From 93c2b6c86b9aec703a8ea524ecccd239fc3cdf24 Mon Sep 17 00:00:00 2001 From: David Justo Date: Thu, 9 Feb 2023 15:39:05 -0800 Subject: [PATCH 1/4] patch failure propagation --- .../Commands/WaitDurableTaskCommand.cs | 4 +- src/DurableSDK/DurableTaskHandler.cs | 60 ++++++++++++++++++- .../Tasks/ActivityInvocationTask.cs | 2 +- test/Unit/Durable/CurrentUtcDateTimeTests.cs | 4 +- test/Unit/Durable/DurableTaskHandlerTests.cs | 14 ++--- 5 files changed, 70 insertions(+), 14 deletions(-) diff --git a/src/DurableSDK/Commands/WaitDurableTaskCommand.cs b/src/DurableSDK/Commands/WaitDurableTaskCommand.cs index fc0b3b48..e46c1a19 100644 --- a/src/DurableSDK/Commands/WaitDurableTaskCommand.cs +++ b/src/DurableSDK/Commands/WaitDurableTaskCommand.cs @@ -30,11 +30,11 @@ protected override void EndProcessing() if (Any.IsPresent) { - _durableTaskHandler.WaitAny(Task, context, WriteObject); + _durableTaskHandler.WaitAny(Task, context, WriteObject, onFailure: failureReason => DurableActivityErrorHandler.Handle(this, failureReason)); } else { - _durableTaskHandler.WaitAll(Task, context, WriteObject); + _durableTaskHandler.WaitAll(Task, context, WriteObject, onFailure: failureReason => DurableActivityErrorHandler.Handle(this, failureReason)); } } diff --git a/src/DurableSDK/DurableTaskHandler.cs b/src/DurableSDK/DurableTaskHandler.cs index c65b1275..62921215 100644 --- a/src/DurableSDK/DurableTaskHandler.cs +++ b/src/DurableSDK/DurableTaskHandler.cs @@ -102,11 +102,49 @@ public void StopAndInitiateDurableTaskOrReplay( } } + public static bool IsFinalTaskFailedEvent( + DurableTask task, + OrchestrationContext context, + HistoryEvent scheduledHistoryEvent, + HistoryEvent completedHistoryEvent + ) + { + + if (task is ActivityInvocationTask activity && completedHistoryEvent.EventType == HistoryEventType.TaskFailed) + { + if (activity.RetryOptions == null) + { + return true; + } + else + { + Action NoOp = _ => { }; + // RetryProcessor assumes events have not been processed, + // it will re-assign the `IsProcessed` flag for these events + // it its execution + scheduledHistoryEvent.IsProcessed = false; + completedHistoryEvent.IsProcessed = false; + + var isFinalFailureEvent = + RetryProcessor.Process( + context.History, + scheduledHistoryEvent, + activity.RetryOptions.MaxNumberOfAttempts, + onSuccess: NoOp, + onFinalFailure: NoOp); + return isFinalFailureEvent; + } + } + return false; + } + // Waits for all of the given DurableTasks to complete public void WaitAll( IReadOnlyCollection tasksToWaitFor, OrchestrationContext context, - Action output) + Action output, + Action onFailure + ) { context.OrchestrationActionCollector.NextBatch(); @@ -127,6 +165,13 @@ public void WaitAll( } completedHistoryEvent.IsProcessed = true; + + if (!IsFinalTaskFailedEvent(task, context, scheduledHistoryEvent, completedHistoryEvent)) + { + // do not count this as a terminal event for this task + continue; + } + completedEvents.Add(completedHistoryEvent); } @@ -142,6 +187,10 @@ public void WaitAll( { output(GetEventResult(completedHistoryEvent)); } + if (completedHistoryEvent.EventType is HistoryEventType.TaskFailed) + { + onFailure(completedHistoryEvent.Reason); + } } } else @@ -154,7 +203,8 @@ public void WaitAll( public void WaitAny( IReadOnlyCollection tasksToWaitFor, OrchestrationContext context, - Action output) + Action output, + Action onFailure) { context.OrchestrationActionCollector.NextBatch(); @@ -175,6 +225,12 @@ public void WaitAny( scheduledHistoryEvent.IsPlayed = true; } + if (!IsFinalTaskFailedEvent(task, context, scheduledHistoryEvent, completedHistoryEvent)) + { + // do not count this as a terminal event for this task + completedHistoryEvent = null; + } + if (completedHistoryEvent != null) { completedTasks.Add(task); diff --git a/src/DurableSDK/Tasks/ActivityInvocationTask.cs b/src/DurableSDK/Tasks/ActivityInvocationTask.cs index 6f54182c..cb2a3744 100644 --- a/src/DurableSDK/Tasks/ActivityInvocationTask.cs +++ b/src/DurableSDK/Tasks/ActivityInvocationTask.cs @@ -23,7 +23,7 @@ public class ActivityInvocationTask : DurableTask private object Input { get; } - private RetryOptions RetryOptions { get; } + internal RetryOptions RetryOptions { get; } internal ActivityInvocationTask(string functionName, object functionInput, RetryOptions retryOptions) { diff --git a/test/Unit/Durable/CurrentUtcDateTimeTests.cs b/test/Unit/Durable/CurrentUtcDateTimeTests.cs index fca39599..d79b706e 100644 --- a/test/Unit/Durable/CurrentUtcDateTimeTests.cs +++ b/test/Unit/Durable/CurrentUtcDateTimeTests.cs @@ -189,7 +189,7 @@ public void CurrentUtcDateTime_UpdatesToNextOrchestratorStartedTimestamp_IfAllAc DurableTestUtilities.EmulateStop(durableTaskHandler); } - durableTaskHandler.WaitAll(tasksToWaitFor, context, output => allOutput.Add(output)); + durableTaskHandler.WaitAll(tasksToWaitFor, context, output => allOutput.Add(output), _ => { }); if (allCompleted) { @@ -226,7 +226,7 @@ public void CurrentUtcDateTime_UpdatesToNextOrchestratorStartedTimestamp_IfAnyAc DurableTestUtilities.EmulateStop(durableTaskHandler); } - durableTaskHandler.WaitAny(tasksToWaitFor, context, output => allOutput.Add(output)); + durableTaskHandler.WaitAny(tasksToWaitFor, context, output => allOutput.Add(output), _ => { }); if (anyCompleted) { diff --git a/test/Unit/Durable/DurableTaskHandlerTests.cs b/test/Unit/Durable/DurableTaskHandlerTests.cs index ef9ab809..0bdffaf8 100644 --- a/test/Unit/Durable/DurableTaskHandlerTests.cs +++ b/test/Unit/Durable/DurableTaskHandlerTests.cs @@ -55,7 +55,7 @@ public void WaitAll_OutputsTaskResults_WhenAllTasksCompleted( var allOutput = new List(); var durableTaskHandler = new DurableTaskHandler(); - durableTaskHandler.WaitAll(tasksToWaitFor, orchestrationContext, output => { allOutput.Add(output); }); + durableTaskHandler.WaitAll(tasksToWaitFor, orchestrationContext, output => { allOutput.Add(output); }, _ => { }); Assert.Equal(new[] { "Result1", "Result2", "Result3" }, allOutput); VerifyNoOrchestrationActionAdded(orchestrationContext); @@ -88,7 +88,7 @@ public void WaitAll_OutputsNothing_WhenAnyTaskIsNotCompleted( DurableTestUtilities.EmulateStop(durableTaskHandler); durableTaskHandler.WaitAll(tasksToWaitFor, orchestrationContext, - _ => { Assert.True(false, "Unexpected output"); }); + _ => { Assert.True(false, "Unexpected output"); }, _ => { }); VerifyNoOrchestrationActionAdded(orchestrationContext); } @@ -122,7 +122,7 @@ public void WaitAll_WaitsForStop_WhenAnyTaskIsNotCompleted(bool scheduledAndComp expectedWaitForStop, () => { - durableTaskHandler.WaitAll(tasksToWaitFor, orchestrationContext, _ => { }); + durableTaskHandler.WaitAll(tasksToWaitFor, orchestrationContext, _ => { }, _ => { }); }); } @@ -151,7 +151,7 @@ public void WaitAny_OutputsEarliestCompletedTask_WhenAnyTaskCompleted(bool compl var allOutput = new List(); var durableTaskHandler = new DurableTaskHandler(); - durableTaskHandler.WaitAny(tasksToWaitFor, orchestrationContext, output => { allOutput.Add(output); }); + durableTaskHandler.WaitAny(tasksToWaitFor, orchestrationContext, output => { allOutput.Add(output); }, _ => { }); if (completed) { @@ -190,7 +190,7 @@ public void WaitAny_WaitsForStop_WhenAllTasksAreNotCompleted(bool completed, boo expectedWaitForStop, () => { - durableTaskHandler.WaitAny(tasksToWaitFor, orchestrationContext, _ => { }); + durableTaskHandler.WaitAny(tasksToWaitFor, orchestrationContext, _ => { }, _ => { }); }); } @@ -242,13 +242,13 @@ public void WaitAll_And_WaitAny_StartNewActivityBatch(bool invokeWaitAll, bool i if (invokeWaitAll) { durableTaskHandler.Stop(); // just to avoid the next call getting stuck waiting for a stop event - durableTaskHandler.WaitAll(new DurableTask[0], orchestrationContext, output: _ => {}); + durableTaskHandler.WaitAll(new DurableTask[0], orchestrationContext, output: _ => {}, _ => { }); } if (invokeWaitAny) { durableTaskHandler.Stop(); // just to avoid the next call getting stuck waiting for a stop event - durableTaskHandler.WaitAny(new DurableTask[0], orchestrationContext, output: _ => {}); + durableTaskHandler.WaitAny(new DurableTask[0], orchestrationContext, output: _ => {}, _ => { }); } durableTaskHandler.StopAndInitiateDurableTaskOrReplay( From ba2c81a9befdc365435a0a9a113bb7c9ddce2a47 Mon Sep 17 00:00:00 2001 From: David Justo Date: Fri, 10 Feb 2023 11:15:23 -0800 Subject: [PATCH 2/4] improve naming and remove nullPtr exception --- src/DurableSDK/DurableTaskHandler.cs | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/src/DurableSDK/DurableTaskHandler.cs b/src/DurableSDK/DurableTaskHandler.cs index 62921215..bf59825d 100644 --- a/src/DurableSDK/DurableTaskHandler.cs +++ b/src/DurableSDK/DurableTaskHandler.cs @@ -102,7 +102,7 @@ public void StopAndInitiateDurableTaskOrReplay( } } - public static bool IsFinalTaskFailedEvent( + public static bool IsNonTerminalTaskFailedEvent( DurableTask task, OrchestrationContext context, HistoryEvent scheduledHistoryEvent, @@ -114,7 +114,7 @@ HistoryEvent completedHistoryEvent { if (activity.RetryOptions == null) { - return true; + return false; } else { @@ -132,10 +132,10 @@ HistoryEvent completedHistoryEvent activity.RetryOptions.MaxNumberOfAttempts, onSuccess: NoOp, onFinalFailure: NoOp); - return isFinalFailureEvent; + return !isFinalFailureEvent; } } - return false; + return true; } // Waits for all of the given DurableTasks to complete @@ -166,7 +166,7 @@ Action onFailure completedHistoryEvent.IsProcessed = true; - if (!IsFinalTaskFailedEvent(task, context, scheduledHistoryEvent, completedHistoryEvent)) + if (IsNonTerminalTaskFailedEvent(task, context, scheduledHistoryEvent, completedHistoryEvent)) { // do not count this as a terminal event for this task continue; @@ -225,7 +225,12 @@ public void WaitAny( scheduledHistoryEvent.IsPlayed = true; } - if (!IsFinalTaskFailedEvent(task, context, scheduledHistoryEvent, completedHistoryEvent)) + if (completedHistoryEvent == null) + { + continue; + } + + if (IsNonTerminalTaskFailedEvent(task, context, scheduledHistoryEvent, completedHistoryEvent)) { // do not count this as a terminal event for this task completedHistoryEvent = null; From 40857ec73fc9f9f8b822f957278623482db9df36 Mon Sep 17 00:00:00 2001 From: David Justo Date: Fri, 10 Feb 2023 13:28:55 -0800 Subject: [PATCH 3/4] patch check --- src/DurableSDK/DurableTaskHandler.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/DurableSDK/DurableTaskHandler.cs b/src/DurableSDK/DurableTaskHandler.cs index bf59825d..cf1116bc 100644 --- a/src/DurableSDK/DurableTaskHandler.cs +++ b/src/DurableSDK/DurableTaskHandler.cs @@ -135,7 +135,7 @@ HistoryEvent completedHistoryEvent return !isFinalFailureEvent; } } - return true; + return false; } // Waits for all of the given DurableTasks to complete From 17da9fb18acddbb48ce12f643e271d24e60ca98a Mon Sep 17 00:00:00 2001 From: David Justo Date: Tue, 14 Feb 2023 10:34:30 -0800 Subject: [PATCH 4/4] add envvar control, remove unused param in waitany --- src/DurableSDK/Commands/WaitDurableTaskCommand.cs | 2 +- src/DurableSDK/DurableTaskHandler.cs | 7 ++++--- src/Utility/Utils.cs | 1 + test/Unit/Durable/CurrentUtcDateTimeTests.cs | 2 +- test/Unit/Durable/DurableTaskHandlerTests.cs | 6 +++--- 5 files changed, 10 insertions(+), 8 deletions(-) diff --git a/src/DurableSDK/Commands/WaitDurableTaskCommand.cs b/src/DurableSDK/Commands/WaitDurableTaskCommand.cs index e46c1a19..9aad44da 100644 --- a/src/DurableSDK/Commands/WaitDurableTaskCommand.cs +++ b/src/DurableSDK/Commands/WaitDurableTaskCommand.cs @@ -30,7 +30,7 @@ protected override void EndProcessing() if (Any.IsPresent) { - _durableTaskHandler.WaitAny(Task, context, WriteObject, onFailure: failureReason => DurableActivityErrorHandler.Handle(this, failureReason)); + _durableTaskHandler.WaitAny(Task, context, WriteObject); } else { diff --git a/src/DurableSDK/DurableTaskHandler.cs b/src/DurableSDK/DurableTaskHandler.cs index cf1116bc..be0e9543 100644 --- a/src/DurableSDK/DurableTaskHandler.cs +++ b/src/DurableSDK/DurableTaskHandler.cs @@ -14,6 +14,8 @@ namespace Microsoft.Azure.Functions.PowerShellWorker.Durable internal class DurableTaskHandler { private readonly ManualResetEvent _waitForStop = new ManualResetEvent(initialState: false); + private bool enabledCompoundErrorPropagation { get; } = + PowerShellWorkerConfiguration.GetBoolean(Utils.PropagateCompoundErrorsEnvVariable) ?? false; public void StopAndInitiateDurableTaskOrReplay( DurableTask task, @@ -187,7 +189,7 @@ Action onFailure { output(GetEventResult(completedHistoryEvent)); } - if (completedHistoryEvent.EventType is HistoryEventType.TaskFailed) + if (completedHistoryEvent.EventType is HistoryEventType.TaskFailed && enabledCompoundErrorPropagation) { onFailure(completedHistoryEvent.Reason); } @@ -203,8 +205,7 @@ Action onFailure public void WaitAny( IReadOnlyCollection tasksToWaitFor, OrchestrationContext context, - Action output, - Action onFailure) + Action output) { context.OrchestrationActionCollector.NextBatch(); diff --git a/src/Utility/Utils.cs b/src/Utility/Utils.cs index e8d3a3d1..b34b1820 100644 --- a/src/Utility/Utils.cs +++ b/src/Utility/Utils.cs @@ -29,6 +29,7 @@ internal class Utils internal const string IsOrchestrationFailureKey = "IsOrchestrationFailure"; internal const string TracePipelineObjectCmdlet = "Microsoft.Azure.Functions.PowerShellWorker\\Trace-PipelineObject"; internal const string ExternalDurableSdkEnvVariable = "ExternalDurablePowerShellSDK"; + internal const string PropagateCompoundErrorsEnvVariable = "PropagateCompoundErrors"; internal readonly static object BoxedTrue = (object)true; internal readonly static object BoxedFalse = (object)false; diff --git a/test/Unit/Durable/CurrentUtcDateTimeTests.cs b/test/Unit/Durable/CurrentUtcDateTimeTests.cs index d79b706e..096ce0a2 100644 --- a/test/Unit/Durable/CurrentUtcDateTimeTests.cs +++ b/test/Unit/Durable/CurrentUtcDateTimeTests.cs @@ -226,7 +226,7 @@ public void CurrentUtcDateTime_UpdatesToNextOrchestratorStartedTimestamp_IfAnyAc DurableTestUtilities.EmulateStop(durableTaskHandler); } - durableTaskHandler.WaitAny(tasksToWaitFor, context, output => allOutput.Add(output), _ => { }); + durableTaskHandler.WaitAny(tasksToWaitFor, context, output => allOutput.Add(output)); if (anyCompleted) { diff --git a/test/Unit/Durable/DurableTaskHandlerTests.cs b/test/Unit/Durable/DurableTaskHandlerTests.cs index 0bdffaf8..f7f6d4f4 100644 --- a/test/Unit/Durable/DurableTaskHandlerTests.cs +++ b/test/Unit/Durable/DurableTaskHandlerTests.cs @@ -151,7 +151,7 @@ public void WaitAny_OutputsEarliestCompletedTask_WhenAnyTaskCompleted(bool compl var allOutput = new List(); var durableTaskHandler = new DurableTaskHandler(); - durableTaskHandler.WaitAny(tasksToWaitFor, orchestrationContext, output => { allOutput.Add(output); }, _ => { }); + durableTaskHandler.WaitAny(tasksToWaitFor, orchestrationContext, output => { allOutput.Add(output); }); if (completed) { @@ -190,7 +190,7 @@ public void WaitAny_WaitsForStop_WhenAllTasksAreNotCompleted(bool completed, boo expectedWaitForStop, () => { - durableTaskHandler.WaitAny(tasksToWaitFor, orchestrationContext, _ => { }, _ => { }); + durableTaskHandler.WaitAny(tasksToWaitFor, orchestrationContext, _ => { }); }); } @@ -248,7 +248,7 @@ public void WaitAll_And_WaitAny_StartNewActivityBatch(bool invokeWaitAll, bool i if (invokeWaitAny) { durableTaskHandler.Stop(); // just to avoid the next call getting stuck waiting for a stop event - durableTaskHandler.WaitAny(new DurableTask[0], orchestrationContext, output: _ => {}, _ => { }); + durableTaskHandler.WaitAny(new DurableTask[0], orchestrationContext, output: _ => {}); } durableTaskHandler.StopAndInitiateDurableTaskOrReplay(