diff --git a/packages/common/src/reserved.ts b/packages/common/src/reserved.ts new file mode 100644 index 000000000..79e1bd909 --- /dev/null +++ b/packages/common/src/reserved.ts @@ -0,0 +1,31 @@ +export const TEMPORAL_RESERVED_PREFIX = '__temporal_'; +export const STACK_TRACE_RESERVED_PREFIX = '__stack_trace'; +export const ENHANCED_STACK_TRACE_RESERVED_PREFIX = '__enhanced_stack_trace'; + +export const reservedPrefixes = [ + TEMPORAL_RESERVED_PREFIX, + STACK_TRACE_RESERVED_PREFIX, + ENHANCED_STACK_TRACE_RESERVED_PREFIX, +]; + +export class ReservedPrefixError extends Error { + constructor(type: string, name: string, prefix: string) { + super(`Cannot use ${type} name: '${name}', with reserved prefix: '${prefix}'`); + this.name = 'ReservedPrefixError'; + } +} + +export function throwIfReservedName(type: string, name: string): void { + const prefix = maybeGetReservedPrefix(name); + if (prefix) { + throw new ReservedPrefixError(type, name, prefix); + } +} + +export function maybeGetReservedPrefix(name: string): string | undefined { + for (const prefix of reservedPrefixes) { + if (name.startsWith(prefix)) { + return prefix; + } + } +} diff --git a/packages/test/src/test-integration-workflows.ts b/packages/test/src/test-integration-workflows.ts index ea05b71f6..b7c9d1d4d 100644 --- a/packages/test/src/test-integration-workflows.ts +++ b/packages/test/src/test-integration-workflows.ts @@ -1,14 +1,24 @@ import { setTimeout as setTimeoutPromise } from 'timers/promises'; import { randomUUID } from 'crypto'; +import asyncRetry from 'async-retry'; import { ExecutionContext } from 'ava'; import { firstValueFrom, Subject } from 'rxjs'; -import { WorkflowFailedError } from '@temporalio/client'; +import { WorkflowFailedError, WorkflowHandle } from '@temporalio/client'; import * as activity from '@temporalio/activity'; import { msToNumber, tsToMs } from '@temporalio/common/lib/time'; import { TestWorkflowEnvironment } from '@temporalio/testing'; import { CancelReason } from '@temporalio/worker/lib/activity'; import * as workflow from '@temporalio/workflow'; -import { defineQuery, defineSignal } from '@temporalio/workflow'; +import { + condition, + defineQuery, + defineSignal, + defineUpdate, + setDefaultQueryHandler, + setDefaultSignalHandler, + setDefaultUpdateHandler, + setHandler, +} from '@temporalio/workflow'; import { SdkFlags } from '@temporalio/workflow/lib/flags'; import { ActivityCancellationType, @@ -19,6 +29,7 @@ import { TypedSearchAttributes, WorkflowExecutionAlreadyStartedError, } from '@temporalio/common'; +import { reservedPrefixes } from '@temporalio/common/lib/reserved'; import { signalSchedulingWorkflow } from './activities/helpers'; import { activityStartedSignal } from './workflows/definitions'; import * as workflows from './workflows'; @@ -1414,3 +1425,185 @@ test('Workflow can return root workflow', async (t) => { t.deepEqual(result, 'empty test-root-workflow-length'); }); }); + +test('Cannot register activities using reserved prefixes', async (t) => { + const { createWorker } = helpers(t); + + for (const prefix of reservedPrefixes) { + const activityName = prefix + '_test'; + await t.throwsAsync( + createWorker({ + activities: { [activityName]: () => {} }, + }), + { + name: 'ReservedPrefixError', + message: `Cannot use activity name: '${activityName}', with reserved prefix: '${prefix}'`, + } + ); + } +}); + +test('Cannot register task queues using reserved prefixes', async (t) => { + const { createWorker } = helpers(t); + + for (const prefix of reservedPrefixes) { + const taskQueue = prefix + '_test'; + + await t.throwsAsync( + createWorker({ + taskQueue, + }), + { + name: 'ReservedPrefixError', + message: `Cannot use task queue name: '${taskQueue}', with reserved prefix: '${prefix}'`, + } + ); + } +}); + +interface HandlerError { + name: string; + message: string; +} + +export async function workflowBadPrefixHandler(prefix: string): Promise { + // Re-package errors, default payload converter has trouble converting native errors (no 'data' field). + const expectedErrors: HandlerError[] = []; + try { + setHandler(defineSignal(prefix + '_signal'), () => {}); + } catch (e) { + if (e instanceof Error) { + expectedErrors.push({ name: e.name, message: e.message }); + } + } + try { + setHandler(defineUpdate(prefix + '_update'), () => {}); + } catch (e) { + if (e instanceof Error) { + expectedErrors.push({ name: e.name, message: e.message }); + } + } + try { + setHandler(defineQuery(prefix + '_query'), () => {}); + } catch (e) { + if (e instanceof Error) { + expectedErrors.push({ name: e.name, message: e.message }); + } + } + return expectedErrors; +} + +test('Workflow failure if define signals/updates/queries with reserved prefixes', async (t) => { + const { createWorker, executeWorkflow } = helpers(t); + const worker = await createWorker(); + await worker.runUntil(async () => { + for (const prefix of reservedPrefixes) { + const result = await executeWorkflow(workflowBadPrefixHandler, { + args: [prefix], + }); + t.deepEqual(result, [ + { + name: 'ReservedPrefixError', + message: `Cannot use signal name: '${prefix}_signal', with reserved prefix: '${prefix}'`, + }, + { + name: 'ReservedPrefixError', + message: `Cannot use update name: '${prefix}_update', with reserved prefix: '${prefix}'`, + }, + { + name: 'ReservedPrefixError', + message: `Cannot use query name: '${prefix}_query', with reserved prefix: '${prefix}'`, + }, + ]); + } + }); +}); + +export const wfReadyQuery = defineQuery('wf-ready'); +export async function workflowWithDefaultHandlers(): Promise { + let unblocked = false; + setHandler(defineSignal('unblock'), () => { + unblocked = true; + }); + + setDefaultQueryHandler(() => {}); + setDefaultSignalHandler(() => {}); + setDefaultUpdateHandler(() => {}); + setHandler(wfReadyQuery, () => true); + + await condition(() => unblocked); +} + +test('Default handlers fail given reserved prefix', async (t) => { + const { createWorker, startWorkflow } = helpers(t); + const worker = await createWorker(); + + const assertWftFailure = async (handle: WorkflowHandle, errMsg: string) => { + await asyncRetry( + async () => { + const history = await handle.fetchHistory(); + const wftFailedEvent = history.events?.findLast((ev) => ev.workflowTaskFailedEventAttributes); + if (wftFailedEvent === undefined) { + throw new Error('No WFT failed event found'); + } + const { failure } = wftFailedEvent.workflowTaskFailedEventAttributes ?? {}; + if (!failure) { + return t.fail('Expected failure in workflowTaskFailedEventAttributes'); + } + t.is(failure.message, errMsg); + }, + { minTimeout: 300, factor: 1, retries: 10 } + ); + }; + + await worker.runUntil(async () => { + for (const prefix of reservedPrefixes) { + // Reserved query + let handle = await startWorkflow(workflowWithDefaultHandlers); + await asyncRetry(async () => { + if (!(await handle.query(wfReadyQuery))) { + throw new Error('Workflow not ready yet'); + } + }); + const queryName = `${prefix}_query`; + await t.throwsAsync( + handle.query(queryName), + { + // ReservedPrefixError transforms to a QueryNotRegisteredError on the way back from server + name: 'QueryNotRegisteredError', + message: `Cannot use query name: '${queryName}', with reserved prefix: '${prefix}'`, + }, + `Query ${queryName} should fail` + ); + await handle.terminate(); + + // Reserved signal + handle = await startWorkflow(workflowWithDefaultHandlers); + await asyncRetry(async () => { + if (!(await handle.query(wfReadyQuery))) { + throw new Error('Workflow not ready yet'); + } + }); + const signalName = `${prefix}_signal`; + await handle.signal(signalName); + await assertWftFailure(handle, `Cannot use signal name: '${signalName}', with reserved prefix: '${prefix}'`); + await handle.terminate(); + + // Reserved update + handle = await startWorkflow(workflowWithDefaultHandlers); + await asyncRetry(async () => { + if (!(await handle.query(wfReadyQuery))) { + throw new Error('Workflow not ready yet'); + } + }); + const updateName = `${prefix}_update`; + handle.executeUpdate(updateName).catch(() => { + // Expect failure. The error caught here is a WorkflowNotFound because + // the workflow will have already failed, so the update cannot go through. + // We assert on the expected failure below. + }); + await assertWftFailure(handle, `Cannot use update name: '${updateName}', with reserved prefix: '${prefix}'`); + await handle.terminate(); + } + }); +}); diff --git a/packages/worker/src/worker-options.ts b/packages/worker/src/worker-options.ts index 6b27f0be0..8e7382633 100644 --- a/packages/worker/src/worker-options.ts +++ b/packages/worker/src/worker-options.ts @@ -14,6 +14,7 @@ import { loadDataConverter } from '@temporalio/common/lib/internal-non-workflow' import { LoggerSinks } from '@temporalio/workflow'; import { Context } from '@temporalio/activity'; import { native } from '@temporalio/core-bridge'; +import { throwIfReservedName } from '@temporalio/common/lib/reserved'; import { ActivityInboundLogInterceptor } from './activity-log-interceptor'; import { NativeConnection } from './connection'; import { CompiledWorkerInterceptors, WorkerInterceptors } from './interceptors'; @@ -953,6 +954,9 @@ export function compileWorkerOptions( } const activities = new Map(Object.entries(opts.activities ?? {}).filter(([_, v]) => typeof v === 'function')); + for (const activityName of activities.keys()) { + throwIfReservedName('activity', activityName); + } const tuner = asNativeTuner(opts.tuner, logger); return { diff --git a/packages/worker/src/worker.ts b/packages/worker/src/worker.ts index 7a1487d72..328e43ad1 100644 --- a/packages/worker/src/worker.ts +++ b/packages/worker/src/worker.ts @@ -57,6 +57,7 @@ import { workflowLogAttributes } from '@temporalio/workflow/lib/logs'; import { native } from '@temporalio/core-bridge'; import { coresdk, temporal } from '@temporalio/proto'; import { type SinkCall, type WorkflowInfo } from '@temporalio/workflow'; +import { throwIfReservedName } from '@temporalio/common/lib/reserved'; import { Activity, CancelReason, activityLogAttributes } from './activity'; import { extractNativeClient, extractReferenceHolders, InternalNativeConnection, NativeConnection } from './connection'; import { ActivityExecuteInput } from './interceptors'; @@ -467,6 +468,7 @@ export class Worker { * This method initiates a connection to the server and will throw (asynchronously) on connection failure. */ public static async create(options: WorkerOptions): Promise { + throwIfReservedName('task queue', options.taskQueue); const runtime = Runtime.instance(); const logger = LoggerWithComposedMetadata.compose(runtime.logger, { sdkComponent: SdkComponent.worker, diff --git a/packages/workflow/src/internals.ts b/packages/workflow/src/internals.ts index b8e86b2a2..555b9286a 100644 --- a/packages/workflow/src/internals.ts +++ b/packages/workflow/src/internals.ts @@ -31,6 +31,12 @@ import { import { composeInterceptors } from '@temporalio/common/lib/interceptors'; import { makeProtoEnumConverters } from '@temporalio/common/lib/internal-workflow'; import type { coresdk, temporal } from '@temporalio/proto'; +import { + ENHANCED_STACK_TRACE_RESERVED_PREFIX, + ReservedPrefixError, + STACK_TRACE_RESERVED_PREFIX, + maybeGetReservedPrefix, +} from '@temporalio/common/lib/reserved'; import { alea, RNG } from './alea'; import { RootCancellationScope } from './cancellation-scope'; import { UpdateScope } from './update-scope'; @@ -260,7 +266,7 @@ export class Activator implements ActivationHandler { */ public readonly queryHandlers = new Map([ [ - '__stack_trace', + STACK_TRACE_RESERVED_PREFIX, { handler: () => { return this.getStackTraces() @@ -271,7 +277,7 @@ export class Activator implements ActivationHandler { }, ], [ - '__enhanced_stack_trace', + ENHANCED_STACK_TRACE_RESERVED_PREFIX, { handler: (): EnhancedStackTrace => { const { sourceMap } = this; @@ -679,11 +685,17 @@ export class Activator implements ActivationHandler { throw new TypeError('Missing query activation attributes'); } - const execute = composeInterceptors( - this.interceptors.inbound, - 'handleQuery', - this.queryWorkflowNextHandler.bind(this) - ); + const reservedPrefix = maybeGetReservedPrefix(queryType); + if (reservedPrefix) { + // Must have (internal) query handler for reserved query. + if (!this.queryHandlers.has(queryType)) { + throw new ReservedPrefixError('query', queryType, reservedPrefix); + } + } + + // Skip interceptors if it is an internal query + const interceptors = reservedPrefix ? [] : this.interceptors.inbound; + const execute = composeInterceptors(interceptors, 'handleQuery', this.queryWorkflowNextHandler.bind(this)); execute({ queryName: queryType, args: arrayFromPayloads(this.payloadConverter, activation.arguments), @@ -706,6 +718,11 @@ export class Activator implements ActivationHandler { if (!protocolInstanceId) { throw new TypeError('Missing activation update protocolInstanceId'); } + const reservedPrefix = maybeGetReservedPrefix(name); + if (reservedPrefix && !this.updateHandlers.get(name)) { + // Must have (internal) update handler for reserved update. + throw new ReservedPrefixError('update', name, reservedPrefix); + } const entry = this.updateHandlers.get(name) ?? @@ -859,6 +876,14 @@ export class Activator implements ActivationHandler { throw new TypeError('Missing activation signalName'); } + const reservedPrefix = maybeGetReservedPrefix(signalName); + if (reservedPrefix) { + if (!this.signalHandlers.has(signalName)) { + // Must have (internal) signal handler for reserved signal. + throw new ReservedPrefixError('signal', signalName, reservedPrefix); + } + } + if (!this.signalHandlers.has(signalName) && !this.defaultSignalHandler) { this.bufferedSignals.push(activation); return; diff --git a/packages/workflow/src/workflow.ts b/packages/workflow/src/workflow.ts index 2d5898865..a9024cd04 100644 --- a/packages/workflow/src/workflow.ts +++ b/packages/workflow/src/workflow.ts @@ -33,6 +33,7 @@ import { versioningIntentToProto } from '@temporalio/common/lib/versioning-inten import { Duration, msOptionalToTs, msToNumber, msToTs, requiredTsToMs } from '@temporalio/common/lib/time'; import { composeInterceptors } from '@temporalio/common/lib/interceptors'; import { temporal } from '@temporalio/proto'; +import { throwIfReservedName } from '@temporalio/common/lib/reserved'; import { CancellationScope, registerSleepImplementation } from './cancellation-scope'; import { UpdateScope } from './update-scope'; import { @@ -1272,6 +1273,8 @@ export function setHandler< options?: QueryHandlerOptions | SignalHandlerOptions | UpdateHandlerOptions ): void { const activator = assertInWorkflowContext('Workflow.setHandler(...) may only be used from a Workflow Execution.'); + // Cannot register handler for reserved names + throwIfReservedName(def.type, def.name); const description = options?.description; if (def.type === 'update') { if (typeof handler === 'function') {