Skip to content

[WIP] feat(workflow): Add support for failing workflow execution on some error types #1732

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 23 additions & 1 deletion packages/core-bridge/src/helpers/try_from_js.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
use std::{collections::HashMap, net::SocketAddr, time::Duration};
use std::{
collections::{HashMap, HashSet},
net::SocketAddr,
time::Duration,
};

use neon::{
handle::Handle,
Expand Down Expand Up @@ -163,6 +167,24 @@ impl<T: TryFromJs> TryFromJs for Vec<T> {
}
}

#[allow(clippy::implicit_hasher)]
impl<T: TryFromJs + std::hash::Hash + Eq> TryFromJs for HashSet<T> {
fn try_from_js<'cx, 'b>(
cx: &mut impl Context<'cx>,
js_value: Handle<'b, JsValue>,
) -> BridgeResult<Self> {
let array = js_value.downcast::<JsArray, _>(cx)?;
let len = array.len(cx);
let mut result = Self::with_capacity(len as usize);

for i in 0..len {
let value = array.get_value(cx, i)?;
result.insert(T::try_from_js(cx, value)?);
}
Ok(result)
}
}

#[allow(clippy::implicit_hasher)]
impl<T: TryFromJs> TryFromJs for HashMap<String, T> {
fn try_from_js<'cx, 'b>(
Expand Down
52 changes: 46 additions & 6 deletions packages/core-bridge/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -404,16 +404,23 @@ impl MutableFinalize for HistoryForReplayTunnelHandle {}
////////////////////////////////////////////////////////////////////////////////////////////////////

mod config {
use std::{sync::Arc, time::Duration};
use std::{
collections::{HashMap, HashSet},
sync::Arc,
time::Duration,
};

use temporal_sdk_core::{
ResourceBasedSlotsOptions, ResourceBasedSlotsOptionsBuilder, ResourceSlotOptions,
SlotSupplierOptions as CoreSlotSupplierOptions, TunerHolder, TunerHolderOptionsBuilder,
api::worker::{
ActivitySlotKind, LocalActivitySlotKind, PollerBehavior as CorePollerBehavior,
SlotKind, WorkerConfig, WorkerConfigBuilder, WorkerConfigBuilderError,
WorkerDeploymentOptions as CoreWorkerDeploymentOptions,
WorkerDeploymentVersion as CoreWorkerDeploymentVersion, WorkflowSlotKind,
api::{
errors::WorkflowErrorType as CoreWorkflowErrorType,
worker::{
ActivitySlotKind, LocalActivitySlotKind, PollerBehavior as CorePollerBehavior,
SlotKind, WorkerConfig, WorkerConfigBuilder, WorkerConfigBuilderError,
WorkerDeploymentOptions as CoreWorkerDeploymentOptions,
WorkerDeploymentVersion as CoreWorkerDeploymentVersion, WorkflowSlotKind,
},
},
protos::temporal::api::enums::v1::VersioningBehavior as CoreVersioningBehavior,
};
Expand Down Expand Up @@ -447,6 +454,8 @@ mod config {
max_activities_per_second: Option<f64>,
max_task_queue_activities_per_second: Option<f64>,
shutdown_grace_time: Option<Duration>,
workflow_failure_errors: HashSet<WorkflowErrorType>,
workflow_types_to_failure_errors: HashMap<String, HashSet<WorkflowErrorType>>,
}

#[derive(TryFromJs)]
Expand Down Expand Up @@ -513,6 +522,10 @@ mod config {
.max_task_queue_activities_per_second(self.max_task_queue_activities_per_second)
.max_worker_activities_per_second(self.max_activities_per_second)
.graceful_shutdown_period(self.shutdown_grace_time)
.workflow_failure_errors(into_core_workflow_error_set(self.workflow_failure_errors))
.workflow_types_to_failure_errors(into_core_workflow_error_map_of_sets(
self.workflow_types_to_failure_errors,
))
.build()
}
}
Expand Down Expand Up @@ -584,6 +597,33 @@ mod config {
}
}

#[derive(TryFromJs, Hash, Eq, PartialEq)]
pub enum WorkflowErrorType {
Nondeterminism,
}

impl From<WorkflowErrorType> for CoreWorkflowErrorType {
fn from(val: WorkflowErrorType) -> Self {
match val {
WorkflowErrorType::Nondeterminism => Self::Nondeterminism,
}
}
}

fn into_core_workflow_error_set(
val: HashSet<WorkflowErrorType>,
) -> HashSet<CoreWorkflowErrorType> {
val.into_iter().map(Into::into).collect()
}

fn into_core_workflow_error_map_of_sets(
val: HashMap<String, HashSet<WorkflowErrorType>>,
) -> HashMap<String, HashSet<CoreWorkflowErrorType>> {
val.into_iter()
.map(|(k, v)| (k, into_core_workflow_error_set(v)))
.collect()
}

#[derive(TryFromJs)]
#[allow(clippy::struct_field_names)]
pub(super) struct WorkerTuner {
Expand Down
4 changes: 4 additions & 0 deletions packages/core-bridge/ts/native.ts
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,8 @@ export interface WorkerOptions {
maxTaskQueueActivitiesPerSecond: Option<number>;
maxActivitiesPerSecond: Option<number>;
shutdownGraceTime: number;
workflowFailureErrors: WorkflowErrorType[];
workflowTypesToFailureErrors: Record<string, WorkflowErrorType[]>;
}

export type PollerBehavior =
Expand Down Expand Up @@ -227,6 +229,8 @@ export type WorkerDeploymentVersion = {

export type VersioningBehavior = { type: 'pinned' } | { type: 'auto-upgrade' };

export type WorkflowErrorType = { type: 'nondeterminism' };

////////////////////////////////////////////////////////////////////////////////////////////////////
// Worker Tuner
////////////////////////////////////////////////////////////////////////////////////////////////////
Expand Down
39 changes: 38 additions & 1 deletion packages/worker/src/worker-options.ts
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,23 @@ export interface WorkerOptions {
*/
sinks?: InjectedSinks<any>;

/**
* The types of exceptions that, if a Workflow-thrown error extends, will cause the Workflow
* Execution or the Update to fail instead of suspending the Workflow via task failure.
*
* This property expects a record of Workflow-type names to the list of error types that will
* cause that type of Workflow to fail. Uses the `'*'` key to specify a list of error types that
* applies to all Workflow types.
*
* If either list of error types includes `NondeterminismError`, then non-determinism errors
* will cause the Workflow Excution to fail. If the list of error types includes `Error`, it
* effectively will fail a workflow/update in all user exception cases, including non-determinism
* errors.
*
* @experimental
*/
workflowTypesToFailureErrors?: Record<'*' | string, (string | 'NondeterminismError' | 'Error')[]>;

/**
* @deprecated SDK tracing is no longer supported. This option is ignored.
*/
Expand Down Expand Up @@ -972,6 +989,24 @@ export function compileWorkerOptions(
}

export function toNativeWorkerOptions(opts: CompiledWorkerOptionsWithBuildId): native.WorkerOptions {
const workflowFailureErrors: native.WorkflowErrorType[] = [];
const workflowTypesToFailureErrors: Record<string, native.WorkflowErrorType[]> = {};

for (const [k, v] of Object.entries(opts.workflowTypesToFailureErrors ?? {})) {
const errorTypes: native.WorkflowErrorType[] = [];

// Core only cares about Non-Determinism Error; other error types are handled by lang side
if (v.includes('NondeterminismError') || v.includes('Error')) {
errorTypes.push({ type: 'nondeterminism' });
}

if (k === '*') {
workflowFailureErrors.push(...errorTypes);
} else {
workflowTypesToFailureErrors[k] = errorTypes;
}
}

return {
identity: opts.identity,
buildId: opts.buildId, // eslint-disable-line deprecation/deprecation
Expand All @@ -983,14 +1018,16 @@ export function toNativeWorkerOptions(opts: CompiledWorkerOptionsWithBuildId): n
nonStickyToStickyPollRatio: opts.nonStickyToStickyPollRatio,
workflowTaskPollerBehavior: toNativeTaskPollerBehavior(opts.workflowTaskPollerBehavior),
activityTaskPollerBehavior: toNativeTaskPollerBehavior(opts.activityTaskPollerBehavior),
enableNonLocalActivities: opts.enableNonLocalActivities,
enableNonLocalActivities: opts.enableNonLocalActivities && opts.activities.size > 0,
stickyQueueScheduleToStartTimeout: msToNumber(opts.stickyQueueScheduleToStartTimeout),
maxCachedWorkflows: opts.maxCachedWorkflows,
maxHeartbeatThrottleInterval: msToNumber(opts.maxHeartbeatThrottleInterval),
defaultHeartbeatThrottleInterval: msToNumber(opts.defaultHeartbeatThrottleInterval),
maxTaskQueueActivitiesPerSecond: opts.maxTaskQueueActivitiesPerSecond ?? null,
maxActivitiesPerSecond: opts.maxActivitiesPerSecond ?? null,
shutdownGraceTime: msToNumber(opts.shutdownGraceTime),
workflowFailureErrors,
workflowTypesToFailureErrors,
};
}

Expand Down
Loading