From 3b3a6d559b21af3af84837290d3fb7b4c4a9e43e Mon Sep 17 00:00:00 2001 From: Thomas Hardy Date: Mon, 10 Mar 2025 19:08:31 -0700 Subject: [PATCH 1/6] init impl for special behaviour for temporal prefixes. Default signal test needs to be fixed, need to add behaviour reserving prefixes from workflows, and waiting for default update to be merged to add behaviour preventing default update handler to be called with reserved names --- packages/common/src/reserved.ts | 24 +++++ .../test/src/test-integration-split-two.ts | 98 ++++++++++++++++++- packages/test/src/test-workflows.ts | 6 ++ packages/test/src/workflows/index.ts | 1 + .../workflow-with-default-handlers.ts | 16 +++ packages/worker/src/worker-options.ts | 6 ++ packages/worker/src/worker.ts | 3 + packages/workflow/src/internals.ts | 35 +++++-- packages/workflow/src/workflow.ts | 3 + 9 files changed, 183 insertions(+), 9 deletions(-) create mode 100644 packages/common/src/reserved.ts create mode 100644 packages/test/src/workflows/workflow-with-default-handlers.ts diff --git a/packages/common/src/reserved.ts b/packages/common/src/reserved.ts new file mode 100644 index 000000000..7797da7fd --- /dev/null +++ b/packages/common/src/reserved.ts @@ -0,0 +1,24 @@ +export const TEMPORAL_RESERVED_PREFIX = '__temporal_'; +export const STACK_TRACE_RESERVED_PREFIX = '__stack_trace'; +export const ENHANCED_STACK_TRACE_RESERVED_PREFIX = '__enhanced_stack_trace'; + +export const reservedPrefixes = [ + TEMPORAL_RESERVED_PREFIX, + STACK_TRACE_RESERVED_PREFIX, + ENHANCED_STACK_TRACE_RESERVED_PREFIX, +]; + +export function throwIfReservedName(type: string, name: string): void { + const prefix = isReservedName(name); + if (prefix) { + throw Error(`Cannot register ${type} name: '${name}', with reserved prefix: '${prefix}'`); + } +} + +export function isReservedName(name: string): string | undefined { + for (const prefix of reservedPrefixes) { + if (name.startsWith(prefix)) { + return prefix; + } + } +} diff --git a/packages/test/src/test-integration-split-two.ts b/packages/test/src/test-integration-split-two.ts index d3d934932..e1a8dd2c3 100644 --- a/packages/test/src/test-integration-split-two.ts +++ b/packages/test/src/test-integration-split-two.ts @@ -14,7 +14,16 @@ import { searchAttributePayloadConverter } from '@temporalio/common/lib/converte import { msToNumber, tsToMs } from '@temporalio/common/lib/time'; import { decode as payloadDecode, decodeFromPayloadsAtIndex } from '@temporalio/common/lib/internal-non-workflow'; -import { condition, defineQuery, defineSignal, setDefaultQueryHandler, setHandler, sleep } from '@temporalio/workflow'; +import { + condition, + defineQuery, + defineSignal, + defineUpdate, + setDefaultQueryHandler, + setHandler, + sleep, +} from '@temporalio/workflow'; +import { reservedPrefixes } from '@temporalio/common/lib/reserved'; import { configurableHelpers, createTestWorkflowBundle } from './helpers-integration'; import * as activities from './activities'; import * as workflows from './workflows'; @@ -763,3 +772,90 @@ test.serial('default query handler is not used if requested query exists', confi t.deepEqual(result, { name: definedQuery.name, args }); }); }); + +test('Cannot register activities using reserved prefixes', configMacro, async (t, config) => { + const { createWorkerWithDefaults } = config; + + for (const prefix of reservedPrefixes) { + const activityName = prefix + '_test'; + await t.throwsAsync( + createWorkerWithDefaults(t, { + activities: { [activityName]: () => {} }, + }), + { + instanceOf: Error, + message: `Cannot register activity name: '${activityName}', with reserved prefix: '${prefix}'`, + } + ); + } +}); + +test('Cannot register task queues using reserved prefixes', configMacro, async (t, config) => { + const { createWorkerWithDefaults } = config; + + for (const prefix of reservedPrefixes) { + const taskQueue = prefix + '_test'; + + await t.throwsAsync( + createWorkerWithDefaults(t, { + taskQueue, + }), + { + instanceOf: Error, + message: `Cannot register task queue name: '${taskQueue}', with reserved prefix: '${prefix}'`, + } + ); + } +}); + +interface HandlerError { + name: string; + message: string; +} + +export async function workflowBadPrefixHandler(prefix: string): Promise { + // Re-package errors, default payload converter has trouble converting native errors (no 'data' field). + const expectedErrors: HandlerError[] = []; + try { + setHandler(defineSignal(prefix + '_signal'), () => {}); + } catch (e) { + if (e instanceof Error) { + expectedErrors.push({ name: e.name, message: e.message }); + } + } + try { + setHandler(defineUpdate(prefix + '_update'), () => {}); + } catch (e) { + if (e instanceof Error) { + expectedErrors.push({ name: e.name, message: e.message }); + } + } + try { + setHandler(defineQuery(prefix + '_query'), () => {}); + } catch (e) { + if (e instanceof Error) { + expectedErrors.push({ name: e.name, message: e.message }); + } + } + return expectedErrors; +} + +test('Workflow failure if define signals/updates/queries with reserved prefixes', configMacro, async (t, config) => { + const { env, createWorkerWithDefaults } = config; + const { executeWorkflow } = configurableHelpers(t, t.context.workflowBundle, env); + const worker = await createWorkerWithDefaults(t); + await worker.runUntil(async () => { + const prefix = reservedPrefixes[0]; + // for (const prefix of reservedPrefixes) { + const result = await executeWorkflow(workflowBadPrefixHandler, { + args: [prefix], + }); + console.log('result', result); + t.deepEqual(result, [ + { name: 'Error', message: `Cannot register signal name: '${prefix}_signal', with reserved prefix: '${prefix}'` }, + { name: 'Error', message: `Cannot register update name: '${prefix}_update', with reserved prefix: '${prefix}'` }, + { name: 'Error', message: `Cannot register query name: '${prefix}_query', with reserved prefix: '${prefix}'` }, + ]); + // } + }); +}); diff --git a/packages/test/src/test-workflows.ts b/packages/test/src/test-workflows.ts index cc8ddab9a..c045ecbf6 100644 --- a/packages/test/src/test-workflows.ts +++ b/packages/test/src/test-workflows.ts @@ -21,6 +21,12 @@ import { VMWorkflow, VMWorkflowCreator } from '@temporalio/worker/lib/workflow/v import { SdkFlag, SdkFlags } from '@temporalio/workflow/lib/flags'; import { ReusableVMWorkflow, ReusableVMWorkflowCreator } from '@temporalio/worker/lib/workflow/reusable-vm'; import { parseWorkflowCode } from '@temporalio/worker/lib/worker'; +import { + ENHANCED_STACK_TRACE_RESERVED_PREFIX, + reservedPrefixes, + STACK_TRACE_RESERVED_PREFIX, + TEMPORAL_RESERVED_PREFIX, +} from '@temporalio/common/lib/reserved'; import * as activityFunctions from './activities'; import { cleanStackTrace, REUSE_V8_CONTEXT, u8 } from './helpers'; import { ProcessedSignal } from './workflows'; diff --git a/packages/test/src/workflows/index.ts b/packages/test/src/workflows/index.ts index d4959ec2f..c26f1f9a2 100644 --- a/packages/test/src/workflows/index.ts +++ b/packages/test/src/workflows/index.ts @@ -92,3 +92,4 @@ export * from './workflow-cancellation-scenarios'; export * from './upsert-and-read-memo'; export * from './updates-ordering'; export * from './wait-on-signal-then-activity'; +export * from './workflow-with-default-handlers'; diff --git a/packages/test/src/workflows/workflow-with-default-handlers.ts b/packages/test/src/workflows/workflow-with-default-handlers.ts new file mode 100644 index 000000000..fb0df0003 --- /dev/null +++ b/packages/test/src/workflows/workflow-with-default-handlers.ts @@ -0,0 +1,16 @@ +import { + condition, + defineSignal, + setDefaultQueryHandler, + setDefaultSignalHandler, + setHandler, +} from '@temporalio/workflow'; + +export async function workflowWithDefaultHandlers(): Promise { + const complete = true; + setDefaultQueryHandler(() => {}); + setDefaultSignalHandler(() => {}); + setHandler(defineSignal('completeSignal'), () => {}); + + await condition(() => complete); +} diff --git a/packages/worker/src/worker-options.ts b/packages/worker/src/worker-options.ts index 6b27f0be0..ad36b2549 100644 --- a/packages/worker/src/worker-options.ts +++ b/packages/worker/src/worker-options.ts @@ -14,6 +14,9 @@ import { loadDataConverter } from '@temporalio/common/lib/internal-non-workflow' import { LoggerSinks } from '@temporalio/workflow'; import { Context } from '@temporalio/activity'; import { native } from '@temporalio/core-bridge'; +import { checkExtends } from '@temporalio/common/lib/type-helpers'; +import { WorkerOptions as NativeWorkerOptions, WorkerTuner as NativeWorkerTuner } from '@temporalio/core-bridge'; +import { throwIfReservedName } from '@temporalio/common/lib/reserved'; import { ActivityInboundLogInterceptor } from './activity-log-interceptor'; import { NativeConnection } from './connection'; import { CompiledWorkerInterceptors, WorkerInterceptors } from './interceptors'; @@ -953,6 +956,9 @@ export function compileWorkerOptions( } const activities = new Map(Object.entries(opts.activities ?? {}).filter(([_, v]) => typeof v === 'function')); + for (const activityName of activities.keys()) { + throwIfReservedName('activity', activityName); + } const tuner = asNativeTuner(opts.tuner, logger); return { diff --git a/packages/worker/src/worker.ts b/packages/worker/src/worker.ts index 7a1487d72..b69fa954b 100644 --- a/packages/worker/src/worker.ts +++ b/packages/worker/src/worker.ts @@ -57,6 +57,7 @@ import { workflowLogAttributes } from '@temporalio/workflow/lib/logs'; import { native } from '@temporalio/core-bridge'; import { coresdk, temporal } from '@temporalio/proto'; import { type SinkCall, type WorkflowInfo } from '@temporalio/workflow'; +import { throwIfReservedName } from '@temporalio/common/lib/reserved'; import { Activity, CancelReason, activityLogAttributes } from './activity'; import { extractNativeClient, extractReferenceHolders, InternalNativeConnection, NativeConnection } from './connection'; import { ActivityExecuteInput } from './interceptors'; @@ -469,6 +470,8 @@ export class Worker { public static async create(options: WorkerOptions): Promise { const runtime = Runtime.instance(); const logger = LoggerWithComposedMetadata.compose(runtime.logger, { + throwIfReservedName('task queue', options.taskQueue); + const logger = withMetadata(Runtime.instance().logger, { sdkComponent: SdkComponent.worker, taskQueue: options.taskQueue ?? 'default', }); diff --git a/packages/workflow/src/internals.ts b/packages/workflow/src/internals.ts index b8e86b2a2..8124011a1 100644 --- a/packages/workflow/src/internals.ts +++ b/packages/workflow/src/internals.ts @@ -31,6 +31,12 @@ import { import { composeInterceptors } from '@temporalio/common/lib/interceptors'; import { makeProtoEnumConverters } from '@temporalio/common/lib/internal-workflow'; import type { coresdk, temporal } from '@temporalio/proto'; +import { + ENHANCED_STACK_TRACE_RESERVED_PREFIX, + STACK_TRACE_RESERVED_PREFIX, + isReservedName, + throwIfReservedName, +} from '@temporalio/common/lib/reserved'; import { alea, RNG } from './alea'; import { RootCancellationScope } from './cancellation-scope'; import { UpdateScope } from './update-scope'; @@ -260,7 +266,7 @@ export class Activator implements ActivationHandler { */ public readonly queryHandlers = new Map([ [ - '__stack_trace', + STACK_TRACE_RESERVED_PREFIX, { handler: () => { return this.getStackTraces() @@ -271,7 +277,7 @@ export class Activator implements ActivationHandler { }, ], [ - '__enhanced_stack_trace', + ENHANCED_STACK_TRACE_RESERVED_PREFIX, { handler: (): EnhancedStackTrace => { const { sourceMap } = this; @@ -679,17 +685,28 @@ export class Activator implements ActivationHandler { throw new TypeError('Missing query activation attributes'); } + const queryInput = { + queryName: queryType, + args: arrayFromPayloads(this.payloadConverter, activation.arguments), + queryId, + headers: headers ?? {}, + }; + + // Skip interceptors if this is an internal query. + if (isReservedName(queryType)) { + this.queryWorkflowNextHandler(queryInput).then( + (result) => this.completeQuery(queryId, result), + (reason) => this.failQuery(queryId, reason) + ); + return; + } + const execute = composeInterceptors( this.interceptors.inbound, 'handleQuery', this.queryWorkflowNextHandler.bind(this) ); - execute({ - queryName: queryType, - args: arrayFromPayloads(this.payloadConverter, activation.arguments), - queryId, - headers: headers ?? {}, - }).then( + execute(queryInput).then( (result) => this.completeQuery(queryId, result), (reason) => this.failQuery(queryId, reason) ); @@ -847,6 +864,8 @@ export class Activator implements ActivationHandler { if (fn) { return await fn(...args); } else if (this.defaultSignalHandler) { + // Do not call default signal handler with reserved signal name. + throwIfReservedName('signal', signalName); return await this.defaultSignalHandler(signalName, ...args); } else { throw new IllegalStateError(`No registered signal handler for signal: ${signalName}`); diff --git a/packages/workflow/src/workflow.ts b/packages/workflow/src/workflow.ts index d915d6ed7..55cae382a 100644 --- a/packages/workflow/src/workflow.ts +++ b/packages/workflow/src/workflow.ts @@ -33,6 +33,7 @@ import { versioningIntentToProto } from '@temporalio/common/lib/versioning-inten import { Duration, msOptionalToTs, msToNumber, msToTs, requiredTsToMs } from '@temporalio/common/lib/time'; import { composeInterceptors } from '@temporalio/common/lib/interceptors'; import { temporal } from '@temporalio/proto'; +import { throwIfReservedName } from '@temporalio/common/lib/reserved'; import { CancellationScope, registerSleepImplementation } from './cancellation-scope'; import { UpdateScope } from './update-scope'; import { @@ -1272,6 +1273,8 @@ export function setHandler< options?: QueryHandlerOptions | SignalHandlerOptions | UpdateHandlerOptions ): void { const activator = assertInWorkflowContext('Workflow.setHandler(...) may only be used from a Workflow Execution.'); + // Cannot register handler for reserved names + throwIfReservedName(def.type, def.name); const description = options?.description; if (def.type === 'update') { if (typeof handler === 'function') { From 0d059500add57f982de05534855ea21562b4ec66 Mon Sep 17 00:00:00 2001 From: Thomas Hardy Date: Mon, 16 Jun 2025 18:13:37 +0200 Subject: [PATCH 2/6] improvements --- packages/common/src/reserved.ts | 13 +++- .../test/src/test-integration-split-two.ts | 71 ++++++++++++++++++- packages/workflow/src/internals.ts | 49 ++++++++----- 3 files changed, 110 insertions(+), 23 deletions(-) diff --git a/packages/common/src/reserved.ts b/packages/common/src/reserved.ts index 7797da7fd..79e1bd909 100644 --- a/packages/common/src/reserved.ts +++ b/packages/common/src/reserved.ts @@ -8,14 +8,21 @@ export const reservedPrefixes = [ ENHANCED_STACK_TRACE_RESERVED_PREFIX, ]; +export class ReservedPrefixError extends Error { + constructor(type: string, name: string, prefix: string) { + super(`Cannot use ${type} name: '${name}', with reserved prefix: '${prefix}'`); + this.name = 'ReservedPrefixError'; + } +} + export function throwIfReservedName(type: string, name: string): void { - const prefix = isReservedName(name); + const prefix = maybeGetReservedPrefix(name); if (prefix) { - throw Error(`Cannot register ${type} name: '${name}', with reserved prefix: '${prefix}'`); + throw new ReservedPrefixError(type, name, prefix); } } -export function isReservedName(name: string): string | undefined { +export function maybeGetReservedPrefix(name: string): string | undefined { for (const prefix of reservedPrefixes) { if (name.startsWith(prefix)) { return prefix; diff --git a/packages/test/src/test-integration-split-two.ts b/packages/test/src/test-integration-split-two.ts index e1a8dd2c3..0b643905a 100644 --- a/packages/test/src/test-integration-split-two.ts +++ b/packages/test/src/test-integration-split-two.ts @@ -2,7 +2,7 @@ import asyncRetry from 'async-retry'; import { v4 as uuid4 } from 'uuid'; import * as iface from '@temporalio/proto'; -import { WorkflowContinuedAsNewError, WorkflowFailedError } from '@temporalio/client'; +import { WorkflowContinuedAsNewError, WorkflowFailedError, WorkflowHandle } from '@temporalio/client'; import { ApplicationFailure, defaultPayloadConverter, @@ -20,6 +20,7 @@ import { defineSignal, defineUpdate, setDefaultQueryHandler, + setDefaultSignalHandler, setHandler, sleep, } from '@temporalio/workflow'; @@ -859,3 +860,71 @@ test('Workflow failure if define signals/updates/queries with reserved prefixes' // } }); }); + +export async function workflowWithDefaultHandlers(): Promise { + let unblocked = false; + setHandler(defineSignal('unblock'), () => { + unblocked = true; + }); + + setDefaultQueryHandler(() => {}); + setDefaultSignalHandler(() => {}); + setDefaultUpdateHandler({ + handler: () => {}, + }); + + await condition(() => unblocked); +} + +test('Default handlers fail WFT given reserved prefix', configMacro, async (t, config) => { + const { env, createWorkerWithDefaults } = config; + const { startWorkflow } = configurableHelpers(t, t.context.workflowBundle, env); + const worker = await createWorkerWithDefaults(t); + + const assertWftFailure = async ( + handle: WorkflowHandle, + name: string, + prefix: string, + handlerType: 'query' | 'signal' | 'update' + ) => { + await asyncRetry( + async () => { + const history = await handle.fetchHistory(); + const wftFailedEvent = history.events?.findLast((ev) => ev.workflowTaskFailedEventAttributes); + if (wftFailedEvent === undefined) { + throw new Error('No WFT failed event found'); + } + const { failure } = wftFailedEvent.workflowTaskFailedEventAttributes ?? {}; + if (!failure) { + return t.fail('Expected failure in workflowTaskFailedEventAttributes'); + } + t.is(failure.message, `Cannot use ${handlerType} name: '${name}', with reserved prefix: '${prefix}'`); + }, + { minTimeout: 300, factor: 1, retries: 10 } + ); + }; + + await worker.runUntil(async () => { + for (const prefix of reservedPrefixes) { + // Test Query + let handle = await startWorkflow(workflowWithDefaultHandlers); + const queryName = `${prefix}_query`; + await t.throwsAsync(handle.query(queryName), undefined, `Query ${queryName} should fail`); + await assertWftFailure(handle, queryName, prefix, 'query'); + await handle.terminate(); + // Test Signal + handle = await startWorkflow(workflowWithDefaultHandlers); + const signalName = `${prefix}_signal`; + await handle.signal(signalName); + await assertWftFailure(handle, signalName, prefix, 'signal'); + await handle.terminate(); + + // Test Update + handle = await startWorkflow(workflowWithDefaultHandlers); + const updateName = `${prefix}_update`; + await t.throwsAsync(handle.executeUpdate(updateName), undefined, `Update ${updateName} should fail`); + await assertWftFailure(handle, updateName, prefix, 'update'); + await handle.terminate(); + } + }); +}); \ No newline at end of file diff --git a/packages/workflow/src/internals.ts b/packages/workflow/src/internals.ts index 8124011a1..061af6445 100644 --- a/packages/workflow/src/internals.ts +++ b/packages/workflow/src/internals.ts @@ -33,8 +33,9 @@ import { makeProtoEnumConverters } from '@temporalio/common/lib/internal-workflo import type { coresdk, temporal } from '@temporalio/proto'; import { ENHANCED_STACK_TRACE_RESERVED_PREFIX, + ReservedPrefixError, STACK_TRACE_RESERVED_PREFIX, - isReservedName, + maybeGetReservedPrefix, throwIfReservedName, } from '@temporalio/common/lib/reserved'; import { alea, RNG } from './alea'; @@ -685,28 +686,27 @@ export class Activator implements ActivationHandler { throw new TypeError('Missing query activation attributes'); } - const queryInput = { - queryName: queryType, - args: arrayFromPayloads(this.payloadConverter, activation.arguments), - queryId, - headers: headers ?? {}, - }; - - // Skip interceptors if this is an internal query. - if (isReservedName(queryType)) { - this.queryWorkflowNextHandler(queryInput).then( - (result) => this.completeQuery(queryId, result), - (reason) => this.failQuery(queryId, reason) - ); - return; + const reservedPrefix = maybeGetReservedPrefix(queryType) + if (reservedPrefix) { + // Must have (internal) query handler for reserved query. + if (!this.queryHandlers.has(queryType)) { + throw new ReservedPrefixError('query', queryType, reservedPrefix); + } } + // Skip interceptors if it is an internal query + let interceptors = reservedPrefix ? [] : this.interceptors.inbound const execute = composeInterceptors( - this.interceptors.inbound, + interceptors, 'handleQuery', this.queryWorkflowNextHandler.bind(this) ); - execute(queryInput).then( + execute({ + queryName: queryType, + args: arrayFromPayloads(this.payloadConverter, activation.arguments), + queryId, + headers: headers ?? {}, + }).then( (result) => this.completeQuery(queryId, result), (reason) => this.failQuery(queryId, reason) ); @@ -737,6 +737,11 @@ export class Activator implements ActivationHandler { // If we don't have an entry from either source, buffer and return if (entry === null) { + const reservedPrefix = maybeGetReservedPrefix(name); + if (reservedPrefix) { + // Must have (internal) update handler for reserved update. + throw new ReservedPrefixError('update', name, reservedPrefix); + } this.bufferedUpdates.push(activation); return; } @@ -864,8 +869,6 @@ export class Activator implements ActivationHandler { if (fn) { return await fn(...args); } else if (this.defaultSignalHandler) { - // Do not call default signal handler with reserved signal name. - throwIfReservedName('signal', signalName); return await this.defaultSignalHandler(signalName, ...args); } else { throw new IllegalStateError(`No registered signal handler for signal: ${signalName}`); @@ -878,6 +881,14 @@ export class Activator implements ActivationHandler { throw new TypeError('Missing activation signalName'); } + const reservedPrefix = maybeGetReservedPrefix(signalName); + if (reservedPrefix) { + if (!this.signalHandlers.has(signalName)) { + // Must have (internal) signal handler for reserved signal. + throw new ReservedPrefixError('signal', signalName, reservedPrefix); + } + } + if (!this.signalHandlers.has(signalName) && !this.defaultSignalHandler) { this.bufferedSignals.push(activation); return; From 823715875b53d417d690ae56bd63eb3c3780a9eb Mon Sep 17 00:00:00 2001 From: Thomas Hardy Date: Tue, 17 Jun 2025 16:15:37 +0200 Subject: [PATCH 3/6] special prefix behaviour, does not cover workflows --- .../test/src/test-integration-split-two.ts | 158 +---------------- .../test/src/test-integration-workflows.ts | 165 +++++++++++++++++- packages/test/src/test-workflows.ts | 6 - packages/test/src/workflows/index.ts | 1 - .../workflow-with-default-handlers.ts | 16 -- packages/worker/src/worker-options.ts | 2 - packages/worker/src/worker.ts | 5 +- packages/workflow/src/internals.ts | 21 +-- 8 files changed, 175 insertions(+), 199 deletions(-) delete mode 100644 packages/test/src/workflows/workflow-with-default-handlers.ts diff --git a/packages/test/src/test-integration-split-two.ts b/packages/test/src/test-integration-split-two.ts index 0b643905a..77e241b84 100644 --- a/packages/test/src/test-integration-split-two.ts +++ b/packages/test/src/test-integration-split-two.ts @@ -21,10 +21,11 @@ import { defineUpdate, setDefaultQueryHandler, setDefaultSignalHandler, + setDefaultUpdateHandler, setHandler, sleep, } from '@temporalio/workflow'; -import { reservedPrefixes } from '@temporalio/common/lib/reserved'; +import { ReservedPrefixError, reservedPrefixes } from '@temporalio/common/lib/reserved'; import { configurableHelpers, createTestWorkflowBundle } from './helpers-integration'; import * as activities from './activities'; import * as workflows from './workflows'; @@ -772,159 +773,4 @@ test.serial('default query handler is not used if requested query exists', confi const result = await handle.query('query-handler-type', ...args); t.deepEqual(result, { name: definedQuery.name, args }); }); -}); - -test('Cannot register activities using reserved prefixes', configMacro, async (t, config) => { - const { createWorkerWithDefaults } = config; - - for (const prefix of reservedPrefixes) { - const activityName = prefix + '_test'; - await t.throwsAsync( - createWorkerWithDefaults(t, { - activities: { [activityName]: () => {} }, - }), - { - instanceOf: Error, - message: `Cannot register activity name: '${activityName}', with reserved prefix: '${prefix}'`, - } - ); - } -}); - -test('Cannot register task queues using reserved prefixes', configMacro, async (t, config) => { - const { createWorkerWithDefaults } = config; - - for (const prefix of reservedPrefixes) { - const taskQueue = prefix + '_test'; - - await t.throwsAsync( - createWorkerWithDefaults(t, { - taskQueue, - }), - { - instanceOf: Error, - message: `Cannot register task queue name: '${taskQueue}', with reserved prefix: '${prefix}'`, - } - ); - } -}); - -interface HandlerError { - name: string; - message: string; -} - -export async function workflowBadPrefixHandler(prefix: string): Promise { - // Re-package errors, default payload converter has trouble converting native errors (no 'data' field). - const expectedErrors: HandlerError[] = []; - try { - setHandler(defineSignal(prefix + '_signal'), () => {}); - } catch (e) { - if (e instanceof Error) { - expectedErrors.push({ name: e.name, message: e.message }); - } - } - try { - setHandler(defineUpdate(prefix + '_update'), () => {}); - } catch (e) { - if (e instanceof Error) { - expectedErrors.push({ name: e.name, message: e.message }); - } - } - try { - setHandler(defineQuery(prefix + '_query'), () => {}); - } catch (e) { - if (e instanceof Error) { - expectedErrors.push({ name: e.name, message: e.message }); - } - } - return expectedErrors; -} - -test('Workflow failure if define signals/updates/queries with reserved prefixes', configMacro, async (t, config) => { - const { env, createWorkerWithDefaults } = config; - const { executeWorkflow } = configurableHelpers(t, t.context.workflowBundle, env); - const worker = await createWorkerWithDefaults(t); - await worker.runUntil(async () => { - const prefix = reservedPrefixes[0]; - // for (const prefix of reservedPrefixes) { - const result = await executeWorkflow(workflowBadPrefixHandler, { - args: [prefix], - }); - console.log('result', result); - t.deepEqual(result, [ - { name: 'Error', message: `Cannot register signal name: '${prefix}_signal', with reserved prefix: '${prefix}'` }, - { name: 'Error', message: `Cannot register update name: '${prefix}_update', with reserved prefix: '${prefix}'` }, - { name: 'Error', message: `Cannot register query name: '${prefix}_query', with reserved prefix: '${prefix}'` }, - ]); - // } - }); -}); - -export async function workflowWithDefaultHandlers(): Promise { - let unblocked = false; - setHandler(defineSignal('unblock'), () => { - unblocked = true; - }); - - setDefaultQueryHandler(() => {}); - setDefaultSignalHandler(() => {}); - setDefaultUpdateHandler({ - handler: () => {}, - }); - - await condition(() => unblocked); -} - -test('Default handlers fail WFT given reserved prefix', configMacro, async (t, config) => { - const { env, createWorkerWithDefaults } = config; - const { startWorkflow } = configurableHelpers(t, t.context.workflowBundle, env); - const worker = await createWorkerWithDefaults(t); - - const assertWftFailure = async ( - handle: WorkflowHandle, - name: string, - prefix: string, - handlerType: 'query' | 'signal' | 'update' - ) => { - await asyncRetry( - async () => { - const history = await handle.fetchHistory(); - const wftFailedEvent = history.events?.findLast((ev) => ev.workflowTaskFailedEventAttributes); - if (wftFailedEvent === undefined) { - throw new Error('No WFT failed event found'); - } - const { failure } = wftFailedEvent.workflowTaskFailedEventAttributes ?? {}; - if (!failure) { - return t.fail('Expected failure in workflowTaskFailedEventAttributes'); - } - t.is(failure.message, `Cannot use ${handlerType} name: '${name}', with reserved prefix: '${prefix}'`); - }, - { minTimeout: 300, factor: 1, retries: 10 } - ); - }; - - await worker.runUntil(async () => { - for (const prefix of reservedPrefixes) { - // Test Query - let handle = await startWorkflow(workflowWithDefaultHandlers); - const queryName = `${prefix}_query`; - await t.throwsAsync(handle.query(queryName), undefined, `Query ${queryName} should fail`); - await assertWftFailure(handle, queryName, prefix, 'query'); - await handle.terminate(); - // Test Signal - handle = await startWorkflow(workflowWithDefaultHandlers); - const signalName = `${prefix}_signal`; - await handle.signal(signalName); - await assertWftFailure(handle, signalName, prefix, 'signal'); - await handle.terminate(); - - // Test Update - handle = await startWorkflow(workflowWithDefaultHandlers); - const updateName = `${prefix}_update`; - await t.throwsAsync(handle.executeUpdate(updateName), undefined, `Update ${updateName} should fail`); - await assertWftFailure(handle, updateName, prefix, 'update'); - await handle.terminate(); - } - }); }); \ No newline at end of file diff --git a/packages/test/src/test-integration-workflows.ts b/packages/test/src/test-integration-workflows.ts index ea05b71f6..2660b0383 100644 --- a/packages/test/src/test-integration-workflows.ts +++ b/packages/test/src/test-integration-workflows.ts @@ -1,14 +1,15 @@ +import asyncRetry from 'async-retry'; import { setTimeout as setTimeoutPromise } from 'timers/promises'; import { randomUUID } from 'crypto'; import { ExecutionContext } from 'ava'; import { firstValueFrom, Subject } from 'rxjs'; -import { WorkflowFailedError } from '@temporalio/client'; +import { WorkflowFailedError, WorkflowHandle } from '@temporalio/client'; import * as activity from '@temporalio/activity'; import { msToNumber, tsToMs } from '@temporalio/common/lib/time'; import { TestWorkflowEnvironment } from '@temporalio/testing'; import { CancelReason } from '@temporalio/worker/lib/activity'; import * as workflow from '@temporalio/workflow'; -import { defineQuery, defineSignal } from '@temporalio/workflow'; +import { condition, defineQuery, defineSignal, defineUpdate, setDefaultQueryHandler, setDefaultSignalHandler, setDefaultUpdateHandler, setHandler } from '@temporalio/workflow'; import { SdkFlags } from '@temporalio/workflow/lib/flags'; import { ActivityCancellationType, @@ -25,6 +26,7 @@ import * as workflows from './workflows'; import { Context, createLocalTestEnvironment, helpers, makeTestFunction } from './helpers-integration'; import { overrideSdkInternalFlag } from './mock-internal-flags'; import { asSdkLoggerSink, loadHistory, RUN_TIME_SKIPPING_TESTS, waitUntil } from './helpers'; +import { reservedPrefixes } from '@temporalio/common/src/reserved'; const test = makeTestFunction({ workflowsPath: __filename, @@ -1414,3 +1416,162 @@ test('Workflow can return root workflow', async (t) => { t.deepEqual(result, 'empty test-root-workflow-length'); }); }); + +test('Cannot register activities using reserved prefixes', async (t) => { + const { createWorker } = helpers(t); + + for (const prefix of reservedPrefixes) { + const activityName = prefix + '_test'; + await t.throwsAsync( + createWorker({ + activities: { [activityName]: () => {} }, + }), + { + name: 'ReservedPrefixError', + message: `Cannot use activity name: '${activityName}', with reserved prefix: '${prefix}'`, + } + ); + } +}); + +test('Cannot register task queues using reserved prefixes', async (t) => { + const { createWorker } = helpers(t); + + for (const prefix of reservedPrefixes) { + const taskQueue = prefix + '_test'; + + await t.throwsAsync( + createWorker({ + taskQueue, + }), + { + name: 'ReservedPrefixError', + message: `Cannot use task queue name: '${taskQueue}', with reserved prefix: '${prefix}'`, + } + ); + } +}); + +interface HandlerError { + name: string; + message: string; +} + +export async function workflowBadPrefixHandler(prefix: string): Promise { + // Re-package errors, default payload converter has trouble converting native errors (no 'data' field). + const expectedErrors: HandlerError[] = []; + try { + setHandler(defineSignal(prefix + '_signal'), () => {}); + } catch (e) { + if (e instanceof Error) { + expectedErrors.push({ name: e.name, message: e.message }); + } + } + try { + setHandler(defineUpdate(prefix + '_update'), () => {}); + } catch (e) { + if (e instanceof Error) { + expectedErrors.push({ name: e.name, message: e.message }); + } + } + try { + setHandler(defineQuery(prefix + '_query'), () => {}); + } catch (e) { + if (e instanceof Error) { + expectedErrors.push({ name: e.name, message: e.message }); + } + } + return expectedErrors; +} + +test('Workflow failure if define signals/updates/queries with reserved prefixes', async (t) => { + const { createWorker, executeWorkflow } = helpers(t); + const worker = await createWorker(); + await worker.runUntil(async () => { + for (const prefix of reservedPrefixes) { + const result = await executeWorkflow(workflowBadPrefixHandler, { + args: [prefix], + }); + t.deepEqual(result, [ + { name: 'ReservedPrefixError', message: `Cannot use signal name: '${prefix}_signal', with reserved prefix: '${prefix}'` }, + { name: 'ReservedPrefixError', message: `Cannot use update name: '${prefix}_update', with reserved prefix: '${prefix}'` }, + { name: 'ReservedPrefixError', message: `Cannot use query name: '${prefix}_query', with reserved prefix: '${prefix}'` }, + ]); + } + }); +}); + +export async function workflowWithDefaultHandlers(): Promise { + let unblocked = false; + setHandler(defineSignal('unblock'), () => { + unblocked = true; + }); + + setDefaultQueryHandler(() => {}); + setDefaultSignalHandler(() => {}); + setDefaultUpdateHandler(() => {}); + + await condition(() => unblocked); +} + +test('Default handlers fail given reserved prefix', async (t) => { + const { createWorker, startWorkflow } = helpers(t); + const worker = await createWorker(); + + const assertWftFailure = async ( + handle: WorkflowHandle, + errMsg: string, + ) => { + await asyncRetry( + async () => { + const history = await handle.fetchHistory(); + const wftFailedEvent = history.events?.findLast((ev) => ev.workflowTaskFailedEventAttributes); + if (wftFailedEvent === undefined) { + throw new Error('No WFT failed event found'); + } + const { failure } = wftFailedEvent.workflowTaskFailedEventAttributes ?? {}; + if (!failure) { + return t.fail('Expected failure in workflowTaskFailedEventAttributes'); + } + t.is(failure.message, errMsg); + }, + { minTimeout: 300, factor: 1, retries: 10 } + ); + }; + + await worker.runUntil(async () => { + for (const prefix of reservedPrefixes) { + + // Reserved query + let handle = await startWorkflow(workflowWithDefaultHandlers); + const queryName = `${prefix}_query`; + await t.throwsAsync(handle.query(queryName, { timeout: 1000 }), { + // ReservedPrefixError transforms to a QueryNotRegisteredError on the way back from server + name: 'QueryNotRegisteredError', + message: `Cannot use query name: '${queryName}', with reserved prefix: '${prefix}'`, + }, `Query ${queryName} should fail`); + await handle.terminate(); + + + + // Reserved signal + handle = await startWorkflow(workflowWithDefaultHandlers); + const signalName = `${prefix}_signal`; + await handle.signal(signalName); + await assertWftFailure(handle, `Cannot use signal name: '${signalName}', with reserved prefix: '${prefix}'`); + await handle.terminate(); + + + // Reserved update + handle = await startWorkflow(workflowWithDefaultHandlers); + const updateName = `${prefix}_update`; + handle.executeUpdate(updateName).catch(() => { + // Expect failure. The error caught here is a WorkflowNotFound because + // the workflow will have already failed, so the update cannot go through. + // We assert on the expected failure below. + }); + await assertWftFailure(handle, `Cannot use update name: '${updateName}', with reserved prefix: '${prefix}'`); + await handle.terminate(); + } + }); +}); \ No newline at end of file diff --git a/packages/test/src/test-workflows.ts b/packages/test/src/test-workflows.ts index c045ecbf6..cc8ddab9a 100644 --- a/packages/test/src/test-workflows.ts +++ b/packages/test/src/test-workflows.ts @@ -21,12 +21,6 @@ import { VMWorkflow, VMWorkflowCreator } from '@temporalio/worker/lib/workflow/v import { SdkFlag, SdkFlags } from '@temporalio/workflow/lib/flags'; import { ReusableVMWorkflow, ReusableVMWorkflowCreator } from '@temporalio/worker/lib/workflow/reusable-vm'; import { parseWorkflowCode } from '@temporalio/worker/lib/worker'; -import { - ENHANCED_STACK_TRACE_RESERVED_PREFIX, - reservedPrefixes, - STACK_TRACE_RESERVED_PREFIX, - TEMPORAL_RESERVED_PREFIX, -} from '@temporalio/common/lib/reserved'; import * as activityFunctions from './activities'; import { cleanStackTrace, REUSE_V8_CONTEXT, u8 } from './helpers'; import { ProcessedSignal } from './workflows'; diff --git a/packages/test/src/workflows/index.ts b/packages/test/src/workflows/index.ts index c26f1f9a2..d4959ec2f 100644 --- a/packages/test/src/workflows/index.ts +++ b/packages/test/src/workflows/index.ts @@ -92,4 +92,3 @@ export * from './workflow-cancellation-scenarios'; export * from './upsert-and-read-memo'; export * from './updates-ordering'; export * from './wait-on-signal-then-activity'; -export * from './workflow-with-default-handlers'; diff --git a/packages/test/src/workflows/workflow-with-default-handlers.ts b/packages/test/src/workflows/workflow-with-default-handlers.ts deleted file mode 100644 index fb0df0003..000000000 --- a/packages/test/src/workflows/workflow-with-default-handlers.ts +++ /dev/null @@ -1,16 +0,0 @@ -import { - condition, - defineSignal, - setDefaultQueryHandler, - setDefaultSignalHandler, - setHandler, -} from '@temporalio/workflow'; - -export async function workflowWithDefaultHandlers(): Promise { - const complete = true; - setDefaultQueryHandler(() => {}); - setDefaultSignalHandler(() => {}); - setHandler(defineSignal('completeSignal'), () => {}); - - await condition(() => complete); -} diff --git a/packages/worker/src/worker-options.ts b/packages/worker/src/worker-options.ts index ad36b2549..8e7382633 100644 --- a/packages/worker/src/worker-options.ts +++ b/packages/worker/src/worker-options.ts @@ -14,8 +14,6 @@ import { loadDataConverter } from '@temporalio/common/lib/internal-non-workflow' import { LoggerSinks } from '@temporalio/workflow'; import { Context } from '@temporalio/activity'; import { native } from '@temporalio/core-bridge'; -import { checkExtends } from '@temporalio/common/lib/type-helpers'; -import { WorkerOptions as NativeWorkerOptions, WorkerTuner as NativeWorkerTuner } from '@temporalio/core-bridge'; import { throwIfReservedName } from '@temporalio/common/lib/reserved'; import { ActivityInboundLogInterceptor } from './activity-log-interceptor'; import { NativeConnection } from './connection'; diff --git a/packages/worker/src/worker.ts b/packages/worker/src/worker.ts index b69fa954b..dd472df98 100644 --- a/packages/worker/src/worker.ts +++ b/packages/worker/src/worker.ts @@ -57,7 +57,6 @@ import { workflowLogAttributes } from '@temporalio/workflow/lib/logs'; import { native } from '@temporalio/core-bridge'; import { coresdk, temporal } from '@temporalio/proto'; import { type SinkCall, type WorkflowInfo } from '@temporalio/workflow'; -import { throwIfReservedName } from '@temporalio/common/lib/reserved'; import { Activity, CancelReason, activityLogAttributes } from './activity'; import { extractNativeClient, extractReferenceHolders, InternalNativeConnection, NativeConnection } from './connection'; import { ActivityExecuteInput } from './interceptors'; @@ -103,6 +102,7 @@ import { ShutdownError, UnexpectedError, } from './errors'; +import { throwIfReservedName } from '@temporalio/common/src/reserved'; export { DataConverter, defaultPayloadConverter }; @@ -468,10 +468,9 @@ export class Worker { * This method initiates a connection to the server and will throw (asynchronously) on connection failure. */ public static async create(options: WorkerOptions): Promise { + throwIfReservedName('task queue', options.taskQueue); const runtime = Runtime.instance(); const logger = LoggerWithComposedMetadata.compose(runtime.logger, { - throwIfReservedName('task queue', options.taskQueue); - const logger = withMetadata(Runtime.instance().logger, { sdkComponent: SdkComponent.worker, taskQueue: options.taskQueue ?? 'default', }); diff --git a/packages/workflow/src/internals.ts b/packages/workflow/src/internals.ts index 061af6445..555b9286a 100644 --- a/packages/workflow/src/internals.ts +++ b/packages/workflow/src/internals.ts @@ -36,7 +36,6 @@ import { ReservedPrefixError, STACK_TRACE_RESERVED_PREFIX, maybeGetReservedPrefix, - throwIfReservedName, } from '@temporalio/common/lib/reserved'; import { alea, RNG } from './alea'; import { RootCancellationScope } from './cancellation-scope'; @@ -686,7 +685,7 @@ export class Activator implements ActivationHandler { throw new TypeError('Missing query activation attributes'); } - const reservedPrefix = maybeGetReservedPrefix(queryType) + const reservedPrefix = maybeGetReservedPrefix(queryType); if (reservedPrefix) { // Must have (internal) query handler for reserved query. if (!this.queryHandlers.has(queryType)) { @@ -695,12 +694,8 @@ export class Activator implements ActivationHandler { } // Skip interceptors if it is an internal query - let interceptors = reservedPrefix ? [] : this.interceptors.inbound - const execute = composeInterceptors( - interceptors, - 'handleQuery', - this.queryWorkflowNextHandler.bind(this) - ); + const interceptors = reservedPrefix ? [] : this.interceptors.inbound; + const execute = composeInterceptors(interceptors, 'handleQuery', this.queryWorkflowNextHandler.bind(this)); execute({ queryName: queryType, args: arrayFromPayloads(this.payloadConverter, activation.arguments), @@ -723,6 +718,11 @@ export class Activator implements ActivationHandler { if (!protocolInstanceId) { throw new TypeError('Missing activation update protocolInstanceId'); } + const reservedPrefix = maybeGetReservedPrefix(name); + if (reservedPrefix && !this.updateHandlers.get(name)) { + // Must have (internal) update handler for reserved update. + throw new ReservedPrefixError('update', name, reservedPrefix); + } const entry = this.updateHandlers.get(name) ?? @@ -737,11 +737,6 @@ export class Activator implements ActivationHandler { // If we don't have an entry from either source, buffer and return if (entry === null) { - const reservedPrefix = maybeGetReservedPrefix(name); - if (reservedPrefix) { - // Must have (internal) update handler for reserved update. - throw new ReservedPrefixError('update', name, reservedPrefix); - } this.bufferedUpdates.push(activation); return; } From efe9d6999a9c875674a84ad0b87cc333b9501579 Mon Sep 17 00:00:00 2001 From: Thomas Hardy Date: Tue, 17 Jun 2025 16:24:05 +0200 Subject: [PATCH 4/6] formatting & linting --- .../test/src/test-integration-split-two.ts | 17 +----- .../test/src/test-integration-workflows.ts | 57 ++++++++++++------- packages/worker/src/worker.ts | 2 +- 3 files changed, 40 insertions(+), 36 deletions(-) diff --git a/packages/test/src/test-integration-split-two.ts b/packages/test/src/test-integration-split-two.ts index 77e241b84..d3d934932 100644 --- a/packages/test/src/test-integration-split-two.ts +++ b/packages/test/src/test-integration-split-two.ts @@ -2,7 +2,7 @@ import asyncRetry from 'async-retry'; import { v4 as uuid4 } from 'uuid'; import * as iface from '@temporalio/proto'; -import { WorkflowContinuedAsNewError, WorkflowFailedError, WorkflowHandle } from '@temporalio/client'; +import { WorkflowContinuedAsNewError, WorkflowFailedError } from '@temporalio/client'; import { ApplicationFailure, defaultPayloadConverter, @@ -14,18 +14,7 @@ import { searchAttributePayloadConverter } from '@temporalio/common/lib/converte import { msToNumber, tsToMs } from '@temporalio/common/lib/time'; import { decode as payloadDecode, decodeFromPayloadsAtIndex } from '@temporalio/common/lib/internal-non-workflow'; -import { - condition, - defineQuery, - defineSignal, - defineUpdate, - setDefaultQueryHandler, - setDefaultSignalHandler, - setDefaultUpdateHandler, - setHandler, - sleep, -} from '@temporalio/workflow'; -import { ReservedPrefixError, reservedPrefixes } from '@temporalio/common/lib/reserved'; +import { condition, defineQuery, defineSignal, setDefaultQueryHandler, setHandler, sleep } from '@temporalio/workflow'; import { configurableHelpers, createTestWorkflowBundle } from './helpers-integration'; import * as activities from './activities'; import * as workflows from './workflows'; @@ -773,4 +762,4 @@ test.serial('default query handler is not used if requested query exists', confi const result = await handle.query('query-handler-type', ...args); t.deepEqual(result, { name: definedQuery.name, args }); }); -}); \ No newline at end of file +}); diff --git a/packages/test/src/test-integration-workflows.ts b/packages/test/src/test-integration-workflows.ts index 2660b0383..a77e6a9fd 100644 --- a/packages/test/src/test-integration-workflows.ts +++ b/packages/test/src/test-integration-workflows.ts @@ -1,6 +1,6 @@ -import asyncRetry from 'async-retry'; import { setTimeout as setTimeoutPromise } from 'timers/promises'; import { randomUUID } from 'crypto'; +import asyncRetry from 'async-retry'; import { ExecutionContext } from 'ava'; import { firstValueFrom, Subject } from 'rxjs'; import { WorkflowFailedError, WorkflowHandle } from '@temporalio/client'; @@ -9,7 +9,16 @@ import { msToNumber, tsToMs } from '@temporalio/common/lib/time'; import { TestWorkflowEnvironment } from '@temporalio/testing'; import { CancelReason } from '@temporalio/worker/lib/activity'; import * as workflow from '@temporalio/workflow'; -import { condition, defineQuery, defineSignal, defineUpdate, setDefaultQueryHandler, setDefaultSignalHandler, setDefaultUpdateHandler, setHandler } from '@temporalio/workflow'; +import { + condition, + defineQuery, + defineSignal, + defineUpdate, + setDefaultQueryHandler, + setDefaultSignalHandler, + setDefaultUpdateHandler, + setHandler, +} from '@temporalio/workflow'; import { SdkFlags } from '@temporalio/workflow/lib/flags'; import { ActivityCancellationType, @@ -20,13 +29,13 @@ import { TypedSearchAttributes, WorkflowExecutionAlreadyStartedError, } from '@temporalio/common'; +import { reservedPrefixes } from '@temporalio/common/lib/reserved'; import { signalSchedulingWorkflow } from './activities/helpers'; import { activityStartedSignal } from './workflows/definitions'; import * as workflows from './workflows'; import { Context, createLocalTestEnvironment, helpers, makeTestFunction } from './helpers-integration'; import { overrideSdkInternalFlag } from './mock-internal-flags'; import { asSdkLoggerSink, loadHistory, RUN_TIME_SKIPPING_TESTS, waitUntil } from './helpers'; -import { reservedPrefixes } from '@temporalio/common/src/reserved'; const test = makeTestFunction({ workflowsPath: __filename, @@ -1493,9 +1502,18 @@ test('Workflow failure if define signals/updates/queries with reserved prefixes' args: [prefix], }); t.deepEqual(result, [ - { name: 'ReservedPrefixError', message: `Cannot use signal name: '${prefix}_signal', with reserved prefix: '${prefix}'` }, - { name: 'ReservedPrefixError', message: `Cannot use update name: '${prefix}_update', with reserved prefix: '${prefix}'` }, - { name: 'ReservedPrefixError', message: `Cannot use query name: '${prefix}_query', with reserved prefix: '${prefix}'` }, + { + name: 'ReservedPrefixError', + message: `Cannot use signal name: '${prefix}_signal', with reserved prefix: '${prefix}'`, + }, + { + name: 'ReservedPrefixError', + message: `Cannot use update name: '${prefix}_update', with reserved prefix: '${prefix}'`, + }, + { + name: 'ReservedPrefixError', + message: `Cannot use query name: '${prefix}_query', with reserved prefix: '${prefix}'`, + }, ]); } }); @@ -1518,10 +1536,7 @@ test('Default handlers fail given reserved prefix', async (t) => { const { createWorker, startWorkflow } = helpers(t); const worker = await createWorker(); - const assertWftFailure = async ( - handle: WorkflowHandle, - errMsg: string, - ) => { + const assertWftFailure = async (handle: WorkflowHandle, errMsg: string) => { await asyncRetry( async () => { const history = await handle.fetchHistory(); @@ -1541,27 +1556,27 @@ test('Default handlers fail given reserved prefix', async (t) => { await worker.runUntil(async () => { for (const prefix of reservedPrefixes) { - // Reserved query let handle = await startWorkflow(workflowWithDefaultHandlers); const queryName = `${prefix}_query`; - await t.throwsAsync(handle.query(queryName, { timeout: 1000 }), { - // ReservedPrefixError transforms to a QueryNotRegisteredError on the way back from server - name: 'QueryNotRegisteredError', - message: `Cannot use query name: '${queryName}', with reserved prefix: '${prefix}'`, - }, `Query ${queryName} should fail`); + await t.throwsAsync( + handle.query(queryName, { timeout: 1000 }), + { + // ReservedPrefixError transforms to a QueryNotRegisteredError on the way back from server + name: 'QueryNotRegisteredError', + message: `Cannot use query name: '${queryName}', with reserved prefix: '${prefix}'`, + }, + `Query ${queryName} should fail` + ); await handle.terminate(); - - // Reserved signal handle = await startWorkflow(workflowWithDefaultHandlers); const signalName = `${prefix}_signal`; await handle.signal(signalName); await assertWftFailure(handle, `Cannot use signal name: '${signalName}', with reserved prefix: '${prefix}'`); await handle.terminate(); - - + // Reserved update handle = await startWorkflow(workflowWithDefaultHandlers); const updateName = `${prefix}_update`; @@ -1574,4 +1589,4 @@ test('Default handlers fail given reserved prefix', async (t) => { await handle.terminate(); } }); -}); \ No newline at end of file +}); diff --git a/packages/worker/src/worker.ts b/packages/worker/src/worker.ts index dd472df98..328e43ad1 100644 --- a/packages/worker/src/worker.ts +++ b/packages/worker/src/worker.ts @@ -57,6 +57,7 @@ import { workflowLogAttributes } from '@temporalio/workflow/lib/logs'; import { native } from '@temporalio/core-bridge'; import { coresdk, temporal } from '@temporalio/proto'; import { type SinkCall, type WorkflowInfo } from '@temporalio/workflow'; +import { throwIfReservedName } from '@temporalio/common/lib/reserved'; import { Activity, CancelReason, activityLogAttributes } from './activity'; import { extractNativeClient, extractReferenceHolders, InternalNativeConnection, NativeConnection } from './connection'; import { ActivityExecuteInput } from './interceptors'; @@ -102,7 +103,6 @@ import { ShutdownError, UnexpectedError, } from './errors'; -import { throwIfReservedName } from '@temporalio/common/src/reserved'; export { DataConverter, defaultPayloadConverter }; From dfb0244d1d94957f8867fa6acad39e937156db21 Mon Sep 17 00:00:00 2001 From: Thomas Hardy Date: Tue, 17 Jun 2025 16:50:50 +0200 Subject: [PATCH 5/6] remove timeout --- packages/test/src/test-integration-workflows.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/test/src/test-integration-workflows.ts b/packages/test/src/test-integration-workflows.ts index a77e6a9fd..db0267279 100644 --- a/packages/test/src/test-integration-workflows.ts +++ b/packages/test/src/test-integration-workflows.ts @@ -1560,7 +1560,7 @@ test('Default handlers fail given reserved prefix', async (t) => { let handle = await startWorkflow(workflowWithDefaultHandlers); const queryName = `${prefix}_query`; await t.throwsAsync( - handle.query(queryName, { timeout: 1000 }), + handle.query(queryName), { // ReservedPrefixError transforms to a QueryNotRegisteredError on the way back from server name: 'QueryNotRegisteredError', From 55c641f57933b794e22c63353779782aee7078f6 Mon Sep 17 00:00:00 2001 From: Thomas Hardy Date: Tue, 17 Jun 2025 17:38:10 +0200 Subject: [PATCH 6/6] fix test timeout failure --- packages/test/src/test-integration-workflows.ts | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/packages/test/src/test-integration-workflows.ts b/packages/test/src/test-integration-workflows.ts index db0267279..b7c9d1d4d 100644 --- a/packages/test/src/test-integration-workflows.ts +++ b/packages/test/src/test-integration-workflows.ts @@ -1519,6 +1519,7 @@ test('Workflow failure if define signals/updates/queries with reserved prefixes' }); }); +export const wfReadyQuery = defineQuery('wf-ready'); export async function workflowWithDefaultHandlers(): Promise { let unblocked = false; setHandler(defineSignal('unblock'), () => { @@ -1528,6 +1529,7 @@ export async function workflowWithDefaultHandlers(): Promise { setDefaultQueryHandler(() => {}); setDefaultSignalHandler(() => {}); setDefaultUpdateHandler(() => {}); + setHandler(wfReadyQuery, () => true); await condition(() => unblocked); } @@ -1558,6 +1560,11 @@ test('Default handlers fail given reserved prefix', async (t) => { for (const prefix of reservedPrefixes) { // Reserved query let handle = await startWorkflow(workflowWithDefaultHandlers); + await asyncRetry(async () => { + if (!(await handle.query(wfReadyQuery))) { + throw new Error('Workflow not ready yet'); + } + }); const queryName = `${prefix}_query`; await t.throwsAsync( handle.query(queryName), @@ -1572,6 +1579,11 @@ test('Default handlers fail given reserved prefix', async (t) => { // Reserved signal handle = await startWorkflow(workflowWithDefaultHandlers); + await asyncRetry(async () => { + if (!(await handle.query(wfReadyQuery))) { + throw new Error('Workflow not ready yet'); + } + }); const signalName = `${prefix}_signal`; await handle.signal(signalName); await assertWftFailure(handle, `Cannot use signal name: '${signalName}', with reserved prefix: '${prefix}'`); @@ -1579,6 +1591,11 @@ test('Default handlers fail given reserved prefix', async (t) => { // Reserved update handle = await startWorkflow(workflowWithDefaultHandlers); + await asyncRetry(async () => { + if (!(await handle.query(wfReadyQuery))) { + throw new Error('Workflow not ready yet'); + } + }); const updateName = `${prefix}_update`; handle.executeUpdate(updateName).catch(() => { // Expect failure. The error caught here is a WorkflowNotFound because