diff --git a/package-lock.json b/package-lock.json index f0222b214..81b98007f 100644 --- a/package-lock.json +++ b/package-lock.json @@ -27,12 +27,14 @@ "@temporalio/common": "file:packages/common", "@temporalio/create": "file:packages/create-project", "@temporalio/interceptors-opentelemetry": "file:packages/interceptors-opentelemetry", + "@temporalio/nexus": "file:packages/nexus", "@temporalio/nyc-test-coverage": "file:packages/nyc-test-coverage", "@temporalio/proto": "file:packages/proto", "@temporalio/test": "file:packages/test", "@temporalio/testing": "file:packages/testing", "@temporalio/worker": "file:packages/worker", "@temporalio/workflow": "file:packages/workflow", + "nexus-rpc": "github:nexus-rpc/sdk-typescript#pull/6/head", "temporalio": "file:packages/meta" }, "devDependencies": { @@ -2746,6 +2748,10 @@ "resolved": "packages/interceptors-opentelemetry", "link": true }, + "node_modules/@temporalio/nexus": { + "resolved": "packages/nexus", + "link": true + }, "node_modules/@temporalio/nyc-test-coverage": { "resolved": "packages/nyc-test-coverage", "link": true @@ -8745,7 +8751,6 @@ "version": "5.1.1", "resolved": "https://registry.npmjs.org/get-port/-/get-port-5.1.1.tgz", "integrity": "sha512-g/Q1aTSDOxFpchXC4i8ZWvxA1lnPqx/JHqcpIw0/LX9T8x/GBbi6YnlN5nhaKIFkT8oFsscUKgDJYxfwfS6QsQ==", - "dev": true, "engines": { "node": ">=8" }, @@ -12364,6 +12369,14 @@ "resolved": "https://registry.npmjs.org/neo-async/-/neo-async-2.6.2.tgz", "integrity": "sha512-Yd3UES5mWCSqR+qNT93S3UoYUkqAZ9lLg8a7g9rimsWmYGK8cVToA4/sF3RrshdyV3sAGMXVUmpMYOw+dLpOuw==" }, + "node_modules/nexus-rpc": { + "version": "0.0.1", + "resolved": "git+ssh://git@github.com/nexus-rpc/sdk-typescript.git#22cc9ba950879a0b523dab2c86b12b303efcd30e", + "license": "MIT", + "engines": { + "node": ">= 18.0.0" + } + }, "node_modules/nice-try": { "version": "1.0.5", "resolved": "https://registry.npmjs.org/nice-try/-/nice-try-1.0.5.tgz", @@ -19169,6 +19182,7 @@ "@temporalio/proto": "file:../proto", "long": "^5.2.3", "ms": "^3.0.0-canary.1", + "nexus-rpc": "github:nexus-rpc/sdk-typescript#pull/6/head", "proto3-json-serializer": "^2.0.0" }, "devDependencies": { @@ -19317,6 +19331,19 @@ "@temporalio/workflow": "file:../workflow" } }, + "packages/nexus": { + "name": "@temporalio/nexus", + "version": "1.11.5", + "license": "MIT", + "dependencies": { + "@temporalio/common": "file:../common", + "long": "^5.2.3", + "nexus-rpc": "^0.0.1" + }, + "engines": { + "node": ">= 18.0.0" + } + }, "packages/nyc-test-coverage": { "name": "@temporalio/nyc-test-coverage", "version": "1.12.0-rc.0", @@ -19391,10 +19418,13 @@ "async-retry": "^1.3.3", "ava": "^5.3.1", "dedent": "^1.5.1", + "get-port": "^5.1.1", "glob": "^10.3.10", "istanbul-lib-coverage": "^3.2.2", "long": "^5.2.3", + "nexus-rpc": "github:nexus-rpc/sdk-typescript#pull/6/head", "node-fetch": "^2.7.0", + "proto3-json-serializer": "^2.0.0", "protobufjs": "^7.2.5", "protobufjs-cli": "^1.1.2", "rxjs": "7.8.1", @@ -19462,11 +19492,15 @@ "@temporalio/client": "file:../client", "@temporalio/common": "file:../common", "@temporalio/core-bridge": "file:../core-bridge", + "@temporalio/nexus": "file:../nexus", "@temporalio/proto": "file:../proto", "@temporalio/workflow": "file:../workflow", "abort-controller": "^3.0.0", "heap-js": "^2.3.0", + "long": "^5.2.3", "memfs": "^4.6.0", + "nexus-rpc": "github:nexus-rpc/sdk-typescript#pull/6/head", + "proto3-json-serializer": "^2.0.0", "protobufjs": "^7.2.5", "rxjs": "^7.8.1", "source-map": "^0.7.4", @@ -19497,7 +19531,8 @@ "license": "MIT", "dependencies": { "@temporalio/common": "file:../common", - "@temporalio/proto": "file:../proto" + "@temporalio/proto": "file:../proto", + "nexus-rpc": "github:nexus-rpc/sdk-typescript#pull/6/head" }, "devDependencies": { "source-map": "^0.7.4" @@ -21430,6 +21465,7 @@ "@temporalio/proto": "file:../proto", "long": "^5.2.3", "ms": "^3.0.0-canary.1", + "nexus-rpc": "github:nexus-rpc/sdk-typescript#pull/6/head", "proto3-json-serializer": "^2.0.0", "protobufjs": "^7.2.5" }, @@ -21503,6 +21539,14 @@ "@temporalio/workflow": "file:../workflow" } }, + "@temporalio/nexus": { + "version": "file:packages/nexus", + "requires": { + "@temporalio/common": "file:../common", + "long": "^5.2.3", + "nexus-rpc": "^0.0.1" + } + }, "@temporalio/nyc-test-coverage": { "version": "file:packages/nyc-test-coverage", "requires": { @@ -21563,12 +21607,15 @@ "ava": "^5.3.1", "dedent": "^1.5.1", "fs-extra": "^11.2.0", + "get-port": "^5.1.1", "glob": "^10.3.10", "istanbul-lib-coverage": "^3.2.2", "long": "^5.2.3", + "nexus-rpc": "github:nexus-rpc/sdk-typescript#pull/6/head", "node-fetch": "^2.7.0", "npm-run-all": "^4.1.5", "pidusage": "^3.0.2", + "proto3-json-serializer": "^2.0.0", "protobufjs": "^7.2.5", "protobufjs-cli": "^1.1.2", "rxjs": "7.8.1", @@ -21608,12 +21655,16 @@ "@temporalio/client": "file:../client", "@temporalio/common": "file:../common", "@temporalio/core-bridge": "file:../core-bridge", + "@temporalio/nexus": "file:../nexus", "@temporalio/proto": "file:../proto", "@temporalio/workflow": "file:../workflow", "@types/supports-color": "^8.1.3", "abort-controller": "^3.0.0", "heap-js": "^2.3.0", + "long": "^5.2.3", "memfs": "^4.6.0", + "nexus-rpc": "github:nexus-rpc/sdk-typescript#pull/6/head", + "proto3-json-serializer": "^2.0.0", "protobufjs": "^7.2.5", "rxjs": "^7.8.1", "source-map": "^0.7.4", @@ -21636,6 +21687,7 @@ "requires": { "@temporalio/common": "file:../common", "@temporalio/proto": "file:../proto", + "nexus-rpc": "github:nexus-rpc/sdk-typescript#pull/6/head", "source-map": "^0.7.4" }, "dependencies": { @@ -26000,8 +26052,7 @@ "get-port": { "version": "5.1.1", "resolved": "https://registry.npmjs.org/get-port/-/get-port-5.1.1.tgz", - "integrity": "sha512-g/Q1aTSDOxFpchXC4i8ZWvxA1lnPqx/JHqcpIw0/LX9T8x/GBbi6YnlN5nhaKIFkT8oFsscUKgDJYxfwfS6QsQ==", - "dev": true + "integrity": "sha512-g/Q1aTSDOxFpchXC4i8ZWvxA1lnPqx/JHqcpIw0/LX9T8x/GBbi6YnlN5nhaKIFkT8oFsscUKgDJYxfwfS6QsQ==" }, "get-stream": { "version": "6.0.1", @@ -28676,6 +28727,10 @@ "resolved": "https://registry.npmjs.org/neo-async/-/neo-async-2.6.2.tgz", "integrity": "sha512-Yd3UES5mWCSqR+qNT93S3UoYUkqAZ9lLg8a7g9rimsWmYGK8cVToA4/sF3RrshdyV3sAGMXVUmpMYOw+dLpOuw==" }, + "nexus-rpc": { + "version": "git+ssh://git@github.com/nexus-rpc/sdk-typescript.git#311444405bc6f0158b2db7afb310231eb2c17333", + "from": "nexus-rpc@github:nexus-rpc/sdk-typescript#pull/6/head" + }, "nice-try": { "version": "1.0.5", "resolved": "https://registry.npmjs.org/nice-try/-/nice-try-1.0.5.tgz", diff --git a/package.json b/package.json index 4d6fa627a..1daa0ced2 100644 --- a/package.json +++ b/package.json @@ -43,12 +43,14 @@ "@temporalio/common": "file:packages/common", "@temporalio/create": "file:packages/create-project", "@temporalio/interceptors-opentelemetry": "file:packages/interceptors-opentelemetry", + "@temporalio/nexus": "file:packages/nexus", "@temporalio/nyc-test-coverage": "file:packages/nyc-test-coverage", "@temporalio/proto": "file:packages/proto", "@temporalio/test": "file:packages/test", "@temporalio/testing": "file:packages/testing", "@temporalio/worker": "file:packages/worker", "@temporalio/workflow": "file:packages/workflow", + "nexus-rpc": "github:nexus-rpc/sdk-typescript#pull/6/head", "temporalio": "file:packages/meta" }, "devDependencies": { @@ -86,6 +88,7 @@ "packages/core-bridge", "packages/create-project", "packages/interceptors-opentelemetry", + "packages/nexus", "packages/nyc-test-coverage", "packages/proto", "packages/test", diff --git a/packages/client/src/internal.ts b/packages/client/src/internal.ts new file mode 100644 index 000000000..2d4924d1d --- /dev/null +++ b/packages/client/src/internal.ts @@ -0,0 +1,28 @@ +import { temporal } from '@temporalio/proto'; + +// A key used internally to pass "hidden options to the WorkflowClient.start() call. +export const InternalWorkflowStartOptionsKey = Symbol.for('__temporal_client_internal_workflow_start_options'); + +// Hidden internal workflow start options, used by the temporal nexus helpers. +export interface InternalWorkflowStartOptions { + requestId?: string; + /** + * Callbacks to be called by the server when this workflow reaches a terminal state. + * If the workflow continues-as-new, these callbacks will be carried over to the new execution. + * Callback addresses must be whitelisted in the server's dynamic configuration. + */ + completionCallbacks?: temporal.api.common.v1.ICallback[]; + /** Links to be associated with the workflow. */ + links?: temporal.api.common.v1.ILink[]; + /** + * Backlink copied by the client from the StartWorkflowExecutionResponse. Only populated in servers newer than 1.27. + */ + backLink?: temporal.api.common.v1.ILink; + + /** + * Conflict options for when USE_EXISTING is specified. + * + * Used by the nexus WorkflowRunOperations to attach to a callback to a running workflow. + */ + onConflictOptions?: temporal.api.workflow.v1.IOnConflictOptions; +} diff --git a/packages/client/src/workflow-client.ts b/packages/client/src/workflow-client.ts index 42899800d..e43828aa0 100644 --- a/packages/client/src/workflow-client.ts +++ b/packages/client/src/workflow-client.ts @@ -91,6 +91,7 @@ import { } from './base-client'; import { mapAsyncIterable } from './iterators-utils'; import { WorkflowUpdateStage, encodeWorkflowUpdateStage } from './workflow-update-stage'; +import { InternalWorkflowStartOptions, InternalWorkflowStartOptionsKey } from './internal'; const UpdateWorkflowExecutionLifecycleStage = temporal.api.enums.v1.UpdateWorkflowExecutionLifecycleStage; @@ -1252,7 +1253,14 @@ export class WorkflowClient extends BaseClient { const req = await this.createStartWorkflowRequest(input); const { options: opts, workflowType } = input; try { - return (await this.workflowService.startWorkflowExecution(req)).runId; + const response = await this.workflowService.startWorkflowExecution(req); + const internalOptions = (opts as any)[InternalWorkflowStartOptionsKey] as + | InternalWorkflowStartOptions + | undefined; + if (internalOptions != null) { + internalOptions.backLink = response.link ?? undefined; + } + return response.runId; } catch (err: any) { if (err.code === grpcStatus.ALREADY_EXISTS) { throw new WorkflowExecutionAlreadyStartedError( @@ -1268,11 +1276,12 @@ export class WorkflowClient extends BaseClient { protected async createStartWorkflowRequest(input: WorkflowStartInput): Promise { const { options: opts, workflowType, headers } = input; const { identity, namespace } = this.options; + const internalOptions = (opts as any)[InternalWorkflowStartOptionsKey] as InternalWorkflowStartOptions | undefined; return { namespace, identity, - requestId: uuid4(), + requestId: internalOptions?.requestId ?? uuid4(), workflowId: opts.workflowId, workflowIdReusePolicy: encodeWorkflowIdReusePolicy(opts.workflowIdReusePolicy), workflowIdConflictPolicy: encodeWorkflowIdConflictPolicy(opts.workflowIdConflictPolicy), @@ -1298,6 +1307,7 @@ export class WorkflowClient extends BaseClient { header: { fields: headers }, priority: opts.priority ? compilePriority(opts.priority) : undefined, versioningOverride: opts.versioningOverride ?? undefined, + ...internalOptions, }; } diff --git a/packages/common/package.json b/packages/common/package.json index c170d04a2..ca0f01ffe 100644 --- a/packages/common/package.json +++ b/packages/common/package.json @@ -15,6 +15,7 @@ "@temporalio/proto": "file:../proto", "long": "^5.2.3", "ms": "^3.0.0-canary.1", + "nexus-rpc": "github:nexus-rpc/sdk-typescript#pull/6/head", "proto3-json-serializer": "^2.0.0" }, "devDependencies": { diff --git a/packages/common/src/converter/failure-converter.ts b/packages/common/src/converter/failure-converter.ts index fe1837054..58a2ba82e 100644 --- a/packages/common/src/converter/failure-converter.ts +++ b/packages/common/src/converter/failure-converter.ts @@ -1,3 +1,6 @@ +import * as nexus from 'nexus-rpc'; +import Long from 'long'; +import type { temporal } from '@temporalio/proto'; import { ActivityFailure, ApplicationFailure, @@ -10,16 +13,42 @@ import { encodeRetryState, encodeTimeoutType, FAILURE_SOURCE, + NexusOperationFailure, ProtoFailure, ServerFailure, TemporalFailure, TerminatedFailure, TimeoutFailure, } from '../failure'; +import { makeProtoEnumConverters } from '../internal-workflow'; import { isError } from '../type-helpers'; import { msOptionalToTs } from '../time'; import { arrayFromPayloads, fromPayloadsAtIndex, PayloadConverter, toPayloads } from './payload-converter'; +// Can't import enums into the workflow sandbox, use this helper type and enum converter instead. +const NexusHandlerErrorRetryBehavior = { + RETRYABLE: 'RETRYABLE', + NON_RETRYABLE: 'NON_RETRYABLE', +} as const; + +type NexusHandlerErrorRetryBehavior = + (typeof NexusHandlerErrorRetryBehavior)[keyof typeof NexusHandlerErrorRetryBehavior]; + +const [encodeNexusHandlerErrorRetryBehavior, decodeNexusHandlerErrorRetryBehavior] = makeProtoEnumConverters< + temporal.api.enums.v1.NexusHandlerErrorRetryBehavior, + typeof temporal.api.enums.v1.NexusHandlerErrorRetryBehavior, + keyof typeof temporal.api.enums.v1.NexusHandlerErrorRetryBehavior, + typeof NexusHandlerErrorRetryBehavior, + 'NEXUS_HANDLER_ERROR_RETRY_BEHAVIOR_' +>( + { + UNSPECIFIED: 0, + [NexusHandlerErrorRetryBehavior.RETRYABLE]: 1, + [NexusHandlerErrorRetryBehavior.NON_RETRYABLE]: 2, + } as const, + 'NEXUS_HANDLER_ERROR_RETRY_BEHAVIOR_' +); + function combineRegExp(...regexps: RegExp[]): RegExp { return new RegExp(regexps.map((x) => `(?:${x.source})`).join('|')); } @@ -30,6 +59,8 @@ function combineRegExp(...regexps: RegExp[]): RegExp { const CUTOFF_STACK_PATTERNS = combineRegExp( /** Activity execution */ /\s+at Activity\.execute \(.*[\\/]worker[\\/](?:src|lib)[\\/]activity\.[jt]s:\d+:\d+\)/, + /** Nexus execution */ + /\s+at( async)? NexusHandler\.invokeUserCode \(.*[\\/]worker[\\/](?:src|lib)[\\/]nexus[\\/]index\.[jt]s:\d+:\d+\)/, /** Workflow activation */ /\s+at Activator\.\S+NextHandler \(.*[\\/]workflow[\\/](?:src|lib)[\\/]internals\.[jt]s:\d+:\d+\)/, /** Workflow run anything in context */ @@ -122,7 +153,7 @@ export class DefaultFailureConverter implements FailureConverter { * * Does not set common properties, that is done in {@link failureToError}. */ - failureToErrorInner(failure: ProtoFailure, payloadConverter: PayloadConverter): TemporalFailure { + failureToErrorInner(failure: ProtoFailure, payloadConverter: PayloadConverter): Error { if (failure.applicationFailureInfo) { return new ApplicationFailure( failure.message ?? undefined, @@ -196,6 +227,38 @@ export class DefaultFailureConverter implements FailureConverter { this.optionalFailureToOptionalError(failure.cause, payloadConverter) ); } + if (failure.nexusHandlerFailureInfo) { + if (failure.cause == null) { + throw new TypeError('Missing failure cause on nexusHandlerFailureInfo'); + } + let retryable: boolean | undefined = undefined; + const retryBehavior = decodeNexusHandlerErrorRetryBehavior(failure.nexusHandlerFailureInfo.retryBehavior); + switch (retryBehavior) { + case 'RETRYABLE': + retryable = true; + break; + case 'NON_RETRYABLE': + retryable = false; + break; + } + + return new nexus.HandlerError({ + type: (failure.nexusHandlerFailureInfo.type as nexus.HandlerErrorType) ?? 'INTERNAL', + cause: this.failureToError(failure.cause, payloadConverter), + retryable, + }); + } + if (failure.nexusOperationExecutionFailureInfo) { + return new NexusOperationFailure( + failure.nexusOperationExecutionFailureInfo.scheduledEventId?.toNumber(), + // We assume these will always be set or gracefully set to empty strings. + failure.nexusOperationExecutionFailureInfo.endpoint ?? '', + failure.nexusOperationExecutionFailureInfo.service ?? '', + failure.nexusOperationExecutionFailureInfo.operation ?? '', + failure.nexusOperationExecutionFailureInfo.operationToken ?? undefined, + this.optionalFailureToOptionalError(failure.cause, payloadConverter) + ); + } return new TemporalFailure( failure.message ?? undefined, this.optionalFailureToOptionalError(failure.cause, payloadConverter) @@ -220,7 +283,9 @@ export class DefaultFailureConverter implements FailureConverter { } const err = this.failureToErrorInner(failure, payloadConverter); err.stack = failure.stackTrace ?? ''; - err.failure = failure; + if (err instanceof TemporalFailure) { + err.failure = failure; + } return err; } @@ -236,8 +301,8 @@ export class DefaultFailureConverter implements FailureConverter { } errorToFailureInner(err: unknown, payloadConverter: PayloadConverter): ProtoFailure { - if (err instanceof TemporalFailure) { - if (err.failure) return err.failure; + if (err instanceof TemporalFailure || err instanceof nexus.HandlerError) { + if (err instanceof TemporalFailure && err.failure) return err.failure; const base = { message: err.message, stackTrace: cutoffStackTrace(err.stack), @@ -315,6 +380,34 @@ export class DefaultFailureConverter implements FailureConverter { terminatedFailureInfo: {}, }; } + if (err instanceof nexus.HandlerError) { + let retryBehavior: temporal.api.enums.v1.NexusHandlerErrorRetryBehavior | undefined = undefined; + if (err.retryable === true) { + retryBehavior = encodeNexusHandlerErrorRetryBehavior('RETRYABLE'); + } else if (err.retryable === false) { + retryBehavior = encodeNexusHandlerErrorRetryBehavior('NON_RETRYABLE'); + } + + return { + ...base, + nexusHandlerFailureInfo: { + type: err.type, + retryBehavior, + }, + }; + } + if (err instanceof NexusOperationFailure) { + return { + ...base, + nexusOperationExecutionFailureInfo: { + scheduledEventId: err.scheduledEventId ? Long.fromNumber(err.scheduledEventId) : undefined, + endpoint: err.endpoint, + service: err.service, + operation: err.operation, + operationToken: err.operationToken, + }, + }; + } // Just a TemporalFailure return base; } diff --git a/packages/common/src/failure.ts b/packages/common/src/failure.ts index f5f0e48c2..6652aeb25 100644 --- a/packages/common/src/failure.ts +++ b/packages/common/src/failure.ts @@ -379,6 +379,20 @@ export class ChildWorkflowFailure extends TemporalFailure { } } +@SymbolBasedInstanceOfError('NexusOperationFailure') +export class NexusOperationFailure extends TemporalFailure { + public constructor( + public readonly scheduledEventId: number | undefined, + public readonly endpoint: string, + public readonly service: string, + public readonly operation: string, + public readonly operationToken: string | undefined, + cause?: Error + ) { + super('Nexus Operation completed unsuccessfully', cause); + } +} + /** * This exception is thrown in the following cases: * - Workflow with the same Workflow ID is currently running and the {@link WorkflowOptions.workflowIdConflictPolicy} is `WORKFLOW_ID_CONFLICT_POLICY_FAIL` diff --git a/packages/common/src/internal-non-workflow/codec-helpers.ts b/packages/common/src/internal-non-workflow/codec-helpers.ts index 57afc7ce8..93b9a1fbf 100644 --- a/packages/common/src/internal-non-workflow/codec-helpers.ts +++ b/packages/common/src/internal-non-workflow/codec-helpers.ts @@ -91,6 +91,17 @@ export async function decodeArrayFromPayloads( return arrayFromPayloads(payloadConverter, await decodeOptional(payloadCodecs, payloads)); } +/** + * Decode `payloads` and then return {@link arrayFromPayloads}`. + */ +export async function decodeFromPayload(converter: LoadedDataConverter, payloads?: Payload | null): Promise { + if (payloads == null) { + return undefined; + } + const { payloadConverter, payloadCodecs } = converter; + return payloadConverter.fromPayload(await decodeSingle(payloadCodecs, payloads)); +} + /** * Decode `payloads` and then return {@link fromPayloadsAtIndex}. */ diff --git a/packages/common/src/logger.ts b/packages/common/src/logger.ts index f1e1ff2bd..97481036d 100644 --- a/packages/common/src/logger.ts +++ b/packages/common/src/logger.ts @@ -38,6 +38,12 @@ export enum SdkComponent { */ activity = 'activity', + /** + * Component name for messages emited from a nexus operation handler, using the nexus context logger. + * The SDK itself never publishes messages with this component name. + */ + nexus = 'nexus', + /** * Component name for messages emited from a Temporal Worker instance. * diff --git a/packages/common/src/proto-utils.ts b/packages/common/src/proto-utils.ts index 83b7b394c..f54d1708d 100644 --- a/packages/common/src/proto-utils.ts +++ b/packages/common/src/proto-utils.ts @@ -113,29 +113,32 @@ export function historyFromJSON(history: unknown): History { * string that adheres to the same norm as JSON history files produced by other Temporal tools. */ export function historyToJSON(history: History): string { - // toProto3JSON doesn't correctly handle some of our "bytes" fields, passing them untouched to the - // output, after which JSON.stringify() would convert them to an array of numbers. As a workaround, - // recursively walk the object and convert all Buffer instances to base64 strings. Note this only - // works on proto3-json-serializer v2.0.0. v2.0.2 throws an error before we even get the chance - // to fix the buffers. See https://github.com/googleapis/proto3-json-serializer-nodejs/issues/103. - function fixBuffers(e: T): T { - if (e && typeof e === 'object') { - if (e instanceof Buffer) return e.toString('base64') as any; - if (Array.isArray(e)) return e.map(fixBuffers) as T; - return Object.fromEntries(Object.entries(e as object).map(([k, v]) => [k, fixBuffers(v)])) as T; - } - return e; - } - const protoJson = toProto3JSON(proto.temporal.api.history.v1.History.fromObject(history) as any); return JSON.stringify(fixBuffers(protoJson), null, 2); } +/** + * toProto3JSON doesn't correctly handle some of our "bytes" fields, passing them untouched to the + * output, after which JSON.stringify() would convert them to an array of numbers. As a workaround, + * recursively walk the object and convert all Buffer instances to base64 strings. Note this only + * works on proto3-json-serializer v2.0.0. v2.0.2 throws an error before we even get the chance + * to fix the buffers. See https://github.com/googleapis/proto3-json-serializer-nodejs/issues/103. + */ +export function fixBuffers(e: T): T { + if (e && typeof e === 'object') { + if (e instanceof Buffer) return e.toString('base64') as any; + if (e instanceof Uint8Array) return Buffer.from(e).toString('base64') as any; + if (Array.isArray(e)) return e.map(fixBuffers) as T; + return Object.fromEntries(Object.entries(e as object).map(([k, v]) => [k, fixBuffers(v)])) as T; + } + return e; +} + /** * Convert from protobuf payload to JSON */ export function payloadToJSON(payload: Payload): JSONPayload { - return toProto3JSON(patched.temporal.api.common.v1.Payload.create(payload)) as any; + return fixBuffers(toProto3JSON(patched.temporal.api.common.v1.Payload.create(payload))) as any; } /** diff --git a/packages/core-bridge/sdk-core b/packages/core-bridge/sdk-core index 1a3b41b3c..7f98ad85f 160000 --- a/packages/core-bridge/sdk-core +++ b/packages/core-bridge/sdk-core @@ -1 +1 @@ -Subproject commit 1a3b41b3c4ec5d800c84995748e9f16a67a9d4d9 +Subproject commit 7f98ad85f28617ad198dc02c102dcd2d3eafb8a7 diff --git a/packages/core-bridge/src/worker.rs b/packages/core-bridge/src/worker.rs index 872bfd60e..dc34a3823 100644 --- a/packages/core-bridge/src/worker.rs +++ b/packages/core-bridge/src/worker.rs @@ -10,12 +10,12 @@ use temporal_sdk_core::{ CoreRuntime, api::{ Worker as CoreWorkerTrait, - errors::{CompleteActivityError, CompleteWfError, PollError}, + errors::{CompleteActivityError, CompleteNexusError, CompleteWfError, PollError}, }, init_replay_worker, init_worker, protos::{ coresdk::{ - ActivityHeartbeat, ActivityTaskCompletion, + ActivityHeartbeat, ActivityTaskCompletion, nexus::NexusTaskCompletion, workflow_completion::WorkflowActivationCompletion, }, temporal::api::history::v1::History, @@ -52,6 +52,9 @@ pub fn init(cx: &mut ModuleContext) -> NeonResult<()> { worker_record_activity_heartbeat, )?; + cx.export_function("workerPollNexusTask", worker_poll_nexus_task)?; + cx.export_function("workerCompleteNexusTask", worker_complete_nexus_task)?; + cx.export_function("workerInitiateShutdown", worker_initiate_shutdown)?; cx.export_function("workerFinalizeShutdown", worker_finalize_shutdown)?; @@ -247,6 +250,63 @@ pub fn worker_record_activity_heartbeat( Ok(()) } +/// Initiate a single nexus task poll request. +/// There should be only one concurrent poll request for this type. +#[js_function] +pub fn worker_poll_nexus_task( + worker: OpaqueInboundHandle, +) -> BridgeResult>> { + let worker_ref = worker.borrow()?; + let worker = worker_ref.core_worker.clone(); + let runtime = worker_ref.core_runtime.clone(); + + runtime.future_to_promise(async move { + let result = worker.poll_nexus_task().await; + + match result { + Ok(task) => Ok(task.encode_to_vec()), + Err(err) => match err { + PollError::ShutDown => Err(BridgeError::WorkerShutdown)?, + PollError::TonicError(status) => { + Err(BridgeError::TransportError(status.message().to_string()))? + } + }, + } + }) +} + +/// Submit an nexus task completion to core. +#[js_function] +pub fn worker_complete_nexus_task( + worker: OpaqueInboundHandle, + completion: Vec, +) -> BridgeResult> { + let nexus_completion = NexusTaskCompletion::decode_length_delimited(completion.as_slice()) + .map_err(|err| BridgeError::TypeError { + field: None, + message: format!("Cannot decode Completion from buffer: {err:?}"), + })?; + + let worker_ref = worker.borrow()?; + let worker = worker_ref.core_worker.clone(); + let runtime = worker_ref.core_runtime.clone(); + + runtime.future_to_promise(async move { + worker + .complete_nexus_task(nexus_completion) + .await + .map_err(|err| match err { + CompleteNexusError::NexusNotEnabled {} => { + BridgeError::UnexpectedError(format!("{err}")) + } + CompleteNexusError::MalformedNexusCompletion { reason } => BridgeError::TypeError { + field: None, + message: format!("Malformed nexus Completion: {reason:?}"), + }, + }) + }) +} + /// Request shutdown of the worker. /// Once complete Core will stop polling on new tasks and activations on worker's task queue. /// Caller should drain any pending tasks and activations and call worker_finalize_shutdown before breaking from @@ -405,14 +465,18 @@ impl MutableFinalize for HistoryForReplayTunnelHandle {} mod config { use std::{sync::Arc, time::Duration}; + use temporal_sdk_core::ResourceBasedSlotsOptions; + use temporal_sdk_core::ResourceBasedSlotsOptionsBuilder; + use temporal_sdk_core::ResourceSlotOptions; + use temporal_sdk_core::SlotSupplierOptions as CoreSlotSupplierOptions; + use temporal_sdk_core::TunerHolder; + use temporal_sdk_core::TunerHolderOptionsBuilder; use temporal_sdk_core::{ - ResourceBasedSlotsOptions, ResourceBasedSlotsOptionsBuilder, ResourceSlotOptions, - SlotSupplierOptions as CoreSlotSupplierOptions, TunerHolder, TunerHolderOptionsBuilder, api::worker::{ - ActivitySlotKind, LocalActivitySlotKind, PollerBehavior as CorePollerBehavior, - SlotKind, WorkerConfig, WorkerConfigBuilder, WorkerConfigBuilderError, - WorkerDeploymentOptions as CoreWorkerDeploymentOptions, + ActivitySlotKind, LocalActivitySlotKind, NexusSlotKind, + PollerBehavior as CorePollerBehavior, SlotKind, WorkerConfig, WorkerConfigBuilder, + WorkerConfigBuilderError, WorkerDeploymentOptions as CoreWorkerDeploymentOptions, WorkerDeploymentVersion as CoreWorkerDeploymentVersion, WorkflowSlotKind, }, protos::temporal::api::enums::v1::VersioningBehavior as CoreVersioningBehavior, @@ -439,6 +503,7 @@ mod config { non_sticky_to_sticky_poll_ratio: f32, workflow_task_poller_behavior: PollerBehavior, activity_task_poller_behavior: PollerBehavior, + nexus_task_poller_behavior: PollerBehavior, enable_non_local_activities: bool, sticky_queue_schedule_to_start_timeout: Duration, max_cached_workflows: usize, @@ -485,6 +550,9 @@ mod config { // Set all other options let mut builder = WorkerConfigBuilder::default(); builder + .versioning_strategy(WorkerVersioningStrategy::None { + build_id: "".to_owned(), + }) .client_identity_override(Some(self.identity)) .versioning_strategy({ if let Some(dopts) = self.worker_deployment_options { @@ -505,6 +573,7 @@ mod config { .nonsticky_to_sticky_poll_ratio(self.non_sticky_to_sticky_poll_ratio) .workflow_task_poller_behavior(self.workflow_task_poller_behavior) .activity_task_poller_behavior(self.activity_task_poller_behavior) + .nexus_task_poller_behavior(self.nexus_task_poller_behavior) .no_remote_activities(!self.enable_non_local_activities) .sticky_queue_schedule_to_start_timeout(self.sticky_queue_schedule_to_start_timeout) .max_cached_workflows(self.max_cached_workflows) @@ -590,6 +659,7 @@ mod config { workflow_task_slot_supplier: SlotSupplier, activity_task_slot_supplier: SlotSupplier, local_activity_task_slot_supplier: SlotSupplier, + nexus_task_slot_supplier: SlotSupplier, } impl WorkerTuner { @@ -609,6 +679,8 @@ mod config { self.local_activity_task_slot_supplier .into_slot_supplier(&mut rbo), ); + tuner_holder + .nexus_slot_options(self.nexus_task_slot_supplier.into_slot_supplier(&mut rbo)); if let Some(rbo) = rbo { tuner_holder.resource_based_options(rbo); } diff --git a/packages/core-bridge/ts/native.ts b/packages/core-bridge/ts/native.ts index bc365fc77..21b5dc293 100644 --- a/packages/core-bridge/ts/native.ts +++ b/packages/core-bridge/ts/native.ts @@ -173,6 +173,9 @@ export declare function workerCompleteActivityTask(worker: Worker, result: Buffe export declare function workerRecordActivityHeartbeat(worker: Worker, heartbeat: Buffer): void; +export declare function workerPollNexusTask(worker: Worker): Promise; +export declare function workerCompleteNexusTask(worker: Worker, result: Buffer): Promise; + export declare function workerInitiateShutdown(worker: Worker): void; export declare function workerFinalizeShutdown(worker: Worker): Promise; @@ -192,6 +195,7 @@ export interface WorkerOptions { nonStickyToStickyPollRatio: number; workflowTaskPollerBehavior: PollerBehavior; activityTaskPollerBehavior: PollerBehavior; + nexusTaskPollerBehavior: PollerBehavior; enableNonLocalActivities: boolean; stickyQueueScheduleToStartTimeout: number; maxCachedWorkflows: number; @@ -235,6 +239,7 @@ export interface WorkerTunerOptions { workflowTaskSlotSupplier: SlotSupplierOptions; activityTaskSlotSupplier: SlotSupplierOptions; localActivityTaskSlotSupplier: SlotSupplierOptions; + nexusTaskSlotSupplier: SlotSupplierOptions; } export type SlotSupplierOptions = diff --git a/packages/nexus/README.md b/packages/nexus/README.md new file mode 100644 index 000000000..be9466cbe --- /dev/null +++ b/packages/nexus/README.md @@ -0,0 +1,8 @@ +# `@temporalio/nexus` + +[![NPM](https://img.shields.io/npm/v/@temporalio/nexus?style=for-the-badge)](https://www.npmjs.com/package/@temporalio/nexus) + +Part of [Temporal](https://temporal.io)'s [TypeScript SDK](https://docs.temporal.io/typescript/introduction/). + +- [API reference](https://typescript.temporal.io/api/namespaces/nexus) +- [Sample projects](https://github.com/temporalio/samples-typescript) diff --git a/packages/nexus/package.json b/packages/nexus/package.json new file mode 100644 index 000000000..8abb7066b --- /dev/null +++ b/packages/nexus/package.json @@ -0,0 +1,39 @@ +{ + "name": "@temporalio/nexus", + "version": "1.11.5", + "description": "Temporal.io SDK Nexus sub-package", + "main": "lib/index.js", + "types": "./lib/index.d.ts", + "keywords": [ + "temporal", + "workflow", + "worker", + "activity" + ], + "author": "Temporal Technologies Inc. ", + "license": "MIT", + "dependencies": { + "@temporalio/common": "file:../common", + "long": "^5.2.3", + "nexus-rpc": "^0.0.1" + }, + "engines": { + "node": ">= 18.0.0" + }, + "bugs": { + "url": "https://github.com/temporalio/sdk-typescript/issues" + }, + "repository": { + "type": "git", + "url": "git+https://github.com/temporalio/sdk-typescript.git", + "directory": "packages/nexus" + }, + "homepage": "https://github.com/temporalio/sdk-typescript/tree/main/packages/nexus", + "publishConfig": { + "access": "public" + }, + "files": [ + "src", + "lib" + ] +} diff --git a/packages/nexus/src/context.ts b/packages/nexus/src/context.ts new file mode 100644 index 000000000..761b622c9 --- /dev/null +++ b/packages/nexus/src/context.ts @@ -0,0 +1,178 @@ +import { AsyncLocalStorage } from 'node:async_hooks'; +import * as nexus from 'nexus-rpc'; +import { Logger, LogLevel, LogMetadata, Workflow } from '@temporalio/common'; +import { Replace } from '@temporalio/common/lib/type-helpers'; +import { Client, WorkflowStartOptions as ClientWorkflowStartOptions } from '@temporalio/client'; +import { temporal } from '@temporalio/proto'; +import { InternalWorkflowStartOptionsKey, InternalWorkflowStartOptions } from '@temporalio/client/lib/internal'; +import { generateWorkflowRunOperationToken, loadWorkflowRunOperationToken } from './token'; +import { convertNexusLinkToWorkflowEventLink, convertWorkflowEventLinkToNexusLink } from './link-converter'; + +// Context used internally in the SDK to propagate information from the worker to the Temporal Nexus helpers. +export interface HandlerContext { + log: Logger; + client: Client; + namespace: string; + taskQueue: string; +} + +// Make it safe to use @temporalio/nexus with multiple versions installed. +const asyncLocalStorageSymbol = Symbol.for('__temporal_nexus_context_storage__'); +if (!(globalThis as any)[asyncLocalStorageSymbol]) { + (globalThis as any)[asyncLocalStorageSymbol] = new AsyncLocalStorage(); +} + +export const asyncLocalStorage: AsyncLocalStorage = (globalThis as any)[asyncLocalStorageSymbol]; + +function getHandlerContext(): HandlerContext { + const ctx = asyncLocalStorage.getStore(); + if (ctx == null) { + throw new ReferenceError('Not in a Nexus handler context'); + } + return ctx; +} + +function getLogger() { + return getHandlerContext().log; +} + +/** + * A logger for use in Nexus Handler scope. + */ +export const log: Logger = { + log(level: LogLevel, message: string, meta?: LogMetadata): any { + return getLogger().log(level, message, meta); + }, + trace(message: string, meta?: LogMetadata): any { + return getLogger().trace(message, meta); + }, + debug(message: string, meta?: LogMetadata): any { + return getLogger().debug(message, meta); + }, + info(message: string, meta?: LogMetadata): any { + return getLogger().info(message, meta); + }, + warn(message: string, meta?: LogMetadata): any { + return getLogger().warn(message, meta); + }, + error(message: string, meta?: LogMetadata): any { + return getLogger().error(message, meta); + }, +}; + +// TODO: also support getting a metrics handler. + +/** + * Returns a client to be used in a Nexus Operation's context, this Client is powered by the same NativeConnection that + * the worker was created with. + */ +export function getClient(): Client { + return getHandlerContext().client; +} + +/** + * A handle to a running workflow that is returned by the {@link startWorkflow} helper. + * This handle should be returned by {@link WorkflowRunOperationHandler} implementations. + */ +export interface WorkflowHandle<_T> { + readonly workflowId: string; + readonly runId: string; +} + +/** + * Options for starting a workflow using {@link startWorkflow}, this type is identical to the client's + * `WorkflowStartOptions` with the exception that `taskQueue` is optional and defaults to the current worker's task + * queue. + */ +export type WorkflowStartOptions = Replace, { taskQueue?: string }>; + +/** + * Starts a workflow run for a {@link WorkflowRunOperationHandler}, linking the execution chain to a Nexus Operation + * (subsequent runs started from continue-as-new and retries). Automatically propagates the callback, request ID, and + * back and forward links from the Nexus options to the Workflow. + */ +export async function startWorkflow( + ctx: nexus.StartOperationContext, + workflowTypeOrFunc: string | T, + workflowOptions: WorkflowStartOptions +): Promise> { + const { client, taskQueue } = getHandlerContext(); + const links = Array(); + if (ctx.inboundLinks?.length > 0) { + for (const l of ctx.inboundLinks) { + try { + links.push({ + workflowEvent: convertNexusLinkToWorkflowEventLink(l), + }); + } catch (error) { + log.warn('failed to convert Nexus link to Workflow event link', { error }); + } + } + } + const internalOptions: InternalWorkflowStartOptions = { links, requestId: ctx.requestId }; + + if (workflowOptions.workflowIdConflictPolicy === 'USE_EXISTING') { + internalOptions.onConflictOptions = { + attachLinks: true, + attachCompletionCallbacks: true, + attachRequestId: true, + }; + } + + if (ctx.callbackURL) { + internalOptions.completionCallbacks = [ + { + nexus: { url: ctx.callbackURL, header: ctx.callbackHeaders }, + links, // pass in links here as well for older servers, newer servers dedupe them. + }, + ]; + } + const { taskQueue: userSpeficiedTaskQueue, ...rest } = workflowOptions; + const startOptions: ClientWorkflowStartOptions = { + ...rest, + taskQueue: userSpeficiedTaskQueue || taskQueue, + [InternalWorkflowStartOptionsKey]: internalOptions, + }; + const handle = await client.workflow.start(workflowTypeOrFunc, startOptions); + if (internalOptions.backLink?.workflowEvent != null) { + try { + ctx.outboundLinks.push(convertWorkflowEventLinkToNexusLink(internalOptions.backLink.workflowEvent)); + } catch (error) { + log.warn('failed to convert Workflow event link to Nexus link', { error }); + } + } + return { workflowId: handle.workflowId, runId: handle.firstExecutionRunId }; +} + +/** + * A handler function for the {@link WorkflowRunOperation} constructor. + */ +export type WorkflowRunOperationHandler = ( + ctx: nexus.StartOperationContext, + input: I +) => Promise>; + +/** + * A Nexus Operation implementation that is backed by a Workflow run. + */ +export class WorkflowRunOperation implements nexus.OperationHandler { + constructor(readonly handler: WorkflowRunOperationHandler) {} + + async start(ctx: nexus.StartOperationContext, input: I): Promise> { + const { namespace } = getHandlerContext(); + const handle = await this.handler(ctx, input); + return { token: generateWorkflowRunOperationToken(namespace, handle.workflowId) }; + } + getResult(_ctx: nexus.GetOperationResultContext, _token: string): Promise { + // Not implemented in Temporal yet. + throw new nexus.HandlerError({ type: 'NOT_IMPLEMENTED', message: 'Method not implemented' }); + } + getInfo(_ctx: nexus.GetOperationInfoContext, _token: string): Promise { + // Not implemented in Temporal yet. + throw new nexus.HandlerError({ type: 'NOT_IMPLEMENTED', message: 'Method not implemented' }); + } + async cancel(_ctx: nexus.CancelOperationContext, token: string): Promise { + const decoded = loadWorkflowRunOperationToken(token); + await getClient().workflow.getHandle(decoded.wid).cancel(); + } +} diff --git a/packages/nexus/src/index.ts b/packages/nexus/src/index.ts new file mode 100644 index 000000000..a8df439d9 --- /dev/null +++ b/packages/nexus/src/index.ts @@ -0,0 +1,9 @@ +export { + log, + getClient, + startWorkflow, + WorkflowHandle, + WorkflowRunOperation, + WorkflowRunOperationHandler, + WorkflowStartOptions, +} from './context'; diff --git a/packages/nexus/src/link-converter.ts b/packages/nexus/src/link-converter.ts new file mode 100644 index 000000000..d4f03f4ed --- /dev/null +++ b/packages/nexus/src/link-converter.ts @@ -0,0 +1,151 @@ +import Long from 'long'; +import { Link as NexusLink } from 'nexus-rpc'; +import { temporal } from '@temporalio/proto'; + +const { EventType } = temporal.api.enums.v1; +type WorkflowEventLink = temporal.api.common.v1.Link.IWorkflowEvent; +type EventReference = temporal.api.common.v1.Link.WorkflowEvent.IEventReference; +type RequestIdReference = temporal.api.common.v1.Link.WorkflowEvent.IRequestIdReference; + +const LINK_EVENT_ID_PARAM = 'eventID'; +const LINK_EVENT_TYPE_PARAM = 'eventType'; +const LINK_REQUEST_ID_PARAM = 'requestID'; + +const EVENT_REFERENCE_TYPE = 'EventReference'; +const REQUEST_ID_REFERENCE_TYPE = 'RequestIdReference'; + +// fullName isn't part of the generated typed unfortunately. +const WORKFLOW_EVENT_TYPE: string = (temporal.api.common.v1.Link.WorkflowEvent as any).fullName.slice(1); + +function pascalCaseToConstantCase(s: string) { + return s.replace(/[^\b][A-Z]/g, (m) => `${m[0]}_${m[1]}`).toUpperCase(); +} + +function constantCaseToPascalCase(s: string) { + return s.replace(/[A-Z]+_?/g, (m) => `${m[0]}${m.slice(1).toLocaleLowerCase()}`.replace(/_/, '')); +} + +function normalizeEnumValue(value: string, prefix: string) { + value = pascalCaseToConstantCase(value); + if (!value.startsWith(prefix)) { + value = `${prefix}_${value}`; + } + return value; +} + +export function convertWorkflowEventLinkToNexusLink(we: WorkflowEventLink): NexusLink { + if (!we.namespace || !we.workflowId || !we.runId) { + throw new TypeError('Missing required fields: namespace, workflowId, or runId'); + } + const url = new URL( + `temporal:///namespaces/${encodeURIComponent(we.namespace)}/workflows/${encodeURIComponent( + we.workflowId + )}/${encodeURIComponent(we.runId)}/history` + ); + + if (we.eventRef != null) { + url.search = convertLinkWorkflowEventEventReferenceToURLQuery(we.eventRef); + } else if (we.requestIdRef != null) { + url.search = convertLinkWorkflowEventRequestIdReferenceToURLQuery(we.requestIdRef); + } + + return { + url, + type: WORKFLOW_EVENT_TYPE, + }; +} + +export function convertNexusLinkToWorkflowEventLink(link: NexusLink): WorkflowEventLink { + if (link.url.protocol !== 'temporal:') { + throw new TypeError(`Invalid URL scheme: ${link.url}, expected 'temporal:', got '${link.url.protocol}'`); + } + + // /namespaces/:namespace/workflows/:workflowId/:runId/history + const parts = link.url.pathname.split('/'); + if (parts.length !== 7 || parts[1] !== 'namespaces' || parts[3] !== 'workflows' || parts[6] !== 'history') { + throw new TypeError(`Invalid URL path: ${link.url}`); + } + const namespace = decodeURIComponent(parts[2]); + const workflowId = decodeURIComponent(parts[4]); + const runId = decodeURIComponent(parts[5]); + + const query = link.url.searchParams; + const refType = query.get('referenceType'); + + const workflowEventLink: WorkflowEventLink = { + namespace, + workflowId, + runId, + }; + + switch (refType) { + case EVENT_REFERENCE_TYPE: + workflowEventLink.eventRef = convertURLQueryToLinkWorkflowEventEventReference(query); + break; + case REQUEST_ID_REFERENCE_TYPE: + workflowEventLink.requestIdRef = convertURLQueryToLinkWorkflowEventRequestIdReference(query); + break; + default: + throw new TypeError(`Unknown reference type: ${refType}`); + } + return workflowEventLink; +} + +function convertLinkWorkflowEventEventReferenceToURLQuery(eventRef: EventReference): string { + const params = new URLSearchParams(); + params.set('referenceType', EVENT_REFERENCE_TYPE); + if (eventRef.eventId != null) { + const eventId = eventRef.eventId.toNumber(); + if (eventId > 0) { + params.set(LINK_EVENT_ID_PARAM, `${eventId}`); + } + } + if (eventRef.eventType != null) { + const eventType = constantCaseToPascalCase(EventType[eventRef.eventType].replace('EVENT_TYPE_', '')); + params.set(LINK_EVENT_TYPE_PARAM, eventType); + } + return params.toString(); +} + +function convertURLQueryToLinkWorkflowEventEventReference(query: URLSearchParams): EventReference { + let eventId = 0; + const eventIdParam = query.get(LINK_EVENT_ID_PARAM); + if (eventIdParam && /^\d+$/.test(eventIdParam)) { + eventId = parseInt(eventIdParam, 10); + } + const eventTypeParam = query.get(LINK_EVENT_TYPE_PARAM); + if (!eventTypeParam) { + throw new TypeError(`Missing eventType parameter`); + } + const eventType = EventType[normalizeEnumValue(eventTypeParam, 'EVENT_TYPE') as keyof typeof EventType]; + if (eventType == null) { + throw new TypeError(`Unknown eventType parameter: ${eventTypeParam}`); + } + return { eventId: Long.fromNumber(eventId), eventType }; +} + +function convertLinkWorkflowEventRequestIdReferenceToURLQuery(requestIdRef: RequestIdReference): string { + const params = new URLSearchParams(); + params.set('referenceType', REQUEST_ID_REFERENCE_TYPE); + if (requestIdRef.requestId != null) { + params.set(LINK_REQUEST_ID_PARAM, requestIdRef.requestId); + } + if (requestIdRef.eventType != null) { + const eventType = constantCaseToPascalCase(EventType[requestIdRef.eventType].replace('EVENT_TYPE_', '')); + params.set(LINK_EVENT_TYPE_PARAM, eventType); + } + return params.toString(); +} + +function convertURLQueryToLinkWorkflowEventRequestIdReference(query: URLSearchParams): RequestIdReference { + const requestId = query.get(LINK_REQUEST_ID_PARAM); + const eventTypeParam = query.get(LINK_EVENT_TYPE_PARAM); + if (!eventTypeParam) { + throw new TypeError(`Missing eventType parameter`); + } + const eventType = EventType[normalizeEnumValue(eventTypeParam, 'EVENT_TYPE') as keyof typeof EventType]; + if (eventType == null) { + throw new TypeError(`Unknown eventType parameter: ${eventTypeParam}`); + } + return { requestId, eventType }; +} diff --git a/packages/nexus/src/token.ts b/packages/nexus/src/token.ts new file mode 100644 index 000000000..7b11bee45 --- /dev/null +++ b/packages/nexus/src/token.ts @@ -0,0 +1,78 @@ +const OPERATION_TOKEN_TYPE_WORKFLOW_RUN = 1; + +// OperationTokenType is used to identify the type of operation token. +// Currently, we only have one type of operation token: WorkflowRun. +type OperationTokenType = typeof OPERATION_TOKEN_TYPE_WORKFLOW_RUN; + +interface WorkflowRunOperationToken { + // Version of the token, by default we assume we're on version 1, this field is not emitted as part of the output, + // it's only used to reject newer token versions on load. + v?: number; + // Type of the operation. Must be OPERATION_TOKEN_TYPE_WORKFLOW_RUN. + t: OperationTokenType; + ns: string; + wid: string; +} + +/** + * Generate a workflow run operation token. + */ +export function generateWorkflowRunOperationToken(namespace: string, workflowID: string): string { + const token: WorkflowRunOperationToken = { + t: OPERATION_TOKEN_TYPE_WORKFLOW_RUN, + ns: namespace, + wid: workflowID, + }; + return base64URLEncodeNoPadding(JSON.stringify(token)); +} + +/** + * Load and validate a workflow run operation token. + */ +export function loadWorkflowRunOperationToken(data: string): WorkflowRunOperationToken { + if (!data) { + throw new TypeError('invalid workflow run token: token is empty'); + } + let decoded: string; + try { + decoded = base64URLDecodeNoPadding(data); + } catch (err) { + throw new TypeError('failed to decode token', { cause: err }); + } + let token: WorkflowRunOperationToken; + try { + token = JSON.parse(decoded); + } catch (err) { + throw new TypeError('failed to unmarshal workflow run operation token', { cause: err }); + } + if (token.t !== OPERATION_TOKEN_TYPE_WORKFLOW_RUN) { + throw new TypeError(`invalid workflow token type: ${token.t}, expected: ${OPERATION_TOKEN_TYPE_WORKFLOW_RUN}`); + } + if (token.v !== undefined && token.v !== 0) { + throw new TypeError('invalid workflow run token: "v" field should not be present'); + } + if (!token.wid) { + throw new TypeError('invalid workflow run token: missing workflow ID (wid)'); + } + return token; +} + +// Exported for use in tests. +export function base64URLEncodeNoPadding(str: string): string { + const base64 = Buffer.from(str).toString('base64url'); + return base64.replace(/=+$/, ''); +} + +function base64URLDecodeNoPadding(str: string): string { + // Validate the string contains only valid base64URL characters + if (!/^[-A-Za-z0-9_]*$/.test(str)) { + throw new TypeError('invalid base64URL encoded string: contains invalid characters'); + } + + const paddingLength = str.length % 4; + if (paddingLength > 0) { + str += '='.repeat(4 - paddingLength); + } + + return Buffer.from(str, 'base64url').toString('utf-8'); +} diff --git a/packages/nexus/tsconfig.json b/packages/nexus/tsconfig.json new file mode 100644 index 000000000..328970e2d --- /dev/null +++ b/packages/nexus/tsconfig.json @@ -0,0 +1,9 @@ +{ + "extends": "../../tsconfig.base.json", + "compilerOptions": { + "outDir": "./lib", + "rootDir": "./src" + }, + "references": [{ "path": "../common" }], + "include": ["./src/**/*.ts"] +} diff --git a/packages/nyc-test-coverage/src/index.ts b/packages/nyc-test-coverage/src/index.ts index b4e5b4d9e..f465f0fae 100644 --- a/packages/nyc-test-coverage/src/index.ts +++ b/packages/nyc-test-coverage/src/index.ts @@ -176,6 +176,8 @@ export class WorkflowCoverage { path.dirname(require.resolve('@temporalio/common')), path.dirname(require.resolve('@temporalio/workflow')), path.dirname(require.resolve('@temporalio/nyc-test-coverage')), + path.dirname(require.resolve('@temporalio/nexus')), + path.dirname(require.resolve('nexus-rpc')), ], }); diff --git a/packages/test/package.json b/packages/test/package.json index 4499978eb..0e17376d4 100644 --- a/packages/test/package.json +++ b/packages/test/package.json @@ -47,10 +47,13 @@ "async-retry": "^1.3.3", "ava": "^5.3.1", "dedent": "^1.5.1", + "get-port": "^5.1.1", "glob": "^10.3.10", "istanbul-lib-coverage": "^3.2.2", "long": "^5.2.3", + "nexus-rpc": "github:nexus-rpc/sdk-typescript#pull/6/head", "node-fetch": "^2.7.0", + "proto3-json-serializer": "^2.0.0", "protobufjs": "^7.2.5", "protobufjs-cli": "^1.1.2", "rxjs": "7.8.1", diff --git a/packages/test/src/helpers-integration.ts b/packages/test/src/helpers-integration.ts index bbaad5ff8..6477b8a13 100644 --- a/packages/test/src/helpers-integration.ts +++ b/packages/test/src/helpers-integration.ts @@ -133,35 +133,43 @@ export function makeConfigurableEnvironmentTestFn(opts: { return test; } -export function makeTestFunction(opts: { +export interface TestFunctionOptions { workflowsPath: string; workflowEnvironmentOpts?: LocalTestWorkflowEnvironmentOptions; workflowInterceptorModules?: string[]; recordedLogs?: { [workflowId: string]: LogEntry[] }; runtimeOpts?: Partial | (() => Promise<[Partial, Partial]>) | undefined; -}): TestFn { +} + +export function makeDefaultTestContextFunction(opts: TestFunctionOptions) { + return async (_t: ExecutionContext): Promise => { + let env: TestWorkflowEnvironment; + if (process.env.TEMPORAL_SERVICE_ADDRESS) { + env = await TestWorkflowEnvironment.createFromExistingServer({ + address: process.env.TEMPORAL_SERVICE_ADDRESS, + }); + } else { + env = await createLocalTestEnvironment(opts.workflowEnvironmentOpts); + } + return { + workflowBundle: await createTestWorkflowBundle({ + workflowsPath: opts.workflowsPath, + workflowInterceptorModules: opts.workflowInterceptorModules, + }), + env, + } as unknown as C; + }; +} + +export function makeTestFunction(opts: TestFunctionOptions): TestFn { return makeConfigurableEnvironmentTestFn({ recordedLogs: opts.recordedLogs, runtimeOpts: opts.runtimeOpts, - createTestContext: async (_t: ExecutionContext): Promise => { - let env: TestWorkflowEnvironment; - if (process.env.TEMPORAL_SERVICE_ADDRESS) { - env = await TestWorkflowEnvironment.createFromExistingServer({ - address: process.env.TEMPORAL_SERVICE_ADDRESS, - }); - } else { - env = await createLocalTestEnvironment(opts.workflowEnvironmentOpts); - } - return { - workflowBundle: await createTestWorkflowBundle({ - workflowsPath: opts.workflowsPath, - workflowInterceptorModules: opts.workflowInterceptorModules, - }), - env, - } as unknown as C; - }, + createTestContext: makeDefaultTestContextFunction(opts), teardown: async (c: C) => { - await c.env.teardown(); + if (c.env) { + await c.env.teardown(); + } }, }); } diff --git a/packages/test/src/helpers.ts b/packages/test/src/helpers.ts index 218be7169..44bd46339 100644 --- a/packages/test/src/helpers.ts +++ b/packages/test/src/helpers.ts @@ -81,7 +81,7 @@ export function cleanStackTrace(ostack: string): string { const su = new StackUtils({ cwd: path.join(__dirname, '../..') }); const firstLine = stack.split('\n')[0]; const cleanedStack = su.clean(stack).trimEnd(); - const normalizedStack = + let normalizedStack = cleanedStack && cleanedStack .replace(/:\d+:\d+/g, '') @@ -89,7 +89,11 @@ export function cleanStackTrace(ostack: string): string { .replace(/\[as fn\] /, '') // Avoid https://github.com/nodejs/node/issues/42417 .replace(/at null\./g, 'at ') - .replace(/\\/g, '/'); + .replace(/\\/g, '/') + .replaceAll(/\([^() ]*\/node_modules\//g, '('); + + // FIXME: Find a better way to handle package vendoring; this will come back again. + normalizedStack = normalizedStack.replaceAll(/\([^() ]*\/nexus-sdk-typescript\/src/g, '(nexus-rpc/src'); return normalizedStack ? `${firstLine}\n${normalizedStack}` : firstLine; } @@ -122,6 +126,7 @@ export const bundlerOptions = { '@temporalio/activity', '@temporalio/client', '@temporalio/testing', + '@temporalio/nexus', '@temporalio/worker', 'ava', 'crypto', diff --git a/packages/test/src/mock-native-worker.ts b/packages/test/src/mock-native-worker.ts index 5b536046d..e10f92550 100644 --- a/packages/test/src/mock-native-worker.ts +++ b/packages/test/src/mock-native-worker.ts @@ -94,11 +94,21 @@ export class MockNativeWorker implements NativeWorkerLike { this.workflowCompletionCallback = undefined; } + public async pollNexusTask(): Promise { + // Not implementing this in the mock worker, testing with real worker instead. + throw new Error('not implemented'); + } + public async completeActivityTask(result: Buffer): Promise { this.activityCompletionCallback!(result); this.activityCompletionCallback = undefined; } + public async completeNexusTask(_result: Buffer): Promise { + // Not implementing this in the mock worker, testing with real worker instead. + throw new Error('not implemented'); + } + public emit(task: Task): void { if ('workflow' in task) { const arr = coresdk.workflow_activation.WorkflowActivation.encode(task.workflow).finish(); diff --git a/packages/test/src/run-a-worker.ts b/packages/test/src/run-a-worker.ts index 7728f03e8..b0afc6fc8 100644 --- a/packages/test/src/run-a-worker.ts +++ b/packages/test/src/run-a-worker.ts @@ -1,4 +1,5 @@ import arg from 'arg'; +import * as nexus from 'nexus-rpc'; import { Worker, Runtime, DefaultLogger, LogLevel, makeTelemetryFilterString } from '@temporalio/worker'; import * as activities from './activities'; @@ -21,6 +22,18 @@ async function main() { const worker = await Worker.create({ activities, workflowsPath: require.resolve('./workflows'), + nexusServices: [ + nexus.serviceHandler( + nexus.service('foo', { + bar: nexus.operation(), + }), + { + async bar(_ctx, input) { + return input; + }, + } + ), + ], taskQueue: 'test', nonStickyToStickyPollRatio: 0.5, }); diff --git a/packages/test/src/test-bridge.ts b/packages/test/src/test-bridge.ts index 04d930084..3f637a7bb 100644 --- a/packages/test/src/test-bridge.ts +++ b/packages/test/src/test-bridge.ts @@ -276,6 +276,10 @@ const GenericConfigs = { type: 'fixed-size', numSlots: 1, }, + nexusTaskSlotSupplier: { + type: 'fixed-size', + numSlots: 1, + }, }, nonStickyToStickyPollRatio: 0.5, workflowTaskPollerBehavior: { @@ -288,6 +292,12 @@ const GenericConfigs = { initial: 5, maximum: 100, }, + nexusTaskPollerBehavior: { + type: 'autoscaling', + minimum: 1, + initial: 5, + maximum: 100, + }, enableNonLocalActivities: false, stickyQueueScheduleToStartTimeout: 1000, maxCachedWorkflows: 1000, diff --git a/packages/test/src/test-nexus-handler.ts b/packages/test/src/test-nexus-handler.ts new file mode 100644 index 000000000..cbc32279d --- /dev/null +++ b/packages/test/src/test-nexus-handler.ts @@ -0,0 +1,716 @@ +import { randomUUID } from 'node:crypto'; +import anyTest, { TestFn } from 'ava'; +import getPort from 'get-port'; +import Long from 'long'; +import * as nexus from 'nexus-rpc'; +import * as protoJsonSerializer from 'proto3-json-serializer'; +import * as temporalnexus from '@temporalio/nexus'; +import * as root from '@temporalio/proto'; +import * as testing from '@temporalio/testing'; +import { DefaultLogger, LogEntry, Runtime, Worker } from '@temporalio/worker'; +import { + ApplicationFailure, + CancelledFailure, + defaultFailureConverter, + defaultPayloadConverter, + SdkComponent, +} from '@temporalio/common'; +import { + convertWorkflowEventLinkToNexusLink, + convertNexusLinkToWorkflowEventLink, +} from '@temporalio/nexus/lib/link-converter'; +import { cleanStackTrace } from './helpers'; + +export interface Context { + httpPort: number; + taskQueue: string; + endpointId: string; + env: testing.TestWorkflowEnvironment; + logEntries: LogEntry[]; +} + +const test = anyTest as TestFn; + +test.before(async (t) => { + const logEntries = new Array(); + + const logger = new DefaultLogger('INFO', (entry) => { + logEntries.push(entry); + }); + + Runtime.install({ logger }); + t.context.httpPort = await getPort(); + t.context.env = await testing.TestWorkflowEnvironment.createLocal({ + server: { + extraArgs: [ + '--http-port', + `${t.context.httpPort}`, + // SDK tests use arbitrary callback URLs, permit that on the server. + '--dynamic-config-value', + 'component.callbacks.allowedAddresses=[{"Pattern":"*","AllowInsecure":true}]', + // TODO: remove this config when it becomes the default on the server. + '--dynamic-config-value', + 'history.enableRequestIdRefLinks=true', + ], + executable: { + type: 'cached-download', + // TODO: remove this version override when CLI 1.4.0 is out. + version: 'v1.3.1-nexus-links.0', + }, + }, + }); + t.context.logEntries = logEntries; +}); + +test.after.always(async (t) => { + await t.context.env.teardown(); +}); + +test.beforeEach(async (t) => { + const taskQueue = t.title + randomUUID(); + const { env } = t.context; + const response = await env.connection.operatorService.createNexusEndpoint({ + spec: { + name: t.title.replaceAll(/[\s,]/g, '-'), + target: { + worker: { + namespace: 'default', + taskQueue, + }, + }, + }, + }); + t.context.taskQueue = taskQueue; + t.context.endpointId = response.endpoint!.id!; + t.truthy(t.context.endpointId); +}); + +test('sync operation handler happy path', async (t) => { + const { env, taskQueue, httpPort, endpointId } = t.context; + + const w = await Worker.create({ + connection: env.nativeConnection, + namespace: env.namespace, + taskQueue, + nexusServices: [ + nexus.serviceHandler( + nexus.service('testService', { + testSyncOp: nexus.operation(), + }), + { + async testSyncOp(ctx, input) { + // Testing headers normalization to lower case. + if (ctx.headers.Test !== 'true') { + throw new nexus.HandlerError({ message: 'expected test header to be set to true', type: 'BAD_REQUEST' }); + } + // Echo links back to the caller. + ctx.outboundLinks.push(...ctx.inboundLinks); + return input; + }, + } + ), + ], + }); + + await w.runUntil(async () => { + const res = await fetch( + `http://127.0.0.1:${httpPort}/nexus/endpoints/${endpointId}/services/testService/testSyncOp`, + { + method: 'POST', + body: JSON.stringify('hello'), + headers: { + 'Content-Type': 'application/json', + Test: 'true', + 'Nexus-Link': '; type="test"', + }, + } + ); + t.true(res.ok); + const output = await res.json(); + t.is(output, 'hello'); + t.is(res.headers.get('Nexus-Link'), '; type="test"'); + }); +}); + +test('operation handler cancelation', async (t) => { + const { env, taskQueue, httpPort, endpointId } = t.context; + let p: Promise | undefined; + + const w = await Worker.create({ + connection: env.nativeConnection, + namespace: env.namespace, + taskQueue, + nexusServices: [ + nexus.serviceHandler( + nexus.service('testService', { + testSyncOp: nexus.operation(), + }), + { + async testSyncOp(ctx) { + p = new Promise((_, reject) => { + ctx.abortSignal.onabort = () => { + reject(ctx.abortSignal.reason); + }; + // never resolve this promise. + }); + return await p; + }, + } + ), + ], + }); + + await w.runUntil(async () => { + const res = await fetch( + `http://127.0.0.1:${httpPort}/nexus/endpoints/${endpointId}/services/testService/testSyncOp`, + { + method: 'POST', + headers: { + 'Request-Timeout': '2s', + }, + } + ); + t.is(res.status, 520 /* UPSTREAM_TIMEOUT */); + // Give time for the worker to actually process the timeout; otherwise the operation + // may end up being cancelled because of the worker shutdown rather than the timeout. + await Promise.race([ + p?.catch(() => undefined), + new Promise((resolve) => { + setTimeout(resolve, 2000).unref(); + }), + ]); + }); + t.truthy(p); + await t.throwsAsync(p!, { instanceOf: CancelledFailure, message: 'TIMED_OUT' }); +}); + +test('async operation handler happy path', async (t) => { + const { env, taskQueue, httpPort, endpointId } = t.context; + const requestId = 'test-' + randomUUID(); + + const w = await Worker.create({ + connection: env.nativeConnection, + namespace: env.namespace, + taskQueue, + nexusServices: [ + nexus.serviceHandler( + nexus.service('testService', { + // Also test custom operation name. + testAsyncOp: nexus.operation({ name: 'async-op' }), + }), + { + testAsyncOp: { + async start(ctx, input): Promise> { + if (input !== 'hello') { + throw new nexus.HandlerError({ message: 'expected input to equal "hello"', type: 'BAD_REQUEST' }); + } + if (ctx.headers.test !== 'true') { + throw new nexus.HandlerError({ + message: 'expected test header to be set to true', + type: 'BAD_REQUEST', + }); + } + if (!ctx.requestId) { + throw new nexus.HandlerError({ message: 'expected requestId to be set', type: 'BAD_REQUEST' }); + } + return { token: ctx.requestId }; + }, + async cancel(ctx, token) { + if (ctx.headers.test !== 'true') { + throw new nexus.HandlerError({ + message: 'expected test header to be set to true', + type: 'BAD_REQUEST', + }); + } + if (token !== requestId) { + throw new nexus.HandlerError({ + message: 'expected token to equal original requestId', + type: 'BAD_REQUEST', + }); + } + }, + async getInfo() { + throw new Error('not implemented'); + }, + async getResult() { + throw new Error('not implemented'); + }, + }, + } + ), + ], + }); + + await w.runUntil(async () => { + let res = await fetch(`http://127.0.0.1:${httpPort}/nexus/endpoints/${endpointId}/services/testService/async-op`, { + method: 'POST', + body: JSON.stringify('hello'), + headers: { + 'Content-Type': 'application/json', + 'Nexus-Request-Id': requestId, + Test: 'true', + }, + }); + t.true(res.ok); + const output = (await res.json()) as { token: string; state: nexus.OperationState }; + + t.is(output.token, requestId); + t.is(output.state, 'running'); + + res = await fetch( + `http://127.0.0.1:${httpPort}/nexus/endpoints/${endpointId}/services/testService/async-op/cancel`, + { + method: 'POST', + headers: { + 'Nexus-Operation-Token': output.token, + Test: 'true', + }, + } + ); + t.true(res.ok); + }); +}); + +test('start operation handler errors', async (t) => { + const { env, taskQueue, httpPort, endpointId } = t.context; + + const w = await Worker.create({ + connection: env.nativeConnection, + namespace: env.namespace, + taskQueue, + nexusServices: [ + nexus.serviceHandler( + nexus.service('testService', { + op: nexus.operation(), + }), + { + async op(_ctx, outcome) { + switch (outcome) { + case 'NonRetryableApplicationFailure': + throw ApplicationFailure.create({ + nonRetryable: true, + message: 'deliberate failure', + details: ['details'], + }); + case 'NonRetryableInternalHandlerError': + throw new nexus.HandlerError({ + type: 'INTERNAL', + message: 'deliberate error', + retryable: false, + }); + case 'OperationError': + throw new nexus.OperationError({ + state: 'failed', + message: 'deliberate error', + }); + } + throw new nexus.HandlerError({ + type: 'BAD_REQUEST', + message: 'invalid outcome requested', + }); + }, + } + ), + ], + }); + + await w.runUntil(async () => { + { + const res = await fetch(`http://127.0.0.1:${httpPort}/nexus/endpoints/${endpointId}/services/testService/op`, { + method: 'POST', + body: JSON.stringify('NonRetryableApplicationFailure'), + headers: { + 'Content-Type': 'application/json', + }, + }); + t.is(res.status, 500); + const failure = (await res.json()) as any; + const failureType = (root as any).lookupType('temporal.api.failure.v1.Failure'); + const temporalFailure = protoJsonSerializer.fromProto3JSON(failureType, failure.details); + const err = defaultFailureConverter.failureToError(temporalFailure as any, defaultPayloadConverter); + delete failure.details; + + t.deepEqual(failure, { + message: 'deliberate failure', + metadata: { + type: 'temporal.api.failure.v1.Failure', + }, + }); + t.true(err instanceof ApplicationFailure); + t.is(err.message, ''); + t.is( + cleanStackTrace(err.stack!), + `ApplicationFailure: deliberate failure + at Function.create (common/src/failure.ts) + at op (test/src/test-nexus-handler.ts) + at ServiceRegistry.start (nexus-rpc/src/handler.ts)` + ); + t.deepEqual((err as ApplicationFailure).details, ['details']); + t.is((err as ApplicationFailure).failure?.source, 'TypeScriptSDK'); + } + { + const res = await fetch(`http://127.0.0.1:${httpPort}/nexus/endpoints/${endpointId}/services/testService/op`, { + method: 'POST', + body: JSON.stringify('NonRetryableInternalHandlerError'), + headers: { + 'Content-Type': 'application/json', + }, + }); + t.is(res.status, 500); + t.is(res.headers.get('Nexus-Request-Retryable'), 'false'); + const failure = (await res.json()) as any; + const failureType = (root as any).lookupType('temporal.api.failure.v1.Failure'); + const temporalFailure = protoJsonSerializer.fromProto3JSON(failureType, failure.details); + const err = defaultFailureConverter.failureToError(temporalFailure as any, defaultPayloadConverter); + delete failure.details; + t.true(err instanceof Error); + t.is(err.message, ''); + t.deepEqual( + cleanStackTrace(err.stack!), + `HandlerError: deliberate error + at op (test/src/test-nexus-handler.ts) + at ServiceRegistry.start (nexus-rpc/src/handler.ts)` + ); + } + { + const res = await fetch(`http://127.0.0.1:${httpPort}/nexus/endpoints/${endpointId}/services/testService/op`, { + method: 'POST', + body: JSON.stringify('OperationError'), + headers: { + 'Content-Type': 'application/json', + }, + }); + t.is(res.status, 424 /* As defined in the nexus HTTP spec */); + const failure = (await res.json()) as any; + const failureType = (root as any).lookupType('temporal.api.failure.v1.Failure'); + const temporalFailure = protoJsonSerializer.fromProto3JSON(failureType, failure.details); + const err = defaultFailureConverter.failureToError(temporalFailure as any, defaultPayloadConverter); + delete failure.details; + t.true(err instanceof Error); + t.is(err.message, ''); + t.is(res.headers.get('nexus-operation-state'), 'failed'); + t.deepEqual( + cleanStackTrace(err.stack!), + `OperationError: deliberate error + at op (test/src/test-nexus-handler.ts) + at ServiceRegistry.start (nexus-rpc/src/handler.ts)` + ); + } + { + const res = await fetch(`http://127.0.0.1:${httpPort}/nexus/endpoints/${endpointId}/services/testService/op`, { + method: 'POST', + body: 'invalid', + headers: { + 'Content-Type': 'application/json', + }, + }); + t.is(res.status, 400); + const { message } = (await res.json()) as { message: string }; + // Exact error message varies between Node versions. + t.regex(message, /Failed to deserialize input: SyntaxError: Unexpected token .* JSON/); + } + }); +}); + +test('cancel operation handler errors', async (t) => { + const { env, taskQueue, httpPort, endpointId } = t.context; + + const w = await Worker.create({ + connection: env.nativeConnection, + namespace: env.namespace, + taskQueue, + nexusServices: [ + nexus.serviceHandler( + nexus.service('testService', { + op: nexus.operation(), + }), + { + op: { + async start() { + throw new Error('not implemented'); + }, + async cancel(ctx, _token) { + switch (ctx.headers.outcome) { + case 'NonRetryableApplicationFailure': + throw ApplicationFailure.create({ + nonRetryable: true, + message: 'deliberate failure', + details: ['details'], + }); + case 'NonRetryableInternalHandlerError': + throw new nexus.HandlerError({ + type: 'INTERNAL', + message: 'deliberate error', + retryable: false, + }); + } + throw new nexus.HandlerError({ + type: 'BAD_REQUEST', + message: 'invalid outcome requested', + }); + }, + async getInfo() { + throw new Error('not implemented'); + }, + async getResult() { + throw new Error('not implemented'); + }, + }, + } + ), + ], + }); + + await w.runUntil(async () => { + { + const res = await fetch( + `http://127.0.0.1:${httpPort}/nexus/endpoints/${endpointId}/services/testService/op/cancel`, + { + method: 'POST', + headers: { + 'Nexus-Operation-Token': 'token', + Outcome: 'NonRetryableApplicationFailure', + }, + } + ); + t.is(res.status, 500); + const failure = (await res.json()) as any; + const failureType = (root as any).lookupType('temporal.api.failure.v1.Failure'); + const temporalFailure = protoJsonSerializer.fromProto3JSON(failureType, failure.details); + const err = defaultFailureConverter.failureToError(temporalFailure as any, defaultPayloadConverter); + delete failure.details; + + t.deepEqual(failure, { + message: 'deliberate failure', + metadata: { + type: 'temporal.api.failure.v1.Failure', + }, + }); + t.true(err instanceof ApplicationFailure); + t.is(err.message, ''); + t.is( + cleanStackTrace(err.stack!), + `ApplicationFailure: deliberate failure + at Function.create (common/src/failure.ts) + at Object.cancel (test/src/test-nexus-handler.ts) + at ServiceRegistry.cancel (nexus-rpc/src/handler.ts)` + ); + t.deepEqual((err as ApplicationFailure).details, ['details']); + t.is((err as ApplicationFailure).failure?.source, 'TypeScriptSDK'); + } + { + const res = await fetch( + `http://127.0.0.1:${httpPort}/nexus/endpoints/${endpointId}/services/testService/op/cancel`, + { + method: 'POST', + headers: { + 'Nexus-Operation-Token': 'token', + Outcome: 'NonRetryableInternalHandlerError', + }, + } + ); + t.is(res.status, 500); + t.is(res.headers.get('Nexus-Request-Retryable'), 'false'); + const failure = (await res.json()) as any; + const failureType = (root as any).lookupType('temporal.api.failure.v1.Failure'); + const temporalFailure = protoJsonSerializer.fromProto3JSON(failureType, failure.details); + const err = defaultFailureConverter.failureToError(temporalFailure as any, defaultPayloadConverter); + delete failure.details; + t.true(err instanceof Error); + t.is(err.message, ''); + t.deepEqual( + cleanStackTrace(err.stack!), + `HandlerError: deliberate error + at Object.cancel (test/src/test-nexus-handler.ts) + at ServiceRegistry.cancel (nexus-rpc/src/handler.ts)` + ); + } + }); +}); + +test('logger is available in handler context', async (t) => { + const { env, taskQueue, httpPort, endpointId, logEntries } = t.context; + + const w = await Worker.create({ + connection: env.nativeConnection, + namespace: env.namespace, + taskQueue, + nexusServices: [ + nexus.serviceHandler( + nexus.service('testService', { + testSyncOp: nexus.operation(), + }), + { + async testSyncOp(_ctx, input) { + temporalnexus.log.info('handler ran', { input }); + return input; + }, + } + ), + ], + }); + + await w.runUntil(async () => { + const res = await fetch( + `http://127.0.0.1:${httpPort}/nexus/endpoints/${endpointId}/services/testService/testSyncOp`, + { + method: 'POST', + body: JSON.stringify('hello'), + headers: { 'Content-Type': 'application/json' }, + } + ); + t.true(res.ok); + const output = await res.json(); + t.is(output, 'hello'); + }); + + const entries = logEntries.filter(({ meta }) => meta?.sdkComponent === SdkComponent.nexus); + t.is(entries.length, 1); + t.is(entries[0].message, 'handler ran'); + t.deepEqual(entries[0].meta, { + sdkComponent: SdkComponent.nexus, + service: 'testService', + operation: 'testSyncOp', + taskQueue, + input: 'hello', + }); +}); + +test('getClient is available in handler context', async (t) => { + const { env, taskQueue, httpPort, endpointId } = t.context; + + const w = await Worker.create({ + connection: env.nativeConnection, + namespace: env.namespace, + taskQueue, + nexusServices: [ + nexus.serviceHandler( + nexus.service('testService', { + testSyncOp: nexus.operation(), + }), + { + async testSyncOp() { + const systemInfo = await temporalnexus + .getClient() + .connection.workflowService.getSystemInfo({ namespace: 'default' }); + return systemInfo.capabilities?.nexus ?? false; + }, + } + ), + ], + }); + + await w.runUntil(async () => { + const res = await fetch( + `http://127.0.0.1:${httpPort}/nexus/endpoints/${endpointId}/services/testService/testSyncOp`, + { + method: 'POST', + } + ); + t.true(res.ok); + const output = await res.json(); + t.is(output, true); + }); +}); + +test('WorkflowRunOperation attaches callback, link, and request ID', async (t) => { + const { env, taskQueue, httpPort, endpointId } = t.context; + const requestId1 = randomUUID(); + const requestId2 = randomUUID(); + const workflowId = t.title; + + const w = await Worker.create({ + connection: env.nativeConnection, + namespace: env.namespace, + taskQueue, + nexusServices: [ + nexus.serviceHandler( + nexus.service('testService', { + testOp: nexus.operation(), + }), + { + testOp: new temporalnexus.WorkflowRunOperation(async (ctx) => { + return await temporalnexus.startWorkflow(ctx, 'some-workflow', { + workflowId, + // To test attaching multiple callers to the same operation. + workflowIdConflictPolicy: 'USE_EXISTING', + }); + }), + } + ), + ], + }); + + const callbackURL = 'http://not-found'; + const workflowLink = { + namespace: 'default', + workflowId: 'wid', + runId: 'runId', + eventRef: { + eventId: Long.fromNumber(5), + eventType: root.temporal.api.enums.v1.EventType.EVENT_TYPE_NEXUS_OPERATION_SCHEDULED, + }, + }; + const nexusLink = convertWorkflowEventLinkToNexusLink(workflowLink); + + await w.runUntil(async () => { + const backlinks = []; + for (const requestId of [requestId1, requestId2]) { + const endpointUrl = new URL( + `http://127.0.0.1:${httpPort}/nexus/endpoints/${endpointId}/services/testService/testOp` + ); + endpointUrl.searchParams.set('callback', callbackURL); + const res = await fetch(endpointUrl.toString(), { + method: 'POST', + body: JSON.stringify('hello'), + headers: { + 'Content-Type': 'application/json', + 'Nexus-Request-Id': requestId, + 'Nexus-Callback-Token': 'token', + 'Nexus-Link': `<${nexusLink.url}>; type="${nexusLink.type}"`, + }, + }); + t.true(res.ok); + const output = (await res.json()) as { token: string; state: nexus.OperationState }; + t.is(output.state, 'running'); + console.log(res.headers.get('Nexus-Link')); + const m = /<([^>]+)>; type="([^"]+)"/.exec(res.headers.get('Nexus-Link') ?? ''); + t.truthy(m); + const [_, url, type] = m!; + const backlink = convertNexusLinkToWorkflowEventLink({ url: new URL(url), type }); + backlinks.push(backlink); + } + + t.is(backlinks[0].eventRef?.eventType, root.temporal.api.enums.v1.EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED); + t.deepEqual(backlinks[0].eventRef?.eventId, Long.fromNumber(1)); + t.is(backlinks[0].workflowId, workflowId); + + console.log(backlinks[1]); + t.is( + backlinks[1].requestIdRef?.eventType, + root.temporal.api.enums.v1.EventType.EVENT_TYPE_WORKFLOW_EXECUTION_OPTIONS_UPDATED + ); + t.deepEqual(backlinks[1].requestIdRef?.requestId, requestId2); + t.is(backlinks[1].workflowId, workflowId); + }); + + const description = await env.client.workflow.getHandle(workflowId).describe(); + // Ensure that request IDs are propagated. + t.truthy(description.raw.workflowExtendedInfo?.requestIdInfos?.[requestId1]); + t.truthy(description.raw.workflowExtendedInfo?.requestIdInfos?.[requestId2]); + + // Ensure that callbacks are attached. + t.is(description.raw.callbacks?.length, 2); + // Don't bother verifying the second callback. + const callback = description.raw.callbacks?.[0].callback; + t.is(callback?.nexus?.url, callbackURL); + t.deepEqual(callback?.nexus?.header, { token: 'token' }); + t.is(callback?.links?.length, 1); + const actualLink = callback!.links![0]!.workflowEvent; + + t.deepEqual(actualLink?.namespace, workflowLink.namespace); + t.deepEqual(actualLink?.workflowId, workflowLink.workflowId); + t.deepEqual(actualLink?.runId, workflowLink.runId); + t.deepEqual(actualLink?.eventRef?.eventType, workflowLink.eventRef.eventType); + t.deepEqual(actualLink?.eventRef?.eventId, workflowLink.eventRef.eventId); +}); diff --git a/packages/test/src/test-nexus-link-converter.ts b/packages/test/src/test-nexus-link-converter.ts new file mode 100644 index 000000000..6763af1cf --- /dev/null +++ b/packages/test/src/test-nexus-link-converter.ts @@ -0,0 +1,163 @@ +import test from 'ava'; +import Long from 'long'; +import { temporal } from '@temporalio/proto'; +import { + convertWorkflowEventLinkToNexusLink, + convertNexusLinkToWorkflowEventLink, +} from '@temporalio/nexus/lib/link-converter'; + +const { EventType } = temporal.api.enums.v1; +const WORKFLOW_EVENT_TYPE = (temporal.api.common.v1.Link.WorkflowEvent as any).fullName.slice(1); + +function makeEventRef(eventId: number, eventType: keyof typeof EventType) { + return { + eventId: Long.fromNumber(eventId), + eventType: EventType[eventType], + }; +} + +function makeRequestIdRef(requestId: string, eventType: keyof typeof EventType) { + return { + requestId, + eventType: EventType[eventType], + }; +} + +test('convertWorkflowEventLinkToNexusLink and back with eventRef', (t) => { + const we = { + namespace: 'ns', + workflowId: 'wid', + runId: 'rid', + eventRef: makeEventRef(42, 'EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED'), + }; + const nexusLink = convertWorkflowEventLinkToNexusLink(we); + t.is(nexusLink.type, WORKFLOW_EVENT_TYPE); + t.regex(nexusLink.url.toString(), /^temporal:\/\/\/namespaces\/ns\/workflows\/wid\/rid\/history\?/); + + const roundTrip = convertNexusLinkToWorkflowEventLink(nexusLink); + t.deepEqual(roundTrip, we); +}); + +test('convertWorkflowEventLinkToNexusLink and back with requestIdRef', (t) => { + const we = { + namespace: 'ns2', + workflowId: 'wid2', + runId: 'rid2', + requestIdRef: makeRequestIdRef('req-123', 'EVENT_TYPE_WORKFLOW_TASK_COMPLETED'), + }; + const nexusLink = convertWorkflowEventLinkToNexusLink(we); + t.is(nexusLink.type, WORKFLOW_EVENT_TYPE); + t.regex(nexusLink.url.toString(), /^temporal:\/\/\/namespaces\/ns2\/workflows\/wid2\/rid2\/history\?/); + + const roundTrip = convertNexusLinkToWorkflowEventLink(nexusLink); + t.deepEqual(roundTrip, we); +}); + +test('convertNexusLinkToLinkWorkflowEvent with an event type in PascalCase', (t) => { + const nexusLink = { + url: new URL( + 'temporal:///namespaces/ns2/workflows/wid2/rid2/history?referenceType=RequestIdReference&requestID=req-123&eventType=WorkflowTaskCompleted' + ), + type: WORKFLOW_EVENT_TYPE, + }; + + const workflowEventLink = convertNexusLinkToWorkflowEventLink(nexusLink); + t.is(workflowEventLink.requestIdRef?.eventType, EventType.EVENT_TYPE_WORKFLOW_TASK_COMPLETED); +}); + +test('throws on missing required fields', (t) => { + t.throws( + () => + convertWorkflowEventLinkToNexusLink({ + namespace: '', + workflowId: 'wid', + runId: 'rid', + }), + { instanceOf: TypeError } + ); + t.throws( + () => + convertWorkflowEventLinkToNexusLink({ + namespace: 'ns', + workflowId: '', + runId: 'rid', + }), + { instanceOf: TypeError } + ); + t.throws( + () => + convertWorkflowEventLinkToNexusLink({ + namespace: 'ns', + workflowId: 'wid', + runId: '', + }), + { instanceOf: TypeError } + ); +}); + +test('throws on invalid URL scheme', (t) => { + const fakeLink = { + url: new URL('http://example.com'), + type: WORKFLOW_EVENT_TYPE, + }; + t.throws(() => convertNexusLinkToWorkflowEventLink(fakeLink as any), { instanceOf: TypeError }); +}); + +test('throws on invalid URL path', (t) => { + const fakeLink = { + url: new URL('temporal:///badpath'), + type: WORKFLOW_EVENT_TYPE, + }; + t.throws(() => convertNexusLinkToWorkflowEventLink(fakeLink as any), { instanceOf: TypeError }); +}); + +test('throws on unknown reference type', (t) => { + const url = new URL('temporal:///namespaces/ns/workflows/wid/rid/history?referenceType=UnknownType'); + const fakeLink = { + url, + type: WORKFLOW_EVENT_TYPE, + }; + t.throws(() => convertNexusLinkToWorkflowEventLink(fakeLink as any), { instanceOf: TypeError }); +}); + +test('throws on missing eventType in eventRef', (t) => { + const url = new URL('temporal:///namespaces/ns/workflows/wid/rid/history?referenceType=EventReference&eventID=1'); + const fakeLink = { + url, + type: WORKFLOW_EVENT_TYPE, + }; + t.throws(() => convertNexusLinkToWorkflowEventLink(fakeLink as any), { message: /Missing eventType parameter/ }); +}); + +test('throws on unknown eventType in eventRef', (t) => { + const url = new URL( + 'temporal:///namespaces/ns/workflows/wid/rid/history?referenceType=EventReference&eventID=1&eventType=NotAType' + ); + const fakeLink = { + url, + type: WORKFLOW_EVENT_TYPE, + }; + t.throws(() => convertNexusLinkToWorkflowEventLink(fakeLink as any), { message: /Unknown eventType parameter/ }); +}); + +test('throws on missing eventType in requestIdRef', (t) => { + const url = new URL( + 'temporal:///namespaces/ns/workflows/wid/rid/history?referenceType=RequestIdReference&requestID=req' + ); + const fakeLink = { + url, + type: WORKFLOW_EVENT_TYPE, + }; + t.throws(() => convertNexusLinkToWorkflowEventLink(fakeLink as any), { message: /Missing eventType parameter/ }); +}); + +test('throws on unknown eventType in requestIdRef', (t) => { + const url = new URL( + 'temporal:///namespaces/ns/workflows/wid/rid/history?referenceType=RequestIdReference&requestID=req&eventType=NotAType' + ); + const fakeLink = { + url, + type: WORKFLOW_EVENT_TYPE, + }; + t.throws(() => convertNexusLinkToWorkflowEventLink(fakeLink as any), { message: /Unknown eventType parameter/ }); +}); diff --git a/packages/test/src/test-nexus-token-helpers.ts b/packages/test/src/test-nexus-token-helpers.ts new file mode 100644 index 000000000..bb29c51a5 --- /dev/null +++ b/packages/test/src/test-nexus-token-helpers.ts @@ -0,0 +1,43 @@ +import test from 'ava'; +import { + base64URLEncodeNoPadding, + generateWorkflowRunOperationToken, + loadWorkflowRunOperationToken, +} from '@temporalio/nexus/lib/token'; + +test('encode and decode workflow run operation token', (t) => { + const expected = { + t: 1, + ns: 'ns', + wid: 'w', + }; + const token = generateWorkflowRunOperationToken('ns', 'w'); + const decoded = loadWorkflowRunOperationToken(token); + t.deepEqual(decoded, expected); +}); + +test('decode workflow run operation token errors', (t) => { + t.throws(() => loadWorkflowRunOperationToken(''), { message: /invalid workflow run token: token is empty/ }); + + t.throws(() => loadWorkflowRunOperationToken('not-base64!@#$'), { message: /failed to decode token/ }); + + const invalidJSONToken = base64URLEncodeNoPadding('invalid json'); + t.throws(() => loadWorkflowRunOperationToken(invalidJSONToken), { + message: /failed to unmarshal workflow run operation token/, + }); + + const invalidTypeToken = base64URLEncodeNoPadding('{"t":2}'); + t.throws(() => loadWorkflowRunOperationToken(invalidTypeToken), { + message: /invalid workflow token type: 2, expected: 1/, + }); + + const missingWIDToken = base64URLEncodeNoPadding('{"t":1}'); + t.throws(() => loadWorkflowRunOperationToken(missingWIDToken), { + message: /invalid workflow run token: missing workflow ID \(wid\)/, + }); + + const versionedToken = base64URLEncodeNoPadding('{"v":1, "t":1,"wid": "workflow-id"}'); + t.throws(() => loadWorkflowRunOperationToken(versionedToken), { + message: /invalid workflow run token: "v" field should not be present/, + }); +}); diff --git a/packages/test/src/test-nexus-workflow-caller.ts b/packages/test/src/test-nexus-workflow-caller.ts new file mode 100644 index 000000000..1fa98b8f3 --- /dev/null +++ b/packages/test/src/test-nexus-workflow-caller.ts @@ -0,0 +1,196 @@ +import { randomUUID } from 'crypto'; +import * as nexus from 'nexus-rpc'; +import { ApplicationFailure, CancelledFailure, NexusOperationFailure } from '@temporalio/common'; +import { WorkflowFailedError } from '@temporalio/client'; +import * as temporalnexus from '@temporalio/nexus'; +import * as workflow from '@temporalio/workflow'; +import { helpers, makeTestFunction } from './helpers-integration'; + +const service = nexus.service('test', { + syncOp: nexus.operation({ name: 'my-sync-op' }), + asyncOp: nexus.operation(), +}); + +const test = makeTestFunction({ + workflowsPath: __filename, + workflowInterceptorModules: [__filename], +}); + +export async function caller(endpoint: string, op: keyof typeof service.operations, action: string): Promise { + const client = workflow.createNexusClient({ + endpoint, + service, + }); + return await workflow.CancellationScope.cancellable(async () => { + const handle = await client.startOperation(op, action); + if (action === 'waitForCancel') { + workflow.CancellationScope.current().cancel(); + } + return await handle.result(); + }); +} + +export async function handler(action: string): Promise { + if (action === 'failWorkflow') { + throw ApplicationFailure.create({ + nonRetryable: true, + message: 'test asked to fail', + type: 'IntentionalError', + details: ['a detail'], + }); + } + if (action === 'waitForCancel') { + await workflow.CancellationScope.current().cancelRequested; + } + return action; +} + +test('Nexus Operation from a Workflow', async (t) => { + const { createWorker, executeWorkflow, taskQueue } = helpers(t); + const endpoint = t.title.replaceAll(/[\s,]/g, '-') + '-' + randomUUID(); + await t.context.env.connection.operatorService.createNexusEndpoint({ + spec: { + name: endpoint, + target: { + worker: { + namespace: 'default', + taskQueue, + }, + }, + }, + }); + const worker = await createWorker({ + nexusServices: [ + nexus.serviceHandler(service, { + async syncOp(_ctx, action) { + if (action === 'pass') { + return action; + } + if (action === 'throwHandlerError') { + throw new nexus.HandlerError({ + type: 'INTERNAL', + retryable: false, + message: 'test asked to fail', + }); + } + throw new nexus.HandlerError({ + type: 'BAD_REQUEST', + message: 'invalid action', + }); + }, + asyncOp: new temporalnexus.WorkflowRunOperation(async (ctx, action) => { + if (action === 'throwOperationError') { + throw new nexus.OperationError({ + state: 'failed', + message: 'some message', + }); + } + if (action === 'throwApplicationFailure') { + throw ApplicationFailure.create({ + nonRetryable: true, + message: 'test asked to fail', + type: 'IntentionalError', + details: ['a detail'], + }); + } + return await temporalnexus.startWorkflow(ctx, handler, { + workflowId: randomUUID(), + args: [action], + }); + }), + }), + ], + }); + await worker.runUntil(async () => { + let res = await executeWorkflow(caller, { + args: [endpoint, 'syncOp', 'pass'], + }); + t.is(res, 'pass'); + let err = await t.throwsAsync( + () => + executeWorkflow(caller, { + args: [endpoint, 'syncOp', 'throwHandlerError'], + }), + { + instanceOf: WorkflowFailedError, + } + ); + t.true( + err instanceof WorkflowFailedError && + err.cause instanceof NexusOperationFailure && + err.cause.cause instanceof nexus.HandlerError && + err.cause.cause.type === 'INTERNAL' + ); + + res = await executeWorkflow(caller, { + args: [endpoint, 'asyncOp', 'pass'], + }); + t.is(res, 'pass'); + err = await t.throwsAsync( + () => + executeWorkflow(caller, { + args: [endpoint, 'asyncOp', 'waitForCancel'], + }), + { + instanceOf: WorkflowFailedError, + } + ); + t.true( + err instanceof WorkflowFailedError && + err.cause instanceof NexusOperationFailure && + err.cause.cause instanceof CancelledFailure + ); + + err = await t.throwsAsync( + () => + executeWorkflow(caller, { + args: [endpoint, 'asyncOp', 'throwOperationError'], + }), + { + instanceOf: WorkflowFailedError, + } + ); + t.true( + err instanceof WorkflowFailedError && + err.cause instanceof NexusOperationFailure && + err.cause.cause instanceof ApplicationFailure + ); + + err = await t.throwsAsync( + () => + executeWorkflow(caller, { + args: [endpoint, 'asyncOp', 'throwApplicationFailure'], + }), + { + instanceOf: WorkflowFailedError, + } + ); + t.true( + err instanceof WorkflowFailedError && + err.cause instanceof NexusOperationFailure && + err.cause.cause instanceof nexus.HandlerError && + err.cause.cause.cause instanceof ApplicationFailure && + err.cause.cause.cause.message === 'test asked to fail' && + err.cause.cause.cause.details?.length === 1 && + err.cause.cause.cause.details[0] === 'a detail' + ); + + err = await t.throwsAsync( + () => + executeWorkflow(caller, { + args: [endpoint, 'asyncOp', 'failWorkflow'], + }), + { + instanceOf: WorkflowFailedError, + } + ); + t.true( + err instanceof WorkflowFailedError && + err.cause instanceof NexusOperationFailure && + err.cause.cause instanceof ApplicationFailure && + err.cause.cause.message === 'test asked to fail' && + err.cause.cause.details?.length === 1 && + err.cause.cause.details[0] === 'a detail' + ); + }); +}); diff --git a/packages/test/src/test-worker-tuner.ts b/packages/test/src/test-worker-tuner.ts index f4390f7ed..bec5f6532 100644 --- a/packages/test/src/test-worker-tuner.ts +++ b/packages/test/src/test-worker-tuner.ts @@ -75,6 +75,10 @@ test('Worker can run with mixed slot suppliers in tuner', async (t) => { type: 'fixed-size', numSlots: 10, }, + nexusTaskSlotSupplier: { + type: 'fixed-size', + numSlots: 10, + }, }, }); const result = await worker.runUntil(executeWorkflow(successString)); @@ -122,6 +126,11 @@ test('Can assume defaults for resource based options', async (t) => { tunerOptions: resourceBasedTunerOptions, maximumSlots: 50, }, + nexusTaskSlotSupplier: { + type: 'resource-based', + tunerOptions: resourceBasedTunerOptions, + maximumSlots: 50, + }, }, }); await worker2.runUntil(Promise.resolve()); @@ -154,6 +163,10 @@ test('Cannot construct worker tuner with multiple different tuner options', asyn type: 'fixed-size', numSlots: 10, }, + nexusTaskSlotSupplier: { + type: 'fixed-size', + numSlots: 10, + }, }, }) ); @@ -225,6 +238,7 @@ test('Custom slot supplier works', async (t) => { workflowTaskSlotSupplier: slotSupplier, activityTaskSlotSupplier: slotSupplier, localActivityTaskSlotSupplier: slotSupplier, + nexusTaskSlotSupplier: slotSupplier, }, }); const result = await worker.runUntil(executeWorkflow(doesActivity)); @@ -278,6 +292,7 @@ test('Custom slot supplier sees aborts', async (t) => { workflowTaskSlotSupplier: slotSupplier, activityTaskSlotSupplier: slotSupplier, localActivityTaskSlotSupplier: slotSupplier, + nexusTaskSlotSupplier: slotSupplier, }, }); const runprom = worker.run(); @@ -324,6 +339,7 @@ test('Throwing slot supplier avoids blowing everything up', async (t) => { workflowTaskSlotSupplier: slotSupplier, activityTaskSlotSupplier: slotSupplier, localActivityTaskSlotSupplier: slotSupplier, + nexusTaskSlotSupplier: slotSupplier, }, }); const result = await worker.runUntil(executeWorkflow(successString)); @@ -357,6 +373,7 @@ test('Undefined slot supplier avoids blowing everything up', async (t) => { workflowTaskSlotSupplier: slotSupplier, activityTaskSlotSupplier: slotSupplier, localActivityTaskSlotSupplier: slotSupplier, + nexusTaskSlotSupplier: slotSupplier, }, }); const result = await worker.runUntil(executeWorkflow(successString)); diff --git a/packages/worker/package.json b/packages/worker/package.json index 3be7602bc..880ddd924 100644 --- a/packages/worker/package.json +++ b/packages/worker/package.json @@ -19,11 +19,15 @@ "@temporalio/client": "file:../client", "@temporalio/common": "file:../common", "@temporalio/core-bridge": "file:../core-bridge", + "@temporalio/nexus": "file:../nexus", "@temporalio/proto": "file:../proto", "@temporalio/workflow": "file:../workflow", "abort-controller": "^3.0.0", "heap-js": "^2.3.0", + "long": "^5.2.3", "memfs": "^4.6.0", + "nexus-rpc": "github:nexus-rpc/sdk-typescript#pull/6/head", + "proto3-json-serializer": "^2.0.0", "protobufjs": "^7.2.5", "rxjs": "^7.8.1", "source-map": "^0.7.4", diff --git a/packages/worker/src/nexus/index.ts b/packages/worker/src/nexus/index.ts new file mode 100644 index 000000000..3c2c11917 --- /dev/null +++ b/packages/worker/src/nexus/index.ts @@ -0,0 +1,475 @@ +import * as nexus from 'nexus-rpc'; +import { status } from '@grpc/grpc-js'; +import * as protobuf from 'protobufjs'; +import * as protoJsonSerializer from 'proto3-json-serializer'; + +import { + ApplicationFailure, + CancelledFailure, + IllegalStateError, + LoadedDataConverter, + Payload, + PayloadConverter, + SdkComponent, + LoggerWithComposedMetadata, +} from '@temporalio/common'; +import { temporal, coresdk } from '@temporalio/proto'; +import { asyncLocalStorage } from '@temporalio/nexus/lib/context'; +import { + encodeToPayload, + encodeErrorToFailure, + decodeOptionalSingle, +} from '@temporalio/common/lib/internal-non-workflow'; +import { fixBuffers } from '@temporalio/common/lib/proto-utils'; +import { isAbortError } from '@temporalio/common/lib/type-helpers'; +import { Client, isGrpcServiceError, ServiceError } from '@temporalio/client'; +import { Logger } from '../logger'; + +const UNINITIALIZED = Symbol(); +// fullName isn't part of the generated typed unfortunately. +const TEMPORAL_FAILURE_METADATA = { type: (temporal.api.failure.v1.Failure as any).fullName.slice(1) }; + +async function errorToNexusFailure( + dataConverter: LoadedDataConverter, + err: unknown +): Promise { + const failure = await encodeErrorToFailure(dataConverter, err); + const { message } = failure; + delete failure.message; + // TODO: there must be a more graceful way of passing this object to this function. + const pbj = protoJsonSerializer.toProto3JSON( + temporal.api.failure.v1.Failure.fromObject(failure) as any as protobuf.Message + ); + return { + message, + metadata: TEMPORAL_FAILURE_METADATA, + details: Buffer.from(JSON.stringify(fixBuffers(pbj))), + }; +} + +export async function handlerErrorToProto( + dataConverter: LoadedDataConverter, + err: nexus.HandlerError +): Promise { + let retryBehavior: temporal.api.enums.v1.NexusHandlerErrorRetryBehavior = + temporal.api.enums.v1.NexusHandlerErrorRetryBehavior.NEXUS_HANDLER_ERROR_RETRY_BEHAVIOR_UNSPECIFIED; + if (err.retryable === true) { + retryBehavior = temporal.api.enums.v1.NexusHandlerErrorRetryBehavior.NEXUS_HANDLER_ERROR_RETRY_BEHAVIOR_RETRYABLE; + } else if (err.retryable === false) { + retryBehavior = + temporal.api.enums.v1.NexusHandlerErrorRetryBehavior.NEXUS_HANDLER_ERROR_RETRY_BEHAVIOR_NON_RETRYABLE; + } + let { cause } = err; + if (cause == null) { + // TODO: this messes up the call stack and creates unnecessary nesting. + // + // Create an error without capturing a stack trace. + const wrapped = Object.create(ApplicationFailure.prototype); + wrapped.message = err.message; + wrapped.stack = err.stack; + cause = wrapped; + } + return { + errorType: err.type, + failure: await errorToNexusFailure(dataConverter, cause), + retryBehavior, + }; +} + +export class NexusHandler { + constructor( + public readonly taskToken: Uint8Array, + public readonly namespace: string, + public readonly taskQueue: string, + public readonly context: nexus.OperationContext, + public readonly client: Client, + public readonly abortController: AbortController, + public readonly serviceRegistry: nexus.ServiceRegistry, + public readonly dataConverter: LoadedDataConverter, + /** + * Logger bound to `sdkComponent: worker`, with metadata from this Nexus task. + * This is the logger to use for all log messages emitted by the Nexus + * worker. Note this is not exactly the same thing as the Nexus context + * logger, which is bound to `sdkComponent: nexus`. + */ + private readonly workerLogger: Logger + ) { + this.workerLogger = LoggerWithComposedMetadata.compose(workerLogger, () => nexusLogAttributes(this.context)); + } + + protected async operationErrorToProto( + err: nexus.OperationError + ): Promise { + let { cause } = err; + if (cause == null) { + // Create an error without capturing a stack trace. + const wrapped = Object.create(ApplicationFailure.prototype); + wrapped.message = err.message; + wrapped.stack = err.stack; + wrapped.nonRetryable = true; + cause = wrapped; + } + return { + operationState: err.state, + failure: await errorToNexusFailure(this.dataConverter, cause), + }; + } + + protected async startOperation( + ctx: nexus.StartOperationContext, + payload: Payload | undefined + ): Promise { + try { + let decoded: Payload | undefined | null; + try { + decoded = await decodeOptionalSingle(this.dataConverter.payloadCodecs, payload); + } catch (err) { + throw new nexus.HandlerError({ + type: 'BAD_REQUEST', + message: `Failed to decode payload: ${err}`, + }); + } + // Nexus headers have string values and Temporal Payloads have binary values. Instead of converting Payload + // instances into Content instances, we embed the Payload in the serializer and pretend we are deserializing an + // empty Content. + const input = new nexus.LazyValue( + new PayloadSerializer(this.dataConverter.payloadConverter, decoded ?? undefined), + {} + ); + const result = await this.invokeUserCode( + 'startOperation', + this.serviceRegistry.start.bind(this.serviceRegistry, ctx, input) + ); + if (isAsyncResult(result)) { + return { + taskToken: this.taskToken, + completed: { + startOperation: { + asyncSuccess: { + operationToken: result.token, + links: ctx.outboundLinks.map(nexusLinkToProtoLink), + }, + }, + }, + }; + } else { + return { + taskToken: this.taskToken, + completed: { + startOperation: { + syncSuccess: { + payload: await encodeToPayload(this.dataConverter, result.value), + links: ctx.outboundLinks.map(nexusLinkToProtoLink), + }, + }, + }, + }; + } + } catch (err) { + if (err instanceof nexus.OperationError) { + return { + taskToken: this.taskToken, + completed: { + startOperation: { + operationError: await this.operationErrorToProto(err), + }, + }, + }; + } + let handlerErr: nexus.HandlerError; + if (err instanceof nexus.HandlerError) { + handlerErr = err; + } else { + handlerErr = convertKnownErrors(err); + } + + return { + taskToken: this.taskToken, + error: await handlerErrorToProto(this.dataConverter, handlerErr), + }; + } + } + + protected async cancelOperation( + ctx: nexus.CancelOperationContext, + token: string + ): Promise { + try { + await this.invokeUserCode('cancelOperation', this.serviceRegistry.cancel.bind(this.serviceRegistry, ctx, token)); + return { + taskToken: this.taskToken, + completed: { + cancelOperation: {}, + }, + }; + } catch (err) { + let handlerErr: nexus.HandlerError; + if (err instanceof nexus.HandlerError) { + handlerErr = err; + } else { + handlerErr = convertKnownErrors(err); + } + + return { + taskToken: this.taskToken, + error: await handlerErrorToProto(this.dataConverter, handlerErr), + }; + } + } + + protected async invokeUserCode(method: string, fn: () => Promise): Promise { + let error: any = UNINITIALIZED; // In case someone decides to throw undefined... + const startTime = process.hrtime.bigint(); + this.workerLogger.debug('Nexus handler started', { method }); + try { + return await fn(); + } catch (err: any) { + error = err; + throw err; + } finally { + const durationNanos = process.hrtime.bigint() - startTime; + const durationMs = Number(durationNanos / 1_000_000n); + + if (error === UNINITIALIZED) { + this.workerLogger.debug('Nexus handler invocation completed', { method, durationMs }); + } else if ((error instanceof CancelledFailure || isAbortError(error)) && this.abortController.signal.aborted) { + this.workerLogger.debug('Nexus handler invocation completed as cancelled', { method, durationMs }); + } else { + this.workerLogger.warn('Nexus handler invocation failed', { method, error, durationMs }); + } + } + } + + /** + * Actually executes the operation. + * + * Any call up to this function and including this one will be trimmed out of stack traces. + */ + protected async execute( + task: temporal.api.workflowservice.v1.IPollNexusTaskQueueResponse + ): Promise { + if (task.request?.startOperation != null) { + const variant = task.request?.startOperation; + return await this.startOperation( + { + ...this.context, + requestId: variant.requestId ?? undefined, + inboundLinks: (variant.links ?? []).map(protoLinkToNexusLink), + callbackURL: variant.callback ?? undefined, + callbackHeaders: variant.callbackHeader ?? undefined, + outboundLinks: [], + }, + variant.payload ?? undefined + ); + } else if (task.request?.cancelOperation != null) { + const variant = task.request?.cancelOperation; + if (variant.operationToken == null) { + throw new nexus.HandlerError({ + type: 'BAD_REQUEST', + message: 'Request missing operation token', + }); + } + return await this.cancelOperation( + { + ...this.context, + }, + variant.operationToken + ); + } else { + throw new nexus.HandlerError({ + type: 'NOT_IMPLEMENTED', + message: 'Request method not implemented', + }); + } + } + + public async run( + task: temporal.api.workflowservice.v1.IPollNexusTaskQueueResponse + ): Promise { + let execute = this.execute.bind(this, task); + // Ensure that client calls made with the worker's client in this handler's context are tied to the abort signal. + // TODO: Actually support canceling requests backed by NativeConnection. Once it does, this functionality should be + // tested. + // TS can't infer this and typing out the types is redundant and hard to read. + execute = this.client.withAbortSignal.bind(this.client, this.abortController.signal, execute) as any; + return await asyncLocalStorage.run( + { + client: this.client, + namespace: this.namespace, + taskQueue: this.taskQueue, + log: LoggerWithComposedMetadata.compose(this.workerLogger, { sdkComponent: SdkComponent.nexus }), + }, + execute + ); + } +} + +export function constructNexusOperationContext( + request: temporal.api.nexus.v1.IRequest | null | undefined, + abortSignal: AbortSignal +): nexus.OperationContext { + const base = { + abortSignal, + headers: headersProxy(request?.header), + }; + + if (request?.startOperation != null) { + const op = request.startOperation; + if (op?.service == null) { + throw new IllegalStateError('expected request service to not be empty'); + } + if (op?.operation == null) { + throw new IllegalStateError('expected request service to not be empty'); + } + return { ...base, service: op.service, operation: op.operation }; + } + if (request?.cancelOperation != null) { + const op = request.cancelOperation; + if (op?.service == null) { + throw new IllegalStateError('expected request service to not be empty'); + } + if (op?.operation == null) { + throw new IllegalStateError('expected request service to not be empty'); + } + return { ...base, service: op.service, operation: op.operation }; + } + throw new nexus.HandlerError({ + type: 'NOT_IMPLEMENTED', + message: 'Request method not implemented', + }); +} + +export function nexusLogAttributes(ctx: nexus.OperationContext): Record { + return { + service: ctx.service, + operation: ctx.operation, + }; +} + +function isAsyncResult( + result: nexus.HandlerStartOperationResult +): result is nexus.HandlerStartOperationResultAsync { + return Object.hasOwnProperty.call(result, 'token') && typeof (result as any).token === 'string'; +} + +function convertKnownErrors(err: unknown): nexus.HandlerError { + if (err instanceof ApplicationFailure && err.nonRetryable) { + return new nexus.HandlerError({ + type: 'INTERNAL', + cause: err, + retryable: false, + }); + } + + if (err instanceof ServiceError) { + if (isGrpcServiceError(err.cause)) { + switch (err.cause.code) { + case status.INVALID_ARGUMENT: + return new nexus.HandlerError({ type: 'BAD_REQUEST', cause: err }); + case (status.ALREADY_EXISTS, status.FAILED_PRECONDITION, status.OUT_OF_RANGE): + return new nexus.HandlerError({ type: 'INTERNAL', cause: err, retryable: false }); + case (status.ABORTED, status.UNAVAILABLE): + return new nexus.HandlerError({ type: 'UNAVAILABLE', cause: err }); + case (status.CANCELLED, + status.DATA_LOSS, + status.INTERNAL, + status.UNKNOWN, + status.UNAUTHENTICATED, + status.PERMISSION_DENIED): + // Note that UNAUTHENTICATED and PERMISSION_DENIED have Nexus error types but we convert to internal because + // this is not a client auth error and happens when the handler fails to auth with Temporal and should be + // considered retryable. + return new nexus.HandlerError({ type: 'INTERNAL', cause: err }); + case status.NOT_FOUND: + return new nexus.HandlerError({ type: 'NOT_FOUND', cause: err }); + case status.RESOURCE_EXHAUSTED: + return new nexus.HandlerError({ type: 'RESOURCE_EXHAUSTED', cause: err }); + case status.UNIMPLEMENTED: + return new nexus.HandlerError({ type: 'NOT_IMPLEMENTED', cause: err }); + case status.DEADLINE_EXCEEDED: + return new nexus.HandlerError({ type: 'UPSTREAM_TIMEOUT', cause: err }); + } + } + } + + return new nexus.HandlerError({ cause: err, type: 'INTERNAL' }); +} + +function headersProxy(initializer?: Record | null): Record { + const headers: Record = initializer + ? Object.fromEntries(Object.entries(initializer).map(([k, v]) => [k.toLowerCase(), v])) + : {}; + return new Proxy(headers, { + get(target, p) { + if (typeof p !== 'string') { + throw new TypeError('header keys must be strings'); + } + return target[p.toLowerCase()]; + }, + set(target, p, newValue) { + if (typeof p !== 'string') { + throw new TypeError('header keys must be strings'); + } + if (typeof newValue !== 'string') { + throw new TypeError('header values must be strings'); + } + target[p.toLowerCase()] = newValue; + return true; + }, + }); +} + +function protoLinkToNexusLink(plink: temporal.api.nexus.v1.ILink): nexus.Link { + if (!plink.url) { + throw new nexus.HandlerError({ + type: 'BAD_REQUEST', + message: 'empty link URL', + }); + } + if (!plink.type) { + throw new nexus.HandlerError({ + type: 'BAD_REQUEST', + message: 'empty link type', + }); + } + return { + url: new URL(plink.url), + type: plink.type, + }; +} + +function nexusLinkToProtoLink(nlink: nexus.Link): temporal.api.nexus.v1.ILink { + return { + url: nlink.url.toString(), + type: nlink.type, + }; +} + +/** + * An adapter from a Temporal PayloadConverer and a Nexus Serializer. + */ +class PayloadSerializer implements nexus.Serializer { + constructor( + readonly payloadConverter: PayloadConverter, + readonly payload?: Payload + ) {} + + deserialize(): T { + if (this.payload == null) { + return undefined as T; + } + try { + return this.payloadConverter.fromPayload(this.payload); + } catch (err) { + throw new nexus.HandlerError({ + type: 'BAD_REQUEST', + message: `Failed to deserialize input: ${err}`, + }); + } + } + + /** Not used in this path */ + serialize(): nexus.Content { + throw new Error('not implemented'); + } +} diff --git a/packages/worker/src/worker-options.ts b/packages/worker/src/worker-options.ts index 6b27f0be0..6bd95c9d9 100644 --- a/packages/worker/src/worker-options.ts +++ b/packages/worker/src/worker-options.ts @@ -1,6 +1,7 @@ import * as os from 'node:os'; import * as v8 from 'node:v8'; import type { Configuration as WebpackConfiguration } from 'webpack'; +import * as nexus from 'nexus-rpc'; import { ActivityFunction, DataConverter, @@ -109,6 +110,11 @@ export interface WorkerOptions { */ activities?: object; + /** + * An array of nexus services + */ + nexusServices?: nexus.ServiceHandler[]; + /** * Path to look up workflows in, any function exported in this path will be registered as a Workflows in this Worker. * @@ -197,6 +203,16 @@ export interface WorkerOptions { */ maxConcurrentLocalActivityExecutions?: number; + /** + * Maximum number of Nexus tasks to execute concurrently. + * Adjust this to improve Worker resource consumption. + * + * Mutually exclusive with the {@link tuner} option. + * + * @default 100 if no {@link tuner} is set + */ + maxConcurrentNexusTaskExecutions?: number; + /** * Whether or not to poll on the Activity task queue. * @@ -313,6 +329,13 @@ export interface WorkerOptions { */ activityTaskPollerBehavior?: PollerBehavior; + /** + * Specify the behavior of Nexus task polling. + * + * @default A fixed maximum whose value is min(10, maxConcurrentNexusTaskExecutions). + */ + nexusTaskPollerBehavior?: PollerBehavior; + /** * Maximum number of Activity tasks to poll concurrently. * @@ -324,6 +347,17 @@ export interface WorkerOptions { */ maxConcurrentActivityTaskPolls?: number; + /** + * Maximum number of Nexus tasks to poll concurrently. + * + * Increase this setting if your Worker is failing to fill in all of its + * `maxConcurrentNexusTaskExecutions` slots despite a low match rate of Nexus + * Tasks in the Task Queue (ie. due to network latency). Can't be higher than + * `maxConcurrentNexusTaskExecutions`. + * @default min(10, maxConcurrentNexusTaskExecutions) + */ + maxConcurrentNexusTaskPolls?: number; + /** * How long a workflow task is allowed to sit on the sticky queue before it is timed out * and moved to the non-sticky queue where it may be picked up by any worker. @@ -566,8 +600,9 @@ export interface PollerBehaviorSimpleMaximum { type: 'simple-maximum'; /** * The maximum poller number, assumes the same default as described in - * {@link WorkerOptions.maxConcurrentWorkflowTaskPolls} or - * {@link WorkerOptions.maxConcurrentActivityTaskPolls}. + * {@link WorkerOptions.maxConcurrentWorkflowTaskPolls}, + * {@link WorkerOptions.maxConcurrentActivityTaskPolls}, or + * {@link WorkerOptions.maxConcurrentNexusTaskPolls} . */ maximum?: number; } @@ -608,14 +643,18 @@ export interface ReplayWorkerOptions | 'namespace' | 'taskQueue' | 'activities' + | 'nexusServices' | 'tuner' | 'maxConcurrentActivityTaskExecutions' | 'maxConcurrentLocalActivityExecutions' | 'maxConcurrentWorkflowTaskExecutions' + | 'maxConcurrentNexusTaskExecutions' | 'maxConcurrentActivityTaskPolls' | 'maxConcurrentWorkflowTaskPolls' + | 'maxConcurrentNexusTaskPolls' | 'workflowTaskPollerBehavior' | 'activityTaskPollerBehavior' + | 'nexusTaskPollerBehavior' | 'nonStickyToStickyPollRatio' | 'maxHeartbeatThrottleInterval' | 'defaultHeartbeatThrottleInterval' @@ -778,6 +817,7 @@ export type WorkerOptionsWithDefaults = WorkerOptions & workflowTaskPollerBehavior: Required; activityTaskPollerBehavior: Required; + nexusTaskPollerBehavior: Required; }; /** @@ -785,7 +825,7 @@ export type WorkerOptionsWithDefaults = WorkerOptions & * formatted strings to numbers. */ export interface CompiledWorkerOptions - extends Omit { + extends Omit { interceptors: CompiledWorkerInterceptors; shutdownGraceTimeMs: number; shutdownForceTimeMs?: number; @@ -795,6 +835,7 @@ export interface CompiledWorkerOptions defaultHeartbeatThrottleIntervalMs: number; loadedDataConverter: LoadedDataConverter; activities: Map; + nexusServiceRegistry?: nexus.ServiceRegistry; tuner: native.WorkerTunerOptions; } @@ -818,9 +859,11 @@ function addDefaultWorkerOptions( interceptors, maxConcurrentActivityTaskExecutions, maxConcurrentLocalActivityExecutions, + maxConcurrentNexusTaskExecutions, maxConcurrentWorkflowTaskExecutions, workflowTaskPollerBehavior, activityTaskPollerBehavior, + nexusTaskPollerBehavior, ...rest } = options; const debugMode = options.debugMode || isSet(process.env.TEMPORAL_DEBUG); @@ -839,6 +882,7 @@ function addDefaultWorkerOptions( // Difficult to predict appropriate poll numbers for resource based slots let maxWFTPolls = 10; let maxATPolls = 10; + let maxNexusTaskPolls = 10; let setTuner: WorkerTuner; if (rest.tuner !== undefined) { if (maxConcurrentActivityTaskExecutions !== undefined) { @@ -853,10 +897,12 @@ function addDefaultWorkerOptions( setTuner = rest.tuner; } else { const maxWft = maxConcurrentWorkflowTaskExecutions ?? 40; - maxWFTPolls = Math.min(10, maxWft); + maxWFTPolls = Math.min(maxWFTPolls, maxWft); const maxAT = maxConcurrentActivityTaskExecutions ?? 100; - maxATPolls = Math.min(10, maxAT); + maxATPolls = Math.min(maxATPolls, maxAT); const maxLAT = maxConcurrentLocalActivityExecutions ?? 100; + const maxNexusTasks = maxConcurrentNexusTaskExecutions ?? 100; + maxNexusTaskPolls = Math.min(maxNexusTaskPolls, maxNexusTasks); setTuner = { workflowTaskSlotSupplier: { type: 'fixed-size', @@ -870,6 +916,10 @@ function addDefaultWorkerOptions( type: 'fixed-size', numSlots: maxLAT, }, + nexusTaskSlotSupplier: { + type: 'fixed-size', + numSlots: maxNexusTasks, + }, }; } @@ -887,6 +937,7 @@ function addDefaultWorkerOptions( const wftPollerBehavior = createPollerBehavior(maxWFTPolls, workflowTaskPollerBehavior); const atPollerBehavior = createPollerBehavior(maxATPolls, activityTaskPollerBehavior); + const nexusPollerBehavior = createPollerBehavior(maxNexusTaskPolls, nexusTaskPollerBehavior); return { namespace: namespace ?? 'default', @@ -897,6 +948,7 @@ function addDefaultWorkerOptions( enableNonLocalActivities: true, workflowTaskPollerBehavior: wftPollerBehavior, activityTaskPollerBehavior: atPollerBehavior, + nexusTaskPollerBehavior: nexusPollerBehavior, stickyQueueScheduleToStartTimeout: '10s', maxHeartbeatThrottleInterval: '60s', defaultHeartbeatThrottleInterval: '30s', @@ -966,11 +1018,19 @@ export function compileWorkerOptions( defaultHeartbeatThrottleIntervalMs: msToNumber(opts.defaultHeartbeatThrottleInterval), loadedDataConverter: loadDataConverter(opts.dataConverter), activities, + nexusServiceRegistry: nexusServiceRegistryFromOptions(opts), enableNonLocalActivities: opts.enableNonLocalActivities && activities.size > 0, tuner, }; } +function nexusServiceRegistryFromOptions(opts: WorkerOptions): nexus.ServiceRegistry | undefined { + if (opts.nexusServices == null || opts.nexusServices.length === 0) { + return undefined; + } + return new nexus.ServiceRegistry(opts.nexusServices); +} + export function toNativeWorkerOptions(opts: CompiledWorkerOptionsWithBuildId): native.WorkerOptions { return { identity: opts.identity, @@ -983,6 +1043,7 @@ export function toNativeWorkerOptions(opts: CompiledWorkerOptionsWithBuildId): n nonStickyToStickyPollRatio: opts.nonStickyToStickyPollRatio, workflowTaskPollerBehavior: toNativeTaskPollerBehavior(opts.workflowTaskPollerBehavior), activityTaskPollerBehavior: toNativeTaskPollerBehavior(opts.activityTaskPollerBehavior), + nexusTaskPollerBehavior: toNativeTaskPollerBehavior(opts.nexusTaskPollerBehavior), enableNonLocalActivities: opts.enableNonLocalActivities, stickyQueueScheduleToStartTimeout: msToNumber(opts.stickyQueueScheduleToStartTimeout), maxCachedWorkflows: opts.maxCachedWorkflows, diff --git a/packages/worker/src/worker-tuner.ts b/packages/worker/src/worker-tuner.ts index 7b8c5a5a7..ecbaf9877 100644 --- a/packages/worker/src/worker-tuner.ts +++ b/packages/worker/src/worker-tuner.ts @@ -20,6 +20,7 @@ export interface TunerHolder { workflowTaskSlotSupplier: SlotSupplier; activityTaskSlotSupplier: SlotSupplier; localActivityTaskSlotSupplier: SlotSupplier; + nexusTaskSlotSupplier: SlotSupplier; } /** @@ -60,6 +61,11 @@ export interface ResourceBasedTuner { * slots with 50ms ramp throttle */ localActivityTaskSlotOptions?: ResourceBasedSlotOptions; + /** + * Options for Nexus task slots. Defaults to a minimum of 1 slots and a maximum of 2000 + * slots with 50ms ramp throttle + */ + nexusTaskSlotOptions?: ResourceBasedSlotOptions; } /** @@ -184,7 +190,7 @@ export interface CustomSlotSupplier { releaseSlot(ctx: SlotReleaseContext): void; } -export type SlotInfo = WorkflowSlotInfo | ActivitySlotInfo | LocalActivitySlotInfo; +export type SlotInfo = WorkflowSlotInfo | ActivitySlotInfo | LocalActivitySlotInfo | NexusSlotInfo; export interface WorkflowSlotInfo { type: 'workflow'; @@ -202,6 +208,12 @@ export interface LocalActivitySlotInfo { activityType: string; } +export interface NexusSlotInfo { + type: 'nexus'; + service: string; + operation: string; +} + /** * A permit to use a slot. * @@ -294,6 +306,7 @@ export function asNativeTuner(tuner: WorkerTuner, logger: Logger): native.Worker workflowTaskSlotSupplier: nativeifySupplier(tuner.workflowTaskSlotSupplier, 'workflow', logger), activityTaskSlotSupplier: nativeifySupplier(tuner.activityTaskSlotSupplier, 'activity', logger), localActivityTaskSlotSupplier: nativeifySupplier(tuner.localActivityTaskSlotSupplier, 'activity', logger), + nexusTaskSlotSupplier: nativeifySupplier(tuner.nexusTaskSlotSupplier, 'nexus', logger), }; for (const supplier of [ retme.workflowTaskSlotSupplier, @@ -318,6 +331,7 @@ export function asNativeTuner(tuner: WorkerTuner, logger: Logger): native.Worker const wftSO = addResourceBasedSlotDefaults(tuner.workflowTaskSlotOptions ?? {}, 'workflow'); const atSO = addResourceBasedSlotDefaults(tuner.activityTaskSlotOptions ?? {}, 'activity'); const latSO = addResourceBasedSlotDefaults(tuner.localActivityTaskSlotOptions ?? {}, 'activity'); + const nexusSO = addResourceBasedSlotDefaults(tuner.nexusTaskSlotOptions ?? {}, 'nexus'); return { workflowTaskSlotSupplier: { type: 'resource-based', @@ -337,6 +351,12 @@ export function asNativeTuner(tuner: WorkerTuner, logger: Logger): native.Worker ...latSO, rampThrottle: msToNumber(latSO.rampThrottle), }, + nexusTaskSlotSupplier: { + type: 'resource-based', + tunerOptions: tuner.tunerOptions, + ...nexusSO, + rampThrottle: msToNumber(nexusSO.rampThrottle), + }, }; } else { throw new TypeError('Invalid worker tuner configuration'); @@ -352,13 +372,13 @@ const isResourceBased = (sup: SlotSupplier | native.SlotSupplierOptions): s const isCustom = (sup: SlotSupplier | native.SlotSupplierOptions): sup is CustomSlotSupplier => sup.type === 'custom'; -type ActOrWorkflow = 'activity' | 'workflow'; +type ResourceKind = 'activity' | 'workflow' | 'nexus'; //////////////////////////////////////////////////////////////////////////////////////////////////// function nativeifySupplier( supplier: SlotSupplier, - kind: ActOrWorkflow, + kind: ResourceKind, logger: Logger ): native.SlotSupplierOptions { if (isResourceBased(supplier)) { @@ -388,7 +408,7 @@ function nativeifySupplier( function addResourceBasedSlotDefaults( slotOptions: ResourceBasedSlotOptions, - kind: ActOrWorkflow + kind: ResourceKind ): Required { if (kind === 'workflow') { return { diff --git a/packages/worker/src/worker.ts b/packages/worker/src/worker.ts index 7a1487d72..bef0358dc 100644 --- a/packages/worker/src/worker.ts +++ b/packages/worker/src/worker.ts @@ -19,6 +19,7 @@ import { } from 'rxjs'; import { delay, filter, first, ignoreElements, last, map, mergeMap, takeUntil, takeWhile, tap } from 'rxjs/operators'; import type { RawSourceMap } from 'source-map'; +import * as nexus from 'nexus-rpc'; import { Info as ActivityInfo } from '@temporalio/activity'; import { DataConverter, @@ -32,6 +33,7 @@ import { ensureApplicationFailure, TypedSearchAttributes, decodePriority, + CancelledFailure, MetricMeter, } from '@temporalio/common'; import { @@ -55,6 +57,7 @@ import { LoggerWithComposedMetadata } from '@temporalio/common/lib/logger'; import { errorMessage, NonNullableObject, OmitFirstParam } from '@temporalio/common/lib/type-helpers'; import { workflowLogAttributes } from '@temporalio/workflow/lib/logs'; import { native } from '@temporalio/core-bridge'; +import { Client } from '@temporalio/client'; import { coresdk, temporal } from '@temporalio/proto'; import { type SinkCall, type WorkflowInfo } from '@temporalio/workflow'; import { Activity, CancelReason, activityLogAttributes } from './activity'; @@ -102,6 +105,7 @@ import { ShutdownError, UnexpectedError, } from './errors'; +import { constructNexusOperationContext, handlerErrorToProto, NexusHandler, nexusLogAttributes } from './nexus'; export { DataConverter, defaultPayloadConverter }; @@ -128,6 +132,15 @@ export type ActivityTaskWithBase64Token = { protobufEncodedTask: Buffer; }; +export type NexusTaskWithBase64Token = { + task: coresdk.nexus.NexusTask; + base64TaskToken: string; + + // The unaltered protobuf-encoded NexusTask; kept so that it can be printed + // out for analysis if decoding fails at a later step. + protobufEncodedTask: Buffer; +}; + interface EvictionWithRunID { runId: string; evictJob: coresdk.workflow_activation.IRemoveFromCache; @@ -140,8 +153,10 @@ export interface NativeWorkerLike { flushCoreLogs(): void; pollWorkflowActivation: OmitFirstParam; pollActivityTask: OmitFirstParam; + pollNexusTask: OmitFirstParam; completeWorkflowActivation: OmitFirstParam; completeActivityTask: OmitFirstParam; + completeNexusTask: OmitFirstParam; recordActivityHeartbeat: OmitFirstParam; } @@ -179,6 +194,8 @@ export class NativeWorker implements NativeWorkerLike { public readonly pollActivityTask: OmitFirstParam; public readonly completeWorkflowActivation: OmitFirstParam; public readonly completeActivityTask: OmitFirstParam; + public readonly pollNexusTask: OmitFirstParam; + public readonly completeNexusTask: OmitFirstParam; public readonly recordActivityHeartbeat: OmitFirstParam; public readonly initiateShutdown: OmitFirstParam; @@ -208,8 +225,10 @@ export class NativeWorker implements NativeWorkerLike { ) { this.pollWorkflowActivation = native.workerPollWorkflowActivation.bind(undefined, nativeWorker); this.pollActivityTask = native.workerPollActivityTask.bind(undefined, nativeWorker); + this.pollNexusTask = native.workerPollNexusTask.bind(undefined, nativeWorker); this.completeWorkflowActivation = native.workerCompleteWorkflowActivation.bind(undefined, nativeWorker); this.completeActivityTask = native.workerCompleteActivityTask.bind(undefined, nativeWorker); + this.completeNexusTask = native.workerCompleteNexusTask.bind(undefined, nativeWorker); this.recordActivityHeartbeat = native.workerRecordActivityHeartbeat.bind(undefined, nativeWorker); this.initiateShutdown = native.workerInitiateShutdown.bind(undefined, nativeWorker); } @@ -436,6 +455,7 @@ export class Worker { protected readonly workflowPollerStateSubject = new BehaviorSubject('POLLING'); protected readonly activityPollerStateSubject = new BehaviorSubject('POLLING'); + protected readonly nexusPollerStateSubject = new BehaviorSubject('POLLING'); /** * Whether or not this worker has an outstanding workflow poll request */ @@ -444,15 +464,24 @@ export class Worker { * Whether or not this worker has an outstanding activity poll request */ protected hasOutstandingActivityPoll = false; + /** + * Whether or not this worker has an outstanding nexus poll request + */ + protected hasOutstandingNexusPoll = false; + + private client?: Client; protected readonly numInFlightActivationsSubject = new BehaviorSubject(0); protected readonly numInFlightActivitiesSubject = new BehaviorSubject(0); protected readonly numInFlightNonLocalActivitiesSubject = new BehaviorSubject(0); protected readonly numInFlightLocalActivitiesSubject = new BehaviorSubject(0); + protected readonly numInFlightNexusOperationsSubject = new BehaviorSubject(0); protected readonly numCachedWorkflowsSubject = new BehaviorSubject(0); protected readonly numHeartbeatingActivitiesSubject = new BehaviorSubject(0); protected readonly evictionsEmitter = new EventEmitter(); + private readonly taskTokenToNexusHandler = new Map(); + protected static nativeWorkerCtor: NativeWorkerConstructor = NativeWorker; // Used to add uniqueness to replay worker task queue names protected static replayWorkerCount = 0; @@ -759,6 +788,15 @@ export class Worker { protected readonly isReplayWorker: boolean = false ) { this.workflowCodecRunner = new WorkflowCodecRunner(options.loadedDataConverter.payloadCodecs); + if (connection !== null) { + this.client = new Client({ + namespace: options.namespace, + connection, + identity: options.identity, + dataConverter: options.dataConverter, + // TODO: support interceptors. + }); + } } /** @@ -1111,6 +1149,113 @@ export class Worker { ); } + /** + * Process Nexus tasks + */ + protected nexusOperator(): OperatorFunction { + return pipe( + mergeMap( + async ({ + task, + base64TaskToken, + protobufEncodedTask, + }): Promise => { + const { variant } = task; + if (!variant) { + throw new TypeError('Got a nexus task without a "variant" attribute'); + } + + switch (variant) { + case 'task': { + if (task.task == null) { + throw new IllegalStateError(`Got empty task for task variant with token: ${base64TaskToken}`); + } + return await this.handleNexusRunTask(task.task, base64TaskToken, protobufEncodedTask); + } + case 'cancelTask': { + const nexusHandler = this.taskTokenToNexusHandler.get(base64TaskToken); + if (nexusHandler == null) { + this.logger.trace('Tried to cancel a non-existing nexus handler', { + taskToken: base64TaskToken, + }); + break; + } + // NOTE: nexus handler will not be considered cancelled until it confirms cancellation (by throwing a CancelledFailure) + this.logger.trace('Cancelling nexus handler', nexusLogAttributes(nexusHandler.context)); + let reason = 'unkown'; + if (task.cancelTask?.reason != null) { + reason = coresdk.nexus.NexusTaskCancelReason[task.cancelTask.reason]; + } + nexusHandler.abortController.abort(new CancelledFailure(reason)); + return; + } + } + } + ), + filter((result: T): result is Exclude => result !== undefined), + map((result) => coresdk.nexus.NexusTaskCompletion.encodeDelimited(result).finish()) + ); + } + + private async handleNexusRunTask( + task: temporal.api.workflowservice.v1.IPollNexusTaskQueueResponse, + base64TaskToken: string, + protobufEncodedTask: ArrayBuffer + ) { + let ctx: nexus.OperationContext | undefined = undefined; + if (task.taskToken == null) { + throw new nexus.HandlerError({ + type: 'INTERNAL', + message: 'Task missing request task token', + }); + } + const { taskToken } = task; + try { + const ctrl = new AbortController(); + + ctx = constructNexusOperationContext(task.request, ctrl.signal); + + const nexusHandler = new NexusHandler( + taskToken, + this.options.namespace, + this.options.taskQueue, + ctx, + this.client!, // Must be defined if we are handling Nexus tasks. + ctrl, + this.options.nexusServiceRegistry!, // Must be defined if we are handling Nexus tasks. + this.options.loadedDataConverter, + this.logger + ); + this.taskTokenToNexusHandler.set(base64TaskToken, nexusHandler); + this.numInFlightNexusOperationsSubject.next(this.numInFlightNexusOperationsSubject.value + 1); + try { + return await nexusHandler.run(task); + } finally { + this.numInFlightNexusOperationsSubject.next(this.numInFlightNexusOperationsSubject.value - 1); + this.taskTokenToNexusHandler.delete(base64TaskToken); + } + } catch (e) { + let handlerErr: nexus.HandlerError; + this.logger.error(`Error while processing Nexus task: ${errorMessage(e)}`, { + ...(ctx ? nexusLogAttributes(ctx) : {}), + error: e, + taskEncoded: Buffer.from(protobufEncodedTask).toString('base64'), + }); + if (e instanceof nexus.HandlerError) { + handlerErr = e; + } else { + handlerErr = new nexus.HandlerError({ + type: 'INTERNAL', + cause: e, + }); + } + return { + taskToken, + error: await handlerErrorToProto(this.options.loadedDataConverter, handlerErr), + }; + } + } + /** * Process activations from the same workflow execution to an observable of completions. * @@ -1656,6 +1801,54 @@ export class Worker { ); } + protected nexusPoll$(): Observable { + return this.pollLoop$(async () => { + this.hasOutstandingNexusPoll = true; + let buffer: Buffer; + try { + buffer = await this.nativeWorker.pollNexusTask(); + } finally { + this.hasOutstandingNexusPoll = false; + } + const task = coresdk.nexus.NexusTask.decode(new Uint8Array(buffer)); + const taskToken = task.task?.taskToken || task.cancelTask?.taskToken; + if (taskToken == null) { + throw new TypeError('Got a nexus task without a task token'); + } + const base64TaskToken = formatTaskToken(taskToken); + this.logger.trace('Got nexus task', { + taskToken: base64TaskToken, + ...task, + }); + return { task, base64TaskToken, protobufEncodedTask: buffer }; + }).pipe( + tap({ + complete: () => { + this.nexusPollerStateSubject.next('SHUTDOWN'); + }, + error: () => { + this.nexusPollerStateSubject.next('FAILED'); + }, + }) + ); + } + + protected nexus$(): Observable { + // This Worker did not register any nexus services, return early. + if (this.options.nexusServiceRegistry == null) { + if (!this.isReplayWorker) this.logger.info('No nexus services registered, not polling for nexus tasks'); + this.nexusPollerStateSubject.next('SHUTDOWN'); + return EMPTY; + } + return this.nexusPoll$().pipe( + this.nexusOperator(), + mergeMap(async (completion) => { + await this.nativeWorker.completeNexusTask(Buffer.from(completion.buffer, completion.byteOffset)); + }), + tap({ complete: () => this.logger.debug('Nexus Worker terminated') }) + ); + } + protected takeUntilState(state: State): MonoTypeOperatorFunction { return takeUntil(this.stateSubject.pipe(filter((value) => value === state))); } @@ -1780,7 +1973,7 @@ export class Worker { this.instantTerminateErrorSubject.pipe(this.takeUntilState('DRAINED')), this.forceShutdown$(), this.activityHeartbeat$(), - merge(this.workflow$(), this.activity$()).pipe( + merge(this.workflow$(), this.activity$(), this.nexus$()).pipe( tap({ complete: () => { this.state = 'DRAINED'; diff --git a/packages/worker/tsconfig.json b/packages/worker/tsconfig.json index e286092be..1ca504ae1 100644 --- a/packages/worker/tsconfig.json +++ b/packages/worker/tsconfig.json @@ -10,6 +10,7 @@ { "path": "../core-bridge" }, { "path": "../workflow" }, { "path": "../activity" }, + { "path": "../nexus" }, { "path": "../common" } ], "include": ["./src/**/*.ts"] diff --git a/packages/workflow/package.json b/packages/workflow/package.json index 236baa9c5..c58ebf39d 100644 --- a/packages/workflow/package.json +++ b/packages/workflow/package.json @@ -23,7 +23,8 @@ "scripts": {}, "dependencies": { "@temporalio/common": "file:../common", - "@temporalio/proto": "file:../proto" + "@temporalio/proto": "file:../proto", + "nexus-rpc": "github:nexus-rpc/sdk-typescript#pull/6/head" }, "devDependencies": { "source-map": "^0.7.4" diff --git a/packages/workflow/src/index.ts b/packages/workflow/src/index.ts index d64e35615..c922dfe00 100644 --- a/packages/workflow/src/index.ts +++ b/packages/workflow/src/index.ts @@ -99,6 +99,7 @@ export { RootWorkflowInfo, StackTraceSDKInfo, StackTrace, + StartNexusOperationOptions, UnsafeWorkflowInfo, WorkflowInfo, } from './interfaces'; @@ -120,3 +121,5 @@ export { // eslint-disable-next-line deprecation/deprecation LoggerSinksDeprecated as LoggerSinks, } from './logs'; + +export { createNexusClient, NexusClient, NexusOperationHandle } from './nexus'; diff --git a/packages/workflow/src/interceptors.ts b/packages/workflow/src/interceptors.ts index 978fd5a08..9b620e0f0 100644 --- a/packages/workflow/src/interceptors.ts +++ b/packages/workflow/src/interceptors.ts @@ -16,7 +16,7 @@ import { WorkflowExecution, } from '@temporalio/common'; import type { coresdk } from '@temporalio/proto'; -import { ChildWorkflowOptionsWithDefaults, ContinueAsNewOptions } from './interfaces'; +import { ChildWorkflowOptionsWithDefaults, ContinueAsNewOptions, StartNexusOperationOptions } from './interfaces'; export { Next, Headers }; @@ -150,6 +150,23 @@ export type GetLogAttributesInput = Record; /** Input for WorkflowOutboundCallsInterceptor.getMetricTags */ export type GetMetricTagsInput = MetricTags; +/** + * Input for WorkflowOutboundCallsInterceptor.startNexusOperation. + */ +export interface StartNexusOperationInput { + input: unknown; + endpoint: string; + service: string; + options: StartNexusOperationOptions; + operation: string; + seq: number; + nexusHeader: Record; +} + +export interface StartNexusOperationOutput { + token?: string; +} + /** * Implement any of these methods to intercept Workflow code calls to the Temporal APIs, like scheduling an activity and starting a timer */ diff --git a/packages/workflow/src/interfaces.ts b/packages/workflow/src/interfaces.ts index 54f0aa5de..6dc819aee 100644 --- a/packages/workflow/src/interfaces.ts +++ b/packages/workflow/src/interfaces.ts @@ -640,3 +640,15 @@ export interface ActivationCompletion { usedInternalFlags: number[]; versioningBehavior?: VersioningBehavior; } + +/** + * Options for starting a Nexus Operation. + */ +export interface StartNexusOperationOptions { + /** + * The end to end timeout for the Nexus Operation. + * + * Optional: defaults to the maximum allowed by the Temporal server. + */ + scheduleToCloseTimeout?: Duration; +} diff --git a/packages/workflow/src/internals.ts b/packages/workflow/src/internals.ts index b8e86b2a2..384319340 100644 --- a/packages/workflow/src/internals.ts +++ b/packages/workflow/src/internals.ts @@ -154,6 +154,8 @@ export class Activator implements ActivationHandler { childWorkflowComplete: new Map(), signalWorkflow: new Map(), cancelWorkflow: new Map(), + nexusOperationStart: new Map(), + nexusOperationComplete: new Map(), }; /** @@ -372,6 +374,7 @@ export class Activator implements ActivationHandler { signalWorkflow: 1, cancelWorkflow: 1, condition: 1, + nexusOperation: 1, // Used internally to keep track of active stack traces stack: 1, }; @@ -637,12 +640,35 @@ export class Activator implements ActivationHandler { } } - public resolveNexusOperationStart(_: coresdk.workflow_activation.IResolveNexusOperationStart): void { - throw new Error('TODO'); + public resolveNexusOperationStart(activation: coresdk.workflow_activation.IResolveNexusOperationStart): void { + const { resolve, reject } = this.consumeCompletion('nexusOperationStart', getSeq(activation)); + if (!activation.cancelledBeforeStart) { + resolve({ token: activation.operationId }); + } else { + reject(this.failureToError(activation.cancelledBeforeStart)); + } } - public resolveNexusOperation(_: coresdk.workflow_activation.IResolveNexusOperation): void { - throw new Error('TODO'); + public resolveNexusOperation(activation: coresdk.workflow_activation.IResolveNexusOperation): void { + const seq = getSeq(activation); + const start = this.maybeConsumeCompletion('nexusOperationStart', seq); + const complete = this.consumeCompletion('nexusOperationComplete', seq); + if (activation.result?.completed) { + start?.resolve({}); + complete.resolve(this.payloadConverter.fromPayload(activation.result.completed)); + } else if (activation.result?.failed) { + const err = this.failureToError(activation.result.failed); + start?.reject(err); + complete.reject(err); + } else if (activation.result?.cancelled) { + const err = this.failureToError(activation.result.cancelled); + start?.reject(err); + complete.reject(err); + } else if (activation.result?.timedOut) { + const err = this.failureToError(activation.result.timedOut); + start?.reject(err); + complete.reject(err); + } } // Intentionally non-async function so this handler doesn't show up in the stack trace diff --git a/packages/workflow/src/nexus.ts b/packages/workflow/src/nexus.ts new file mode 100644 index 000000000..9f5899eef --- /dev/null +++ b/packages/workflow/src/nexus.ts @@ -0,0 +1,184 @@ +import * as nexus from 'nexus-rpc'; +import { msOptionalToTs } from '@temporalio/common/lib/time'; +import { CancellationScope } from './cancellation-scope'; +import { getActivator } from './global-attributes'; +import { StartNexusOperationOptions } from './interfaces'; +import { untrackPromise } from './stack-helpers'; +import { StartNexusOperationInput, StartNexusOperationOutput } from './interceptors'; + +export interface NexusClient { + /** + * Start a Nexus Operation and wait for its completion taking a {@link nexus.operation}. + */ + executeOperation( + op: O, + input: nexus.OperationInput, + options?: Partial + ): Promise>; + + /** + * Start a Nexus Operation and wait for its completion taking an operation name. + */ + executeOperation>( + op: K, + input: nexus.OperationInput, + options?: Partial + ): Promise>; + + /** + * Start a Nexus Operation taking a {@link nexus.operation}. + * + * Returns a handle that can be used to wait for the operation's result. + */ + startOperation( + op: O, + input: nexus.OperationInput, + options?: Partial + ): Promise>>; + + /** + * Start a Nexus Operation taking an operation name. + * + * Returns a handle that can be used to wait for the operation's result. + */ + startOperation>( + op: K, + input: nexus.OperationInput, + options?: Partial + ): Promise>>; +} + +/** + * A handle to a Nexus Operation. + */ +export interface NexusOperationHandle { + /** + * The operation's service name. + */ + readonly service: string; + /** + * The name of the Operation. + */ + readonly operation: string; + /** + * Operation token as set by the Operation's handler. May be empty if the operation completed synchronously. + */ + readonly token?: string; + + /** + * Wait for Operation completion and get its result. + */ + result(): Promise; +} + +/** + * Options for {@createNexusClient}. + */ +interface NexusClientOptions { + endpoint: string; + service: T; +} + +/** + * Create a Nexus client for invoking Nexus Operations from a Workflow. + */ +export function createNexusClient(options: NexusClientOptions): NexusClient { + const client = { + executeOperation: async (operation: any, input: unknown, operationOptions?: StartNexusOperationOptions) => { + const handle = await client.startOperation(operation, input, operationOptions); + return await handle.result(); + }, + startOperation: async ( + operation: string | nexus.Operation, + input: unknown, + operationOptions?: StartNexusOperationOptions + ) => { + const opName = typeof operation === 'string' ? options.service.operations[operation].name : operation.name; + + const activator = getActivator(); + const seq = activator.nextSeqs.nexusOperation++; + + // TODO: Do we want to make the interceptor async like we do for child workflow? That seems redundant. + const [startPromise, completePromise] = startNexusOperationNextHandler({ + endpoint: options.endpoint, + service: options.service.name, + operation: opName, + options: operationOptions ?? {}, + nexusHeader: {}, + seq, + input, + }); + const { token } = await startPromise; + return { + service: options.service.name, + operation: opName, + token, + async result() { + return await completePromise; + }, + }; + }, + }; + return client as any; +} + +export function startNexusOperationNextHandler({ + input, + endpoint, + service, + options, + operation, + seq, + nexusHeader, +}: StartNexusOperationInput): [Promise, Promise] { + const activator = getActivator(); + const startPromise = new Promise((resolve, reject) => { + const scope = CancellationScope.current(); + if (scope.consideredCancelled) { + untrackPromise(scope.cancelRequested.catch(reject)); + return; + } + if (scope.cancellable) { + untrackPromise( + scope.cancelRequested.catch(() => { + const complete = !activator.completions.nexusOperationComplete.has(seq); + + if (!complete) { + activator.pushCommand({ + requestCancelNexusOperation: { seq }, + }); + } + // Nothing to cancel otherwise + }) + ); + } + + getActivator().pushCommand({ + scheduleNexusOperation: { + seq, + endpoint, + service, + operation, + nexusHeader, + input: activator.payloadConverter.toPayload(input), + scheduleToCloseTimeout: msOptionalToTs(options?.scheduleToCloseTimeout), + }, + }); + activator.completions.nexusOperationStart.set(seq, { + resolve, + reject, + }); + }); + + const completePromise = new Promise((resolve, reject) => { + activator.completions.nexusOperationComplete.set(seq, { + resolve, + reject, + }); + }); + untrackPromise(startPromise); + untrackPromise(completePromise); + // Prevent unhandled rejection because the completion might not be awaited + untrackPromise(completePromise.catch(() => undefined)); + return [startPromise, completePromise]; +}