Skip to content

Adds IExternalInvoker #776

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Mar 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions src/DurableSDK/ExternalInvoker.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
//
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
//

namespace Microsoft.Azure.Functions.PowerShellWorker.Durable
{
using System;
using System.Management.Automation;

internal class ExternalInvoker : IExternalInvoker
{
private readonly Func<PowerShell, object> _externalSDKInvokerFunction;
private readonly IPowerShellServices _powerShellServices;

public ExternalInvoker(Func<PowerShell, object> invokerFunction, IPowerShellServices powerShellServices)
{
_externalSDKInvokerFunction = invokerFunction;
_powerShellServices = powerShellServices;
}

public void Invoke()
{
_externalSDKInvokerFunction.Invoke(_powerShellServices.GetPowerShell());
}
}
}
14 changes: 14 additions & 0 deletions src/DurableSDK/IExternalInvoker.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
//
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
//

namespace Microsoft.Azure.Functions.PowerShellWorker.Durable
{
// Represents a contract for the
internal interface IExternalInvoker
{
// Method to invoke an orchestration using the external Durable SDK
void Invoke();
}
}
4 changes: 1 addition & 3 deletions src/DurableSDK/IOrchestrationInvoker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,11 @@

namespace Microsoft.Azure.Functions.PowerShellWorker.Durable
{
using System;
using System.Collections;
using System.Management.Automation;

internal interface IOrchestrationInvoker
{
Hashtable Invoke(OrchestrationBindingInfo orchestrationBindingInfo, IPowerShellServices pwsh);
void SetExternalInvoker(Action<PowerShell> externalInvoker);
void SetExternalInvoker(IExternalInvoker externalInvoker);
}
}
2 changes: 1 addition & 1 deletion src/DurableSDK/IPowerShellServices.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ internal interface IPowerShellServices

void SetDurableClient(object durableClient);

OrchestrationBindingInfo SetOrchestrationContext(ParameterBinding orchestrationContext, out Action<object> externalInvoker);
OrchestrationBindingInfo SetOrchestrationContext(ParameterBinding context, out IExternalInvoker externalInvoker);

void ClearOrchestrationContext();

Expand Down
9 changes: 5 additions & 4 deletions src/DurableSDK/OrchestrationContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,16 @@ public class OrchestrationContext

internal OrchestrationActionCollector OrchestrationActionCollector { get; } = new OrchestrationActionCollector();

internal object ExternalResult;
internal bool ExternalIsError;
internal object ExternalSDKResult;

internal bool ExternalSDKIsError;

// Called by the External DF SDK to communicate its orchestration result
// back to the worker.
internal void SetExternalResult(object result, bool isError)
{
this.ExternalResult = result;
this.ExternalIsError = isError;
this.ExternalSDKResult = result;
this.ExternalSDKIsError = isError;
}

internal object CustomStatus { get; set; }
Expand Down
123 changes: 69 additions & 54 deletions src/DurableSDK/OrchestrationInvoker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,80 +11,95 @@ namespace Microsoft.Azure.Functions.PowerShellWorker.Durable
using System.Linq;
using System.Management.Automation;

// using PowerShellWorker.Utility;
using Microsoft.Azure.Functions.PowerShellWorker.Durable.Actions;

internal class OrchestrationInvoker : IOrchestrationInvoker
{
private Action<PowerShell> externalInvoker = null;
private IExternalInvoker _externalInvoker;

public Hashtable Invoke(OrchestrationBindingInfo orchestrationBindingInfo, IPowerShellServices pwsh)
public Hashtable Invoke(
OrchestrationBindingInfo orchestrationBindingInfo,
IPowerShellServices powerShellServices)
{

try
{
if (pwsh.UseExternalDurableSDK())
if (powerShellServices.UseExternalDurableSDK())
{
externalInvoker.Invoke(pwsh.GetPowerShell());
var result = orchestrationBindingInfo.Context.ExternalResult;
var isError = orchestrationBindingInfo.Context.ExternalIsError;
if (isError)
{
throw (Exception)result;
}
else
{
return (Hashtable)result;
}
return InvokeExternalDurableSDK(orchestrationBindingInfo, powerShellServices);
}
return InvokeInternalDurableSDK(orchestrationBindingInfo, powerShellServices);
}
finally
{
powerShellServices.ClearStreamsAndCommands();
}
}

var outputBuffer = new PSDataCollection<object>();
var context = orchestrationBindingInfo.Context;
public Hashtable InvokeExternalDurableSDK(
OrchestrationBindingInfo orchestrationBindingInfo,
IPowerShellServices powerShellServices)
{

// context.History should never be null when initializing CurrentUtcDateTime
var orchestrationStart = context.History.First(
e => e.EventType == HistoryEventType.OrchestratorStarted);
context.CurrentUtcDateTime = orchestrationStart.Timestamp.ToUniversalTime();
_externalInvoker.Invoke();
var result = orchestrationBindingInfo.Context.ExternalSDKResult;
var isError = orchestrationBindingInfo.Context.ExternalSDKIsError;
if (isError)
{
throw (Exception)result;
}
else
{
return (Hashtable)result;
}
}

public Hashtable InvokeInternalDurableSDK(
OrchestrationBindingInfo orchestrationBindingInfo,
IPowerShellServices powerShellServices)
{
var outputBuffer = new PSDataCollection<object>();
var context = orchestrationBindingInfo.Context;

// Marks the first OrchestratorStarted event as processed
orchestrationStart.IsProcessed = true;
// context.History should never be null when initializing CurrentUtcDateTime
var orchestrationStart = context.History.First(
e => e.EventType == HistoryEventType.OrchestratorStarted);
context.CurrentUtcDateTime = orchestrationStart.Timestamp.ToUniversalTime();

// Finish initializing the Function invocation
pwsh.AddParameter(orchestrationBindingInfo.ParameterName, context);
pwsh.TracePipelineObject();
// Marks the first OrchestratorStarted event as processed
orchestrationStart.IsProcessed = true;

var asyncResult = pwsh.BeginInvoke(outputBuffer);
// Finish initializing the Function invocation
powerShellServices.AddParameter(orchestrationBindingInfo.ParameterName, context);
powerShellServices.TracePipelineObject();

var (shouldStop, actions) =
orchestrationBindingInfo.Context.OrchestrationActionCollector.WaitForActions(asyncResult.AsyncWaitHandle);
var asyncResult = powerShellServices.BeginInvoke(outputBuffer);

if (shouldStop)
var (shouldStop, actions) =
orchestrationBindingInfo.Context.OrchestrationActionCollector.WaitForActions(asyncResult.AsyncWaitHandle);

if (shouldStop)
{
// The orchestration function should be stopped and restarted
powerShellServices.StopInvoke();
// return (Hashtable)orchestrationBindingInfo.Context.OrchestrationActionCollector.output;
return CreateOrchestrationResult(isDone: false, actions, output: null, context.CustomStatus);
}
else
{
try
{
// The orchestration function should be stopped and restarted
pwsh.StopInvoke();
return CreateOrchestrationResult(isDone: false, actions, output: null, context.CustomStatus);
// The orchestration function completed
powerShellServices.EndInvoke(asyncResult);
var result = CreateReturnValueFromFunctionOutput(outputBuffer);
return CreateOrchestrationResult(isDone: true, actions, output: result, context.CustomStatus);
}
else
catch (Exception e)
{
try
{
// The orchestration function completed
pwsh.EndInvoke(asyncResult);
var result = CreateReturnValueFromFunctionOutput(outputBuffer);
return CreateOrchestrationResult(isDone: true, actions, output: result, context.CustomStatus);
}
catch (Exception e)
{
// The orchestrator code has thrown an unhandled exception:
// this should be treated as an entire orchestration failure
throw new OrchestrationFailureException(actions, context.CustomStatus, e);
}
// The orchestrator code has thrown an unhandled exception:
// this should be treated as an entire orchestration failure
throw new OrchestrationFailureException(actions, context.CustomStatus, e);
}
}
finally
{
pwsh.ClearStreamsAndCommands();
}
}

public static object CreateReturnValueFromFunctionOutput(IList<object> pipelineItems)
Expand All @@ -107,9 +122,9 @@ private static Hashtable CreateOrchestrationResult(
return new Hashtable { { "$return", orchestrationMessage } };
}

public void SetExternalInvoker(Action<PowerShell> externalInvoker)
public void SetExternalInvoker(IExternalInvoker externalInvoker)
{
this.externalInvoker = externalInvoker;
_externalInvoker = externalInvoker;
}
}
}
37 changes: 24 additions & 13 deletions src/DurableSDK/PowerShellServices.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@ internal class PowerShellServices : IPowerShellServices

public PowerShellServices(PowerShell pwsh)
{
// Attempt to import the external SDK
// We attempt to import the external SDK upon construction of the PowerShellServices object.
// We maintain the boolean member _useExternalDurableSDK in this object rather than
// DurableController because the expected input and functionality of SetFunctionInvocationContextCommand
// may differ between the internal and external implementations.
try
{
pwsh.AddCommand(Utils.ImportModuleCmdletInfo)
Expand All @@ -46,7 +49,7 @@ public PowerShellServices(PowerShell pwsh)
if (availableModules.Count() > 0)
{
// TODO: evaluate if there is a better error message or exception type to be throwing.
// Ideally, this should never happen
// Ideally, this should never happen.
throw new InvalidOperationException("The external Durable SDK was detected, but unable to be imported.", e);
}
_useExternalDurableSDK = false;
Expand Down Expand Up @@ -78,38 +81,46 @@ public void SetDurableClient(object durableClient)
_pwsh.AddCommand(SetFunctionInvocationContextCommand)
.AddParameter("DurableClient", durableClient)
.InvokeAndClearCommands();

// TODO: is _hasSetOrchestrationContext properly named?
_hasSetOrchestrationContext = true;
}

public OrchestrationBindingInfo SetOrchestrationContext(ParameterBinding context, out Action<object> externalInvoker)
public OrchestrationBindingInfo SetOrchestrationContext(
ParameterBinding context,
out IExternalInvoker externalInvoker)
{
externalInvoker = null;
var orchBindingInfo = new OrchestrationBindingInfo(
OrchestrationBindingInfo orchestrationBindingInfo = new OrchestrationBindingInfo(
context.Name,
JsonConvert.DeserializeObject<OrchestrationContext>(context.Data.String));

if (_useExternalDurableSDK)
{
Collection<Action<object>> output = _pwsh.AddCommand(SetFunctionInvocationContextCommand)
Collection<Func<PowerShell, object>> output = _pwsh.AddCommand(SetFunctionInvocationContextCommand)
// The external SetFunctionInvocationContextCommand expects a .json string to deserialize
// and writes an invoker function to the output pipeline.
.AddParameter("OrchestrationContext", context.Data.String)
.AddParameter("SetResult", (Action<object, bool>) orchBindingInfo.Context.SetExternalResult)
.InvokeAndClearCommands<Action<object>>();
.AddParameter("SetResult", (Action<object, bool>) orchestrationBindingInfo.Context.SetExternalResult)
.InvokeAndClearCommands<Func<PowerShell, object>>();
if (output.Count() == 1)
{
externalInvoker = output[0];
externalInvoker = new ExternalInvoker(output[0], this);
}
else
{
throw new InvalidOperationException($"Only a single output was expected for an invocation of {SetFunctionInvocationContextCommand}");
}
}
else
{
_pwsh.AddCommand(SetFunctionInvocationContextCommand)
.AddParameter("OrchestrationContext", orchBindingInfo.Context)
.InvokeAndClearCommands<Action<object>>();
.AddParameter("OrchestrationContext", orchestrationBindingInfo.Context)
.InvokeAndClearCommands();
}

_hasSetOrchestrationContext = true;
return orchBindingInfo;
return orchestrationBindingInfo;
}


public void AddParameter(string name, object value)
{
Expand Down
10 changes: 4 additions & 6 deletions src/DurableWorker/DurableController.cs
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,10 @@ public void InitializeBindings(IList<ParameterBinding> inputData)
}
else if (_durableFunctionInfo.IsOrchestrationFunction)
{
var contextBindingData = inputData[0];
_orchestrationBindingInfo = _powerShellServices.SetOrchestrationContext(contextBindingData, out var externalInvoker);
if (externalInvoker != null)
{
this._orchestrationInvoker.SetExternalInvoker(externalInvoker);
}
_orchestrationBindingInfo = _powerShellServices.SetOrchestrationContext(
inputData[0],
out IExternalInvoker externalInvoker);
_orchestrationInvoker.SetExternalInvoker(externalInvoker);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ Set-Alias -Name Start-NewOrchestration -Value Start-DurableOrchestration

function GetDurableClientFromModulePrivateData {
$PrivateData = $PSCmdlet.MyInvocation.MyCommand.Module.PrivateData
if ($PrivateData -eq $null -or $PrivateData['DurableClient'] -eq $null) {
if ($null -eq $PrivateData -or $null -eq $PrivateData['DurableClient']) {
throw "No binding of the type 'durableClient' was defined."
}
else {
Expand Down
Loading