Skip to content

Commit 16321d3

Browse files
authored
Add support for Actor Reentrancy (#708)
* Add support for Actor Reentrancy This commit allows for the Dapr-Reentrancy-Id header to be propogated through an actor's call chain which lets the dapr runtime allow reentrant calls. #661 * Add unit/e2e tests for Actor Reentrancy * Introduce state isolation for actor reentrancy * Move state context to be fully AsyncLocal
1 parent 9c8e24a commit 16321d3

24 files changed

+591
-56
lines changed

.github/workflows/itests.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,8 @@ jobs:
3737
GOOS: linux
3838
GOARCH: amd64
3939
GOPROXY: https://proxy.golang.org
40-
DAPR_CLI_VER: 1.0.0
41-
DAPR_RUNTIME_VER: 1.0.0
40+
DAPR_CLI_VER: 1.2.0
41+
DAPR_RUNTIME_VER: 1.2.1
4242
DAPR_INSTALL_URL: https://raw.githubusercontent.com/dapr/cli/3dacfb672d55f1436c249057aaebbe597e1066f3/install/install.sh
4343
DAPR_CLI_REF: ''
4444
DAPR_REF: ''

src/Dapr.Actors.AspNetCore/ActorsEndpointRouteBuilderExtensions.cs

Lines changed: 30 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -78,24 +78,45 @@ private static IEndpointConventionBuilder MapActorMethodEndpoint(this IEndpointR
7878
var actorId = (string)routeValues["actorId"];
7979
var methodName = (string)routeValues["methodName"];
8080

81+
if (context.Request.Headers.ContainsKey(Constants.ReentrancyRequestHeaderName))
82+
{
83+
var daprReentrancyHeader = context.Request.Headers[Constants.ReentrancyRequestHeaderName];
84+
ActorReentrancyContextAccessor.ReentrancyContext = daprReentrancyHeader;
85+
}
86+
8187
// If Header is present, call is made using Remoting, use Remoting dispatcher.
8288
if (context.Request.Headers.ContainsKey(Constants.RequestHeaderName))
8389
{
8490
var daprActorheader = context.Request.Headers[Constants.RequestHeaderName];
85-
var (header, body) = await runtime.DispatchWithRemotingAsync(actorTypeName, actorId, methodName, daprActorheader, context.Request.Body);
8691

87-
// Item 1 is header , Item 2 is body
88-
if (header != string.Empty)
92+
try
8993
{
90-
// exception case
91-
context.Response.Headers.Add(Constants.ErrorResponseHeaderName, header); // add error header
92-
}
93-
94-
await context.Response.Body.WriteAsync(body, 0, body.Length); // add response message body
94+
var (header, body) = await runtime.DispatchWithRemotingAsync(actorTypeName, actorId, methodName, daprActorheader, context.Request.Body);
95+
96+
// Item 1 is header , Item 2 is body
97+
if (header != string.Empty)
98+
{
99+
// exception case
100+
context.Response.Headers.Add(Constants.ErrorResponseHeaderName, header); // add error header
101+
}
102+
103+
await context.Response.Body.WriteAsync(body, 0, body.Length); // add response message body
104+
}
105+
finally
106+
{
107+
ActorReentrancyContextAccessor.ReentrancyContext = null;
108+
}
95109
}
96110
else
97111
{
98-
await runtime.DispatchWithoutRemotingAsync(actorTypeName, actorId, methodName, context.Request.Body, context.Response.Body);
112+
try
113+
{
114+
await runtime.DispatchWithoutRemotingAsync(actorTypeName, actorId, methodName, context.Request.Body, context.Response.Body);
115+
}
116+
finally
117+
{
118+
ActorReentrancyContextAccessor.ReentrancyContext = null;
119+
}
99120
}
100121
}).WithDisplayName(b => "Dapr Actors Invoke");
101122
}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
// ------------------------------------------------------------
2+
// Copyright (c) Microsoft Corporation.
3+
// Licensed under the MIT License.
4+
// ------------------------------------------------------------
5+
6+
namespace Dapr.Actors
7+
{
8+
/// <summary>
9+
/// Represents the configuration required for Actor Reentrancy.
10+
///
11+
/// See: https://docs.dapr.io/developing-applications/building-blocks/actors/actor-reentrancy/
12+
/// </summary>
13+
public sealed class ActorReentrancyConfig
14+
{
15+
private bool enabled;
16+
private int? maxStackDepth;
17+
18+
/// <summary>
19+
/// Determines if Actor Reentrancy is enabled or disabled.
20+
/// </summary>
21+
public bool Enabled
22+
{
23+
get
24+
{
25+
return this.enabled;
26+
}
27+
28+
set
29+
{
30+
this.enabled = value;
31+
}
32+
}
33+
34+
/// <summary>
35+
/// Optional parameter that will stop a reentrant call from progressing past the defined
36+
/// limit. This is a safety measure against infinite reentrant calls.
37+
/// </summary>
38+
public int? MaxStackDepth
39+
{
40+
get
41+
{
42+
return this.maxStackDepth;
43+
}
44+
45+
set
46+
{
47+
this.maxStackDepth = value;
48+
}
49+
}
50+
}
51+
}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
// ------------------------------------------------------------
2+
// Copyright (c) Microsoft Corporation.
3+
// Licensed under the MIT License.
4+
// ------------------------------------------------------------
5+
6+
using System.Threading;
7+
8+
namespace Dapr.Actors
9+
{
10+
/// <summary>
11+
/// Accessor for the reentrancy context. This provides the necessary ID to continue a reentrant request
12+
/// across actor invocations.
13+
/// </summary>
14+
internal static class ActorReentrancyContextAccessor
15+
{
16+
private static readonly AsyncLocal<ActorReentrancyContextHolder> state = new AsyncLocal<ActorReentrancyContextHolder>();
17+
18+
/// <summary>
19+
/// The reentrancy context for a given request, if one is present.
20+
/// </summary>
21+
public static string ReentrancyContext
22+
{
23+
get
24+
{
25+
return state.Value?.Context;
26+
}
27+
set
28+
{
29+
var holder = state.Value;
30+
// Reset the current state if it exists.
31+
if (holder != null)
32+
{
33+
holder.Context = null;
34+
}
35+
36+
if (value != null)
37+
{
38+
state.Value = new ActorReentrancyContextHolder { Context = value };
39+
}
40+
}
41+
}
42+
43+
private class ActorReentrancyContextHolder
44+
{
45+
public string Context;
46+
}
47+
}
48+
}

src/Dapr.Actors/Constants.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ internal static class Constants
1313
public const string RequestIdHeaderName = "X-DaprRequestId";
1414
public const string RequestHeaderName = "X-DaprRequestHeader";
1515
public const string ErrorResponseHeaderName = "X-DaprErrorResponseHeader";
16+
public const string ReentrancyRequestHeaderName = "Dapr-Reentrancy-Id";
1617
public const string Dapr = "dapr";
1718
public const string Config = "config";
1819
public const string State = "state";

src/Dapr.Actors/DaprHttpInteractor.cs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,12 @@ HttpRequestMessage RequestFunc()
113113

114114
request.Headers.Add(Constants.RequestHeaderName, Encoding.UTF8.GetString(serializedHeader, 0, serializedHeader.Length));
115115

116+
var reentrancyId = ActorReentrancyContextAccessor.ReentrancyContext;
117+
if (reentrancyId != null)
118+
{
119+
request.Headers.Add(Constants.ReentrancyRequestHeaderName, reentrancyId);
120+
}
121+
116122
return request;
117123
}
118124

@@ -189,6 +195,12 @@ HttpRequestMessage RequestFunc()
189195
request.Content.Headers.ContentType = System.Net.Http.Headers.MediaTypeHeaderValue.Parse("application/json; charset=utf-8");
190196
}
191197

198+
var reentrancyId = ActorReentrancyContextAccessor.ReentrancyContext;
199+
if (reentrancyId != null)
200+
{
201+
request.Headers.Add(Constants.ReentrancyRequestHeaderName, reentrancyId);
202+
}
203+
192204
return request;
193205
}
194206

src/Dapr.Actors/Runtime/Actor.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ protected Actor(ActorHost host)
6262
/// <summary>
6363
/// Gets the StateManager for the actor.
6464
/// </summary>
65-
protected IActorStateManager StateManager { get; set; }
65+
public IActorStateManager StateManager { get; protected set; }
6666

6767
internal async Task OnActivateInternalAsync()
6868
{

src/Dapr.Actors/Runtime/ActorManager.cs

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ namespace Dapr.Actors.Runtime
88
using System;
99
using System.Collections.Concurrent;
1010
using System.IO;
11-
using System.Reflection;
1211
using System.Text;
1312
using System.Text.Json;
1413
using System.Threading;
@@ -340,6 +339,12 @@ private async Task<T> DispatchInternalAsync<T>(ActorId actorId, ActorMethodConte
340339
T retval;
341340
try
342341
{
342+
// Set the state context of the request, if possible.
343+
if (state.Actor.StateManager is IActorContextualState contextualStateManager)
344+
{
345+
await contextualStateManager.SetStateContext(Guid.NewGuid().ToString());
346+
}
347+
343348
// invoke the function of the actor
344349
await actor.OnPreActorMethodAsyncInternal(actorMethodContext);
345350
retval = await actorFunc.Invoke(actor, cancellationToken);
@@ -352,6 +357,14 @@ private async Task<T> DispatchInternalAsync<T>(ActorId actorId, ActorMethodConte
352357
await actor.OnInvokeFailedAsync();
353358
throw;
354359
}
360+
finally
361+
{
362+
// Set the state context of the request, if possible.
363+
if (state.Actor.StateManager is IActorContextualState contextualStateManager)
364+
{
365+
await contextualStateManager.SetStateContext(null);
366+
}
367+
}
355368

356369
return retval;
357370
}

src/Dapr.Actors/Runtime/ActorRuntime.cs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,15 @@ internal Task SerializeSettingsAndRegisteredTypes(IBufferWriter<byte> output)
9999
writer.WriteNumber("remindersStoragePartitions", this.options.RemindersStoragePartitions.Value);
100100
}
101101

102+
// Reentrancy has a default value so it is always included.
103+
writer.WriteStartObject("reentrancy");
104+
writer.WriteBoolean("enabled", this.options.ReentrancyConfig.Enabled);
105+
if (this.options.ReentrancyConfig.MaxStackDepth != null)
106+
{
107+
writer.WriteNumber("maxStackDepth", this.options.ReentrancyConfig.MaxStackDepth.Value);
108+
}
109+
writer.WriteEndObject();
110+
102111
writer.WriteEndObject();
103112
return writer.FlushAsync();
104113
}

src/Dapr.Actors/Runtime/ActorRuntimeOptions.cs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,10 @@ public sealed class ActorRuntimeOptions
2020
private TimeSpan? actorScanInterval;
2121
private TimeSpan? drainOngoingCallTimeout;
2222
private bool drainRebalancedActors;
23+
private ActorReentrancyConfig reentrancyConfig = new ActorReentrancyConfig
24+
{
25+
Enabled = false,
26+
};
2327
private JsonSerializerOptions jsonSerializerOptions = JsonSerializerDefaults.Web;
2428
private string daprApiToken = null;
2529
private int? remindersStoragePartitions = null;
@@ -120,6 +124,25 @@ public bool DrainRebalancedActors
120124
}
121125
}
122126

127+
/// <summary>
128+
/// A configuration that defines the Actor Reentrancy parameters. If set and enabled, Actors
129+
/// will be able to perform reentrant calls to themselves/others. If not set or false, Actors
130+
/// will continue to lock for every request.
131+
/// See https://docs.dapr.io/developing-applications/building-blocks/actors/actor-reentrancy/
132+
/// </summary>
133+
public ActorReentrancyConfig ReentrancyConfig
134+
{
135+
get
136+
{
137+
return this.reentrancyConfig;
138+
}
139+
140+
set
141+
{
142+
this.reentrancyConfig = value;
143+
}
144+
}
145+
123146

124147
/// <summary>
125148
/// The <see cref="JsonSerializerOptions"/> to use for actor state persistence and message deserialization

0 commit comments

Comments
 (0)