Skip to content

Commit d43de2b

Browse files
halspangrynowak
andauthored
Add support for Query API (#810)
This commit adds methods to the DaprClient to call the new Query API. Currently, a raw query string is provided to the API. The API is still in the alpha stage which may cause shifts in how the API works. This should not effect how the SDK exposes the call, but may change the underlying semantics. #777 Signed-off-by: Hal Spang <[email protected]> Co-authored-by: Ryan Nowak <[email protected]>
1 parent f5169cb commit d43de2b

File tree

5 files changed

+299
-0
lines changed

5 files changed

+299
-0
lines changed

src/Dapr.Client/DaprClient.cs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -757,6 +757,22 @@ public abstract Task<bool> TryDeleteStateAsync(
757757
IReadOnlyDictionary<string, string> metadata = default,
758758
CancellationToken cancellationToken = default);
759759

760+
/// <summary>
761+
/// Queries the specified statestore with the given query. The query is a JSON representation of the query as described by the Dapr QueryState API.
762+
/// Note that the underlying statestore must support queries.
763+
/// </summary>
764+
/// <param name="storeName">The name of the statestore.</param>
765+
/// <param name="jsonQuery">A JSON formatted query string.</param>
766+
/// <param name="metadata">Metadata to send to the statestore.</param>
767+
/// <param name="cancellationToken">A <see cref="CancellationToken"/> that can be used to cancel the operation.</param>
768+
/// <typeparam name="TValue">The data type of the value to read.</typeparam>
769+
/// <returns>A <see cref="StateQueryResponse{TValue}"/> that may be paginated, use <see cref="StateQueryResponse{TValue}.Token"/> to continue the query.</returns>
770+
public abstract Task<StateQueryResponse<TValue>> QueryStateAsync<TValue>(
771+
string storeName,
772+
string jsonQuery,
773+
IReadOnlyDictionary<string, string> metadata = default,
774+
CancellationToken cancellationToken = default);
775+
760776
/// <summary>
761777
/// Gets the secret value from the secret store.
762778
/// </summary>

src/Dapr.Client/DaprClientGrpc.cs

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1015,6 +1015,65 @@ private async Task<bool> MakeDeleteStateCallAsync(
10151015
throw new DaprException("State operation failed: the Dapr endpoint indicated a failure. See InnerException for details.", ex);
10161016
}
10171017
}
1018+
1019+
/// <inheritdoc/>
1020+
public async override Task<StateQueryResponse<TValue>> QueryStateAsync<TValue>(
1021+
string storeName,
1022+
string jsonQuery,
1023+
IReadOnlyDictionary<string, string> metadata = default,
1024+
CancellationToken cancellationToken = default)
1025+
{
1026+
var queryRequest = new Autogenerated.QueryStateRequest()
1027+
{
1028+
StoreName = storeName,
1029+
Query = jsonQuery
1030+
};
1031+
1032+
if (metadata != null)
1033+
{
1034+
foreach (var kvp in metadata)
1035+
{
1036+
queryRequest.Metadata.Add(kvp.Key, kvp.Value);
1037+
}
1038+
}
1039+
1040+
var options = CreateCallOptions(headers: null, cancellationToken);
1041+
1042+
try
1043+
{
1044+
var items = new List<StateQueryItem<TValue>>();
1045+
var failedKeys = new List<string>();
1046+
var queryResponse = await client.QueryStateAlpha1Async(queryRequest, options);
1047+
foreach (var item in queryResponse.Results)
1048+
{
1049+
if (!string.IsNullOrEmpty(item.Error))
1050+
{
1051+
// When we encounter an error, we record the key and prepare to throw an exception at the end of the results.
1052+
failedKeys.Add(item.Key);
1053+
continue;
1054+
}
1055+
1056+
items.Add(new StateQueryItem<TValue>(item.Key, TypeConverters.FromJsonByteString<TValue>(item.Data, this.JsonSerializerOptions), item.Etag, item.Error));
1057+
}
1058+
1059+
var results = new StateQueryResponse<TValue>(items, queryResponse.Token, queryResponse.Metadata);
1060+
if (failedKeys.Count > 0)
1061+
{
1062+
// We encountered some bad keys so we throw instead of returning to alert the user.
1063+
throw new StateQueryException<TValue>($"Encountered an error while processing state query results.", results, failedKeys);
1064+
}
1065+
1066+
return results;
1067+
}
1068+
catch (RpcException ex)
1069+
{
1070+
throw new DaprException("Query state operation failed: the Dapr endpointed indicated a failure. See InnerException for details.", ex);
1071+
}
1072+
catch (JsonException ex)
1073+
{
1074+
throw new DaprException("State operation failed: the state payload could not be deserialized. See InnerException for details.", ex);
1075+
}
1076+
}
10181077
#endregion
10191078

10201079
#region Secret Apis
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
using System.Collections.Generic;
2+
3+
namespace Dapr.Client
4+
{
5+
/// <summary>
6+
/// Exception that is thrown when an erorr is encountered during a call to the Query API.
7+
/// This exception contains the partial results (if any) from that exception.
8+
/// </summary>
9+
public class StateQueryException<TValue> : DaprApiException
10+
{
11+
/// <summary>
12+
/// Constructor.
13+
/// </summary>
14+
/// <param name="message">The description of the exception from the source.</param>
15+
/// <param name="response">The response containing successful items, if any, in a response with errors.</param>
16+
/// <param name="failedKeys">The key(s) that encountered an error during the query.</param>
17+
public StateQueryException(string message, StateQueryResponse<TValue> response, IReadOnlyList<string> failedKeys)
18+
: base(message)
19+
{
20+
Response = response;
21+
FailedKeys = failedKeys;
22+
}
23+
24+
/// <summary>
25+
/// The response containing successful items, if any, in a response with errors.
26+
/// </summary>
27+
public StateQueryResponse<TValue> Response { get; }
28+
29+
/// <summary>
30+
/// The key(s) that encountered an error during the query.
31+
/// </summary>
32+
public IReadOnlyList<string> FailedKeys { get; }
33+
}
34+
}

src/Dapr.Client/StateQueryResponse.cs

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
/*
2+
Copyright 2021 The Dapr Authors
3+
Licensed under the Apache License, Version 2.0 (the "License");
4+
you may not use this file except in compliance with the License.
5+
You may obtain a copy of the License at
6+
http://www.apache.org/licenses/LICENSE-2.0
7+
Unless required by applicable law or agreed to in writing, software
8+
distributed under the License is distributed on an "AS IS" BASIS,
9+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
14+
using System.Collections.Generic;
15+
16+
namespace Dapr.Client
17+
{
18+
/// <summary>
19+
/// Represents the response from a state query.
20+
/// </summary>
21+
public class StateQueryResponse<TValue>
22+
{
23+
/// <summary>
24+
/// Constructor.
25+
/// </summary>
26+
/// <param name="results">The results of the query.</param>
27+
/// <param name="token">The pagination token to continue the query.</param>
28+
/// <param name="metadata">The metadata to be passed back to the caller.</param>
29+
public StateQueryResponse(IReadOnlyList<StateQueryItem<TValue>> results, string token, IReadOnlyDictionary<string, string> metadata)
30+
{
31+
Results = new List<StateQueryItem<TValue>>(results);
32+
Token = token;
33+
Metadata = metadata;
34+
}
35+
36+
/// <summary>
37+
/// The results of the query.
38+
/// </summary>
39+
public IReadOnlyList<StateQueryItem<TValue>> Results { get; }
40+
41+
/// <summary>
42+
/// The pagination token to continue the query.
43+
/// </summary>
44+
public string Token { get; }
45+
46+
/// <summary>
47+
/// The metadata to be passed back to the caller.
48+
/// </summary>
49+
public IReadOnlyDictionary<string, string> Metadata { get; }
50+
}
51+
52+
/// <summary>
53+
/// Represents an individual item from the results of a state query.
54+
/// </summary>
55+
public class StateQueryItem<TValue>
56+
{
57+
/// <summary>
58+
/// Constructor.
59+
/// </summary>
60+
/// <param name="key">The key of the returned item.</param>
61+
/// <param name="data">The value of the returned item.</param>
62+
/// <param name="etag">The ETag of the returned item.</param>
63+
/// <param name="error">The error, if one occurred, of the returned item.</param>
64+
public StateQueryItem(string key, TValue data, string etag, string error)
65+
{
66+
Key = key;
67+
Data = data;
68+
ETag = etag;
69+
Error = error;
70+
}
71+
72+
/// <summary>
73+
/// The key from the matched query.
74+
/// </summary>
75+
public string Key { get; }
76+
77+
/// <summary>
78+
/// The data of the the key from the matched query.
79+
/// </summary>
80+
public TValue Data { get; }
81+
82+
/// <summary>
83+
/// The ETag for the key from the matched query.
84+
/// </summary>
85+
public string ETag { get; }
86+
87+
/// <summary>
88+
/// The error from the query, if one occurred.
89+
/// </summary>
90+
public string Error { get; }
91+
}
92+
}

test/Dapr.Client.Test/StateApiTest.cs

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ namespace Dapr.Client.Test
2929
using Xunit;
3030
using System.Threading;
3131
using System.Net.Http;
32+
using System.Text;
3233

3334
public class StateApiTest
3435
{
@@ -999,6 +1000,81 @@ public async Task DeleteBulkStateAsync_ValidateRequest()
9991000
envelope.States[0].Metadata.Should().ContainKey("partitionKey");
10001001
}
10011002

1003+
[Fact]
1004+
public async Task QueryStateAsync_ValidateResult()
1005+
{
1006+
await using var client = TestClient.CreateForDaprClient();
1007+
1008+
var queryJson = "{'query':{'filter':{ 'EQ': {'value':'test'}}}}";
1009+
var request = await client.CaptureGrpcRequestAsync(async daprClient =>
1010+
{
1011+
return await daprClient.QueryStateAsync<Widget>("testStore", queryJson, new Dictionary<string, string>());
1012+
});
1013+
1014+
// Validate request.
1015+
var envelope = await request.GetRequestEnvelopeAsync<Autogenerated.QueryStateRequest>();
1016+
envelope.StoreName.Should().Be("testStore");
1017+
envelope.Query.Should().Be(queryJson);
1018+
envelope.Metadata.Should().BeEmpty();
1019+
1020+
// Validate response.
1021+
var testData = new Widget() { Color = "Green", Size = "Small" };
1022+
var wireResponse = new Autogenerated.QueryStateResponse();
1023+
wireResponse.Results.Add(MakeQueryStateItem("test", testData, "an etag"));
1024+
1025+
var response = await request.CompleteWithMessageAsync(wireResponse);
1026+
response.Results.Count.Should().Be(1);
1027+
response.Results[0].Key.Should().Be("test");
1028+
response.Results[0].Data.Should().Be(testData);
1029+
response.Results[0].ETag.Should().Be("an etag");
1030+
response.Results[0].Error.Should().BeNullOrEmpty();
1031+
}
1032+
1033+
[Fact]
1034+
public async Task QueryStateAsync_EncountersError_ValidatePartialResult()
1035+
{
1036+
await using var client = TestClient.CreateForDaprClient();
1037+
1038+
var queryJson = "{'query':{'filter':{ 'EQ': {'value':'test'}}}}";
1039+
var request = await client.CaptureGrpcRequestAsync(async daprClient =>
1040+
{
1041+
return await daprClient.QueryStateAsync<Widget>("testStore", queryJson, new Dictionary<string, string>());
1042+
});
1043+
1044+
// Validate request.
1045+
var envelope = await request.GetRequestEnvelopeAsync<Autogenerated.QueryStateRequest>();
1046+
envelope.StoreName.Should().Be("testStore");
1047+
envelope.Query.Should().Be(queryJson);
1048+
envelope.Metadata.Should().BeEmpty();
1049+
1050+
// Validate response, we expect to only get the first object as the 2nd will present an error.
1051+
var testData1 = new Widget() { Color = "Green", Size = "Small" };
1052+
var testData2 = new Widget() { Color = "Green", Size = "Medium" };
1053+
var testData3 = new Widget() { Color = "Green", Size = "Large" };
1054+
var wireResponse = new Autogenerated.QueryStateResponse();
1055+
1056+
wireResponse.Results.Add(MakeQueryStateItem("test1", testData1));
1057+
wireResponse.Results.Add(MakeQueryStateItem("test2", testData2, string.Empty, "An error!"));
1058+
wireResponse.Results.Add(MakeQueryStateItem("test3", testData3));
1059+
1060+
var ex = await Assert.ThrowsAsync<StateQueryException<Widget>>(() => request.CompleteWithMessageAsync(wireResponse));
1061+
ex.Message.Should().Be("Encountered an error while processing state query results.");
1062+
var response = ex.Response;
1063+
response.Results.Count.Should().Be(2);
1064+
response.Results[0].Key.Should().Be("test1");
1065+
response.Results[0].Data.Should().Be(testData1);
1066+
response.Results[0].ETag.Should().BeNullOrEmpty();
1067+
response.Results[0].Error.Should().BeNullOrEmpty();
1068+
response.Results[1].Key.Should().Be("test3");
1069+
response.Results[1].Data.Should().Be(testData3);
1070+
response.Results[1].ETag.Should().BeNullOrEmpty();
1071+
response.Results[1].Error.Should().BeNullOrEmpty();
1072+
1073+
var failedKeys = ex.FailedKeys;
1074+
failedKeys.Count.Should().Be(1);
1075+
failedKeys[0].Should().Be("test2");
1076+
}
1077+
10021078
private Autogenerated.GetStateResponse MakeGetStateResponse<T>(T state, string etag = null)
10031079
{
10041080
var data = TypeConverters.ToJsonByteString(state, new JsonSerializerOptions(JsonSerializerDefaults.Web));
@@ -1033,11 +1109,33 @@ private Autogenerated.GetBulkStateResponse MakeGetBulkStateResponse<T>(string ke
10331109
return response;
10341110
}
10351111

1112+
private Autogenerated.QueryStateItem MakeQueryStateItem<T>(string key, T data, string etag = default, string error = default)
1113+
{
1114+
var wireItem = new Autogenerated.QueryStateItem();
1115+
wireItem.Key = key;
1116+
wireItem.Data = ByteString.CopyFromUtf8(JsonSerializer.Serialize(data));
1117+
wireItem.Etag = etag ?? string.Empty;
1118+
wireItem.Error = error ?? string.Empty;
1119+
return wireItem;
1120+
}
1121+
10361122
private class Widget
10371123
{
10381124
public string Size { get; set; }
10391125

10401126
public string Color { get; set; }
1127+
1128+
public override bool Equals(object obj)
1129+
{
1130+
return obj is Widget widget &&
1131+
Size == widget.Size &&
1132+
Color == widget.Color;
1133+
}
1134+
1135+
public override int GetHashCode()
1136+
{
1137+
return HashCode.Combine(Size, Color);
1138+
}
10411139
}
10421140
}
10431141
}

0 commit comments

Comments
 (0)