Skip to content

Commit 0f1e1bf

Browse files
authored
Initial Bulk Subscribe functionality (#1009)
Signed-off-by: Yash Nisar <[email protected]>
1 parent c863582 commit 0f1e1bf

21 files changed

+852
-63
lines changed

examples/AspNetCore/ControllerSample/Controllers/SampleController.cs

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,12 @@
1414
namespace ControllerSample.Controllers
1515
{
1616
using System;
17+
using System.Collections.Generic;
1718
using System.Text;
1819
using System.Text.Json;
1920
using System.Threading.Tasks;
2021
using Dapr;
22+
using Dapr.AspNetCore;
2123
using Dapr.Client;
2224
using Microsoft.AspNetCore.Mvc;
2325
using Microsoft.Extensions.Logging;
@@ -86,6 +88,51 @@ public async Task<ActionResult<Account>> Deposit(Transaction transaction, [FromS
8688
return state.Value;
8789
}
8890

91+
/// <summary>
92+
/// Method for depositing multiple times to the account as specified in transaction.
93+
/// </summary>
94+
/// <param name="bulkMessage">List of entries of type BulkMessageModel received from dapr.</param>
95+
/// <param name="daprClient">State client to interact with Dapr runtime.</param>
96+
/// <returns>A <see cref="Task{TResult}"/> representing the result of the asynchronous operation.</returns>
97+
/// "pubsub", the first parameter into the Topic attribute, is name of the default pub/sub configured by the Dapr CLI.
98+
[Topic("pubsub", "multideposit", "amountDeadLetterTopic", false)]
99+
[BulkSubscribe("multideposit", 500, 2000)]
100+
[HttpPost("multideposit")]
101+
public async Task<ActionResult<BulkSubscribeAppResponse>> MultiDeposit([FromBody] BulkSubscribeMessage<BulkMessageModel<Transaction>>
102+
bulkMessage, [FromServices] DaprClient daprClient)
103+
{
104+
logger.LogInformation("Enter bulk deposit");
105+
106+
List<BulkSubscribeAppResponseEntry> entries = new List<BulkSubscribeAppResponseEntry>();
107+
108+
foreach (var entry in bulkMessage.Entries)
109+
{
110+
try
111+
{
112+
var transaction = entry.Event.Data;
113+
114+
var state = await daprClient.GetStateEntryAsync<Account>(StoreName, transaction.Id);
115+
state.Value ??= new Account() { Id = transaction.Id, };
116+
logger.LogInformation("Id is {0}, the amount to be deposited is {1}",
117+
transaction.Id, transaction.Amount);
118+
119+
if (transaction.Amount < 0m)
120+
{
121+
return BadRequest(new { statusCode = 400, message = "bad request" });
122+
}
123+
124+
state.Value.Balance += transaction.Amount;
125+
logger.LogInformation("Balance is {0}", state.Value.Balance);
126+
await state.SaveAsync();
127+
entries.Add(new BulkSubscribeAppResponseEntry(entry.EntryId, BulkSubscribeAppResponseStatus.SUCCESS));
128+
} catch (Exception e) {
129+
logger.LogError(e.Message);
130+
entries.Add(new BulkSubscribeAppResponseEntry(entry.EntryId, BulkSubscribeAppResponseStatus.RETRY));
131+
}
132+
}
133+
return new BulkSubscribeAppResponse(entries);
134+
}
135+
89136
/// <summary>
90137
/// Method for viewing the error message when the deposit/withdrawal amounts
91138
/// are negative.
@@ -190,6 +237,7 @@ public async Task<ActionResult<Account>> RawDeposit([FromBody] JsonDocument rawT
190237

191238
/// <summary>
192239
/// Method for returning a BadRequest result which will cause Dapr sidecar to throw an RpcException
240+
/// </summary>
193241
[HttpPost("throwException")]
194242
public async Task<ActionResult<Account>> ThrowException(Transaction transaction, [FromServices] DaprClient daprClient)
195243
{

examples/AspNetCore/ControllerSample/README.md

Lines changed: 85 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,10 @@ This sample shows using Dapr with ASP.NET Core controllers. This application is
55
It exposes the following endpoints over HTTP:
66
- GET `/{account}`: Get the balance for the account specified by `id`
77
- POST `/deposit`: Accepts a JSON payload to deposit money to an account
8+
- POST `/multideposit`: Accepts a JSON payload to deposit money multiple times to a bulk subscribed topic
89
- POST `/withdraw`: Accepts a JSON payload to withdraw money from an account
910

10-
The application also registers for pub/sub with the `deposit` and `withdraw` topics.
11+
The application also registers for pub/sub with the `deposit`, `multideposit` and `withdraw` topics.
1112

1213
## Prerequisitess
1314

@@ -57,7 +58,76 @@ Output:
5758
```
5859

5960
---
61+
**Deposit Money multiple times to a bulk subscribed topic**
6062

63+
On Linux, MacOS:
64+
```
65+
curl -X POST http://127.0.0.1:5000/multideposit \
66+
-H 'Content-Type: application/json' \
67+
-d '{
68+
"entries":[
69+
{
70+
"entryId":"653dd9f5-f375-499b-8b2a-c4599bbd36b0",
71+
"event":{
72+
"data":{
73+
"amount":10,
74+
"id":"17"
75+
},
76+
"datacontenttype":"application/json",
77+
"id":"DaprClient",
78+
"pubsubname":"pubsub",
79+
"source":"Dapr",
80+
"specversion":"1.0",
81+
"topic":"multideposit",
82+
"type":"com.dapr.event.sent"
83+
},
84+
"metadata":null,
85+
"contentType":"application/cloudevents+json"
86+
},
87+
{
88+
"entryId":"7ea8191e-1e62-46d0-9ba8-ff6e571351cc",
89+
"event":{
90+
"data":{
91+
"amount":20,
92+
"id":"17"
93+
},
94+
"datacontenttype":"application/json",
95+
"id":"DaprClient",
96+
"pubsubname":"pubsub",
97+
"source":"Dapr",
98+
"specversion":"1.0",
99+
"topic":"multideposit",
100+
"type":"com.dapr.event.sent"
101+
},
102+
"metadata":null,
103+
"contentType":"application/cloudevents+json"
104+
}
105+
],
106+
"id":"fa68c580-1b96-40d3-aa2c-04bab05e954e",
107+
"metadata":{
108+
"pubsubName":"pubsub"
109+
},
110+
"pubsubname":"pubsub",
111+
"topic":"multideposit",
112+
"type":"com.dapr.event.sent.bulk"
113+
}'
114+
```
115+
Output:
116+
```
117+
{
118+
"statuses":[
119+
{
120+
"entryId":"653dd9f5-f375-499b-8b2a-c4599bbd36b0",
121+
"status":"SUCCESS"
122+
},
123+
{
124+
"entryId":"7ea8191e-1e62-46d0-9ba8-ff6e571351cc",
125+
"status":"SUCCESS"
126+
}
127+
]
128+
}
129+
```
130+
---
61131
**Withdraw Money**
62132
On Linux, MacOS:
63133
```sh
@@ -213,6 +283,20 @@ public async Task<ActionResult<Account>> Deposit(...)
213283

214284
`[Topic(...)]` associates a pub/sub named `pubsub` (this is the default configured by the Dapr CLI) pub/sub topic `deposit` with this endpoint.
215285

286+
---
287+
```C#
288+
[Topic("pubsub", "multideposit", "amountDeadLetterTopic", false)]
289+
[BulkSubscribe("multideposit")]
290+
[HttpPost("multideposit")]
291+
public async Task<ActionResult<BulkSubscribeAppResponse>> MultiDeposit([FromBody] BulkSubscribeMessage<BulkMessageModel<Transaction>>
292+
bulkMessage, [FromServices] DaprClient daprClient)
293+
```
294+
295+
`[BulkSubscribe(...)]` associates a topic with the name mentioned in the attribute with the ability to be bulk subscribed to. It can take additional parameters like `MaxMessagesCount` and `MaxAwaitDurationMs`.
296+
If those parameters are not supplied, the defaults of 100 and 1000ms are set.
297+
298+
However, you need to use `BulkSubscribeMessage<BulkMessageModel<T>>` in the input and that you need to return the `BulkSubscribeAppResponse` as well.
299+
216300
---
217301

218302
```C#

examples/AspNetCore/RoutingSample/README.md

Lines changed: 86 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,12 @@
33
This sample shows using Dapr with ASP.NET Core routing. This application is a simple and not-so-secure banking application. The application uses the Dapr state-store for its data storage.
44

55
It exposes the following endpoints over HTTP:
6-
- GET `/{id}`: Get the balance for the account specified by `id`
7-
- POST `/deposit`: Accepts a JSON payload to deposit money to an account
8-
- POST `/withdraw`: Accepts a JSON payload to withdraw money from an account
6+
- GET `/{id}`: Get the balance for the account specified by `id`
7+
- POST `/deposit`: Accepts a JSON payload to deposit money to an account
8+
- POST `/multideposit`: Accepts a JSON payload to deposit money multiple times to a bulk subscribed topic
9+
- POST `/withdraw`: Accepts a JSON payload to withdraw money from an account
910

10-
The application also registers for pub/sub with the `deposit` and `withdraw` topics.
11+
The application also registers for pub/sub with the `deposit`, `multideposit`, and `withdraw` topics.
1112

1213
## Prerequisites
1314

@@ -56,6 +57,76 @@ Output:
5657
```
5758

5859
---
60+
**Deposit Money multiple times to a bulk subscribed topic**
61+
62+
On Linux, MacOS:
63+
```
64+
curl -X POST http://127.0.0.1:5000/multideposit \
65+
-H 'Content-Type: application/json' \
66+
-d '{
67+
"entries":[
68+
{
69+
"entryId":"653dd9f5-f375-499b-8b2a-c4599bbd36b0",
70+
"event":{
71+
"data":{
72+
"amount":10,
73+
"id":"17"
74+
},
75+
"datacontenttype":"application/json",
76+
"id":"DaprClient",
77+
"pubsubname":"pubsub",
78+
"source":"Dapr",
79+
"specversion":"1.0",
80+
"topic":"multideposit",
81+
"type":"com.dapr.event.sent"
82+
},
83+
"metadata":null,
84+
"contentType":"application/cloudevents+json"
85+
},
86+
{
87+
"entryId":"7ea8191e-1e62-46d0-9ba8-ff6e571351cc",
88+
"event":{
89+
"data":{
90+
"amount":20,
91+
"id":"17"
92+
},
93+
"datacontenttype":"application/json",
94+
"id":"DaprClient",
95+
"pubsubname":"pubsub",
96+
"source":"Dapr",
97+
"specversion":"1.0",
98+
"topic":"multideposit",
99+
"type":"com.dapr.event.sent"
100+
},
101+
"metadata":null,
102+
"contentType":"application/cloudevents+json"
103+
}
104+
],
105+
"id":"fa68c580-1b96-40d3-aa2c-04bab05e954e",
106+
"metadata":{
107+
"pubsubName":"pubsub"
108+
},
109+
"pubsubname":"pubsub",
110+
"topic":"multideposit",
111+
"type":"com.dapr.event.sent.bulk"
112+
}'
113+
```
114+
Output:
115+
```
116+
{
117+
"statuses":[
118+
{
119+
"entryId":"653dd9f5-f375-499b-8b2a-c4599bbd36b0",
120+
"status":"SUCCESS"
121+
},
122+
{
123+
"entryId":"7ea8191e-1e62-46d0-9ba8-ff6e571351cc",
124+
"status":"SUCCESS"
125+
}
126+
]
127+
}
128+
```
129+
---
59130

60131
**Withdraw Money**
61132
On Linux, MacOS:
@@ -194,6 +265,7 @@ app.UseEndpoints(endpoints =>
194265

195266
endpoints.MapGet("{id}", Balance);
196267
endpoints.MapPost("deposit", Deposit).WithTopic(PubsubName, "deposit");
268+
endpoints.MapPost("multideposit", MultiDeposit).WithTopic(multiDepositTopicOptions).WithBulkSubscribe(bulkSubscribeTopicOptions);
197269
endpoints.MapPost("withdraw", Withdraw).WithTopic(PubsubName, "withdraw");
198270
});
199271
```
@@ -213,9 +285,19 @@ var withdrawTopicOptions = new TopicOptions();
213285
withdrawTopicOptions.PubsubName = PubsubName;
214286
withdrawTopicOptions.Name = "withdraw";
215287
withdrawTopicOptions.DeadLetterTopic = "amountDeadLetterTopic";
288+
289+
var multiDepositTopicOptions = new TopicOptions
290+
{ PubsubName = PubsubName, Name = "multideposit" };
291+
292+
var bulkSubscribeTopicOptions = new BulkSubscribeTopicOptions
293+
{
294+
TopicName = "multideposit", MaxMessagesCount = 250, MaxAwaitDurationMs = 1000
295+
};
216296
```
217297
`WithTopic(...)` now takes the `TopicOptions(..)` instance that defines configurations for the subscribe endpoint.
218298

299+
`WithBulkSubscribe(...)` now takes the `BulkSubscribeTopicOptions(..)` instance that defines configurations for the bulk subscribe endpoint.
300+
219301
---
220302

221303
```C#

examples/AspNetCore/RoutingSample/RoutingSample.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
<Project Sdk="Microsoft.NET.Sdk.Web">
22

33
<PropertyGroup>
4-
<TargetFramework>netcoreapp3.1</TargetFramework>
4+
<TargetFramework>net6</TargetFramework>
55
</PropertyGroup>
66

77
<ItemGroup>

0 commit comments

Comments
 (0)