From 7b3aa1c2af98514b7d4d2b3730016ead6cc1d49c Mon Sep 17 00:00:00 2001 From: Thomas Hardy Date: Thu, 20 Mar 2025 19:33:58 -0700 Subject: [PATCH 01/12] init impl of user metadata --- packages/client/src/schedule-helpers.ts | 10 ++- packages/client/src/schedule-types.ts | 4 + packages/client/src/types.ts | 2 + packages/client/src/workflow-client.ts | 18 +++- packages/common/src/workflow-options.ts | 15 ++++ packages/workflow/src/interceptors.ts | 27 +++++- packages/workflow/src/internals.ts | 3 + packages/workflow/src/workflow.ts | 105 +++++++++++++++--------- 8 files changed, 141 insertions(+), 43 deletions(-) diff --git a/packages/client/src/schedule-helpers.ts b/packages/client/src/schedule-helpers.ts index e403b7cce..1208aa35a 100644 --- a/packages/client/src/schedule-helpers.ts +++ b/packages/client/src/schedule-helpers.ts @@ -1,5 +1,5 @@ import Long from 'long'; // eslint-disable-line import/no-named-as-default -import { compileRetryPolicy, decompileRetryPolicy, extractWorkflowType, LoadedDataConverter } from '@temporalio/common'; +import { compileRetryPolicy, decompileRetryPolicy, extractWorkflowType, JsonPayloadConverter, LoadedDataConverter } from '@temporalio/common'; import { encodeUnifiedSearchAttributes, decodeSearchAttributes, @@ -189,8 +189,7 @@ export function decodeOptionalStructuredCalendarSpecs( } export function compileScheduleOptions(options: ScheduleOptions): CompiledScheduleOptions { - const workflowTypeOrFunc = options.action.workflowType; - const workflowType = extractWorkflowType(workflowTypeOrFunc); + const workflowType = extractWorkflowType(options.action.workflowType); return { ...options, action: { @@ -240,6 +239,7 @@ export async function encodeScheduleAction( action: CompiledScheduleAction, headers: Headers ): Promise { + const jsonConverter = new JsonPayloadConverter() return { startWorkflow: { workflowId: action.workflowId, @@ -263,6 +263,10 @@ export async function encodeScheduleAction( } : undefined, header: { fields: headers }, + userMetadata: { + summary: jsonConverter.toPayload(action.staticSummary), + details: jsonConverter.toPayload(action.staticDetails) + } }, }; } diff --git a/packages/client/src/schedule-types.ts b/packages/client/src/schedule-types.ts index 9630e2f50..30204a88e 100644 --- a/packages/client/src/schedule-types.ts +++ b/packages/client/src/schedule-types.ts @@ -783,6 +783,8 @@ export type ScheduleOptionsStartWorkflowAction = { | 'workflowExecutionTimeout' | 'workflowRunTimeout' | 'workflowTaskTimeout' + | 'staticDetails' + | 'staticSummary' > & { /** * Workflow id to use when starting. Assign a meaningful business id. @@ -815,6 +817,8 @@ export type ScheduleDescriptionStartWorkflowAction = ScheduleSummaryStartWorkflo | 'workflowExecutionTimeout' | 'workflowRunTimeout' | 'workflowTaskTimeout' + | 'staticSummary' + | 'staticDetails' >; // Invariant: an existing ScheduleDescriptionAction can be used as is to create or update a schedule diff --git a/packages/client/src/types.ts b/packages/client/src/types.ts index 7289f0e05..f4c0c8c19 100644 --- a/packages/client/src/types.ts +++ b/packages/client/src/types.ts @@ -65,6 +65,8 @@ export interface CountWorkflowExecution { export type WorkflowExecutionDescription = Replace< WorkflowExecutionInfo, { + staticSummary?: string; + staticDetails?: string; raw: DescribeWorkflowExecutionResponse; } >; diff --git a/packages/client/src/workflow-client.ts b/packages/client/src/workflow-client.ts index 15b75ce3e..53c82cbdc 100644 --- a/packages/client/src/workflow-client.ts +++ b/packages/client/src/workflow-client.ts @@ -22,6 +22,7 @@ import { decodeRetryState, encodeWorkflowIdConflictPolicy, WorkflowIdConflictPolicy, + JsonPayloadConverter, } from '@temporalio/common'; import { encodeUnifiedSearchAttributes } from '@temporalio/common/lib/converter/payload-search-attributes'; import { composeInterceptors } from '@temporalio/common/lib/interceptors'; @@ -510,7 +511,7 @@ export class WorkflowClient extends BaseClient { protected async _start( workflowTypeOrFunc: string | T, - options: WithWorkflowArgs, + options: WorkflowStartOptions, interceptors: WorkflowClientInterceptor[] ): Promise { const workflowType = extractWorkflowType(workflowTypeOrFunc); @@ -1196,6 +1197,7 @@ export class WorkflowClient extends BaseClient { protected async _signalWithStartWorkflowHandler(input: WorkflowSignalWithStartInput): Promise { const { identity } = this.options; const { options, workflowType, signalName, signalArgs, headers } = input; + const jsonConverter = new JsonPayloadConverter(); const req: temporal.api.workflowservice.v1.ISignalWithStartWorkflowExecutionRequest = { namespace: this.options.namespace, identity, @@ -1225,6 +1227,10 @@ export class WorkflowClient extends BaseClient { : undefined, cronSchedule: options.cronSchedule, header: { fields: headers }, + userMetadata: { + summary: jsonConverter.toPayload(options?.staticSummary), + details: jsonConverter.toPayload(options?.staticDetails) + } }; try { return (await this.workflowService.signalWithStartWorkflowExecution(req)).runId; @@ -1265,7 +1271,7 @@ export class WorkflowClient extends BaseClient { protected async createStartWorkflowRequest(input: WorkflowStartInput): Promise { const { options: opts, workflowType, headers } = input; const { identity, namespace } = this.options; - + const jsonConverter = new JsonPayloadConverter() return { namespace, identity, @@ -1293,6 +1299,10 @@ export class WorkflowClient extends BaseClient { : undefined, cronSchedule: opts.cronSchedule, header: { fields: headers }, + userMetadata: { + summary: jsonConverter.toPayload(opts?.staticSummary), + details: jsonConverter.toPayload(opts?.staticDetails) + } }; } @@ -1426,8 +1436,12 @@ export class WorkflowClient extends BaseClient { workflowExecution: { workflowId, runId }, }); const info = await executionInfoFromRaw(raw.workflowExecutionInfo ?? {}, this.client.dataConverter, raw); + const jsonConverter = new JsonPayloadConverter() + const userMetadata = raw.executionConfig?.userMetadata return { ...info, + staticDetails: userMetadata?.details ? jsonConverter.fromPayload(userMetadata.details) : undefined, + staticSummary: userMetadata?.summary ? jsonConverter.fromPayload(userMetadata.summary) : undefined, raw, }; }, diff --git a/packages/common/src/workflow-options.ts b/packages/common/src/workflow-options.ts index 6206a9861..56eddc008 100644 --- a/packages/common/src/workflow-options.ts +++ b/packages/common/src/workflow-options.ts @@ -190,6 +190,21 @@ export interface BaseWorkflowOptions { * by {@link typedSearchAttributes}. */ typedSearchAttributes?: SearchAttributePair[] | TypedSearchAttributes; + + /** + * General fixed details for this workflow execution that may appear in UI/CLI. + * This can be in Temporal markdown format and can span multiple lines. + * + * @experimental + */ + staticDetails?: string; + /** + * A single-line fixed summary for this workflow execution that may appear in the UI/CLI. + * This can be in single-line Temporal markdown format. + * + * @experimental + */ + staticSummary?: string; } export type WithWorkflowArgs = T & diff --git a/packages/workflow/src/interceptors.ts b/packages/workflow/src/interceptors.ts index 1ae70c4b5..aaf2a2d81 100644 --- a/packages/workflow/src/interceptors.ts +++ b/packages/workflow/src/interceptors.ts @@ -7,7 +7,7 @@ */ import { ActivityOptions, Headers, LocalActivityOptions, Next, Timestamp, WorkflowExecution } from '@temporalio/common'; -import type { coresdk } from '@temporalio/proto'; +import type { coresdk, temporal } from '@temporalio/proto'; import { ChildWorkflowOptionsWithDefaults, ContinueAsNewOptions } from './interfaces'; export { Next, Headers }; @@ -80,6 +80,7 @@ export interface ActivityInput { readonly options: ActivityOptions; readonly headers: Headers; readonly seq: number; + readonly cmdOpts?: WorkflowCommandOptions; } /** Input for WorkflowOutboundCallsInterceptor.scheduleLocalActivity */ @@ -91,6 +92,7 @@ export interface LocalActivityInput { readonly seq: number; readonly originalScheduleTime?: Timestamp; readonly attempt: number; + readonly cmdOpts?: WorkflowCommandOptions; } /** Input for WorkflowOutboundCallsInterceptor.startChildWorkflowExecution */ @@ -101,10 +103,33 @@ export interface StartChildWorkflowExecutionInput { readonly seq: number; } +/** + * User metadata that can be attached to workflow commands. + * + * Current used for: + * - startTimer, scheduleActivity/scheduleLocalActivity commands + * - internal metadata query + */ +export interface UserMetadata { + /** @experimental A single line summary of the command's purpose */ + summary?: string; + /** @experimental Additional details about the command for longer-text description, can span multiple lines */ + details?: string; +} + +/** + * Options that can be attached to workflow commands. + */ +export interface WorkflowCommandOptions { + /** User metadata for the command that may be persisted to history */ + readonly userMetadata?: UserMetadata +} + /** Input for WorkflowOutboundCallsInterceptor.startTimer */ export interface TimerInput { readonly durationMs: number; readonly seq: number; + readonly cmdOpts?: WorkflowCommandOptions; } /** diff --git a/packages/workflow/src/internals.ts b/packages/workflow/src/internals.ts index acc49985b..29a43f168 100644 --- a/packages/workflow/src/internals.ts +++ b/packages/workflow/src/internals.ts @@ -313,6 +313,7 @@ export class Activator implements ActivationHandler { signalDefinitions, updateDefinitions, }, + currentDetails: this.currentDetails, }; }, description: 'Returns metadata associated with this workflow.', @@ -416,6 +417,8 @@ export class Activator implements ActivationHandler { public readonly registeredActivityNames: Set; + public currentDetails: string = ""; + constructor({ info, now, diff --git a/packages/workflow/src/workflow.ts b/packages/workflow/src/workflow.ts index 711b71b04..5a09ead5c 100644 --- a/packages/workflow/src/workflow.ts +++ b/packages/workflow/src/workflow.ts @@ -22,6 +22,7 @@ import { WorkflowReturnType, WorkflowUpdateValidatorType, SearchAttributeUpdatePair, + JsonPayloadConverter, } from '@temporalio/common'; import { encodeUnifiedSearchAttributes, @@ -112,6 +113,9 @@ function timerNextHandler(input: TimerInput) { seq: input.seq, startToFireTimeout: msToTs(input.durationMs), }, + userMetadata: { + summary: new JsonPayloadConverter().toPayload(input.cmdOpts?.userMetadata?.summary) + } }); activator.completions.timer.set(input.seq, { resolve, @@ -128,7 +132,7 @@ function timerNextHandler(input: TimerInput) { * @param ms sleep duration - number of milliseconds or {@link https://www.npmjs.com/package/ms | ms-formatted string}. * If given a negative number or 0, value will be set to 1. */ -export function sleep(ms: Duration): Promise { +export function sleep(ms: Duration, summary?: string): Promise { const activator = assertInWorkflowContext('Workflow.sleep(...) may only be used from a Workflow Execution'); const seq = activator.nextSeqs.timer++; @@ -139,6 +143,7 @@ export function sleep(ms: Duration): Promise { return execute({ durationMs, seq, + cmdOpts: { userMetadata: { summary }} }); } @@ -154,7 +159,7 @@ const validateLocalActivityOptions = validateActivityOptions; /** * Push a scheduleActivity command into activator accumulator and register completion */ -function scheduleActivityNextHandler({ options, args, headers, seq, activityType }: ActivityInput): Promise { +function scheduleActivityNextHandler({ options, args, headers, seq, activityType, cmdOpts }: ActivityInput): Promise { const activator = getActivator(); validateActivityOptions(options); return new Promise((resolve, reject) => { @@ -194,6 +199,9 @@ function scheduleActivityNextHandler({ options, args, headers, seq, activityType doNotEagerlyExecute: !(options.allowEagerDispatch ?? true), versioningIntent: versioningIntentToProto(options.versioningIntent), }, + userMetadata: { + summary: new JsonPayloadConverter().toPayload(cmdOpts?.userMetadata?.summary) + } }); activator.completions.activity.set(seq, { resolve, @@ -213,6 +221,7 @@ async function scheduleLocalActivityNextHandler({ activityType, attempt, originalScheduleTime, + cmdOpts }: LocalActivityInput): Promise { const activator = getActivator(); // Eagerly fail the local activity (which will in turn fail the workflow task. @@ -259,6 +268,9 @@ async function scheduleLocalActivityNextHandler({ headers, cancellationType: encodeActivityCancellationType(options.cancellationType), }, + userMetadata: { + summary: new JsonPayloadConverter().toPayload(cmdOpts?.userMetadata?.summary) + } }); activator.completions.activity.set(seq, { resolve, @@ -271,7 +283,7 @@ async function scheduleLocalActivityNextHandler({ * Schedule an activity and run outbound interceptors * @hidden */ -export function scheduleActivity(activityType: string, args: any[], options: ActivityOptions): Promise { +export function scheduleActivity(activityType: string, args: any[], options: ActivityOptions, summary?: string): Promise { const activator = assertInWorkflowContext( 'Workflow.scheduleActivity(...) may only be used from a Workflow Execution' ); @@ -287,6 +299,7 @@ export function scheduleActivity(activityType: string, args: any[], options: options, args, seq, + cmdOpts: { userMetadata: { summary }} }) as Promise; } @@ -297,7 +310,8 @@ export function scheduleActivity(activityType: string, args: any[], options: export async function scheduleLocalActivity( activityType: string, args: any[], - options: LocalActivityOptions + options: LocalActivityOptions, + summary?: string, ): Promise { const activator = assertInWorkflowContext( 'Workflow.scheduleLocalActivity(...) may only be used from a Workflow Execution' @@ -326,6 +340,7 @@ export async function scheduleLocalActivity( seq, attempt, originalScheduleTime, + cmdOpts: { userMetadata: { summary }} })) as Promise; } catch (err) { if (err instanceof LocalActivityDoBackoff) { @@ -506,38 +521,26 @@ export type ActivityInterfaceFor = { * * @return a {@link https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Proxy | Proxy} for * which each attribute is a callable Activity function + */ +export function proxyActivities(options: ActivityOptions): ActivityInterfaceFor; + +/** + * Configure Activity functions with given {@link ActivityOptions} and a summary. * - * @example - * ```ts - * import { proxyActivities } from '@temporalio/workflow'; - * import * as activities from '../activities'; - * - * // Setup Activities from module exports - * const { httpGet, otherActivity } = proxyActivities({ - * startToCloseTimeout: '30 minutes', - * }); - * - * // Setup Activities from an explicit interface (e.g. when defined by another SDK) - * interface JavaActivities { - * httpGetFromJava(url: string): Promise - * someOtherJavaActivity(arg1: number, arg2: string): Promise; - * } - * - * const { - * httpGetFromJava, - * someOtherJavaActivity - * } = proxyActivities({ - * taskQueue: 'java-worker-taskQueue', - * startToCloseTimeout: '5m', - * }); - * - * export function execute(): Promise { - * const response = await httpGet("http://example.com"); - * // ... - * } - * ``` + * @param options Activity options + * @param summary A description of the activity's purpose, useful for debugging and monitoring + * @return a {@link https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Proxy | Proxy} for + * which each attribute is a callable Activity function with the provided summary */ -export function proxyActivities(options: ActivityOptions): ActivityInterfaceFor { +export function proxyActivities( + options: ActivityOptions, + summary: string +): ActivityInterfaceFor; + +export function proxyActivities( + options: ActivityOptions, + summary?: string +): ActivityInterfaceFor { if (options === undefined) { throw new TypeError('options must be defined'); } @@ -551,7 +554,7 @@ export function proxyActivities(options: ActivityOptions) throw new TypeError(`Only strings are supported for Activity types, got: ${String(activityType)}`); } return function activityProxyFunction(...args: unknown[]): Promise { - return scheduleActivity(activityType, args, options); + return scheduleActivity(activityType, args, options, summary); }; }, } @@ -568,7 +571,25 @@ export function proxyActivities(options: ActivityOptions) * * @see {@link proxyActivities} for examples */ -export function proxyLocalActivities(options: LocalActivityOptions): ActivityInterfaceFor { +export function proxyLocalActivities(options: LocalActivityOptions): ActivityInterfaceFor; + +/** + * Configure Local Activity functions with given {@link LocalActivityOptions} and a summary. + * + * @param options Local activity options + * @param summary A description of the activity's purpose, useful for debugging and monitoring + * @return a {@link https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Proxy | Proxy} + * for which each attribute is a callable Activity function with the provided summary + */ +export function proxyLocalActivities( + options: LocalActivityOptions, + summary: string +): ActivityInterfaceFor; + +export function proxyLocalActivities( + options: LocalActivityOptions, + summary?: string +): ActivityInterfaceFor { if (options === undefined) { throw new TypeError('options must be defined'); } @@ -582,7 +603,7 @@ export function proxyLocalActivities(options: LocalActivi throw new TypeError(`Only strings are supported for Activity types, got: ${String(activityType)}`); } return function localActivityProxyFunction(...args: unknown[]) { - return scheduleLocalActivity(activityType, args, options); + return scheduleLocalActivity(activityType, args, options, summary); }; }, } @@ -1590,3 +1611,13 @@ export function allHandlersFinished(): boolean { export const stackTraceQuery = defineQuery('__stack_trace'); export const enhancedStackTraceQuery = defineQuery('__enhanced_stack_trace'); export const workflowMetadataQuery = defineQuery('__temporal_workflow_metadata'); + +export function getCurrentDetails(): string { + const activator = assertInWorkflowContext('getCurrentDetails() may only be used from a Workflow Execution.'); + return activator.currentDetails; +} + +export function setCurrentDetails(details: string): void { + const activator = assertInWorkflowContext('getCurrentDetails() may only be used from a Workflow Execution.'); + activator.currentDetails = details; +} \ No newline at end of file From 2dab40dd5b364517a8809008fccf6fc573d731b4 Mon Sep 17 00:00:00 2001 From: Thomas Hardy Date: Fri, 21 Mar 2025 12:06:51 -0700 Subject: [PATCH 02/12] add test, removed proxyActivities overloads instead added withSummaries field to add a summary to an activity --- .../test/src/test-integration-workflows.ts | 80 ++++++- packages/workflow/src/workflow.ts | 196 ++++++++++++------ 2 files changed, 206 insertions(+), 70 deletions(-) diff --git a/packages/test/src/test-integration-workflows.ts b/packages/test/src/test-integration-workflows.ts index 3a12aa384..9fd92b081 100644 --- a/packages/test/src/test-integration-workflows.ts +++ b/packages/test/src/test-integration-workflows.ts @@ -8,15 +8,16 @@ import { msToNumber, tsToMs } from '@temporalio/common/lib/time'; import { TestWorkflowEnvironment } from '@temporalio/testing'; import { CancelReason } from '@temporalio/worker/lib/activity'; import * as workflow from '@temporalio/workflow'; -import { defineQuery, defineSignal } from '@temporalio/workflow'; +import { defineQuery, defineSignal, setHandler } from '@temporalio/workflow'; import { SdkFlags } from '@temporalio/workflow/lib/flags'; -import { ActivityCancellationType, ApplicationFailure, WorkflowExecutionAlreadyStartedError } from '@temporalio/common'; +import { ActivityCancellationType, ApplicationFailure, JsonPayloadConverter, WorkflowExecutionAlreadyStartedError } from '@temporalio/common'; import { signalSchedulingWorkflow } from './activities/helpers'; import { activityStartedSignal } from './workflows/definitions'; import * as workflows from './workflows'; import { Context, helpers, makeTestFunction } from './helpers-integration'; import { overrideSdkInternalFlag } from './mock-internal-flags'; import { asSdkLoggerSink, loadHistory, RUN_TIME_SKIPPING_TESTS } from './helpers'; +import { temporal } from '@temporalio/proto'; const test = makeTestFunction({ workflowsPath: __filename, @@ -1303,3 +1304,78 @@ test('Count workflow executions', async (t) => { ], }); }); + +export async function userMetadataWorkflow(): Promise { + let done = false; + const signalDef = defineSignal('done') + setHandler(signalDef, () => { done = true }) + + // That workflow should call an activity (with summary) + const { activityWithSummary } = workflow + .proxyActivities({ scheduleToCloseTimeout: '10s' }) + .withSummaries({ + activityWithSummary: 'activity summary' + }) + await activityWithSummary() + // Should have a timer (with summary) + await workflow.sleep(5, "timer summary") + // Set current details + workflow.setCurrentDetails('current wf details'); + // Unblock on var -> query current details (or return) + await workflow.condition(() => done); + return workflow.getCurrentDetails(); +} + +test('User metadata', async (t) => { + const { createWorker, startWorkflow } = helpers(t); + const worker = await createWorker({ + activities: { + async activityWithSummary() {} + } + }); + + await worker.runUntil(async () => { + // Start a workflow with static details + const handle = await startWorkflow(userMetadataWorkflow, { + staticSummary: "wf static summary", + staticDetails: "wf static details" + }); + // Describe workflow -> static summary, static details + const desc = await handle.describe(); + t.true(desc.staticSummary === 'wf static summary'); + t.true(desc.staticDetails === 'wf static details'); + + await handle.signal('done'); + const res = await handle.result(); + t.true(res === 'current wf details'); + + // Get history events for timer and activity summaries. + const resp = await t.context.env.client.workflowService.getWorkflowExecutionHistory( + { + namespace: t.context.env.client.options.namespace, + execution: { + workflowId: handle.workflowId, + runId: handle.firstExecutionRunId + }, + } + ); + const jsonConverter = new JsonPayloadConverter(); + for (const event of resp.history?.events ?? []) { + if (event.eventType === temporal.api.enums.v1.EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED) { + t.deepEqual(jsonConverter.fromPayload(event.userMetadata?.summary ?? {}), 'wf static summary'); + t.deepEqual(jsonConverter.fromPayload(event.userMetadata?.details ?? {}), 'wf static details'); + } else if (event.eventType === temporal.api.enums.v1.EventType.EVENT_TYPE_ACTIVITY_TASK_SCHEDULED) { + t.deepEqual(jsonConverter.fromPayload(event.userMetadata?.summary ?? {}), 'activity summary'); + } else if (event.eventType === temporal.api.enums.v1.EventType.EVENT_TYPE_TIMER_STARTED) { + t.deepEqual(jsonConverter.fromPayload(event.userMetadata?.summary ?? {}), 'timer summary'); + } + } + + // Run metadata query -> get current details + const wfMetadata = await handle.query('__temporal_workflow_metadata') as temporal.api.sdk.v1.IWorkflowMetadata; + t.deepEqual(wfMetadata.definition?.signalDefinitions?.length, 1); + t.deepEqual(wfMetadata.definition?.signalDefinitions?.[0].name, 'done'); + t.deepEqual(wfMetadata.definition?.queryDefinitions?.length, 3); // default queries + t.deepEqual(wfMetadata.currentDetails, 'current wf details'); + }); +}); \ No newline at end of file diff --git a/packages/workflow/src/workflow.ts b/packages/workflow/src/workflow.ts index 5a09ead5c..d51d713d5 100644 --- a/packages/workflow/src/workflow.ts +++ b/packages/workflow/src/workflow.ts @@ -40,6 +40,8 @@ import { SignalWorkflowInput, StartChildWorkflowExecutionInput, TimerInput, + UserMetadata, + WorkflowCommandOptions, } from './interceptors'; import { ChildWorkflowCancellationType, @@ -82,6 +84,27 @@ export function addDefaultWorkflowOptions( }; } +function addUserMetadata(userMetadata?: UserMetadata): temporal.api.sdk.v1.IUserMetadata | undefined { + if (userMetadata == null) { + return undefined + } + + const jsonConverter = new JsonPayloadConverter(); + return { + summary: jsonConverter.toPayload(userMetadata.summary), + details: jsonConverter.toPayload(userMetadata.details), + } +} + +function addWorkflowCommandOptions(cmdOpts?: WorkflowCommandOptions): object { + if (cmdOpts == null) { + return {} + } + return { + userMetadata: addUserMetadata(cmdOpts.userMetadata) + } +} + /** * Push a startTimer command into state accumulator and register completion */ @@ -113,9 +136,7 @@ function timerNextHandler(input: TimerInput) { seq: input.seq, startToFireTimeout: msToTs(input.durationMs), }, - userMetadata: { - summary: new JsonPayloadConverter().toPayload(input.cmdOpts?.userMetadata?.summary) - } + ...addWorkflowCommandOptions(input.cmdOpts) }); activator.completions.timer.set(input.seq, { resolve, @@ -199,9 +220,7 @@ function scheduleActivityNextHandler({ options, args, headers, seq, activityType doNotEagerlyExecute: !(options.allowEagerDispatch ?? true), versioningIntent: versioningIntentToProto(options.versioningIntent), }, - userMetadata: { - summary: new JsonPayloadConverter().toPayload(cmdOpts?.userMetadata?.summary) - } + ...addWorkflowCommandOptions(cmdOpts) }); activator.completions.activity.set(seq, { resolve, @@ -268,9 +287,7 @@ async function scheduleLocalActivityNextHandler({ headers, cancellationType: encodeActivityCancellationType(options.cancellationType), }, - userMetadata: { - summary: new JsonPayloadConverter().toPayload(cmdOpts?.userMetadata?.summary) - } + ...addWorkflowCommandOptions(cmdOpts) }); activator.completions.activity.set(seq, { resolve, @@ -514,6 +531,18 @@ export type ActivityInterfaceFor = { [K in keyof T]: T[K] extends ActivityFunction ? T[K] : typeof NotAnActivityMethod; }; +/** + * Extends ActivityInterfaceFor to include the withSummaries method + */ +export type ActivityInterfaceWithSummaries = ActivityInterfaceFor & { + /** + * Provide descriptive summaries for activities + * @param summaries Record mapping activity names to their summary descriptions + * @returns A new proxy with the provided summaries + */ + withSummaries(summaries: Record): ActivityInterfaceFor; +}; + /** * Configure Activity functions with given {@link ActivityOptions}. * @@ -521,44 +550,81 @@ export type ActivityInterfaceFor = { * * @return a {@link https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Proxy | Proxy} for * which each attribute is a callable Activity function - */ -export function proxyActivities(options: ActivityOptions): ActivityInterfaceFor; - -/** - * Configure Activity functions with given {@link ActivityOptions} and a summary. * - * @param options Activity options - * @param summary A description of the activity's purpose, useful for debugging and monitoring - * @return a {@link https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Proxy | Proxy} for - * which each attribute is a callable Activity function with the provided summary + * @example + * ```ts + * import { proxyActivities } from '@temporalio/workflow'; + * import * as activities from '../activities'; + * + * // Setup Activities from module exports + * const { httpGet, otherActivity } = proxyActivities({ + * startToCloseTimeout: '30 minutes', + * }); + * + * // Setup Activities with summaries for better observability + * const { + * httpGet, + * processData, + * saveResults + * } = proxyActivities({ + * startToCloseTimeout: '10m', + * }).withSummaries({ + * httpGet: 'Fetches data from external API', + * processData: 'Processes the fetched data', + * saveResults: 'Saves processed results to database' + * }); + * + * // Setup Activities from an explicit interface (e.g. when defined by another SDK) + * interface JavaActivities { + * httpGetFromJava(url: string): Promise + * someOtherJavaActivity(arg1: number, arg2: string): Promise; + * } + * + * const { + * httpGetFromJava, + * someOtherJavaActivity + * } = proxyActivities({ + * taskQueue: 'java-worker-taskQueue', + * startToCloseTimeout: '5m', + * }); + * + * export function execute(): Promise { + * const response = await httpGet("http://example.com"); + * // ... + * } + * ``` */ -export function proxyActivities( - options: ActivityOptions, - summary: string -): ActivityInterfaceFor; - export function proxyActivities( options: ActivityOptions, - summary?: string -): ActivityInterfaceFor { +): ActivityInterfaceWithSummaries { if (options === undefined) { throw new TypeError('options must be defined'); } // Validate as early as possible for immediate user feedback validateActivityOptions(options); - return new Proxy( - {}, - { - get(_, activityType) { - if (typeof activityType !== 'string') { - throw new TypeError(`Only strings are supported for Activity types, got: ${String(activityType)}`); + + function createActivityProxy(summaries: Record = {}): ActivityInterfaceWithSummaries { + return new Proxy({} as ActivityInterfaceWithSummaries, { + get(_, prop) { + if (prop === 'withSummaries') { + return function withSummaries(newSummaries: Record): ActivityInterfaceFor { + return createActivityProxy(newSummaries); + }; } + + if (typeof prop !== 'string') { + throw new TypeError(`Only strings are supported for Activity types, got: ${String(prop)}`); + } + return function activityProxyFunction(...args: unknown[]): Promise { - return scheduleActivity(activityType, args, options, summary); + const summary = summaries[prop]; + return scheduleActivity(prop, args, options, summary); }; }, - } - ) as any; + }); + } + + return createActivityProxy(); } /** @@ -571,43 +637,37 @@ export function proxyActivities( * * @see {@link proxyActivities} for examples */ -export function proxyLocalActivities(options: LocalActivityOptions): ActivityInterfaceFor; - -/** - * Configure Local Activity functions with given {@link LocalActivityOptions} and a summary. - * - * @param options Local activity options - * @param summary A description of the activity's purpose, useful for debugging and monitoring - * @return a {@link https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Proxy | Proxy} - * for which each attribute is a callable Activity function with the provided summary - */ -export function proxyLocalActivities( - options: LocalActivityOptions, - summary: string -): ActivityInterfaceFor; - export function proxyLocalActivities( - options: LocalActivityOptions, - summary?: string -): ActivityInterfaceFor { + options: LocalActivityOptions +): ActivityInterfaceWithSummaries { if (options === undefined) { throw new TypeError('options must be defined'); } // Validate as early as possible for immediate user feedback validateLocalActivityOptions(options); - return new Proxy( - {}, - { - get(_, activityType) { - if (typeof activityType !== 'string') { - throw new TypeError(`Only strings are supported for Activity types, got: ${String(activityType)}`); + + function createLocalActivityProxy(summaries: Record = {}): ActivityInterfaceWithSummaries { + return new Proxy({} as ActivityInterfaceWithSummaries, { + get(_, prop) { + if (prop === 'withSummaries') { + return function withSummaries(newSummaries: Record): ActivityInterfaceFor { + return createLocalActivityProxy(newSummaries); + }; + } + + if (typeof prop !== 'string') { + throw new TypeError(`Only strings are supported for Activity types, got: ${String(prop)}`); } - return function localActivityProxyFunction(...args: unknown[]) { - return scheduleLocalActivity(activityType, args, options, summary); + + return function localActivityProxyFunction(...args: unknown[]): Promise { + const summary = summaries[prop]; + return scheduleLocalActivity(prop, args, options, summary); }; }, - } - ) as any; + }); + } + + return createLocalActivityProxy(); } // TODO: deprecate this patch after "enough" time has passed @@ -977,13 +1037,13 @@ export function makeContinueAsNewFunc( * @example * * ```ts - *import { continueAsNew } from '@temporalio/workflow'; -import { SearchAttributeType } from '@temporalio/common'; + * import { continueAsNew } from '@temporalio/workflow'; + * import { SearchAttributeType } from '@temporalio/common'; * - *export async function myWorkflow(n: number): Promise { - * // ... Workflow logic - * await continueAsNew(n + 1); - *} + * export async function myWorkflow(n: number): Promise { + * // ... Workflow logic + * await continueAsNew(n + 1); + * } * ``` */ export function continueAsNew(...args: Parameters): Promise { From 6af2e2c62e80ba137c83e59aff749ba44b976a3e Mon Sep 17 00:00:00 2001 From: Thomas Hardy Date: Fri, 21 Mar 2025 12:50:34 -0700 Subject: [PATCH 03/12] schedule metadata test, fix schedule action decode --- packages/client/src/schedule-helpers.ts | 22 ++++-- packages/client/src/workflow-client.ts | 14 ++-- packages/common/src/workflow-options.ts | 8 +- .../test/src/test-integration-workflows.ts | 59 +++++++------- packages/test/src/test-schedules.ts | 27 +++++++ packages/workflow/src/interceptors.ts | 6 +- packages/workflow/src/internals.ts | 2 +- packages/workflow/src/workflow.ts | 77 +++++++++++-------- 8 files changed, 133 insertions(+), 82 deletions(-) diff --git a/packages/client/src/schedule-helpers.ts b/packages/client/src/schedule-helpers.ts index 1208aa35a..34d3179c3 100644 --- a/packages/client/src/schedule-helpers.ts +++ b/packages/client/src/schedule-helpers.ts @@ -1,5 +1,11 @@ import Long from 'long'; // eslint-disable-line import/no-named-as-default -import { compileRetryPolicy, decompileRetryPolicy, extractWorkflowType, JsonPayloadConverter, LoadedDataConverter } from '@temporalio/common'; +import { + compileRetryPolicy, + decompileRetryPolicy, + extractWorkflowType, + JsonPayloadConverter, + LoadedDataConverter, +} from '@temporalio/common'; import { encodeUnifiedSearchAttributes, decodeSearchAttributes, @@ -239,7 +245,7 @@ export async function encodeScheduleAction( action: CompiledScheduleAction, headers: Headers ): Promise { - const jsonConverter = new JsonPayloadConverter() + const jsonConverter = new JsonPayloadConverter(); return { startWorkflow: { workflowId: action.workflowId, @@ -263,10 +269,10 @@ export async function encodeScheduleAction( } : undefined, header: { fields: headers }, - userMetadata: { - summary: jsonConverter.toPayload(action.staticSummary), - details: jsonConverter.toPayload(action.staticDetails) - } + userMetadata: { + summary: jsonConverter.toPayload(action.staticSummary), + details: jsonConverter.toPayload(action.staticDetails), + }, }, }; } @@ -316,6 +322,8 @@ export async function decodeScheduleAction( pb: temporal.api.schedule.v1.IScheduleAction ): Promise { if (pb.startWorkflow) { + const jsonConverter = new JsonPayloadConverter(); + const userMetadata = pb.startWorkflow?.userMetadata; return { type: 'startWorkflow', // eslint-disable-next-line @typescript-eslint/no-non-null-assertion @@ -332,6 +340,8 @@ export async function decodeScheduleAction( workflowExecutionTimeout: optionalTsToMs(pb.startWorkflow.workflowExecutionTimeout), workflowRunTimeout: optionalTsToMs(pb.startWorkflow.workflowRunTimeout), workflowTaskTimeout: optionalTsToMs(pb.startWorkflow.workflowTaskTimeout), + staticSummary: userMetadata?.summary ? jsonConverter.fromPayload(userMetadata.summary) : undefined, + staticDetails: userMetadata?.details ? jsonConverter.fromPayload(userMetadata.details) : undefined, }; } throw new TypeError('Unsupported schedule action'); diff --git a/packages/client/src/workflow-client.ts b/packages/client/src/workflow-client.ts index 53c82cbdc..0ce1bd08d 100644 --- a/packages/client/src/workflow-client.ts +++ b/packages/client/src/workflow-client.ts @@ -1229,8 +1229,8 @@ export class WorkflowClient extends BaseClient { header: { fields: headers }, userMetadata: { summary: jsonConverter.toPayload(options?.staticSummary), - details: jsonConverter.toPayload(options?.staticDetails) - } + details: jsonConverter.toPayload(options?.staticDetails), + }, }; try { return (await this.workflowService.signalWithStartWorkflowExecution(req)).runId; @@ -1271,7 +1271,7 @@ export class WorkflowClient extends BaseClient { protected async createStartWorkflowRequest(input: WorkflowStartInput): Promise { const { options: opts, workflowType, headers } = input; const { identity, namespace } = this.options; - const jsonConverter = new JsonPayloadConverter() + const jsonConverter = new JsonPayloadConverter(); return { namespace, identity, @@ -1301,8 +1301,8 @@ export class WorkflowClient extends BaseClient { header: { fields: headers }, userMetadata: { summary: jsonConverter.toPayload(opts?.staticSummary), - details: jsonConverter.toPayload(opts?.staticDetails) - } + details: jsonConverter.toPayload(opts?.staticDetails), + }, }; } @@ -1436,8 +1436,8 @@ export class WorkflowClient extends BaseClient { workflowExecution: { workflowId, runId }, }); const info = await executionInfoFromRaw(raw.workflowExecutionInfo ?? {}, this.client.dataConverter, raw); - const jsonConverter = new JsonPayloadConverter() - const userMetadata = raw.executionConfig?.userMetadata + const jsonConverter = new JsonPayloadConverter(); + const userMetadata = raw.executionConfig?.userMetadata; return { ...info, staticDetails: userMetadata?.details ? jsonConverter.fromPayload(userMetadata.details) : undefined, diff --git a/packages/common/src/workflow-options.ts b/packages/common/src/workflow-options.ts index 56eddc008..4868e6292 100644 --- a/packages/common/src/workflow-options.ts +++ b/packages/common/src/workflow-options.ts @@ -192,16 +192,16 @@ export interface BaseWorkflowOptions { typedSearchAttributes?: SearchAttributePair[] | TypedSearchAttributes; /** - * General fixed details for this workflow execution that may appear in UI/CLI. + * General fixed details for this workflow execution that may appear in UI/CLI. * This can be in Temporal markdown format and can span multiple lines. - * - * @experimental + * + * @experimental */ staticDetails?: string; /** * A single-line fixed summary for this workflow execution that may appear in the UI/CLI. * This can be in single-line Temporal markdown format. - * + * * @experimental */ staticSummary?: string; diff --git a/packages/test/src/test-integration-workflows.ts b/packages/test/src/test-integration-workflows.ts index 9fd92b081..175eb7f9e 100644 --- a/packages/test/src/test-integration-workflows.ts +++ b/packages/test/src/test-integration-workflows.ts @@ -10,14 +10,19 @@ import { CancelReason } from '@temporalio/worker/lib/activity'; import * as workflow from '@temporalio/workflow'; import { defineQuery, defineSignal, setHandler } from '@temporalio/workflow'; import { SdkFlags } from '@temporalio/workflow/lib/flags'; -import { ActivityCancellationType, ApplicationFailure, JsonPayloadConverter, WorkflowExecutionAlreadyStartedError } from '@temporalio/common'; +import { + ActivityCancellationType, + ApplicationFailure, + JsonPayloadConverter, + WorkflowExecutionAlreadyStartedError, +} from '@temporalio/common'; +import { temporal } from '@temporalio/proto'; import { signalSchedulingWorkflow } from './activities/helpers'; import { activityStartedSignal } from './workflows/definitions'; import * as workflows from './workflows'; import { Context, helpers, makeTestFunction } from './helpers-integration'; import { overrideSdkInternalFlag } from './mock-internal-flags'; import { asSdkLoggerSink, loadHistory, RUN_TIME_SKIPPING_TESTS } from './helpers'; -import { temporal } from '@temporalio/proto'; const test = makeTestFunction({ workflowsPath: __filename, @@ -1307,18 +1312,18 @@ test('Count workflow executions', async (t) => { export async function userMetadataWorkflow(): Promise { let done = false; - const signalDef = defineSignal('done') - setHandler(signalDef, () => { done = true }) - + const signalDef = defineSignal('done'); + setHandler(signalDef, () => { + done = true; + }); + // That workflow should call an activity (with summary) - const { activityWithSummary } = workflow - .proxyActivities({ scheduleToCloseTimeout: '10s' }) - .withSummaries({ - activityWithSummary: 'activity summary' - }) - await activityWithSummary() + const { activityWithSummary } = workflow.proxyActivities({ scheduleToCloseTimeout: '10s' }).withSummaries({ + activityWithSummary: 'activity summary', + }); + await activityWithSummary(); // Should have a timer (with summary) - await workflow.sleep(5, "timer summary") + await workflow.sleep(5, 'timer summary'); // Set current details workflow.setCurrentDetails('current wf details'); // Unblock on var -> query current details (or return) @@ -1326,19 +1331,19 @@ export async function userMetadataWorkflow(): Promise { return workflow.getCurrentDetails(); } -test('User metadata', async (t) => { +test('User metadata on workflow, timer, activity', async (t) => { const { createWorker, startWorkflow } = helpers(t); const worker = await createWorker({ activities: { - async activityWithSummary() {} - } + async activityWithSummary() {}, + }, }); await worker.runUntil(async () => { // Start a workflow with static details const handle = await startWorkflow(userMetadataWorkflow, { - staticSummary: "wf static summary", - staticDetails: "wf static details" + staticSummary: 'wf static summary', + staticDetails: 'wf static details', }); // Describe workflow -> static summary, static details const desc = await handle.describe(); @@ -1350,15 +1355,13 @@ test('User metadata', async (t) => { t.true(res === 'current wf details'); // Get history events for timer and activity summaries. - const resp = await t.context.env.client.workflowService.getWorkflowExecutionHistory( - { - namespace: t.context.env.client.options.namespace, - execution: { - workflowId: handle.workflowId, - runId: handle.firstExecutionRunId - }, - } - ); + const resp = await t.context.env.client.workflowService.getWorkflowExecutionHistory({ + namespace: t.context.env.client.options.namespace, + execution: { + workflowId: handle.workflowId, + runId: handle.firstExecutionRunId, + }, + }); const jsonConverter = new JsonPayloadConverter(); for (const event of resp.history?.events ?? []) { if (event.eventType === temporal.api.enums.v1.EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED) { @@ -1372,10 +1375,10 @@ test('User metadata', async (t) => { } // Run metadata query -> get current details - const wfMetadata = await handle.query('__temporal_workflow_metadata') as temporal.api.sdk.v1.IWorkflowMetadata; + const wfMetadata = (await handle.query('__temporal_workflow_metadata')) as temporal.api.sdk.v1.IWorkflowMetadata; t.deepEqual(wfMetadata.definition?.signalDefinitions?.length, 1); t.deepEqual(wfMetadata.definition?.signalDefinitions?.[0].name, 'done'); t.deepEqual(wfMetadata.definition?.queryDefinitions?.length, 3); // default queries t.deepEqual(wfMetadata.currentDetails, 'current wf details'); }); -}); \ No newline at end of file +}); diff --git a/packages/test/src/test-schedules.ts b/packages/test/src/test-schedules.ts index 58cebf9c7..d7a5ddfd5 100644 --- a/packages/test/src/test-schedules.ts +++ b/packages/test/src/test-schedules.ts @@ -758,4 +758,31 @@ if (RUN_INTEGRATION_TESTS) { await handle.delete(); } }); + + test.serial('User metadata on schedule', async (t) => { + const { client } = t.context; + const scheduleId = `schedule-with-user-metadata-${randomUUID()}`; + const handle = await client.schedule.create({ + scheduleId, + spec: {}, + action: { + type: 'startWorkflow', + workflowType: dummyWorkflow, + taskQueue, + staticSummary: 'schedule static summary', + staticDetails: 'schedule static details', + }, + }); + + try { + const describedSchedule = await handle.describe(); + t.deepEqual(describedSchedule.spec.calendars, []); + t.deepEqual(describedSchedule.spec.intervals, []); + t.deepEqual(describedSchedule.spec.skip, []); + t.deepEqual(describedSchedule.action.staticSummary, 'schedule static summary'); + t.deepEqual(describedSchedule.action.staticDetails, 'schedule static details'); + } finally { + await handle.delete(); + } + }); } diff --git a/packages/workflow/src/interceptors.ts b/packages/workflow/src/interceptors.ts index aaf2a2d81..5ee9e046a 100644 --- a/packages/workflow/src/interceptors.ts +++ b/packages/workflow/src/interceptors.ts @@ -7,7 +7,7 @@ */ import { ActivityOptions, Headers, LocalActivityOptions, Next, Timestamp, WorkflowExecution } from '@temporalio/common'; -import type { coresdk, temporal } from '@temporalio/proto'; +import type { coresdk } from '@temporalio/proto'; import { ChildWorkflowOptionsWithDefaults, ContinueAsNewOptions } from './interfaces'; export { Next, Headers }; @@ -105,7 +105,7 @@ export interface StartChildWorkflowExecutionInput { /** * User metadata that can be attached to workflow commands. - * + * * Current used for: * - startTimer, scheduleActivity/scheduleLocalActivity commands * - internal metadata query @@ -122,7 +122,7 @@ export interface UserMetadata { */ export interface WorkflowCommandOptions { /** User metadata for the command that may be persisted to history */ - readonly userMetadata?: UserMetadata + readonly userMetadata?: UserMetadata; } /** Input for WorkflowOutboundCallsInterceptor.startTimer */ diff --git a/packages/workflow/src/internals.ts b/packages/workflow/src/internals.ts index 29a43f168..b044104e9 100644 --- a/packages/workflow/src/internals.ts +++ b/packages/workflow/src/internals.ts @@ -417,7 +417,7 @@ export class Activator implements ActivationHandler { public readonly registeredActivityNames: Set; - public currentDetails: string = ""; + public currentDetails: string = ''; constructor({ info, diff --git a/packages/workflow/src/workflow.ts b/packages/workflow/src/workflow.ts index d51d713d5..aa0ab5766 100644 --- a/packages/workflow/src/workflow.ts +++ b/packages/workflow/src/workflow.ts @@ -86,23 +86,23 @@ export function addDefaultWorkflowOptions( function addUserMetadata(userMetadata?: UserMetadata): temporal.api.sdk.v1.IUserMetadata | undefined { if (userMetadata == null) { - return undefined + return undefined; } const jsonConverter = new JsonPayloadConverter(); return { summary: jsonConverter.toPayload(userMetadata.summary), details: jsonConverter.toPayload(userMetadata.details), - } + }; } function addWorkflowCommandOptions(cmdOpts?: WorkflowCommandOptions): object { if (cmdOpts == null) { - return {} + return {}; } return { - userMetadata: addUserMetadata(cmdOpts.userMetadata) - } + userMetadata: addUserMetadata(cmdOpts.userMetadata), + }; } /** @@ -136,7 +136,7 @@ function timerNextHandler(input: TimerInput) { seq: input.seq, startToFireTimeout: msToTs(input.durationMs), }, - ...addWorkflowCommandOptions(input.cmdOpts) + ...addWorkflowCommandOptions(input.cmdOpts), }); activator.completions.timer.set(input.seq, { resolve, @@ -152,6 +152,7 @@ function timerNextHandler(input: TimerInput) { * * @param ms sleep duration - number of milliseconds or {@link https://www.npmjs.com/package/ms | ms-formatted string}. * If given a negative number or 0, value will be set to 1. + * @param summary a short summary/description of the timer. Can serve as a timer ID. */ export function sleep(ms: Duration, summary?: string): Promise { const activator = assertInWorkflowContext('Workflow.sleep(...) may only be used from a Workflow Execution'); @@ -164,7 +165,7 @@ export function sleep(ms: Duration, summary?: string): Promise { return execute({ durationMs, seq, - cmdOpts: { userMetadata: { summary }} + cmdOpts: { userMetadata: { summary } }, }); } @@ -180,7 +181,14 @@ const validateLocalActivityOptions = validateActivityOptions; /** * Push a scheduleActivity command into activator accumulator and register completion */ -function scheduleActivityNextHandler({ options, args, headers, seq, activityType, cmdOpts }: ActivityInput): Promise { +function scheduleActivityNextHandler({ + options, + args, + headers, + seq, + activityType, + cmdOpts, +}: ActivityInput): Promise { const activator = getActivator(); validateActivityOptions(options); return new Promise((resolve, reject) => { @@ -220,7 +228,7 @@ function scheduleActivityNextHandler({ options, args, headers, seq, activityType doNotEagerlyExecute: !(options.allowEagerDispatch ?? true), versioningIntent: versioningIntentToProto(options.versioningIntent), }, - ...addWorkflowCommandOptions(cmdOpts) + ...addWorkflowCommandOptions(cmdOpts), }); activator.completions.activity.set(seq, { resolve, @@ -240,7 +248,7 @@ async function scheduleLocalActivityNextHandler({ activityType, attempt, originalScheduleTime, - cmdOpts + cmdOpts, }: LocalActivityInput): Promise { const activator = getActivator(); // Eagerly fail the local activity (which will in turn fail the workflow task. @@ -287,7 +295,7 @@ async function scheduleLocalActivityNextHandler({ headers, cancellationType: encodeActivityCancellationType(options.cancellationType), }, - ...addWorkflowCommandOptions(cmdOpts) + ...addWorkflowCommandOptions(cmdOpts), }); activator.completions.activity.set(seq, { resolve, @@ -300,7 +308,12 @@ async function scheduleLocalActivityNextHandler({ * Schedule an activity and run outbound interceptors * @hidden */ -export function scheduleActivity(activityType: string, args: any[], options: ActivityOptions, summary?: string): Promise { +export function scheduleActivity( + activityType: string, + args: any[], + options: ActivityOptions, + summary?: string +): Promise { const activator = assertInWorkflowContext( 'Workflow.scheduleActivity(...) may only be used from a Workflow Execution' ); @@ -316,7 +329,7 @@ export function scheduleActivity(activityType: string, args: any[], options: options, args, seq, - cmdOpts: { userMetadata: { summary }} + cmdOpts: { userMetadata: { summary } }, }) as Promise; } @@ -328,7 +341,7 @@ export async function scheduleLocalActivity( activityType: string, args: any[], options: LocalActivityOptions, - summary?: string, + summary?: string ): Promise { const activator = assertInWorkflowContext( 'Workflow.scheduleLocalActivity(...) may only be used from a Workflow Execution' @@ -357,7 +370,7 @@ export async function scheduleLocalActivity( seq, attempt, originalScheduleTime, - cmdOpts: { userMetadata: { summary }} + cmdOpts: { userMetadata: { summary } }, })) as Promise; } catch (err) { if (err instanceof LocalActivityDoBackoff) { @@ -560,12 +573,12 @@ export type ActivityInterfaceWithSummaries = ActivityInterfaceFor & { * const { httpGet, otherActivity } = proxyActivities({ * startToCloseTimeout: '30 minutes', * }); - * + * * // Setup Activities with summaries for better observability - * const { - * httpGet, + * const { + * httpGet, * processData, - * saveResults + * saveResults * } = proxyActivities({ * startToCloseTimeout: '10m', * }).withSummaries({ @@ -594,28 +607,26 @@ export type ActivityInterfaceWithSummaries = ActivityInterfaceFor & { * } * ``` */ -export function proxyActivities( - options: ActivityOptions, -): ActivityInterfaceWithSummaries { +export function proxyActivities(options: ActivityOptions): ActivityInterfaceWithSummaries { if (options === undefined) { throw new TypeError('options must be defined'); } // Validate as early as possible for immediate user feedback validateActivityOptions(options); - + function createActivityProxy(summaries: Record = {}): ActivityInterfaceWithSummaries { return new Proxy({} as ActivityInterfaceWithSummaries, { get(_, prop) { if (prop === 'withSummaries') { - return function withSummaries(newSummaries: Record): ActivityInterfaceFor { + return function withSummaries(newSummaries: Record): ActivityInterfaceWithSummaries { return createActivityProxy(newSummaries); }; } - + if (typeof prop !== 'string') { throw new TypeError(`Only strings are supported for Activity types, got: ${String(prop)}`); } - + return function activityProxyFunction(...args: unknown[]): Promise { const summary = summaries[prop]; return scheduleActivity(prop, args, options, summary); @@ -623,7 +634,7 @@ export function proxyActivities( }, }); } - + return createActivityProxy(); } @@ -645,20 +656,20 @@ export function proxyLocalActivities( } // Validate as early as possible for immediate user feedback validateLocalActivityOptions(options); - + function createLocalActivityProxy(summaries: Record = {}): ActivityInterfaceWithSummaries { return new Proxy({} as ActivityInterfaceWithSummaries, { get(_, prop) { if (prop === 'withSummaries') { - return function withSummaries(newSummaries: Record): ActivityInterfaceFor { + return function withSummaries(newSummaries: Record): ActivityInterfaceWithSummaries { return createLocalActivityProxy(newSummaries); }; } - + if (typeof prop !== 'string') { throw new TypeError(`Only strings are supported for Activity types, got: ${String(prop)}`); } - + return function localActivityProxyFunction(...args: unknown[]): Promise { const summary = summaries[prop]; return scheduleLocalActivity(prop, args, options, summary); @@ -666,7 +677,7 @@ export function proxyLocalActivities( }, }); } - + return createLocalActivityProxy(); } @@ -1680,4 +1691,4 @@ export function getCurrentDetails(): string { export function setCurrentDetails(details: string): void { const activator = assertInWorkflowContext('getCurrentDetails() may only be used from a Workflow Execution.'); activator.currentDetails = details; -} \ No newline at end of file +} From 031e2952b46f242b7a3bffc35268677a729a10a5 Mon Sep 17 00:00:00 2001 From: Thomas Hardy Date: Mon, 24 Mar 2025 14:37:45 -0700 Subject: [PATCH 04/12] test fix --- packages/workflow/src/workflow.ts | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/packages/workflow/src/workflow.ts b/packages/workflow/src/workflow.ts index aa0ab5766..70177b662 100644 --- a/packages/workflow/src/workflow.ts +++ b/packages/workflow/src/workflow.ts @@ -165,7 +165,7 @@ export function sleep(ms: Duration, summary?: string): Promise { return execute({ durationMs, seq, - cmdOpts: { userMetadata: { summary } }, + ...(summary !== undefined && { cmdOpts: { userMetadata: { summary } } }), }); } @@ -329,7 +329,7 @@ export function scheduleActivity( options, args, seq, - cmdOpts: { userMetadata: { summary } }, + ...(summary !== undefined && { cmdOpts: { userMetadata: { summary } } }), }) as Promise; } @@ -370,7 +370,7 @@ export async function scheduleLocalActivity( seq, attempt, originalScheduleTime, - cmdOpts: { userMetadata: { summary } }, + ...(summary !== undefined && { cmdOpts: { userMetadata: { summary } } }), })) as Promise; } catch (err) { if (err instanceof LocalActivityDoBackoff) { From d4969668ec69e082061c1197bae150ec6458b55f Mon Sep 17 00:00:00 2001 From: Thomas Hardy Date: Mon, 2 Jun 2025 21:16:43 +0200 Subject: [PATCH 05/12] use supplied data converter --- packages/client/src/schedule-helpers.ts | 12 ++++----- packages/client/src/workflow-client.ts | 18 ++++++------- .../internal-non-workflow/codec-helpers.ts | 25 +++++++++++++++++++ 3 files changed, 39 insertions(+), 16 deletions(-) diff --git a/packages/client/src/schedule-helpers.ts b/packages/client/src/schedule-helpers.ts index 34d3179c3..672c17bd3 100644 --- a/packages/client/src/schedule-helpers.ts +++ b/packages/client/src/schedule-helpers.ts @@ -15,7 +15,9 @@ import { Headers } from '@temporalio/common/lib/interceptors'; import { decodeArrayFromPayloads, decodeMapFromPayloads, + decodeOptionalSinglePayload, encodeMapToPayloads, + encodeOptionalToPayload, encodeToPayloads, } from '@temporalio/common/lib/internal-non-workflow'; import { temporal } from '@temporalio/proto'; @@ -245,7 +247,6 @@ export async function encodeScheduleAction( action: CompiledScheduleAction, headers: Headers ): Promise { - const jsonConverter = new JsonPayloadConverter(); return { startWorkflow: { workflowId: action.workflowId, @@ -270,8 +271,8 @@ export async function encodeScheduleAction( : undefined, header: { fields: headers }, userMetadata: { - summary: jsonConverter.toPayload(action.staticSummary), - details: jsonConverter.toPayload(action.staticDetails), + summary: await encodeOptionalToPayload(dataConverter, action?.staticSummary), + details: await encodeOptionalToPayload(dataConverter, action?.staticDetails), }, }, }; @@ -322,7 +323,6 @@ export async function decodeScheduleAction( pb: temporal.api.schedule.v1.IScheduleAction ): Promise { if (pb.startWorkflow) { - const jsonConverter = new JsonPayloadConverter(); const userMetadata = pb.startWorkflow?.userMetadata; return { type: 'startWorkflow', @@ -340,8 +340,8 @@ export async function decodeScheduleAction( workflowExecutionTimeout: optionalTsToMs(pb.startWorkflow.workflowExecutionTimeout), workflowRunTimeout: optionalTsToMs(pb.startWorkflow.workflowRunTimeout), workflowTaskTimeout: optionalTsToMs(pb.startWorkflow.workflowTaskTimeout), - staticSummary: userMetadata?.summary ? jsonConverter.fromPayload(userMetadata.summary) : undefined, - staticDetails: userMetadata?.details ? jsonConverter.fromPayload(userMetadata.details) : undefined, + staticSummary: await decodeOptionalSinglePayload(dataConverter, userMetadata?.summary) ?? undefined, + staticDetails: await decodeOptionalSinglePayload(dataConverter, userMetadata?.details) ?? undefined, }; } throw new TypeError('Unsupported schedule action'); diff --git a/packages/client/src/workflow-client.ts b/packages/client/src/workflow-client.ts index 0ce1bd08d..027959353 100644 --- a/packages/client/src/workflow-client.ts +++ b/packages/client/src/workflow-client.ts @@ -22,7 +22,6 @@ import { decodeRetryState, encodeWorkflowIdConflictPolicy, WorkflowIdConflictPolicy, - JsonPayloadConverter, } from '@temporalio/common'; import { encodeUnifiedSearchAttributes } from '@temporalio/common/lib/converter/payload-search-attributes'; import { composeInterceptors } from '@temporalio/common/lib/interceptors'; @@ -32,7 +31,9 @@ import { decodeArrayFromPayloads, decodeFromPayloadsAtIndex, decodeOptionalFailureToOptionalError, + decodeOptionalSinglePayload, encodeMapToPayloads, + encodeOptionalToPayload, encodeToPayloads, filterNullAndUndefined, } from '@temporalio/common/lib/internal-non-workflow'; @@ -1197,7 +1198,6 @@ export class WorkflowClient extends BaseClient { protected async _signalWithStartWorkflowHandler(input: WorkflowSignalWithStartInput): Promise { const { identity } = this.options; const { options, workflowType, signalName, signalArgs, headers } = input; - const jsonConverter = new JsonPayloadConverter(); const req: temporal.api.workflowservice.v1.ISignalWithStartWorkflowExecutionRequest = { namespace: this.options.namespace, identity, @@ -1228,8 +1228,8 @@ export class WorkflowClient extends BaseClient { cronSchedule: options.cronSchedule, header: { fields: headers }, userMetadata: { - summary: jsonConverter.toPayload(options?.staticSummary), - details: jsonConverter.toPayload(options?.staticDetails), + summary: await encodeOptionalToPayload(this.dataConverter, options?.staticSummary), + details: await encodeOptionalToPayload(this.dataConverter, options?.staticDetails), }, }; try { @@ -1271,7 +1271,6 @@ export class WorkflowClient extends BaseClient { protected async createStartWorkflowRequest(input: WorkflowStartInput): Promise { const { options: opts, workflowType, headers } = input; const { identity, namespace } = this.options; - const jsonConverter = new JsonPayloadConverter(); return { namespace, identity, @@ -1300,8 +1299,8 @@ export class WorkflowClient extends BaseClient { cronSchedule: opts.cronSchedule, header: { fields: headers }, userMetadata: { - summary: jsonConverter.toPayload(opts?.staticSummary), - details: jsonConverter.toPayload(opts?.staticDetails), + summary: await encodeOptionalToPayload(this.dataConverter, opts?.staticSummary), + details: await encodeOptionalToPayload(this.dataConverter, opts?.staticDetails), }, }; } @@ -1436,12 +1435,11 @@ export class WorkflowClient extends BaseClient { workflowExecution: { workflowId, runId }, }); const info = await executionInfoFromRaw(raw.workflowExecutionInfo ?? {}, this.client.dataConverter, raw); - const jsonConverter = new JsonPayloadConverter(); const userMetadata = raw.executionConfig?.userMetadata; return { ...info, - staticDetails: userMetadata?.details ? jsonConverter.fromPayload(userMetadata.details) : undefined, - staticSummary: userMetadata?.summary ? jsonConverter.fromPayload(userMetadata.summary) : undefined, + staticDetails: await decodeOptionalSinglePayload(this.client.dataConverter, userMetadata?.details) ?? undefined, + staticSummary: await decodeOptionalSinglePayload(this.client.dataConverter, userMetadata?.summary) ?? undefined, raw, }; }, diff --git a/packages/common/src/internal-non-workflow/codec-helpers.ts b/packages/common/src/internal-non-workflow/codec-helpers.ts index d77e7ab69..6f5b39874 100644 --- a/packages/common/src/internal-non-workflow/codec-helpers.ts +++ b/packages/common/src/internal-non-workflow/codec-helpers.ts @@ -72,6 +72,17 @@ export async function decodeOptionalSingle( return await decodeSingle(codecs, payload); } +/** Run {@link PayloadCodec.decode} and convert from a single Payload */ +export async function decodeOptionalSinglePayload( + dataConverter: LoadedDataConverter, + payload?: Payload | null | undefined +): Promise { + const { payloadCodecs, payloadConverter } = dataConverter; + const decoded = await decodeOptionalSingle(payloadCodecs, payload); + if (decoded == null) return decoded; + return payloadConverter.fromPayload(decoded); +} + /** * Run {@link PayloadConverter.toPayload} on value, and then encode it. */ @@ -80,6 +91,20 @@ export async function encodeToPayload(converter: LoadedDataConverter, value: unk return await encodeSingle(payloadCodecs, payloadConverter.toPayload(value)); } +/** + * Run {@link PayloadConverter.toPayload} on an optional value, and then encode it. + */ +export async function encodeOptionalToPayload( + converter: LoadedDataConverter, + value: unknown +): Promise { + if (value == null) return value; + + const { payloadConverter, payloadCodecs } = converter; + const payload = payloadConverter.toPayload(value); + return await encodeSingle(payloadCodecs, payload); +} + /** * Decode `payloads` and then return {@link arrayFromPayloads}`. */ From 1efb3f0664f05a6e68d74ccc4d9911d43d171ae2 Mon Sep 17 00:00:00 2001 From: Thomas Hardy Date: Tue, 3 Jun 2025 01:39:32 +0200 Subject: [PATCH 06/12] inline options, lazily decode summary/details on workflow description --- packages/client/src/schedule-helpers.ts | 12 +- packages/client/src/types.ts | 20 ++- packages/client/src/workflow-client.ts | 6 +- packages/common/src/activity-options.ts | 16 +++ .../internal-non-workflow/codec-helpers.ts | 6 +- packages/common/src/workflow-options.ts | 4 +- .../test/src/test-integration-workflows.ts | 52 +++++-- packages/workflow/src/interceptors.ts | 18 +-- packages/workflow/src/workflow.ts | 131 ++++++++---------- 9 files changed, 145 insertions(+), 120 deletions(-) diff --git a/packages/client/src/schedule-helpers.ts b/packages/client/src/schedule-helpers.ts index 672c17bd3..9b5a10750 100644 --- a/packages/client/src/schedule-helpers.ts +++ b/packages/client/src/schedule-helpers.ts @@ -1,11 +1,5 @@ import Long from 'long'; // eslint-disable-line import/no-named-as-default -import { - compileRetryPolicy, - decompileRetryPolicy, - extractWorkflowType, - JsonPayloadConverter, - LoadedDataConverter, -} from '@temporalio/common'; +import { compileRetryPolicy, decompileRetryPolicy, extractWorkflowType, LoadedDataConverter } from '@temporalio/common'; import { encodeUnifiedSearchAttributes, decodeSearchAttributes, @@ -340,8 +334,8 @@ export async function decodeScheduleAction( workflowExecutionTimeout: optionalTsToMs(pb.startWorkflow.workflowExecutionTimeout), workflowRunTimeout: optionalTsToMs(pb.startWorkflow.workflowRunTimeout), workflowTaskTimeout: optionalTsToMs(pb.startWorkflow.workflowTaskTimeout), - staticSummary: await decodeOptionalSinglePayload(dataConverter, userMetadata?.summary) ?? undefined, - staticDetails: await decodeOptionalSinglePayload(dataConverter, userMetadata?.details) ?? undefined, + staticSummary: (await decodeOptionalSinglePayload(dataConverter, userMetadata?.summary)) ?? undefined, + staticDetails: (await decodeOptionalSinglePayload(dataConverter, userMetadata?.details)) ?? undefined, }; } throw new TypeError('Unsupported schedule action'); diff --git a/packages/client/src/types.ts b/packages/client/src/types.ts index f4c0c8c19..bc72a7cce 100644 --- a/packages/client/src/types.ts +++ b/packages/client/src/types.ts @@ -65,11 +65,25 @@ export interface CountWorkflowExecution { export type WorkflowExecutionDescription = Replace< WorkflowExecutionInfo, { - staticSummary?: string; - staticDetails?: string; raw: DescribeWorkflowExecutionResponse; } ->; +> & { + /** + * General fixed details for this workflow execution that may appear in UI/CLI. + * This can be in Temporal markdown format and can span multiple lines. + * + * @experimental User metadata is a new API and suspectible to change. + */ + staticDetails: () => Promise; + + /** + * A single-line fixed summary for this workflow execution that may appear in the UI/CLI. + * This can be in single-line Temporal markdown format. + * + * @experimental User metadata is a new API and suspectible to change. + */ + staticSummary: () => Promise; +}; export type WorkflowService = proto.temporal.api.workflowservice.v1.WorkflowService; export const { WorkflowService } = proto.temporal.api.workflowservice.v1; diff --git a/packages/client/src/workflow-client.ts b/packages/client/src/workflow-client.ts index 027959353..404e4eede 100644 --- a/packages/client/src/workflow-client.ts +++ b/packages/client/src/workflow-client.ts @@ -1438,8 +1438,10 @@ export class WorkflowClient extends BaseClient { const userMetadata = raw.executionConfig?.userMetadata; return { ...info, - staticDetails: await decodeOptionalSinglePayload(this.client.dataConverter, userMetadata?.details) ?? undefined, - staticSummary: await decodeOptionalSinglePayload(this.client.dataConverter, userMetadata?.summary) ?? undefined, + staticDetails: async () => + (await decodeOptionalSinglePayload(this.client.dataConverter, userMetadata?.details)) ?? undefined, + staticSummary: async () => + (await decodeOptionalSinglePayload(this.client.dataConverter, userMetadata?.summary)) ?? undefined, raw, }; }, diff --git a/packages/common/src/activity-options.ts b/packages/common/src/activity-options.ts index 8ad6e2b89..d60f7e6ad 100644 --- a/packages/common/src/activity-options.ts +++ b/packages/common/src/activity-options.ts @@ -122,6 +122,14 @@ export interface ActivityOptions { * @experimental The Worker Versioning API is still being designed. Major changes are expected. */ versioningIntent?: VersioningIntent; + + /** + * A single-line fixed summary for this workflow execution that may appear in the UI/CLI. + * This can be in single-line Temporal markdown format. + * + * @experimental User metadata is a new API and suspectible to change. + */ + staticSummary?: string; } /** @@ -186,4 +194,12 @@ export interface LocalActivityOptions { * - `ABANDON` - Do not request cancellation of the activity and immediately report cancellation to the workflow. */ cancellationType?: ActivityCancellationType; + + /** + * A single-line fixed summary for this workflow execution that may appear in the UI/CLI. + * This can be in single-line Temporal markdown format. + * + * @experimental User metadata is a new API and suspectible to change. + */ + staticSummary?: string; } diff --git a/packages/common/src/internal-non-workflow/codec-helpers.ts b/packages/common/src/internal-non-workflow/codec-helpers.ts index 6f5b39874..cfe4a94f4 100644 --- a/packages/common/src/internal-non-workflow/codec-helpers.ts +++ b/packages/common/src/internal-non-workflow/codec-helpers.ts @@ -95,13 +95,13 @@ export async function encodeToPayload(converter: LoadedDataConverter, value: unk * Run {@link PayloadConverter.toPayload} on an optional value, and then encode it. */ export async function encodeOptionalToPayload( - converter: LoadedDataConverter, + converter: LoadedDataConverter, value: unknown ): Promise { if (value == null) return value; - + const { payloadConverter, payloadCodecs } = converter; - const payload = payloadConverter.toPayload(value); + const payload = payloadConverter.toPayload(value); return await encodeSingle(payloadCodecs, payload); } diff --git a/packages/common/src/workflow-options.ts b/packages/common/src/workflow-options.ts index 4868e6292..b68f45928 100644 --- a/packages/common/src/workflow-options.ts +++ b/packages/common/src/workflow-options.ts @@ -195,14 +195,14 @@ export interface BaseWorkflowOptions { * General fixed details for this workflow execution that may appear in UI/CLI. * This can be in Temporal markdown format and can span multiple lines. * - * @experimental + * @experimental User metadata is a new API and suspectible to change. */ staticDetails?: string; /** * A single-line fixed summary for this workflow execution that may appear in the UI/CLI. * This can be in single-line Temporal markdown format. * - * @experimental + * @experimental User metadata is a new API and suspectible to change. */ staticSummary?: string; } diff --git a/packages/test/src/test-integration-workflows.ts b/packages/test/src/test-integration-workflows.ts index 3c96e536e..a2a305b1d 100644 --- a/packages/test/src/test-integration-workflows.ts +++ b/packages/test/src/test-integration-workflows.ts @@ -14,13 +14,13 @@ import { ActivityCancellationType, ApplicationFailure, defineSearchAttributeKey, - JsonPayloadConverter, SearchAttributePair, SearchAttributeType, TypedSearchAttributes, WorkflowExecutionAlreadyStartedError, } from '@temporalio/common'; import { temporal } from '@temporalio/proto'; +import { decodeOptionalSinglePayload } from '@temporalio/common/lib/internal-non-workflow'; import { signalSchedulingWorkflow } from './activities/helpers'; import { activityStartedSignal } from './workflows/definitions'; import * as workflows from './workflows'; @@ -1348,12 +1348,15 @@ export async function userMetadataWorkflow(): Promise { }); // That workflow should call an activity (with summary) - const { activityWithSummary } = workflow.proxyActivities({ scheduleToCloseTimeout: '10s' }).withSummaries({ - activityWithSummary: 'activity summary', - }); - await activityWithSummary(); + const { activityWithSummary } = workflow.proxyActivities({ scheduleToCloseTimeout: '10s' }); + await activityWithSummary.runWithOptions( + { + staticSummary: 'activity summary', + }, + [] + ); // Should have a timer (with summary) - await workflow.sleep(5, 'timer summary'); + await workflow.sleep(5, { summary: 'timer summary' }); // Set current details workflow.setCurrentDetails('current wf details'); // Unblock on var -> query current details (or return) @@ -1377,8 +1380,8 @@ test('User metadata on workflow, timer, activity', async (t) => { }); // Describe workflow -> static summary, static details const desc = await handle.describe(); - t.true(desc.staticSummary === 'wf static summary'); - t.true(desc.staticDetails === 'wf static details'); + t.true((await desc.staticSummary()) === 'wf static summary'); + t.true((await desc.staticDetails()) === 'wf static details'); await handle.signal('done'); const res = await handle.result(); @@ -1392,15 +1395,38 @@ test('User metadata on workflow, timer, activity', async (t) => { runId: handle.firstExecutionRunId, }, }); - const jsonConverter = new JsonPayloadConverter(); for (const event of resp.history?.events ?? []) { if (event.eventType === temporal.api.enums.v1.EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED) { - t.deepEqual(jsonConverter.fromPayload(event.userMetadata?.summary ?? {}), 'wf static summary'); - t.deepEqual(jsonConverter.fromPayload(event.userMetadata?.details ?? {}), 'wf static details'); + t.deepEqual( + await decodeOptionalSinglePayload( + t.context.env.client.options.loadedDataConverter, + event.userMetadata?.summary ?? {} + ), + 'wf static summary' + ); + t.deepEqual( + await decodeOptionalSinglePayload( + t.context.env.client.options.loadedDataConverter, + event.userMetadata?.details ?? {} + ), + 'wf static details' + ); } else if (event.eventType === temporal.api.enums.v1.EventType.EVENT_TYPE_ACTIVITY_TASK_SCHEDULED) { - t.deepEqual(jsonConverter.fromPayload(event.userMetadata?.summary ?? {}), 'activity summary'); + t.deepEqual( + await decodeOptionalSinglePayload( + t.context.env.client.options.loadedDataConverter, + event.userMetadata?.summary ?? {} + ), + 'activity summary' + ); } else if (event.eventType === temporal.api.enums.v1.EventType.EVENT_TYPE_TIMER_STARTED) { - t.deepEqual(jsonConverter.fromPayload(event.userMetadata?.summary ?? {}), 'timer summary'); + t.deepEqual( + await decodeOptionalSinglePayload( + t.context.env.client.options.loadedDataConverter, + event.userMetadata?.summary ?? {} + ), + 'timer summary' + ); } } diff --git a/packages/workflow/src/interceptors.ts b/packages/workflow/src/interceptors.ts index 5ee9e046a..8efd7b096 100644 --- a/packages/workflow/src/interceptors.ts +++ b/packages/workflow/src/interceptors.ts @@ -80,7 +80,6 @@ export interface ActivityInput { readonly options: ActivityOptions; readonly headers: Headers; readonly seq: number; - readonly cmdOpts?: WorkflowCommandOptions; } /** Input for WorkflowOutboundCallsInterceptor.scheduleLocalActivity */ @@ -92,7 +91,6 @@ export interface LocalActivityInput { readonly seq: number; readonly originalScheduleTime?: Timestamp; readonly attempt: number; - readonly cmdOpts?: WorkflowCommandOptions; } /** Input for WorkflowOutboundCallsInterceptor.startChildWorkflowExecution */ @@ -117,19 +115,17 @@ export interface UserMetadata { details?: string; } -/** - * Options that can be attached to workflow commands. - */ -export interface WorkflowCommandOptions { - /** User metadata for the command that may be persisted to history */ - readonly userMetadata?: UserMetadata; -} - /** Input for WorkflowOutboundCallsInterceptor.startTimer */ export interface TimerInput { readonly durationMs: number; readonly seq: number; - readonly cmdOpts?: WorkflowCommandOptions; + readonly options?: TimerOptions; +} + +/** Options for starting a timer (i.e. sleep) */ +export interface TimerOptions { + /** @experimental A single line summary of the command's purpose */ + readonly summary?: string; } /** diff --git a/packages/workflow/src/workflow.ts b/packages/workflow/src/workflow.ts index 70177b662..68c77ae52 100644 --- a/packages/workflow/src/workflow.ts +++ b/packages/workflow/src/workflow.ts @@ -22,7 +22,6 @@ import { WorkflowReturnType, WorkflowUpdateValidatorType, SearchAttributeUpdatePair, - JsonPayloadConverter, } from '@temporalio/common'; import { encodeUnifiedSearchAttributes, @@ -40,8 +39,7 @@ import { SignalWorkflowInput, StartChildWorkflowExecutionInput, TimerInput, - UserMetadata, - WorkflowCommandOptions, + TimerOptions, } from './interceptors'; import { ChildWorkflowCancellationType, @@ -84,31 +82,10 @@ export function addDefaultWorkflowOptions( }; } -function addUserMetadata(userMetadata?: UserMetadata): temporal.api.sdk.v1.IUserMetadata | undefined { - if (userMetadata == null) { - return undefined; - } - - const jsonConverter = new JsonPayloadConverter(); - return { - summary: jsonConverter.toPayload(userMetadata.summary), - details: jsonConverter.toPayload(userMetadata.details), - }; -} - -function addWorkflowCommandOptions(cmdOpts?: WorkflowCommandOptions): object { - if (cmdOpts == null) { - return {}; - } - return { - userMetadata: addUserMetadata(cmdOpts.userMetadata), - }; -} - /** * Push a startTimer command into state accumulator and register completion */ -function timerNextHandler(input: TimerInput) { +function timerNextHandler({ seq, durationMs, options }: TimerInput) { const activator = getActivator(); return new Promise((resolve, reject) => { const scope = CancellationScope.current(); @@ -119,12 +96,12 @@ function timerNextHandler(input: TimerInput) { if (scope.cancellable) { untrackPromise( scope.cancelRequested.catch((err) => { - if (!activator.completions.timer.delete(input.seq)) { + if (!activator.completions.timer.delete(seq)) { return; // Already resolved or never scheduled } activator.pushCommand({ cancelTimer: { - seq: input.seq, + seq, }, }); reject(err); @@ -133,12 +110,14 @@ function timerNextHandler(input: TimerInput) { } activator.pushCommand({ startTimer: { - seq: input.seq, - startToFireTimeout: msToTs(input.durationMs), + seq, + startToFireTimeout: msToTs(durationMs), + }, + userMetadata: options && { + summary: options.summary ? activator.payloadConverter.toPayload(options.summary) : undefined, }, - ...addWorkflowCommandOptions(input.cmdOpts), }); - activator.completions.timer.set(input.seq, { + activator.completions.timer.set(seq, { resolve, reject, }); @@ -154,7 +133,7 @@ function timerNextHandler(input: TimerInput) { * If given a negative number or 0, value will be set to 1. * @param summary a short summary/description of the timer. Can serve as a timer ID. */ -export function sleep(ms: Duration, summary?: string): Promise { +export function sleep(ms: Duration, options?: TimerOptions): Promise { const activator = assertInWorkflowContext('Workflow.sleep(...) may only be used from a Workflow Execution'); const seq = activator.nextSeqs.timer++; @@ -165,7 +144,7 @@ export function sleep(ms: Duration, summary?: string): Promise { return execute({ durationMs, seq, - ...(summary !== undefined && { cmdOpts: { userMetadata: { summary } } }), + options, }); } @@ -181,14 +160,7 @@ const validateLocalActivityOptions = validateActivityOptions; /** * Push a scheduleActivity command into activator accumulator and register completion */ -function scheduleActivityNextHandler({ - options, - args, - headers, - seq, - activityType, - cmdOpts, -}: ActivityInput): Promise { +function scheduleActivityNextHandler({ options, args, headers, seq, activityType }: ActivityInput): Promise { const activator = getActivator(); validateActivityOptions(options); return new Promise((resolve, reject) => { @@ -228,7 +200,9 @@ function scheduleActivityNextHandler({ doNotEagerlyExecute: !(options.allowEagerDispatch ?? true), versioningIntent: versioningIntentToProto(options.versioningIntent), }, - ...addWorkflowCommandOptions(cmdOpts), + userMetadata: options && { + summary: options.staticSummary ? activator.payloadConverter.toPayload(options.staticSummary) : undefined, + }, }); activator.completions.activity.set(seq, { resolve, @@ -248,7 +222,6 @@ async function scheduleLocalActivityNextHandler({ activityType, attempt, originalScheduleTime, - cmdOpts, }: LocalActivityInput): Promise { const activator = getActivator(); // Eagerly fail the local activity (which will in turn fail the workflow task. @@ -295,7 +268,9 @@ async function scheduleLocalActivityNextHandler({ headers, cancellationType: encodeActivityCancellationType(options.cancellationType), }, - ...addWorkflowCommandOptions(cmdOpts), + userMetadata: options && { + summary: options?.staticSummary ? activator.payloadConverter.toPayload(options?.staticSummary) : undefined, + }, }); activator.completions.activity.set(seq, { resolve, @@ -308,12 +283,7 @@ async function scheduleLocalActivityNextHandler({ * Schedule an activity and run outbound interceptors * @hidden */ -export function scheduleActivity( - activityType: string, - args: any[], - options: ActivityOptions, - summary?: string -): Promise { +export function scheduleActivity(activityType: string, args: any[], options: ActivityOptions): Promise { const activator = assertInWorkflowContext( 'Workflow.scheduleActivity(...) may only be used from a Workflow Execution' ); @@ -329,7 +299,6 @@ export function scheduleActivity( options, args, seq, - ...(summary !== undefined && { cmdOpts: { userMetadata: { summary } } }), }) as Promise; } @@ -541,19 +510,26 @@ export const NotAnActivityMethod = Symbol.for('__TEMPORAL_NOT_AN_ACTIVITY_METHOD * ``` */ export type ActivityInterfaceFor = { - [K in keyof T]: T[K] extends ActivityFunction ? T[K] : typeof NotAnActivityMethod; + [K in keyof T]: T[K] extends ActivityFunction ? ActivityFunctionWithOptions : typeof NotAnActivityMethod; }; /** - * Extends ActivityInterfaceFor to include the withSummaries method + * Extends ActivityInterfaceFor to include the withOptions method */ -export type ActivityInterfaceWithSummaries = ActivityInterfaceFor & { +export type ActivityInterfaceWithOptions = ActivityInterfaceFor & { + runWithOptions(options: ActivityOptions, args: any[]): Promise; +}; + +export type ActivityFunctionWithOptions = T & { /** - * Provide descriptive summaries for activities - * @param summaries Record mapping activity names to their summary descriptions - * @returns A new proxy with the provided summaries + * Run the activity, overriding its existing options with the + * provided options. + * + * @param options ActivityOptions + * @param args: list of arguments + * @returns return value of the activity */ - withSummaries(summaries: Record): ActivityInterfaceFor; + runWithOptions(options: ActivityOptions, args: Parameters): Promise>>; }; /** @@ -607,35 +583,36 @@ export type ActivityInterfaceWithSummaries = ActivityInterfaceFor & { * } * ``` */ -export function proxyActivities(options: ActivityOptions): ActivityInterfaceWithSummaries { +export function proxyActivities(options: ActivityOptions): ActivityInterfaceWithOptions { if (options === undefined) { throw new TypeError('options must be defined'); } // Validate as early as possible for immediate user feedback validateActivityOptions(options); - function createActivityProxy(summaries: Record = {}): ActivityInterfaceWithSummaries { - return new Proxy({} as ActivityInterfaceWithSummaries, { - get(_, prop) { - if (prop === 'withSummaries') { - return function withSummaries(newSummaries: Record): ActivityInterfaceWithSummaries { - return createActivityProxy(newSummaries); - }; + function createActivityProxy(options: ActivityOptions): ActivityInterfaceWithOptions { + return new Proxy({} as ActivityInterfaceWithOptions, { + get(_, activityType) { + if (typeof activityType !== 'string') { + throw new TypeError(`Only strings are supported for Activity types, got: ${String(activityType)}`); } - if (typeof prop !== 'string') { - throw new TypeError(`Only strings are supported for Activity types, got: ${String(prop)}`); + function activityProxyFunction(...args: unknown[]): Promise { + return scheduleActivity(activityType as string, args, options); } - return function activityProxyFunction(...args: unknown[]): Promise { - const summary = summaries[prop]; - return scheduleActivity(prop, args, options, summary); + activityProxyFunction.runWithOptions = function ( + overrideOptions: ActivityOptions, + args: any[] + ): Promise { + return scheduleActivity(activityType, args, { ...options, ...overrideOptions }); }; + + return activityProxyFunction; }, }); } - - return createActivityProxy(); + return createActivityProxy(options); } /** @@ -650,18 +627,18 @@ export function proxyActivities(options: ActivityOptions) */ export function proxyLocalActivities( options: LocalActivityOptions -): ActivityInterfaceWithSummaries { +): ActivityInterfaceWithOptions { if (options === undefined) { throw new TypeError('options must be defined'); } // Validate as early as possible for immediate user feedback validateLocalActivityOptions(options); - function createLocalActivityProxy(summaries: Record = {}): ActivityInterfaceWithSummaries { - return new Proxy({} as ActivityInterfaceWithSummaries, { + function createLocalActivityProxy(summaries: Record = {}): ActivityInterfaceWithOptions { + return new Proxy({} as ActivityInterfaceWithOptions, { get(_, prop) { if (prop === 'withSummaries') { - return function withSummaries(newSummaries: Record): ActivityInterfaceWithSummaries { + return function withSummaries(newSummaries: Record): ActivityInterfaceWithOptions { return createLocalActivityProxy(newSummaries); }; } From ebe745f61ce8718a91ddd4e9cf9ecc1566357f3b Mon Sep 17 00:00:00 2001 From: Thomas Hardy Date: Tue, 3 Jun 2025 01:48:16 +0200 Subject: [PATCH 07/12] update proxyActivities comment to show runWithOptions usage --- packages/workflow/src/workflow.ts | 30 ++++++++++++++++++------------ 1 file changed, 18 insertions(+), 12 deletions(-) diff --git a/packages/workflow/src/workflow.ts b/packages/workflow/src/workflow.ts index 68c77ae52..eb170d263 100644 --- a/packages/workflow/src/workflow.ts +++ b/packages/workflow/src/workflow.ts @@ -550,18 +550,19 @@ export type ActivityFunctionWithOptions = T & { * startToCloseTimeout: '30 minutes', * }); * - * // Setup Activities with summaries for better observability - * const { - * httpGet, - * processData, - * saveResults - * } = proxyActivities({ - * startToCloseTimeout: '10m', - * }).withSummaries({ - * httpGet: 'Fetches data from external API', - * processData: 'Processes the fetched data', - * saveResults: 'Saves processed results to database' - * }); + * // Use activities with default options + * const result1 = await httpGet('http://example.com'); + * + * // Override options for specific activity calls + * const result2 = await httpGet.runWithOptions({ + * staticSummary: 'Fetches data from external API', + * scheduleToCloseTimeout: '5m' + * }, ['http://api.example.com']); + * + * const result3 = await otherActivity.runWithOptions({ + * staticSummary: 'Processes the fetched data', + * taskQueue: 'special-task-queue' + * }, [data]); * * // Setup Activities from an explicit interface (e.g. when defined by another SDK) * interface JavaActivities { @@ -579,6 +580,11 @@ export type ActivityFunctionWithOptions = T & { * * export function execute(): Promise { * const response = await httpGet("http://example.com"); + * // Or with custom options: + * const response2 = await httpGetFromJava.runWithOptions({ + * staticSummary: 'Java HTTP call with timeout override', + * startToCloseTimeout: '2m' + * }, ["http://fast-api.example.com"]); * // ... * } * ``` From 67d8e06e4f6cd7772de14ca547bf12265d0f88e7 Mon Sep 17 00:00:00 2001 From: Thomas Hardy Date: Tue, 3 Jun 2025 02:07:37 +0200 Subject: [PATCH 08/12] add runWithOptions for local activities --- packages/workflow/src/workflow.ts | 27 ++++++++++++++------------- 1 file changed, 14 insertions(+), 13 deletions(-) diff --git a/packages/workflow/src/workflow.ts b/packages/workflow/src/workflow.ts index 44e22e567..669a44a29 100644 --- a/packages/workflow/src/workflow.ts +++ b/packages/workflow/src/workflow.ts @@ -645,28 +645,29 @@ export function proxyLocalActivities( // Validate as early as possible for immediate user feedback validateLocalActivityOptions(options); - function createLocalActivityProxy(summaries: Record = {}): ActivityInterfaceWithOptions { + function createActivityProxy(options: ActivityOptions): ActivityInterfaceWithOptions { return new Proxy({} as ActivityInterfaceWithOptions, { - get(_, prop) { - if (prop === 'withSummaries') { - return function withSummaries(newSummaries: Record): ActivityInterfaceWithOptions { - return createLocalActivityProxy(newSummaries); - }; + get(_, activityType) { + if (typeof activityType !== 'string') { + throw new TypeError(`Only strings are supported for Activity types, got: ${String(activityType)}`); } - if (typeof prop !== 'string') { - throw new TypeError(`Only strings are supported for Activity types, got: ${String(prop)}`); + function activityProxyFunction(...args: unknown[]): Promise { + return scheduleLocalActivity(activityType as string, args, options); } - return function localActivityProxyFunction(...args: unknown[]): Promise { - const summary = summaries[prop]; - return scheduleLocalActivity(prop, args, options, summary); + activityProxyFunction.runWithOptions = function ( + overrideOptions: ActivityOptions, + args: any[] + ): Promise { + return scheduleLocalActivity(activityType, args, { ...options, ...overrideOptions }); }; + + return activityProxyFunction; }, }); } - - return createLocalActivityProxy(); + return createActivityProxy(options); } // TODO: deprecate this patch after "enough" time has passed From c49168362de2604d75ab6bffa4ed3ab93ca0f51c Mon Sep 17 00:00:00 2001 From: Thomas Hardy Date: Tue, 3 Jun 2025 02:44:23 +0200 Subject: [PATCH 09/12] allow for undefined userMetadata when scheduling activity --- packages/test/src/test-integration-workflows.ts | 2 +- packages/workflow/src/workflow.ts | 8 +++++--- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/packages/test/src/test-integration-workflows.ts b/packages/test/src/test-integration-workflows.ts index aa5c1666e..8f917ca45 100644 --- a/packages/test/src/test-integration-workflows.ts +++ b/packages/test/src/test-integration-workflows.ts @@ -1514,4 +1514,4 @@ test('User metadata on workflow, timer, activity', async (t) => { t.deepEqual(wfMetadata.definition?.queryDefinitions?.length, 3); // default queries t.deepEqual(wfMetadata.currentDetails, 'current wf details'); }); -}); \ No newline at end of file +}); diff --git a/packages/workflow/src/workflow.ts b/packages/workflow/src/workflow.ts index 669a44a29..1586b3cc2 100644 --- a/packages/workflow/src/workflow.ts +++ b/packages/workflow/src/workflow.ts @@ -204,9 +204,11 @@ function scheduleActivityNextHandler({ options, args, headers, seq, activityType versioningIntent: versioningIntentToProto(options.versioningIntent), priority: options.priority ? compilePriority(options.priority) : undefined, }, - userMetadata: options && { - summary: options.staticSummary ? activator.payloadConverter.toPayload(options.staticSummary) : undefined, - }, + userMetadata: options.staticSummary + ? { + summary: activator.payloadConverter.toPayload(options.staticSummary), + } + : undefined, }); activator.completions.activity.set(seq, { resolve, From 60a1e08386c8728d5e69a7b241fd8bb6a5477392 Mon Sep 17 00:00:00 2001 From: Thomas Hardy Date: Mon, 16 Jun 2025 12:57:02 +0200 Subject: [PATCH 10/12] move user metadata encoding/decoding to user-metadata.ts, add deepMerge to merge/override nested fields in objects (used to override activity options with activities' executeWithOptions), add metadata to child workflows and condition calls --- packages/client/src/schedule-helpers.ts | 15 +- packages/client/src/workflow-client.ts | 12 +- packages/common/src/activity-options.ts | 8 +- packages/common/src/index.ts | 1 + .../internal-non-workflow/codec-helpers.ts | 14 +- .../src/internal-workflow/objects-helpers.ts | 30 ++++ packages/common/src/user-metadata.ts | 66 ++++++++ .../test/src/test-integration-workflows.ts | 115 +++++++++++-- packages/workflow/src/interceptors.ts | 16 +- packages/workflow/src/workflow.ts | 157 ++++++++++-------- 10 files changed, 302 insertions(+), 132 deletions(-) create mode 100644 packages/common/src/user-metadata.ts diff --git a/packages/client/src/schedule-helpers.ts b/packages/client/src/schedule-helpers.ts index 9dcc164fc..463d49514 100644 --- a/packages/client/src/schedule-helpers.ts +++ b/packages/client/src/schedule-helpers.ts @@ -6,6 +6,8 @@ import { decompileRetryPolicy, extractWorkflowType, LoadedDataConverter, + encodeUserMetadata, + decodeUserMetadata, } from '@temporalio/common'; import { encodeUnifiedSearchAttributes, @@ -16,9 +18,7 @@ import { Headers } from '@temporalio/common/lib/interceptors'; import { decodeArrayFromPayloads, decodeMapFromPayloads, - decodeOptionalSinglePayload, encodeMapToPayloads, - encodeOptionalToPayload, encodeToPayloads, } from '@temporalio/common/lib/internal-non-workflow'; import { temporal } from '@temporalio/proto'; @@ -271,10 +271,7 @@ export async function encodeScheduleAction( } : undefined, header: { fields: headers }, - userMetadata: { - summary: await encodeOptionalToPayload(dataConverter, action?.staticSummary), - details: await encodeOptionalToPayload(dataConverter, action?.staticDetails), - }, + userMetadata: await encodeUserMetadata(dataConverter, action.staticSummary, action.staticDetails), priority: action.priority ? compilePriority(action.priority) : undefined, }, }; @@ -325,7 +322,7 @@ export async function decodeScheduleAction( pb: temporal.api.schedule.v1.IScheduleAction ): Promise { if (pb.startWorkflow) { - const userMetadata = pb.startWorkflow?.userMetadata; + const { staticSummary, staticDetails } = await decodeUserMetadata(dataConverter, pb.startWorkflow?.userMetadata); return { type: 'startWorkflow', // eslint-disable-next-line @typescript-eslint/no-non-null-assertion @@ -342,8 +339,8 @@ export async function decodeScheduleAction( workflowExecutionTimeout: optionalTsToMs(pb.startWorkflow.workflowExecutionTimeout), workflowRunTimeout: optionalTsToMs(pb.startWorkflow.workflowRunTimeout), workflowTaskTimeout: optionalTsToMs(pb.startWorkflow.workflowTaskTimeout), - staticSummary: (await decodeOptionalSinglePayload(dataConverter, userMetadata?.summary)) ?? undefined, - staticDetails: (await decodeOptionalSinglePayload(dataConverter, userMetadata?.details)) ?? undefined, + staticSummary, + staticDetails, priority: decodePriority(pb.startWorkflow.priority), }; } diff --git a/packages/client/src/workflow-client.ts b/packages/client/src/workflow-client.ts index f3a677cd3..0214a17b5 100644 --- a/packages/client/src/workflow-client.ts +++ b/packages/client/src/workflow-client.ts @@ -23,6 +23,7 @@ import { encodeWorkflowIdConflictPolicy, WorkflowIdConflictPolicy, compilePriority, + encodeUserMetadata, } from '@temporalio/common'; import { encodeUnifiedSearchAttributes } from '@temporalio/common/lib/converter/payload-search-attributes'; import { composeInterceptors } from '@temporalio/common/lib/interceptors'; @@ -34,7 +35,6 @@ import { decodeOptionalFailureToOptionalError, decodeOptionalSinglePayload, encodeMapToPayloads, - encodeOptionalToPayload, encodeToPayloads, } from '@temporalio/common/lib/internal-non-workflow'; import { filterNullAndUndefined } from '@temporalio/common/lib/internal-workflow'; @@ -1228,10 +1228,7 @@ export class WorkflowClient extends BaseClient { : undefined, cronSchedule: options.cronSchedule, header: { fields: headers }, - userMetadata: { - summary: await encodeOptionalToPayload(this.dataConverter, options?.staticSummary), - details: await encodeOptionalToPayload(this.dataConverter, options?.staticDetails), - }, + userMetadata: await encodeUserMetadata(this.dataConverter, options.staticSummary, options.staticDetails), priority: options.priority ? compilePriority(options.priority) : undefined, versioningOverride: options.versioningOverride ?? undefined, }; @@ -1301,10 +1298,7 @@ export class WorkflowClient extends BaseClient { : undefined, cronSchedule: opts.cronSchedule, header: { fields: headers }, - userMetadata: { - summary: await encodeOptionalToPayload(this.dataConverter, opts?.staticSummary), - details: await encodeOptionalToPayload(this.dataConverter, opts?.staticDetails), - }, + userMetadata: await encodeUserMetadata(this.dataConverter, opts.staticSummary, opts.staticDetails), priority: opts.priority ? compilePriority(opts.priority) : undefined, versioningOverride: opts.versioningOverride ?? undefined, }; diff --git a/packages/common/src/activity-options.ts b/packages/common/src/activity-options.ts index e228d9769..125b4e351 100644 --- a/packages/common/src/activity-options.ts +++ b/packages/common/src/activity-options.ts @@ -125,12 +125,12 @@ export interface ActivityOptions { versioningIntent?: VersioningIntent; /** - * A single-line fixed summary for this workflow execution that may appear in the UI/CLI. + * A fixed, single-line fixed summary for this workflow execution that may appear in the UI/CLI. * This can be in single-line Temporal markdown format. * * @experimental User metadata is a new API and suspectible to change. */ - staticSummary?: string; + summary?: string; /** * Priority of this activity @@ -202,10 +202,10 @@ export interface LocalActivityOptions { cancellationType?: ActivityCancellationType; /** - * A single-line fixed summary for this workflow execution that may appear in the UI/CLI. + * A fixed, single-line fixed summary for this workflow execution that may appear in the UI/CLI. * This can be in single-line Temporal markdown format. * * @experimental User metadata is a new API and suspectible to change. */ - staticSummary?: string; + summary?: string; } diff --git a/packages/common/src/index.ts b/packages/common/src/index.ts index ba703f3f0..f292d2070 100644 --- a/packages/common/src/index.ts +++ b/packages/common/src/index.ts @@ -37,6 +37,7 @@ export { TypedSearchAttributes, defineSearchAttributeKey, } from './search-attributes'; +export * from './user-metadata'; /** * Encode a UTF-8 string into a Uint8Array diff --git a/packages/common/src/internal-non-workflow/codec-helpers.ts b/packages/common/src/internal-non-workflow/codec-helpers.ts index 6f06360d6..6ab981fa8 100644 --- a/packages/common/src/internal-non-workflow/codec-helpers.ts +++ b/packages/common/src/internal-non-workflow/codec-helpers.ts @@ -1,5 +1,5 @@ import { Payload } from '../interfaces'; -import { arrayFromPayloads, fromPayloadsAtIndex, toPayloads } from '../converter/payload-converter'; +import { arrayFromPayloads, fromPayloadsAtIndex, PayloadConverter, toPayloads } from '../converter/payload-converter'; import { PayloadConverterError } from '../errors'; import { PayloadCodec } from '../converter/payload-codec'; import { ProtoFailure } from '../failure'; @@ -77,7 +77,7 @@ export async function decodeOptionalSinglePayload( dataConverter: LoadedDataConverter, payload?: Payload | null | undefined ): Promise { - const { payloadCodecs, payloadConverter } = dataConverter; + const { payloadConverter, payloadCodecs } = dataConverter; const decoded = await decodeOptionalSingle(payloadCodecs, payload); if (decoded == null) return decoded; return payloadConverter.fromPayload(decoded); @@ -94,15 +94,13 @@ export async function encodeToPayload(converter: LoadedDataConverter, value: unk /** * Run {@link PayloadConverter.toPayload} on an optional value, and then encode it. */ -export async function encodeOptionalToPayload( - converter: LoadedDataConverter, +export function encodeOptionalToPayload( + payloadConverter: PayloadConverter, value: unknown -): Promise { +): Payload | null | undefined { if (value == null) return value; - const { payloadConverter, payloadCodecs } = converter; - const payload = payloadConverter.toPayload(value); - return await encodeSingle(payloadCodecs, payload); + return payloadConverter.toPayload(value); } /** diff --git a/packages/common/src/internal-workflow/objects-helpers.ts b/packages/common/src/internal-workflow/objects-helpers.ts index addf156d0..a1659e38d 100644 --- a/packages/common/src/internal-workflow/objects-helpers.ts +++ b/packages/common/src/internal-workflow/objects-helpers.ts @@ -35,3 +35,33 @@ export function mergeObjects>( return changed ? (merged as T) : original; } + +function isObject(item: any): item is Record { + return item && typeof item === 'object' && !Array.isArray(item); +} + +/** + * Recursively merges two objects, returning a new object. + * + * Properties from `source` will overwrite properties on `target`. + * Nested objects are merged recursively. + * + * Object fields in the returned object are references, as in, + * the returned object is not completely fresh. + */ +export function deepMerge>(target: T, source: Partial): T { + const output = { ...target }; + + if (isObject(target) && isObject(source)) { + for (const key of Object.keys(source)) { + const sourceValue = source[key]; + if (isObject(sourceValue) && key in target && isObject(target[key] as any)) { + output[key as keyof T] = deepMerge(target[key], sourceValue); + } else { + (output as any)[key] = sourceValue; + } + } + } + + return output; +} diff --git a/packages/common/src/user-metadata.ts b/packages/common/src/user-metadata.ts new file mode 100644 index 000000000..243e41314 --- /dev/null +++ b/packages/common/src/user-metadata.ts @@ -0,0 +1,66 @@ +import { temporal } from '@temporalio/proto'; +import { PayloadConverter } from './converter/payload-converter'; +import { LoadedDataConverter } from './converter/data-converter'; +import { encodeOptionalToPayload, decodeOptionalSinglePayload, encodeOptionalSingle } from './internal-non-workflow'; + +/** + * User metadata that can be attached to workflow commands. + */ +export interface UserMetadata { + /** @experimental A fixed, single line summary of the command's purpose */ + staticSummary?: string; + /** @experimental Fixed additional details about the command for longer-text description, can span multiple lines */ + staticDetails?: string; +} + +export function userMetadataToPayload( + payloadConverter: PayloadConverter, + staticSummary: string | undefined, + staticDetails: string | undefined +): temporal.api.sdk.v1.IUserMetadata | undefined { + if (staticSummary == null && staticDetails == null) return undefined; + + const summary = encodeOptionalToPayload(payloadConverter, staticSummary); + const details = encodeOptionalToPayload(payloadConverter, staticDetails); + + if (summary == null && details == null) return undefined; + + return { summary, details }; +} + +export async function encodeUserMetadata( + dataConverter: LoadedDataConverter, + staticSummary: string | undefined, + staticDetails: string | undefined +): Promise { + if (staticSummary == null && staticDetails == null) return undefined; + + const { payloadConverter, payloadCodecs } = dataConverter; + const summary = await encodeOptionalSingle( + payloadCodecs, + await encodeOptionalToPayload(payloadConverter, staticSummary) + ); + const details = await encodeOptionalSingle( + payloadCodecs, + await encodeOptionalToPayload(payloadConverter, staticDetails) + ); + + if (summary == null && details == null) return undefined; + + return { summary, details }; +} + +export async function decodeUserMetadata( + dataConverter: LoadedDataConverter, + metadata: temporal.api.sdk.v1.IUserMetadata | undefined | null +): Promise { + const res = { staticSummary: undefined, staticDetails: undefined }; + if (metadata == null) return res; + + const staticSummary = (await decodeOptionalSinglePayload(dataConverter, metadata.summary)) ?? undefined; + const staticDetails = (await decodeOptionalSinglePayload(dataConverter, metadata.details)) ?? undefined; + + if (staticSummary == null && staticDetails == null) return res; + + return { staticSummary, staticDetails }; +} diff --git a/packages/test/src/test-integration-workflows.ts b/packages/test/src/test-integration-workflows.ts index 8f917ca45..04e7f44d4 100644 --- a/packages/test/src/test-integration-workflows.ts +++ b/packages/test/src/test-integration-workflows.ts @@ -1417,35 +1417,75 @@ test('Workflow can return root workflow', async (t) => { }); }); -export async function userMetadataWorkflow(): Promise { +export async function userMetadataWorkflow(): Promise<{ + currentDetails: string; + childWorkflowId: string; + childRunId: string; +}> { let done = false; const signalDef = defineSignal('done'); - setHandler(signalDef, () => { - done = true; - }); + setHandler( + signalDef, + () => { + done = true; + }, + { description: 'signal-desc' } + ); // That workflow should call an activity (with summary) - const { activityWithSummary } = workflow.proxyActivities({ scheduleToCloseTimeout: '10s' }); - await activityWithSummary.runWithOptions( + const { activityWithSummary } = workflow.proxyActivities({ + scheduleToCloseTimeout: '10s', + scheduleToStartTimeout: '10s', + }); + await activityWithSummary.executeWithOptions( { - staticSummary: 'activity summary', + summary: 'activity summary', + retry: { + initialInterval: '1s', + maximumAttempts: 5, + maximumInterval: '10s', + }, + scheduleToStartTimeout: '5s', }, [] ); - // Should have a timer (with summary) + const { localActivityWithSummary } = workflow.proxyLocalActivities({ scheduleToCloseTimeout: '10s' }); + await localActivityWithSummary.executeWithOptions( + { + summary: 'local activity summary', + retry: { + maximumAttempts: 2, + nonRetryableErrorTypes: ['CustomError'], + }, + scheduleToStartTimeout: '5s', + }, + [] + ); + // Timer (with summary) await workflow.sleep(5, { summary: 'timer summary' }); // Set current details workflow.setCurrentDetails('current wf details'); - // Unblock on var -> query current details (or return) + // Start child workflow + const child_handle = await workflow.startChild(completableWorkflow, { + args: [false], + staticDetails: 'child details', + staticSummary: 'child summary', + }); + await workflow.condition(() => done); - return workflow.getCurrentDetails(); + return { + currentDetails: workflow.getCurrentDetails(), + childWorkflowId: child_handle.workflowId, + childRunId: child_handle.firstExecutionRunId, + }; } -test('User metadata on workflow, timer, activity', async (t) => { +test('User metadata on workflow, timer, activity, child', async (t) => { const { createWorker, startWorkflow } = helpers(t); const worker = await createWorker({ activities: { async activityWithSummary() {}, + async localActivityWithSummary() {}, }, }); @@ -1462,9 +1502,15 @@ test('User metadata on workflow, timer, activity', async (t) => { await handle.signal('done'); const res = await handle.result(); - t.true(res === 'current wf details'); + t.true(res.currentDetails === 'current wf details'); + + // Get child workflow handle and verify metadata + const childHandle = t.context.env.client.workflow.getHandle(res.childWorkflowId, res.childRunId); + const childDesc = await childHandle.describe(); + t.true((await childDesc.staticSummary()) === 'child summary'); + t.true((await childDesc.staticDetails()) === 'child details'); - // Get history events for timer and activity summaries. + // Get history events for main workflow. const resp = await t.context.env.client.workflowService.getWorkflowExecutionHistory({ namespace: t.context.env.client.options.namespace, execution: { @@ -1477,14 +1523,14 @@ test('User metadata on workflow, timer, activity', async (t) => { t.deepEqual( await decodeOptionalSinglePayload( t.context.env.client.options.loadedDataConverter, - event.userMetadata?.summary ?? {} + event.userMetadata?.summary ), 'wf static summary' ); t.deepEqual( await decodeOptionalSinglePayload( t.context.env.client.options.loadedDataConverter, - event.userMetadata?.details ?? {} + event.userMetadata?.details ), 'wf static details' ); @@ -1492,25 +1538,60 @@ test('User metadata on workflow, timer, activity', async (t) => { t.deepEqual( await decodeOptionalSinglePayload( t.context.env.client.options.loadedDataConverter, - event.userMetadata?.summary ?? {} + event.userMetadata?.summary ), 'activity summary' ); + // Assert that the overriden activity options are what we expect. + const attrs = event.activityTaskScheduledEventAttributes; + t.is(tsToMs(attrs?.scheduleToCloseTimeout), 10000); + t.is(tsToMs(attrs?.scheduleToStartTimeout), 5000); + const retryPolicy = attrs?.retryPolicy; + t.is(retryPolicy?.maximumAttempts, 5); + t.is(tsToMs(retryPolicy?.initialInterval), 1000); + t.is(tsToMs(retryPolicy?.maximumInterval), 10000); } else if (event.eventType === temporal.api.enums.v1.EventType.EVENT_TYPE_TIMER_STARTED) { t.deepEqual( await decodeOptionalSinglePayload( t.context.env.client.options.loadedDataConverter, - event.userMetadata?.summary ?? {} + event.userMetadata?.summary ), 'timer summary' ); } } + // Get history events for child workflow. + const childResp = await t.context.env.client.workflowService.getWorkflowExecutionHistory({ + namespace: t.context.env.client.options.namespace, + execution: { + workflowId: res.childWorkflowId, + runId: res.childRunId, + }, + }); + for (const event of childResp.history?.events ?? []) { + if (event.eventType === temporal.api.enums.v1.EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED) { + t.deepEqual( + await decodeOptionalSinglePayload( + t.context.env.client.options.loadedDataConverter, + event.userMetadata?.summary + ), + 'child summary' + ); + t.deepEqual( + await decodeOptionalSinglePayload( + t.context.env.client.options.loadedDataConverter, + event.userMetadata?.details + ), + 'child details' + ); + } + } // Run metadata query -> get current details const wfMetadata = (await handle.query('__temporal_workflow_metadata')) as temporal.api.sdk.v1.IWorkflowMetadata; t.deepEqual(wfMetadata.definition?.signalDefinitions?.length, 1); t.deepEqual(wfMetadata.definition?.signalDefinitions?.[0].name, 'done'); + t.deepEqual(wfMetadata.definition?.signalDefinitions?.[0].description, 'signal-desc'); t.deepEqual(wfMetadata.definition?.queryDefinitions?.length, 3); // default queries t.deepEqual(wfMetadata.currentDetails, 'current wf details'); }); diff --git a/packages/workflow/src/interceptors.ts b/packages/workflow/src/interceptors.ts index f810f6e27..ae2f91fa7 100644 --- a/packages/workflow/src/interceptors.ts +++ b/packages/workflow/src/interceptors.ts @@ -109,20 +109,6 @@ export interface StartChildWorkflowExecutionInput { readonly seq: number; } -/** - * User metadata that can be attached to workflow commands. - * - * Current used for: - * - startTimer, scheduleActivity/scheduleLocalActivity commands - * - internal metadata query - */ -export interface UserMetadata { - /** @experimental A single line summary of the command's purpose */ - summary?: string; - /** @experimental Additional details about the command for longer-text description, can span multiple lines */ - details?: string; -} - /** Input for WorkflowOutboundCallsInterceptor.startTimer */ export interface TimerInput { readonly durationMs: number; @@ -132,7 +118,7 @@ export interface TimerInput { /** Options for starting a timer (i.e. sleep) */ export interface TimerOptions { - /** @experimental A single line summary of the command's purpose */ + /** @experimental A fixed, single line summary of the command's purpose */ readonly summary?: string; } diff --git a/packages/workflow/src/workflow.ts b/packages/workflow/src/workflow.ts index 1586b3cc2..9ea65ff8b 100644 --- a/packages/workflow/src/workflow.ts +++ b/packages/workflow/src/workflow.ts @@ -2,6 +2,7 @@ import { ActivityFunction, ActivityOptions, compileRetryPolicy, + compilePriority, encodeActivityCancellationType, encodeWorkflowIdReusePolicy, extractWorkflowType, @@ -22,8 +23,8 @@ import { WorkflowReturnType, WorkflowUpdateValidatorType, SearchAttributeUpdatePair, - compilePriority, WorkflowDefinitionOptionsOrGetter, + userMetadataToPayload, } from '@temporalio/common'; import { encodeUnifiedSearchAttributes, @@ -33,6 +34,7 @@ import { versioningIntentToProto } from '@temporalio/common/lib/versioning-inten import { Duration, msOptionalToTs, msToNumber, msToTs, requiredTsToMs } from '@temporalio/common/lib/time'; import { composeInterceptors } from '@temporalio/common/lib/interceptors'; import { temporal } from '@temporalio/proto'; +import { deepMerge } from '@temporalio/common/lib/internal-workflow/objects-helpers'; import { CancellationScope, registerSleepImplementation } from './cancellation-scope'; import { UpdateScope } from './update-scope'; import { @@ -116,9 +118,7 @@ function timerNextHandler({ seq, durationMs, options }: TimerInput) { seq, startToFireTimeout: msToTs(durationMs), }, - userMetadata: options && { - summary: options.summary ? activator.payloadConverter.toPayload(options.summary) : undefined, - }, + userMetadata: userMetadataToPayload(activator.payloadConverter, options?.summary, undefined), }); activator.completions.timer.set(seq, { resolve, @@ -134,7 +134,7 @@ function timerNextHandler({ seq, durationMs, options }: TimerInput) { * * @param ms sleep duration - number of milliseconds or {@link https://www.npmjs.com/package/ms | ms-formatted string}. * If given a negative number or 0, value will be set to 1. - * @param summary a short summary/description of the timer. Can serve as a timer ID. + * @param options optional timer options for additional configuration */ export function sleep(ms: Duration, options?: TimerOptions): Promise { const activator = assertInWorkflowContext('Workflow.sleep(...) may only be used from a Workflow Execution'); @@ -204,11 +204,7 @@ function scheduleActivityNextHandler({ options, args, headers, seq, activityType versioningIntent: versioningIntentToProto(options.versioningIntent), priority: options.priority ? compilePriority(options.priority) : undefined, }, - userMetadata: options.staticSummary - ? { - summary: activator.payloadConverter.toPayload(options.staticSummary), - } - : undefined, + userMetadata: userMetadataToPayload(activator.payloadConverter, options.summary, undefined), }); activator.completions.activity.set(seq, { resolve, @@ -274,9 +270,7 @@ async function scheduleLocalActivityNextHandler({ headers, cancellationType: encodeActivityCancellationType(options.cancellationType), }, - userMetadata: options && { - summary: options?.staticSummary ? activator.payloadConverter.toPayload(options?.staticSummary) : undefined, - }, + userMetadata: userMetadataToPayload(activator.payloadConverter, options.summary, undefined), }); activator.completions.activity.set(seq, { resolve, @@ -315,8 +309,7 @@ export function scheduleActivity(activityType: string, args: any[], options: export async function scheduleLocalActivity( activityType: string, args: any[], - options: LocalActivityOptions, - summary?: string + options: LocalActivityOptions ): Promise { const activator = assertInWorkflowContext( 'Workflow.scheduleLocalActivity(...) may only be used from a Workflow Execution' @@ -345,7 +338,6 @@ export async function scheduleLocalActivity( seq, attempt, originalScheduleTime, - ...(summary !== undefined && { cmdOpts: { userMetadata: { summary } } }), })) as Promise; } catch (err) { if (err instanceof LocalActivityDoBackoff) { @@ -415,6 +407,7 @@ function startChildWorkflowExecutionNextHandler({ versioningIntent: versioningIntentToProto(options.versioningIntent), priority: options.priority ? compilePriority(options.priority) : undefined, }, + userMetadata: userMetadataToPayload(activator.payloadConverter, options?.staticSummary, options?.staticDetails), }); activator.completions.childWorkflowStart.set(seq, { resolve, @@ -520,23 +513,41 @@ export type ActivityInterfaceFor = { [K in keyof T]: T[K] extends ActivityFunction ? ActivityFunctionWithOptions : typeof NotAnActivityMethod; }; +export type ActivityFunctionWithOptions = T & { + /** + * Execute the activity, overriding its existing options with the + * provided options. + * + * @param options ActivityOptions + * @param args: list of arguments + * @returns return value of the activity + * + * @experimental executeWithOptions is a new method to provide call-site options + * and is subject to change + */ + executeWithOptions(options: ActivityOptions, args: Parameters): Promise>>; +}; + /** - * Extends ActivityInterfaceFor to include the withOptions method + * The local activity counterpart to {@link ActivityInterfaceFor} */ -export type ActivityInterfaceWithOptions = ActivityInterfaceFor & { - runWithOptions(options: ActivityOptions, args: any[]): Promise; +export type LocalActivityInterfaceFor = { + [K in keyof T]: T[K] extends ActivityFunction ? LocalActivityFunctionWithOptions : typeof NotAnActivityMethod; }; -export type ActivityFunctionWithOptions = T & { +export type LocalActivityFunctionWithOptions = T & { /** - * Run the activity, overriding its existing options with the + * Run the local activity, overriding its existing options with the * provided options. * - * @param options ActivityOptions + * @param options LocalActivityOptions * @param args: list of arguments * @returns return value of the activity + * + * @experimental executeWithOptions is a new method to provide call-site options + * and is subject to change */ - runWithOptions(options: ActivityOptions, args: Parameters): Promise>>; + executeWithOptions(options: LocalActivityOptions, args: Parameters): Promise>>; }; /** @@ -561,12 +572,12 @@ export type ActivityFunctionWithOptions = T & { * const result1 = await httpGet('http://example.com'); * * // Override options for specific activity calls - * const result2 = await httpGet.runWithOptions({ + * const result2 = await httpGet.executeWithOptions({ * staticSummary: 'Fetches data from external API', * scheduleToCloseTimeout: '5m' * }, ['http://api.example.com']); * - * const result3 = await otherActivity.runWithOptions({ + * const result3 = await otherActivity.executeWithOptions({ * staticSummary: 'Processes the fetched data', * taskQueue: 'special-task-queue' * }, [data]); @@ -588,7 +599,7 @@ export type ActivityFunctionWithOptions = T & { * export function execute(): Promise { * const response = await httpGet("http://example.com"); * // Or with custom options: - * const response2 = await httpGetFromJava.runWithOptions({ + * const response2 = await httpGetFromJava.executeWithOptions({ * staticSummary: 'Java HTTP call with timeout override', * startToCloseTimeout: '2m' * }, ["http://fast-api.example.com"]); @@ -596,36 +607,33 @@ export type ActivityFunctionWithOptions = T & { * } * ``` */ -export function proxyActivities(options: ActivityOptions): ActivityInterfaceWithOptions { +export function proxyActivities(options: ActivityOptions): ActivityInterfaceFor { if (options === undefined) { throw new TypeError('options must be defined'); } // Validate as early as possible for immediate user feedback validateActivityOptions(options); - function createActivityProxy(options: ActivityOptions): ActivityInterfaceWithOptions { - return new Proxy({} as ActivityInterfaceWithOptions, { - get(_, activityType) { - if (typeof activityType !== 'string') { - throw new TypeError(`Only strings are supported for Activity types, got: ${String(activityType)}`); - } + return new Proxy({} as ActivityInterfaceFor, { + get(_, activityType) { + if (typeof activityType !== 'string') { + throw new TypeError(`Only strings are supported for Activity types, got: ${String(activityType)}`); + } - function activityProxyFunction(...args: unknown[]): Promise { - return scheduleActivity(activityType as string, args, options); - } + function activityProxyFunction(...args: unknown[]): Promise { + return scheduleActivity(activityType as string, args, options); + } - activityProxyFunction.runWithOptions = function ( - overrideOptions: ActivityOptions, - args: any[] - ): Promise { - return scheduleActivity(activityType, args, { ...options, ...overrideOptions }); - }; + activityProxyFunction.executeWithOptions = function ( + overrideOptions: ActivityOptions, + args: any[] + ): Promise { + return scheduleActivity(activityType, args, deepMerge(options, overrideOptions)); + }; - return activityProxyFunction; - }, - }); - } - return createActivityProxy(options); + return activityProxyFunction; + }, + }); } /** @@ -640,36 +648,33 @@ export function proxyActivities(options: ActivityOptions) */ export function proxyLocalActivities( options: LocalActivityOptions -): ActivityInterfaceWithOptions { +): LocalActivityInterfaceFor { if (options === undefined) { throw new TypeError('options must be defined'); } // Validate as early as possible for immediate user feedback validateLocalActivityOptions(options); - function createActivityProxy(options: ActivityOptions): ActivityInterfaceWithOptions { - return new Proxy({} as ActivityInterfaceWithOptions, { - get(_, activityType) { - if (typeof activityType !== 'string') { - throw new TypeError(`Only strings are supported for Activity types, got: ${String(activityType)}`); - } + return new Proxy({} as LocalActivityInterfaceFor, { + get(_, activityType) { + if (typeof activityType !== 'string') { + throw new TypeError(`Only strings are supported for Activity types, got: ${String(activityType)}`); + } - function activityProxyFunction(...args: unknown[]): Promise { - return scheduleLocalActivity(activityType as string, args, options); - } + function localActivityProxyFunction(...args: unknown[]): Promise { + return scheduleLocalActivity(activityType as string, args, options); + } - activityProxyFunction.runWithOptions = function ( - overrideOptions: ActivityOptions, - args: any[] - ): Promise { - return scheduleLocalActivity(activityType, args, { ...options, ...overrideOptions }); - }; + localActivityProxyFunction.executeWithOptions = function ( + overrideOptions: LocalActivityOptions, + args: any[] + ): Promise { + return scheduleLocalActivity(activityType, args, deepMerge(options, overrideOptions)); + }; - return activityProxyFunction; - }, - }); - } - return createActivityProxy(options); + return localActivityProxyFunction; + }, + }); } // TODO: deprecate this patch after "enough" time has passed @@ -1127,6 +1132,18 @@ export function deprecatePatch(patchId: string): void { activator.patchInternal(patchId, true); } +/** + * Returns a Promise that resolves when `fn` evaluates to `true` or `timeout` expires, providing + * options to configure the timer (i.e. provide metadata) + * + * @param timeout number of milliseconds or {@link https://www.npmjs.com/package/ms | ms-formatted string} + * + * @returns a boolean indicating whether the condition was true before the timeout expires + * + * @experimental TimerOptions is a new addition and subject to change + */ +export function condition(fn: () => boolean, timeout: Duration, options: TimerOptions): Promise; + /** * Returns a Promise that resolves when `fn` evaluates to `true` or `timeout` expires. * @@ -1141,7 +1158,7 @@ export function condition(fn: () => boolean, timeout: Duration): Promise boolean): Promise; -export async function condition(fn: () => boolean, timeout?: Duration): Promise { +export async function condition(fn: () => boolean, timeout?: Duration, opts?: TimerOptions): Promise { assertInWorkflowContext('Workflow.condition(...) may only be used from a Workflow Execution.'); // Prior to 1.5.0, `condition(fn, 0)` was treated as equivalent to `condition(fn, undefined)` if (timeout === 0 && !patched(CONDITION_0_PATCH)) { @@ -1150,7 +1167,7 @@ export async function condition(fn: () => boolean, timeout?: Duration): Promise< if (typeof timeout === 'number' || typeof timeout === 'string') { return CancellationScope.cancellable(async () => { try { - return await Promise.race([sleep(timeout).then(() => false), conditionInner(fn).then(() => true)]); + return await Promise.race([sleep(timeout, opts).then(() => false), conditionInner(fn).then(() => true)]); } finally { CancellationScope.current().cancel(); } From 3cfffa7666b4435d370788bd0457acbd910ca017 Mon Sep 17 00:00:00 2001 From: Thomas Hardy Date: Mon, 16 Jun 2025 16:00:02 +0200 Subject: [PATCH 11/12] break circular dependency --- packages/client/src/schedule-helpers.ts | 3 +-- packages/client/src/workflow-client.ts | 2 +- packages/common/src/index.ts | 1 - packages/workflow/src/workflow.ts | 4 ++-- 4 files changed, 4 insertions(+), 6 deletions(-) diff --git a/packages/client/src/schedule-helpers.ts b/packages/client/src/schedule-helpers.ts index 463d49514..2d5904102 100644 --- a/packages/client/src/schedule-helpers.ts +++ b/packages/client/src/schedule-helpers.ts @@ -6,9 +6,8 @@ import { decompileRetryPolicy, extractWorkflowType, LoadedDataConverter, - encodeUserMetadata, - decodeUserMetadata, } from '@temporalio/common'; +import { encodeUserMetadata, decodeUserMetadata } from '@temporalio/common/lib/user-metadata'; import { encodeUnifiedSearchAttributes, decodeSearchAttributes, diff --git a/packages/client/src/workflow-client.ts b/packages/client/src/workflow-client.ts index 0214a17b5..9ec730155 100644 --- a/packages/client/src/workflow-client.ts +++ b/packages/client/src/workflow-client.ts @@ -23,8 +23,8 @@ import { encodeWorkflowIdConflictPolicy, WorkflowIdConflictPolicy, compilePriority, - encodeUserMetadata, } from '@temporalio/common'; +import { encodeUserMetadata } from '@temporalio/common/lib/user-metadata'; import { encodeUnifiedSearchAttributes } from '@temporalio/common/lib/converter/payload-search-attributes'; import { composeInterceptors } from '@temporalio/common/lib/interceptors'; import { History } from '@temporalio/common/lib/proto-utils'; diff --git a/packages/common/src/index.ts b/packages/common/src/index.ts index f292d2070..ba703f3f0 100644 --- a/packages/common/src/index.ts +++ b/packages/common/src/index.ts @@ -37,7 +37,6 @@ export { TypedSearchAttributes, defineSearchAttributeKey, } from './search-attributes'; -export * from './user-metadata'; /** * Encode a UTF-8 string into a Uint8Array diff --git a/packages/workflow/src/workflow.ts b/packages/workflow/src/workflow.ts index 9ea65ff8b..f6c4a898b 100644 --- a/packages/workflow/src/workflow.ts +++ b/packages/workflow/src/workflow.ts @@ -24,8 +24,8 @@ import { WorkflowUpdateValidatorType, SearchAttributeUpdatePair, WorkflowDefinitionOptionsOrGetter, - userMetadataToPayload, } from '@temporalio/common'; +import { userMetadataToPayload } from '@temporalio/common/lib/user-metadata'; import { encodeUnifiedSearchAttributes, searchAttributePayloadConverter, @@ -34,7 +34,7 @@ import { versioningIntentToProto } from '@temporalio/common/lib/versioning-inten import { Duration, msOptionalToTs, msToNumber, msToTs, requiredTsToMs } from '@temporalio/common/lib/time'; import { composeInterceptors } from '@temporalio/common/lib/interceptors'; import { temporal } from '@temporalio/proto'; -import { deepMerge } from '@temporalio/common/lib/internal-workflow/objects-helpers'; +import { deepMerge } from '@temporalio/common/lib/internal-workflow'; import { CancellationScope, registerSleepImplementation } from './cancellation-scope'; import { UpdateScope } from './update-scope'; import { From e56a117e2e2200979573ecea3326c5e15b77401b Mon Sep 17 00:00:00 2001 From: Thomas Hardy Date: Tue, 17 Jun 2025 02:44:55 +0200 Subject: [PATCH 12/12] add codec testing --- .../test/src/test-integration-split-two.ts | 189 +++++++++++++++++- .../test/src/test-integration-workflows.ts | 184 +---------------- packages/worker/src/workflow-codec-runner.ts | 7 + 3 files changed, 195 insertions(+), 185 deletions(-) diff --git a/packages/test/src/test-integration-split-two.ts b/packages/test/src/test-integration-split-two.ts index d3d934932..4145720a2 100644 --- a/packages/test/src/test-integration-split-two.ts +++ b/packages/test/src/test-integration-split-two.ts @@ -12,9 +12,26 @@ import { } from '@temporalio/common'; import { searchAttributePayloadConverter } from '@temporalio/common/lib/converter/payload-search-attributes'; import { msToNumber, tsToMs } from '@temporalio/common/lib/time'; -import { decode as payloadDecode, decodeFromPayloadsAtIndex } from '@temporalio/common/lib/internal-non-workflow'; +import { + decode as payloadDecode, + decodeFromPayloadsAtIndex, + decodeOptionalSinglePayload, +} from '@temporalio/common/lib/internal-non-workflow'; -import { condition, defineQuery, defineSignal, setDefaultQueryHandler, setHandler, sleep } from '@temporalio/workflow'; +import { + condition, + defineQuery, + defineSignal, + getCurrentDetails, + proxyActivities, + proxyLocalActivities, + setCurrentDetails, + setDefaultQueryHandler, + setHandler, + sleep, + startChild, +} from '@temporalio/workflow'; +import { temporal } from '@temporalio/proto'; import { configurableHelpers, createTestWorkflowBundle } from './helpers-integration'; import * as activities from './activities'; import * as workflows from './workflows'; @@ -763,3 +780,171 @@ test.serial('default query handler is not used if requested query exists', confi t.deepEqual(result, { name: definedQuery.name, args }); }); }); + +export async function completableWorkflow(completes: boolean): Promise { + await condition(() => completes); +} + +export async function userMetadataWorkflow(): Promise<{ + currentDetails: string; + childWorkflowId: string; + childRunId: string; +}> { + let done = false; + const signalDef = defineSignal('done'); + setHandler( + signalDef, + () => { + done = true; + }, + { description: 'signal-desc' } + ); + + // That workflow should call an activity (with summary) + const { activityWithSummary } = proxyActivities({ + scheduleToCloseTimeout: '10s', + scheduleToStartTimeout: '10s', + }); + await activityWithSummary.executeWithOptions( + { + summary: 'activity summary', + retry: { + initialInterval: '1s', + maximumAttempts: 5, + maximumInterval: '10s', + }, + scheduleToStartTimeout: '5s', + }, + [] + ); + const { localActivityWithSummary } = proxyLocalActivities({ scheduleToCloseTimeout: '10s' }); + await localActivityWithSummary.executeWithOptions( + { + summary: 'local activity summary', + retry: { + maximumAttempts: 2, + nonRetryableErrorTypes: ['CustomError'], + }, + scheduleToStartTimeout: '5s', + }, + [] + ); + // Timer (with summary) + await sleep(5, { summary: 'timer summary' }); + // Set current details + setCurrentDetails('current wf details'); + // Start child workflow + const child_handle = await startChild(completableWorkflow, { + args: [false], + staticDetails: 'child details', + staticSummary: 'child summary', + }); + + await condition(() => done); + return { + currentDetails: getCurrentDetails(), + childWorkflowId: child_handle.workflowId, + childRunId: child_handle.firstExecutionRunId, + }; +} + +test.serial('User metadata on workflow, timer, activity, child', configMacro, async (t, config) => { + const { env, createWorkerWithDefaults } = config; + const { startWorkflow } = configurableHelpers(t, t.context.workflowBundle, env); + + const worker = await createWorkerWithDefaults(t, { + activities: { + async activityWithSummary() {}, + async localActivityWithSummary() {}, + }, + }); + + await worker.runUntil(async () => { + // Start a workflow with static details + const handle = await startWorkflow(userMetadataWorkflow, { + staticSummary: 'wf static summary', + staticDetails: 'wf static details', + }); + // Describe workflow -> static summary, static details + const desc = await handle.describe(); + t.true((await desc.staticSummary()) === 'wf static summary'); + t.true((await desc.staticDetails()) === 'wf static details'); + + await handle.signal('done'); + const res = await handle.result(); + t.true(res.currentDetails === 'current wf details'); + + // Get child workflow handle and verify metadata + const childHandle = env.client.workflow.getHandle(res.childWorkflowId, res.childRunId); + const childDesc = await childHandle.describe(); + t.true((await childDesc.staticSummary()) === 'child summary'); + t.true((await childDesc.staticDetails()) === 'child details'); + + // Get history events for main workflow. + const resp = await env.client.workflowService.getWorkflowExecutionHistory({ + namespace: env.client.options.namespace, + execution: { + workflowId: handle.workflowId, + runId: handle.firstExecutionRunId, + }, + }); + for (const event of resp.history?.events ?? []) { + if (event.eventType === temporal.api.enums.v1.EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED) { + t.deepEqual( + await decodeOptionalSinglePayload(env.client.options.loadedDataConverter, event.userMetadata?.summary), + 'wf static summary' + ); + t.deepEqual( + await decodeOptionalSinglePayload(env.client.options.loadedDataConverter, event.userMetadata?.details), + 'wf static details' + ); + } else if (event.eventType === temporal.api.enums.v1.EventType.EVENT_TYPE_ACTIVITY_TASK_SCHEDULED) { + t.deepEqual( + await decodeOptionalSinglePayload(env.client.options.loadedDataConverter, event.userMetadata?.summary), + 'activity summary' + ); + // Assert that the overriden activity options are what we expect. + const attrs = event.activityTaskScheduledEventAttributes; + t.is(tsToMs(attrs?.scheduleToCloseTimeout), 10000); + t.is(tsToMs(attrs?.scheduleToStartTimeout), 5000); + const retryPolicy = attrs?.retryPolicy; + t.is(retryPolicy?.maximumAttempts, 5); + t.is(tsToMs(retryPolicy?.initialInterval), 1000); + t.is(tsToMs(retryPolicy?.maximumInterval), 10000); + } else if (event.eventType === temporal.api.enums.v1.EventType.EVENT_TYPE_TIMER_STARTED) { + t.deepEqual( + await decodeOptionalSinglePayload(env.client.options.loadedDataConverter, event.userMetadata?.summary), + 'timer summary' + ); + } + } + // Get history events for child workflow. + const childResp = await env.client.workflowService.getWorkflowExecutionHistory({ + namespace: env.client.options.namespace, + execution: { + workflowId: res.childWorkflowId, + runId: res.childRunId, + }, + }); + + for (const event of childResp.history?.events ?? []) { + if (event.eventType === temporal.api.enums.v1.EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED) { + t.deepEqual( + await decodeOptionalSinglePayload(env.client.options.loadedDataConverter, event.userMetadata?.summary), + 'child summary' + ); + t.deepEqual( + await decodeOptionalSinglePayload(env.client.options.loadedDataConverter, event.userMetadata?.details), + 'child details' + ); + } + } + // Run metadata query -> get current details + const wfMetadata = (await handle.query('__temporal_workflow_metadata')) as temporal.api.sdk.v1.IWorkflowMetadata; + t.deepEqual(wfMetadata.definition?.signalDefinitions?.length, 1); + t.deepEqual(wfMetadata.definition?.signalDefinitions?.[0].name, 'done'); + t.deepEqual(wfMetadata.definition?.signalDefinitions?.[0].description, 'signal-desc'); + t.deepEqual(wfMetadata.definition?.queryDefinitions?.length, 3); // default queries + t.deepEqual(wfMetadata.currentDetails, 'current wf details'); + }); +}); diff --git a/packages/test/src/test-integration-workflows.ts b/packages/test/src/test-integration-workflows.ts index 04e7f44d4..ea05b71f6 100644 --- a/packages/test/src/test-integration-workflows.ts +++ b/packages/test/src/test-integration-workflows.ts @@ -8,7 +8,7 @@ import { msToNumber, tsToMs } from '@temporalio/common/lib/time'; import { TestWorkflowEnvironment } from '@temporalio/testing'; import { CancelReason } from '@temporalio/worker/lib/activity'; import * as workflow from '@temporalio/workflow'; -import { defineQuery, defineSignal, setHandler } from '@temporalio/workflow'; +import { defineQuery, defineSignal } from '@temporalio/workflow'; import { SdkFlags } from '@temporalio/workflow/lib/flags'; import { ActivityCancellationType, @@ -19,8 +19,6 @@ import { TypedSearchAttributes, WorkflowExecutionAlreadyStartedError, } from '@temporalio/common'; -import { temporal } from '@temporalio/proto'; -import { decodeOptionalSinglePayload } from '@temporalio/common/lib/internal-non-workflow'; import { signalSchedulingWorkflow } from './activities/helpers'; import { activityStartedSignal } from './workflows/definitions'; import * as workflows from './workflows'; @@ -1416,183 +1414,3 @@ test('Workflow can return root workflow', async (t) => { t.deepEqual(result, 'empty test-root-workflow-length'); }); }); - -export async function userMetadataWorkflow(): Promise<{ - currentDetails: string; - childWorkflowId: string; - childRunId: string; -}> { - let done = false; - const signalDef = defineSignal('done'); - setHandler( - signalDef, - () => { - done = true; - }, - { description: 'signal-desc' } - ); - - // That workflow should call an activity (with summary) - const { activityWithSummary } = workflow.proxyActivities({ - scheduleToCloseTimeout: '10s', - scheduleToStartTimeout: '10s', - }); - await activityWithSummary.executeWithOptions( - { - summary: 'activity summary', - retry: { - initialInterval: '1s', - maximumAttempts: 5, - maximumInterval: '10s', - }, - scheduleToStartTimeout: '5s', - }, - [] - ); - const { localActivityWithSummary } = workflow.proxyLocalActivities({ scheduleToCloseTimeout: '10s' }); - await localActivityWithSummary.executeWithOptions( - { - summary: 'local activity summary', - retry: { - maximumAttempts: 2, - nonRetryableErrorTypes: ['CustomError'], - }, - scheduleToStartTimeout: '5s', - }, - [] - ); - // Timer (with summary) - await workflow.sleep(5, { summary: 'timer summary' }); - // Set current details - workflow.setCurrentDetails('current wf details'); - // Start child workflow - const child_handle = await workflow.startChild(completableWorkflow, { - args: [false], - staticDetails: 'child details', - staticSummary: 'child summary', - }); - - await workflow.condition(() => done); - return { - currentDetails: workflow.getCurrentDetails(), - childWorkflowId: child_handle.workflowId, - childRunId: child_handle.firstExecutionRunId, - }; -} - -test('User metadata on workflow, timer, activity, child', async (t) => { - const { createWorker, startWorkflow } = helpers(t); - const worker = await createWorker({ - activities: { - async activityWithSummary() {}, - async localActivityWithSummary() {}, - }, - }); - - await worker.runUntil(async () => { - // Start a workflow with static details - const handle = await startWorkflow(userMetadataWorkflow, { - staticSummary: 'wf static summary', - staticDetails: 'wf static details', - }); - // Describe workflow -> static summary, static details - const desc = await handle.describe(); - t.true((await desc.staticSummary()) === 'wf static summary'); - t.true((await desc.staticDetails()) === 'wf static details'); - - await handle.signal('done'); - const res = await handle.result(); - t.true(res.currentDetails === 'current wf details'); - - // Get child workflow handle and verify metadata - const childHandle = t.context.env.client.workflow.getHandle(res.childWorkflowId, res.childRunId); - const childDesc = await childHandle.describe(); - t.true((await childDesc.staticSummary()) === 'child summary'); - t.true((await childDesc.staticDetails()) === 'child details'); - - // Get history events for main workflow. - const resp = await t.context.env.client.workflowService.getWorkflowExecutionHistory({ - namespace: t.context.env.client.options.namespace, - execution: { - workflowId: handle.workflowId, - runId: handle.firstExecutionRunId, - }, - }); - for (const event of resp.history?.events ?? []) { - if (event.eventType === temporal.api.enums.v1.EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED) { - t.deepEqual( - await decodeOptionalSinglePayload( - t.context.env.client.options.loadedDataConverter, - event.userMetadata?.summary - ), - 'wf static summary' - ); - t.deepEqual( - await decodeOptionalSinglePayload( - t.context.env.client.options.loadedDataConverter, - event.userMetadata?.details - ), - 'wf static details' - ); - } else if (event.eventType === temporal.api.enums.v1.EventType.EVENT_TYPE_ACTIVITY_TASK_SCHEDULED) { - t.deepEqual( - await decodeOptionalSinglePayload( - t.context.env.client.options.loadedDataConverter, - event.userMetadata?.summary - ), - 'activity summary' - ); - // Assert that the overriden activity options are what we expect. - const attrs = event.activityTaskScheduledEventAttributes; - t.is(tsToMs(attrs?.scheduleToCloseTimeout), 10000); - t.is(tsToMs(attrs?.scheduleToStartTimeout), 5000); - const retryPolicy = attrs?.retryPolicy; - t.is(retryPolicy?.maximumAttempts, 5); - t.is(tsToMs(retryPolicy?.initialInterval), 1000); - t.is(tsToMs(retryPolicy?.maximumInterval), 10000); - } else if (event.eventType === temporal.api.enums.v1.EventType.EVENT_TYPE_TIMER_STARTED) { - t.deepEqual( - await decodeOptionalSinglePayload( - t.context.env.client.options.loadedDataConverter, - event.userMetadata?.summary - ), - 'timer summary' - ); - } - } - // Get history events for child workflow. - const childResp = await t.context.env.client.workflowService.getWorkflowExecutionHistory({ - namespace: t.context.env.client.options.namespace, - execution: { - workflowId: res.childWorkflowId, - runId: res.childRunId, - }, - }); - - for (const event of childResp.history?.events ?? []) { - if (event.eventType === temporal.api.enums.v1.EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED) { - t.deepEqual( - await decodeOptionalSinglePayload( - t.context.env.client.options.loadedDataConverter, - event.userMetadata?.summary - ), - 'child summary' - ); - t.deepEqual( - await decodeOptionalSinglePayload( - t.context.env.client.options.loadedDataConverter, - event.userMetadata?.details - ), - 'child details' - ); - } - } - // Run metadata query -> get current details - const wfMetadata = (await handle.query('__temporal_workflow_metadata')) as temporal.api.sdk.v1.IWorkflowMetadata; - t.deepEqual(wfMetadata.definition?.signalDefinitions?.length, 1); - t.deepEqual(wfMetadata.definition?.signalDefinitions?.[0].name, 'done'); - t.deepEqual(wfMetadata.definition?.signalDefinitions?.[0].description, 'signal-desc'); - t.deepEqual(wfMetadata.definition?.queryDefinitions?.length, 3); // default queries - t.deepEqual(wfMetadata.currentDetails, 'current wf details'); - }); -}); diff --git a/packages/worker/src/workflow-codec-runner.ts b/packages/worker/src/workflow-codec-runner.ts index 5b3266688..ffc0ead8e 100644 --- a/packages/worker/src/workflow-codec-runner.ts +++ b/packages/worker/src/workflow-codec-runner.ts @@ -335,6 +335,13 @@ export class WorkflowCodecRunner { }, } : undefined, + userMetadata: + command.userMetadata && (command.userMetadata.summary || command.userMetadata.details) + ? { + summary: await encodeOptionalSingle(this.codecs, command.userMetadata.summary), + details: await encodeOptionalSingle(this.codecs, command.userMetadata.details), + } + : undefined, } ) ?? [] )