Skip to content

User metadata #1657

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 17 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions packages/client/src/schedule-helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ import { Headers } from '@temporalio/common/lib/interceptors';
import {
decodeArrayFromPayloads,
decodeMapFromPayloads,
decodeOptionalSinglePayload,
encodeMapToPayloads,
encodeOptionalToPayload,
encodeToPayloads,
} from '@temporalio/common/lib/internal-non-workflow';
import { temporal } from '@temporalio/proto';
Expand Down Expand Up @@ -196,8 +198,7 @@ export function decodeOptionalStructuredCalendarSpecs(
}

export function compileScheduleOptions(options: ScheduleOptions): CompiledScheduleOptions {
const workflowTypeOrFunc = options.action.workflowType;
const workflowType = extractWorkflowType(workflowTypeOrFunc);
const workflowType = extractWorkflowType(options.action.workflowType);
return {
...options,
action: {
Expand Down Expand Up @@ -270,6 +271,10 @@ export async function encodeScheduleAction(
}
: undefined,
header: { fields: headers },
userMetadata: {
summary: await encodeOptionalToPayload(dataConverter, action?.staticSummary),
details: await encodeOptionalToPayload(dataConverter, action?.staticDetails),
},
priority: action.priority ? compilePriority(action.priority) : undefined,
},
};
Expand Down Expand Up @@ -320,6 +325,7 @@ export async function decodeScheduleAction(
pb: temporal.api.schedule.v1.IScheduleAction
): Promise<ScheduleDescriptionAction> {
if (pb.startWorkflow) {
const userMetadata = pb.startWorkflow?.userMetadata;
return {
type: 'startWorkflow',
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
Expand All @@ -336,6 +342,8 @@ export async function decodeScheduleAction(
workflowExecutionTimeout: optionalTsToMs(pb.startWorkflow.workflowExecutionTimeout),
workflowRunTimeout: optionalTsToMs(pb.startWorkflow.workflowRunTimeout),
workflowTaskTimeout: optionalTsToMs(pb.startWorkflow.workflowTaskTimeout),
staticSummary: (await decodeOptionalSinglePayload(dataConverter, userMetadata?.summary)) ?? undefined,
staticDetails: (await decodeOptionalSinglePayload(dataConverter, userMetadata?.details)) ?? undefined,
priority: decodePriority(pb.startWorkflow.priority),
};
}
Expand Down
4 changes: 4 additions & 0 deletions packages/client/src/schedule-types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -783,6 +783,8 @@ export type ScheduleOptionsStartWorkflowAction<W extends Workflow> = {
| 'workflowExecutionTimeout'
| 'workflowRunTimeout'
| 'workflowTaskTimeout'
| 'staticDetails'
| 'staticSummary'
> & {
/**
* Workflow id to use when starting. Assign a meaningful business id.
Expand Down Expand Up @@ -815,6 +817,8 @@ export type ScheduleDescriptionStartWorkflowAction = ScheduleSummaryStartWorkflo
| 'workflowExecutionTimeout'
| 'workflowRunTimeout'
| 'workflowTaskTimeout'
| 'staticSummary'
| 'staticDetails'
| 'priority'
>;

Expand Down
18 changes: 17 additions & 1 deletion packages/client/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,23 @@ export type WorkflowExecutionDescription = Replace<
{
raw: DescribeWorkflowExecutionResponse;
}
>;
> & {
/**
* General fixed details for this workflow execution that may appear in UI/CLI.
* This can be in Temporal markdown format and can span multiple lines.
*
* @experimental User metadata is a new API and suspectible to change.
*/
staticDetails: () => Promise<string | undefined>;

/**
* A single-line fixed summary for this workflow execution that may appear in the UI/CLI.
* This can be in single-line Temporal markdown format.
*
* @experimental User metadata is a new API and suspectible to change.
*/
staticSummary: () => Promise<string | undefined>;
};

export type WorkflowService = proto.temporal.api.workflowservice.v1.WorkflowService;
export const { WorkflowService } = proto.temporal.api.workflowservice.v1;
Expand Down
18 changes: 16 additions & 2 deletions packages/client/src/workflow-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ import {
decodeArrayFromPayloads,
decodeFromPayloadsAtIndex,
decodeOptionalFailureToOptionalError,
decodeOptionalSinglePayload,
encodeMapToPayloads,
encodeOptionalToPayload,
encodeToPayloads,
} from '@temporalio/common/lib/internal-non-workflow';
import { filterNullAndUndefined } from '@temporalio/common/lib/internal-workflow';
Expand Down Expand Up @@ -511,7 +513,7 @@ export class WorkflowClient extends BaseClient {

protected async _start<T extends Workflow>(
workflowTypeOrFunc: string | T,
options: WithWorkflowArgs<T, WorkflowOptions>,
options: WorkflowStartOptions<T>,
interceptors: WorkflowClientInterceptor[]
): Promise<string> {
const workflowType = extractWorkflowType(workflowTypeOrFunc);
Expand Down Expand Up @@ -1226,6 +1228,10 @@ export class WorkflowClient extends BaseClient {
: undefined,
cronSchedule: options.cronSchedule,
header: { fields: headers },
userMetadata: {
summary: await encodeOptionalToPayload(this.dataConverter, options?.staticSummary),
details: await encodeOptionalToPayload(this.dataConverter, options?.staticDetails),
},
priority: options.priority ? compilePriority(options.priority) : undefined,
versioningOverride: options.versioningOverride ?? undefined,
};
Expand Down Expand Up @@ -1268,7 +1274,6 @@ export class WorkflowClient extends BaseClient {
protected async createStartWorkflowRequest(input: WorkflowStartInput): Promise<StartWorkflowExecutionRequest> {
const { options: opts, workflowType, headers } = input;
const { identity, namespace } = this.options;

return {
namespace,
identity,
Expand Down Expand Up @@ -1296,6 +1301,10 @@ export class WorkflowClient extends BaseClient {
: undefined,
cronSchedule: opts.cronSchedule,
header: { fields: headers },
userMetadata: {
summary: await encodeOptionalToPayload(this.dataConverter, opts?.staticSummary),
details: await encodeOptionalToPayload(this.dataConverter, opts?.staticDetails),
},
priority: opts.priority ? compilePriority(opts.priority) : undefined,
versioningOverride: opts.versioningOverride ?? undefined,
};
Expand Down Expand Up @@ -1431,8 +1440,13 @@ export class WorkflowClient extends BaseClient {
workflowExecution: { workflowId, runId },
});
const info = await executionInfoFromRaw(raw.workflowExecutionInfo ?? {}, this.client.dataConverter, raw);
const userMetadata = raw.executionConfig?.userMetadata;
return {
...info,
staticDetails: async () =>
(await decodeOptionalSinglePayload(this.client.dataConverter, userMetadata?.details)) ?? undefined,
staticSummary: async () =>
(await decodeOptionalSinglePayload(this.client.dataConverter, userMetadata?.summary)) ?? undefined,
raw,
};
},
Expand Down
16 changes: 16 additions & 0 deletions packages/common/src/activity-options.ts
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,14 @@ export interface ActivityOptions {
*/
versioningIntent?: VersioningIntent;

/**
* A single-line fixed summary for this workflow execution that may appear in the UI/CLI.
* This can be in single-line Temporal markdown format.
*
* @experimental User metadata is a new API and suspectible to change.
*/
staticSummary?: string;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that should be only summary, no static. What's the terminology in other SDKs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a bit of a mix, but generally static prefix for workflow start methods, otherwise no static, i'ved opted for the same


/**
* Priority of this activity
*/
Expand Down Expand Up @@ -192,4 +200,12 @@ export interface LocalActivityOptions {
* - `ABANDON` - Do not request cancellation of the activity and immediately report cancellation to the workflow.
*/
cancellationType?: ActivityCancellationType;

/**
* A single-line fixed summary for this workflow execution that may appear in the UI/CLI.
* This can be in single-line Temporal markdown format.
*
* @experimental User metadata is a new API and suspectible to change.
*/
staticSummary?: string;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same

Copy link
Contributor Author

@THardy98 THardy98 Jun 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no static prefix for activity options, changed

}
25 changes: 25 additions & 0 deletions packages/common/src/internal-non-workflow/codec-helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,17 @@ export async function decodeOptionalSingle(
return await decodeSingle(codecs, payload);
}

/** Run {@link PayloadCodec.decode} and convert from a single Payload */
export async function decodeOptionalSinglePayload<T>(
dataConverter: LoadedDataConverter,
payload?: Payload | null | undefined
): Promise<T | null | undefined> {
const { payloadCodecs, payloadConverter } = dataConverter;
const decoded = await decodeOptionalSingle(payloadCodecs, payload);
if (decoded == null) return decoded;
return payloadConverter.fromPayload(decoded);
}

/**
* Run {@link PayloadConverter.toPayload} on value, and then encode it.
*/
Expand All @@ -80,6 +91,20 @@ export async function encodeToPayload(converter: LoadedDataConverter, value: unk
return await encodeSingle(payloadCodecs, payloadConverter.toPayload(value));
}

/**
* Run {@link PayloadConverter.toPayload} on an optional value, and then encode it.
*/
export async function encodeOptionalToPayload(
converter: LoadedDataConverter,
value: unknown
): Promise<Payload | null | undefined> {
if (value == null) return value;

const { payloadConverter, payloadCodecs } = converter;
const payload = payloadConverter.toPayload(value);
return await encodeSingle(payloadCodecs, payload);
}

/**
* Decode `payloads` and then return {@link arrayFromPayloads}`.
*/
Expand Down
15 changes: 15 additions & 0 deletions packages/common/src/workflow-options.ts
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,21 @@ export interface BaseWorkflowOptions {
*/
typedSearchAttributes?: SearchAttributePair[] | TypedSearchAttributes;

/**
* General fixed details for this workflow execution that may appear in UI/CLI.
* This can be in Temporal markdown format and can span multiple lines.
*
* @experimental User metadata is a new API and suspectible to change.
*/
staticDetails?: string;
/**
* A single-line fixed summary for this workflow execution that may appear in the UI/CLI.
* This can be in single-line Temporal markdown format.
*
* @experimental User metadata is a new API and suspectible to change.
*/
staticSummary?: string;

/**
* Priority of a workflow
*/
Expand Down
103 changes: 102 additions & 1 deletion packages/test/src/test-integration-workflows.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import { msToNumber, tsToMs } from '@temporalio/common/lib/time';
import { TestWorkflowEnvironment } from '@temporalio/testing';
import { CancelReason } from '@temporalio/worker/lib/activity';
import * as workflow from '@temporalio/workflow';
import { defineQuery, defineSignal } from '@temporalio/workflow';
import { defineQuery, defineSignal, setHandler } from '@temporalio/workflow';
import { SdkFlags } from '@temporalio/workflow/lib/flags';
import {
ActivityCancellationType,
Expand All @@ -19,6 +19,8 @@ import {
TypedSearchAttributes,
WorkflowExecutionAlreadyStartedError,
} from '@temporalio/common';
import { temporal } from '@temporalio/proto';
import { decodeOptionalSinglePayload } from '@temporalio/common/lib/internal-non-workflow';
import { signalSchedulingWorkflow } from './activities/helpers';
import { activityStartedSignal } from './workflows/definitions';
import * as workflows from './workflows';
Expand Down Expand Up @@ -1414,3 +1416,102 @@ test('Workflow can return root workflow', async (t) => {
t.deepEqual(result, 'empty test-root-workflow-length');
});
});

export async function userMetadataWorkflow(): Promise<string> {
let done = false;
const signalDef = defineSignal('done');
setHandler(signalDef, () => {
done = true;
});

// That workflow should call an activity (with summary)
const { activityWithSummary } = workflow.proxyActivities({ scheduleToCloseTimeout: '10s' });
await activityWithSummary.runWithOptions(
{
staticSummary: 'activity summary',
},
[]
);
// Should have a timer (with summary)
await workflow.sleep(5, { summary: 'timer summary' });
// Set current details
workflow.setCurrentDetails('current wf details');
// Unblock on var -> query current details (or return)
await workflow.condition(() => done);
return workflow.getCurrentDetails();
}

test('User metadata on workflow, timer, activity', async (t) => {
const { createWorker, startWorkflow } = helpers(t);
const worker = await createWorker({
activities: {
async activityWithSummary() {},
},
});

await worker.runUntil(async () => {
// Start a workflow with static details
const handle = await startWorkflow(userMetadataWorkflow, {
staticSummary: 'wf static summary',
staticDetails: 'wf static details',
});
// Describe workflow -> static summary, static details
const desc = await handle.describe();
t.true((await desc.staticSummary()) === 'wf static summary');
t.true((await desc.staticDetails()) === 'wf static details');

await handle.signal('done');
const res = await handle.result();
t.true(res === 'current wf details');

// Get history events for timer and activity summaries.
const resp = await t.context.env.client.workflowService.getWorkflowExecutionHistory({
namespace: t.context.env.client.options.namespace,
execution: {
workflowId: handle.workflowId,
runId: handle.firstExecutionRunId,
},
});
for (const event of resp.history?.events ?? []) {
if (event.eventType === temporal.api.enums.v1.EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED) {
t.deepEqual(
await decodeOptionalSinglePayload(
t.context.env.client.options.loadedDataConverter,
event.userMetadata?.summary ?? {}
),
'wf static summary'
);
t.deepEqual(
await decodeOptionalSinglePayload(
t.context.env.client.options.loadedDataConverter,
event.userMetadata?.details ?? {}
),
'wf static details'
);
} else if (event.eventType === temporal.api.enums.v1.EventType.EVENT_TYPE_ACTIVITY_TASK_SCHEDULED) {
t.deepEqual(
await decodeOptionalSinglePayload(
t.context.env.client.options.loadedDataConverter,
event.userMetadata?.summary ?? {}
),
'activity summary'
);
} else if (event.eventType === temporal.api.enums.v1.EventType.EVENT_TYPE_TIMER_STARTED) {
t.deepEqual(
await decodeOptionalSinglePayload(
t.context.env.client.options.loadedDataConverter,
event.userMetadata?.summary ?? {}
),
'timer summary'
);
}
}

// Run metadata query -> get current details
const wfMetadata = (await handle.query('__temporal_workflow_metadata')) as temporal.api.sdk.v1.IWorkflowMetadata;
t.deepEqual(wfMetadata.definition?.signalDefinitions?.length, 1);
t.deepEqual(wfMetadata.definition?.signalDefinitions?.[0].name, 'done');
t.deepEqual(wfMetadata.definition?.queryDefinitions?.length, 3); // default queries
t.deepEqual(wfMetadata.currentDetails, 'current wf details');
});
});
27 changes: 27 additions & 0 deletions packages/test/src/test-schedules.ts
Original file line number Diff line number Diff line change
Expand Up @@ -881,4 +881,31 @@ if (RUN_INTEGRATION_TESTS) {
await handle.delete();
}
});

test.serial('User metadata on schedule', async (t) => {
const { client } = t.context;
const scheduleId = `schedule-with-user-metadata-${randomUUID()}`;
const handle = await client.schedule.create({
scheduleId,
spec: {},
action: {
type: 'startWorkflow',
workflowType: dummyWorkflow,
taskQueue,
staticSummary: 'schedule static summary',
staticDetails: 'schedule static details',
},
});

try {
const describedSchedule = await handle.describe();
t.deepEqual(describedSchedule.spec.calendars, []);
t.deepEqual(describedSchedule.spec.intervals, []);
t.deepEqual(describedSchedule.spec.skip, []);
t.deepEqual(describedSchedule.action.staticSummary, 'schedule static summary');
t.deepEqual(describedSchedule.action.staticDetails, 'schedule static details');
} finally {
await handle.delete();
}
});
}
Loading
Loading