diff --git a/src/DurableSDK/Commands/WaitDurableTaskCommand.cs b/src/DurableSDK/Commands/WaitDurableTaskCommand.cs index fc0b3b48..9aad44da 100644 --- a/src/DurableSDK/Commands/WaitDurableTaskCommand.cs +++ b/src/DurableSDK/Commands/WaitDurableTaskCommand.cs @@ -34,7 +34,7 @@ protected override void EndProcessing() } else { - _durableTaskHandler.WaitAll(Task, context, WriteObject); + _durableTaskHandler.WaitAll(Task, context, WriteObject, onFailure: failureReason => DurableActivityErrorHandler.Handle(this, failureReason)); } } diff --git a/src/DurableSDK/DurableTaskHandler.cs b/src/DurableSDK/DurableTaskHandler.cs index c65b1275..be0e9543 100644 --- a/src/DurableSDK/DurableTaskHandler.cs +++ b/src/DurableSDK/DurableTaskHandler.cs @@ -14,6 +14,8 @@ namespace Microsoft.Azure.Functions.PowerShellWorker.Durable internal class DurableTaskHandler { private readonly ManualResetEvent _waitForStop = new ManualResetEvent(initialState: false); + private bool enabledCompoundErrorPropagation { get; } = + PowerShellWorkerConfiguration.GetBoolean(Utils.PropagateCompoundErrorsEnvVariable) ?? false; public void StopAndInitiateDurableTaskOrReplay( DurableTask task, @@ -102,11 +104,49 @@ public void StopAndInitiateDurableTaskOrReplay( } } + public static bool IsNonTerminalTaskFailedEvent( + DurableTask task, + OrchestrationContext context, + HistoryEvent scheduledHistoryEvent, + HistoryEvent completedHistoryEvent + ) + { + + if (task is ActivityInvocationTask activity && completedHistoryEvent.EventType == HistoryEventType.TaskFailed) + { + if (activity.RetryOptions == null) + { + return false; + } + else + { + Action NoOp = _ => { }; + // RetryProcessor assumes events have not been processed, + // it will re-assign the `IsProcessed` flag for these events + // it its execution + scheduledHistoryEvent.IsProcessed = false; + completedHistoryEvent.IsProcessed = false; + + var isFinalFailureEvent = + RetryProcessor.Process( + context.History, + scheduledHistoryEvent, + activity.RetryOptions.MaxNumberOfAttempts, + onSuccess: NoOp, + onFinalFailure: NoOp); + return !isFinalFailureEvent; + } + } + return false; + } + // Waits for all of the given DurableTasks to complete public void WaitAll( IReadOnlyCollection tasksToWaitFor, OrchestrationContext context, - Action output) + Action output, + Action onFailure + ) { context.OrchestrationActionCollector.NextBatch(); @@ -127,6 +167,13 @@ public void WaitAll( } completedHistoryEvent.IsProcessed = true; + + if (IsNonTerminalTaskFailedEvent(task, context, scheduledHistoryEvent, completedHistoryEvent)) + { + // do not count this as a terminal event for this task + continue; + } + completedEvents.Add(completedHistoryEvent); } @@ -142,6 +189,10 @@ public void WaitAll( { output(GetEventResult(completedHistoryEvent)); } + if (completedHistoryEvent.EventType is HistoryEventType.TaskFailed && enabledCompoundErrorPropagation) + { + onFailure(completedHistoryEvent.Reason); + } } } else @@ -175,6 +226,17 @@ public void WaitAny( scheduledHistoryEvent.IsPlayed = true; } + if (completedHistoryEvent == null) + { + continue; + } + + if (IsNonTerminalTaskFailedEvent(task, context, scheduledHistoryEvent, completedHistoryEvent)) + { + // do not count this as a terminal event for this task + completedHistoryEvent = null; + } + if (completedHistoryEvent != null) { completedTasks.Add(task); diff --git a/src/DurableSDK/Tasks/ActivityInvocationTask.cs b/src/DurableSDK/Tasks/ActivityInvocationTask.cs index 6f54182c..cb2a3744 100644 --- a/src/DurableSDK/Tasks/ActivityInvocationTask.cs +++ b/src/DurableSDK/Tasks/ActivityInvocationTask.cs @@ -23,7 +23,7 @@ public class ActivityInvocationTask : DurableTask private object Input { get; } - private RetryOptions RetryOptions { get; } + internal RetryOptions RetryOptions { get; } internal ActivityInvocationTask(string functionName, object functionInput, RetryOptions retryOptions) { diff --git a/src/Utility/Utils.cs b/src/Utility/Utils.cs index e8d3a3d1..b34b1820 100644 --- a/src/Utility/Utils.cs +++ b/src/Utility/Utils.cs @@ -29,6 +29,7 @@ internal class Utils internal const string IsOrchestrationFailureKey = "IsOrchestrationFailure"; internal const string TracePipelineObjectCmdlet = "Microsoft.Azure.Functions.PowerShellWorker\\Trace-PipelineObject"; internal const string ExternalDurableSdkEnvVariable = "ExternalDurablePowerShellSDK"; + internal const string PropagateCompoundErrorsEnvVariable = "PropagateCompoundErrors"; internal readonly static object BoxedTrue = (object)true; internal readonly static object BoxedFalse = (object)false; diff --git a/test/Unit/Durable/CurrentUtcDateTimeTests.cs b/test/Unit/Durable/CurrentUtcDateTimeTests.cs index fca39599..096ce0a2 100644 --- a/test/Unit/Durable/CurrentUtcDateTimeTests.cs +++ b/test/Unit/Durable/CurrentUtcDateTimeTests.cs @@ -189,7 +189,7 @@ public void CurrentUtcDateTime_UpdatesToNextOrchestratorStartedTimestamp_IfAllAc DurableTestUtilities.EmulateStop(durableTaskHandler); } - durableTaskHandler.WaitAll(tasksToWaitFor, context, output => allOutput.Add(output)); + durableTaskHandler.WaitAll(tasksToWaitFor, context, output => allOutput.Add(output), _ => { }); if (allCompleted) { diff --git a/test/Unit/Durable/DurableTaskHandlerTests.cs b/test/Unit/Durable/DurableTaskHandlerTests.cs index ef9ab809..f7f6d4f4 100644 --- a/test/Unit/Durable/DurableTaskHandlerTests.cs +++ b/test/Unit/Durable/DurableTaskHandlerTests.cs @@ -55,7 +55,7 @@ public void WaitAll_OutputsTaskResults_WhenAllTasksCompleted( var allOutput = new List(); var durableTaskHandler = new DurableTaskHandler(); - durableTaskHandler.WaitAll(tasksToWaitFor, orchestrationContext, output => { allOutput.Add(output); }); + durableTaskHandler.WaitAll(tasksToWaitFor, orchestrationContext, output => { allOutput.Add(output); }, _ => { }); Assert.Equal(new[] { "Result1", "Result2", "Result3" }, allOutput); VerifyNoOrchestrationActionAdded(orchestrationContext); @@ -88,7 +88,7 @@ public void WaitAll_OutputsNothing_WhenAnyTaskIsNotCompleted( DurableTestUtilities.EmulateStop(durableTaskHandler); durableTaskHandler.WaitAll(tasksToWaitFor, orchestrationContext, - _ => { Assert.True(false, "Unexpected output"); }); + _ => { Assert.True(false, "Unexpected output"); }, _ => { }); VerifyNoOrchestrationActionAdded(orchestrationContext); } @@ -122,7 +122,7 @@ public void WaitAll_WaitsForStop_WhenAnyTaskIsNotCompleted(bool scheduledAndComp expectedWaitForStop, () => { - durableTaskHandler.WaitAll(tasksToWaitFor, orchestrationContext, _ => { }); + durableTaskHandler.WaitAll(tasksToWaitFor, orchestrationContext, _ => { }, _ => { }); }); } @@ -242,7 +242,7 @@ public void WaitAll_And_WaitAny_StartNewActivityBatch(bool invokeWaitAll, bool i if (invokeWaitAll) { durableTaskHandler.Stop(); // just to avoid the next call getting stuck waiting for a stop event - durableTaskHandler.WaitAll(new DurableTask[0], orchestrationContext, output: _ => {}); + durableTaskHandler.WaitAll(new DurableTask[0], orchestrationContext, output: _ => {}, _ => { }); } if (invokeWaitAny)