Skip to content

feat(worker): Add a workflow metadata query #1319

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions packages/common/src/interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,15 @@ export type Payload = temporal.api.common.v1.IPayload;
export type WorkflowReturnType = Promise<any>;
export type WorkflowUpdateType = (...args: any[]) => Promise<any> | any;
export type WorkflowUpdateValidatorType = (...args: any[]) => void;
export type WorkflowUpdateAnnotatedType = {
handler: WorkflowUpdateType;
validator?: WorkflowUpdateValidatorType;
description?: string;
};
export type WorkflowSignalType = (...args: any[]) => Promise<void> | void;
export type WorkflowSignalAnnotatedType = { handler: WorkflowSignalType; description?: string };
export type WorkflowQueryType = (...args: any[]) => any;
export type WorkflowQueryAnnotatedType = { handler: WorkflowQueryType; description?: string };

/**
* Broad Workflow function definition, specific Workflows will typically use a narrower type definition, e.g:
Expand Down
2 changes: 2 additions & 0 deletions packages/proto/scripts/compile-proto.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ const coreProtoPath = resolve(protoBaseDir, 'local/temporal/sdk/core/core_interf
const workflowServiceProtoPath = resolve(protoBaseDir, 'api_upstream/temporal/api/workflowservice/v1/service.proto');
const operatorServiceProtoPath = resolve(protoBaseDir, 'api_upstream/temporal/api/operatorservice/v1/service.proto');
const errorDetailsProtoPath = resolve(protoBaseDir, 'api_upstream/temporal/api/errordetails/v1/message.proto');
const workflowMetadataProtoPath = resolve(protoBaseDir, 'api_upstream/temporal/api/sdk/v1/workflow_metadata.proto');
const testServiceRRProtoPath = resolve(
protoBaseDir,
'testsrv_upstream/temporal/api/testservice/v1/request_response.proto'
Expand Down Expand Up @@ -51,6 +52,7 @@ async function compileProtos(dtsOutputFile, ...args) {
workflowServiceProtoPath,
operatorServiceProtoPath,
errorDetailsProtoPath,
workflowMetadataProtoPath,
testServiceRRProtoPath,
testServiceProtoPath,
healthServiceProtoPath,
Expand Down
2 changes: 1 addition & 1 deletion packages/test/src/integration-tests-old.ts
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@ export function runIntegrationTests(codec?: PayloadCodec): void {
await t.throwsAsync(workflow.query('not found'), {
instanceOf: QueryNotRegisteredError,
message:
'Workflow did not register a handler for not found. Registered queries: [__stack_trace __enhanced_stack_trace isBlocked]',
'Workflow did not register a handler for not found. Registered queries: [__stack_trace __enhanced_stack_trace __temporal_workflow_metadata isBlocked]',
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's getting messy. I think we should consider not listing "internals" queries.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It could be useful for knowing what built-in queries, like __temporal_workflow_metadata, are supported by the WF.
Also the UI today may expect some handlers present when parsing the error message...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

UI doesn't and shouldn't expect specific handlers to be present in the list returned on query not found error message. Using that error message to report existing queries is hack, not normalized across SDKs, and going forward, UI should call __temporal_workflow_metadata directly anyway.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with you on the UI, but unfortunately, depending on how the UI parses the query today, we may break it if we return an empty set. It is also unfortunate that isBlocked does not have the __ internal prefix. Once they switch, we can do the cleanup with them...

Copy link
Member

@cretz cretz Jan 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 Fixing the query-failed string to not include all queries anymore should be a separate task once we feel comfortable enough modern UIs are no longer using it (i.e. fixed version has been out there a while). There are other approaches too such as having this error return the workflow metadata as extra error details. Hrmm, I wonder if we should do that...can discuss separately.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My point was not about query-failed no longer returning any queries, but only about not returning SDK built-in queries. For example, if using an old UI, they should not see __stack_trace, __enhanced_stack_trace and __temporal_workflow_metadata in the query drop down. Returning user queries is still fine, for now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If needed that's easy to filter by the UI using the __ prefix. I like that we can quickly know what internal queries are supported for a given workflow...

});
});

Expand Down
40 changes: 40 additions & 0 deletions packages/test/src/test-integration-workflows.ts
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,46 @@ test('Start of workflow with signal is delayed', async (t) => {
t.is(tsToMs(startDelay), 4678000);
});

export async function queryWorkflowMetadata(): Promise<void> {
const dummyQuery1 = workflow.defineQuery<void>('dummyQuery1');
const dummyQuery2 = workflow.defineQuery<void>('dummyQuery2');
const dummyQuery3 = workflow.defineQuery<void>('dummyQuery3');
const dummySignal1 = workflow.defineSignal('dummySignal1');
const dummyUpdate1 = workflow.defineUpdate<void>('dummyUpdate1');

workflow.setHandler(dummyQuery1, () => void {}, { description: 'ignore' });
// Override description
workflow.setHandler(dummyQuery1, () => void {}, { description: 'query1' });
workflow.setHandler(dummyQuery2, () => void {}, { description: 'query2' });
workflow.setHandler(dummyQuery3, () => void {}, { description: 'query3' });
// Remove handler
workflow.setHandler(dummyQuery3, undefined);
workflow.setHandler(dummySignal1, () => void {}, { description: 'signal1' });
workflow.setHandler(dummyUpdate1, () => void {}, { description: 'update1' });
await workflow.condition(() => false);
}

test('Query workflow metadata returns handler descriptions', async (t) => {
const { createWorker, startWorkflow } = helpers(t);

const worker = await createWorker();

await worker.runUntil(async () => {
const handle = await startWorkflow(queryWorkflowMetadata);
const meta = await handle.query(workflow.workflowMetadataQuery);
t.is(meta.definition?.type, 'queryWorkflowMetadata');
const queryDefinitions = meta.definition?.queryDefinitions;
// Three built-in ones plus dummyQuery1 and dummyQuery2
t.is(queryDefinitions?.length, 5);
t.deepEqual(queryDefinitions?.[3], { name: 'dummyQuery1', description: 'query1' });
t.deepEqual(queryDefinitions?.[4], { name: 'dummyQuery2', description: 'query2' });
const signalDefinitions = meta.definition?.signalDefinitions;
t.deepEqual(signalDefinitions, [{ name: 'dummySignal1', description: 'signal1' }]);
const updateDefinitions = meta.definition?.updateDefinitions;
t.deepEqual(updateDefinitions, [{ name: 'dummyUpdate1', description: 'update1' }]);
});
});

export async function executeEagerActivity(): Promise<void> {
const scheduleActivity = () =>
workflow
Expand Down
4 changes: 2 additions & 2 deletions packages/test/src/test-workflows.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1922,10 +1922,10 @@ test('query not found - successString', async (t) => {
queryId: 'qid',
failed: {
message:
'Workflow did not register a handler for not-found. Registered queries: [__stack_trace __enhanced_stack_trace]',
'Workflow did not register a handler for not-found. Registered queries: [__stack_trace __enhanced_stack_trace __temporal_workflow_metadata]',
source: 'TypeScriptSDK',
stackTrace:
'ReferenceError: Workflow did not register a handler for not-found. Registered queries: [__stack_trace __enhanced_stack_trace]',
'ReferenceError: Workflow did not register a handler for not-found. Registered queries: [__stack_trace __enhanced_stack_trace __temporal_workflow_metadata]',
applicationFailureInfo: {
type: 'ReferenceError',
nonRetryable: false,
Expand Down
15 changes: 15 additions & 0 deletions packages/workflow/src/interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -464,3 +464,18 @@ export type DefaultSignalHandler = (signalName: string, ...args: unknown[]) => v
* A validation function capable of accepting the arguments for a given UpdateDefinition.
*/
export type UpdateValidator<Args extends any[]> = (...args: Args) => void;

/**
* A description of a query handler.
*/
export type QueryHandlerOptions = { description?: string };

/**
* A description of a signal handler.
*/
export type SignalHandlerOptions = { description?: string };

/**
* A validator and description of an update handler.
*/
export type UpdateHandlerOptions<Args extends any[]> = { validator?: UpdateValidator<Args>; description?: string };
101 changes: 68 additions & 33 deletions packages/workflow/src/internals.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,15 @@ import {
TemporalFailure,
Workflow,
WorkflowExecutionAlreadyStartedError,
WorkflowQueryType,
WorkflowSignalType,
WorkflowUpdateType,
WorkflowQueryAnnotatedType,
WorkflowSignalAnnotatedType,
WorkflowUpdateAnnotatedType,
ProtoFailure,
WorkflowUpdateValidatorType,
ApplicationFailure,
} from '@temporalio/common';
import { composeInterceptors } from '@temporalio/common/lib/interceptors';
import { checkExtends } from '@temporalio/common/lib/type-helpers';
import type { coresdk } from '@temporalio/proto';
import type { coresdk, temporal } from '@temporalio/proto';
import { alea, RNG } from './alea';
import { RootCancellationScope } from './cancellation-scope';
import { DeterminismViolationError, LocalActivityDoBackoff, isCancellation } from './errors';
Expand Down Expand Up @@ -125,12 +124,12 @@ export class Activator implements ActivationHandler {
/**
* Mapping of update name to handler and validator
*/
readonly updateHandlers = new Map<string, { handler: WorkflowUpdateType; validator?: WorkflowUpdateValidatorType }>();
readonly updateHandlers = new Map<string, WorkflowUpdateAnnotatedType>();

/**
* Mapping of signal name to handler
*/
readonly signalHandlers = new Map<string, WorkflowSignalType>();
readonly signalHandlers = new Map<string, WorkflowSignalAnnotatedType>();

/**
* A signal handler that catches calls for non-registered signal names.
Expand All @@ -157,38 +156,74 @@ export class Activator implements ActivationHandler {
/**
* Mapping of query name to handler
*/
public readonly queryHandlers = new Map<string, WorkflowQueryType>([
public readonly queryHandlers = new Map<string, WorkflowQueryAnnotatedType>([
[
'__stack_trace',
() => {
return this.getStackTraces()
.map((s) => s.formatted)
.join('\n\n');
{
handler: () => {
return this.getStackTraces()
.map((s) => s.formatted)
.join('\n\n');
},
description: 'Returns a sensible stack trace.',
},
],
[
'__enhanced_stack_trace',
(): EnhancedStackTrace => {
const { sourceMap } = this;
const sdk: SDKInfo = { name: 'typescript', version: pkg.version };
const stacks = this.getStackTraces().map(({ structured: locations }) => ({ locations }));
const sources: Record<string, FileSlice[]> = {};
if (this.showStackTraceSources) {
for (const { locations } of stacks) {
for (const { filePath } of locations) {
if (!filePath) continue;
const content = sourceMap?.sourcesContent?.[sourceMap?.sources.indexOf(filePath)];
if (!content) continue;
sources[filePath] = [
{
content,
lineOffset: 0,
},
];
{
handler: (): EnhancedStackTrace => {
const { sourceMap } = this;
const sdk: SDKInfo = { name: 'typescript', version: pkg.version };
const stacks = this.getStackTraces().map(({ structured: locations }) => ({ locations }));
const sources: Record<string, FileSlice[]> = {};
if (this.showStackTraceSources) {
for (const { locations } of stacks) {
for (const { filePath } of locations) {
if (!filePath) continue;
const content = sourceMap?.sourcesContent?.[sourceMap?.sources.indexOf(filePath)];
if (!content) continue;
sources[filePath] = [
{
content,
lineOffset: 0,
},
];
}
}
}
}
return { sdk, stacks, sources };
return { sdk, stacks, sources };
},
description: 'Returns a stack trace annotated with source information.',
},
],
[
'__temporal_workflow_metadata',
{
handler: (): temporal.api.sdk.v1.IWorkflowMetadata => {
const workflowType = this.info.workflowType;
const queryDefinitions = Array.from(this.queryHandlers.entries()).map(([name, value]) => ({
name,
description: value.description,
}));
const signalDefinitions = Array.from(this.signalHandlers.entries()).map(([name, value]) => ({
name,
description: value.description,
}));
const updateDefinitions = Array.from(this.updateHandlers.entries()).map(([name, value]) => ({
name,
description: value.description,
}));
return {
definition: {
type: workflowType,
description: null, // For now, do not set the workflow description in the TS SDK.
queryDefinitions,
signalDefinitions,
updateDefinitions,
},
};
},
description: 'Returns metadata associated with this workflow.',
},
],
]);
Expand Down Expand Up @@ -491,7 +526,7 @@ export class Activator implements ActivationHandler {

// Intentionally non-async function so this handler doesn't show up in the stack trace
protected queryWorkflowNextHandler({ queryName, args }: QueryInput): Promise<unknown> {
const fn = this.queryHandlers.get(queryName);
const fn = this.queryHandlers.get(queryName)?.handler;
if (fn === undefined) {
const knownQueryTypes = [...this.queryHandlers.keys()].join(' ');
// Fail the query
Expand Down Expand Up @@ -662,7 +697,7 @@ export class Activator implements ActivationHandler {
}

public async signalWorkflowNextHandler({ signalName, args }: SignalInput): Promise<void> {
const fn = this.signalHandlers.get(signalName);
const fn = this.signalHandlers.get(signalName)?.handler;
if (fn) {
return await fn(...args);
} else if (this.defaultSignalHandler) {
Expand Down
35 changes: 26 additions & 9 deletions packages/workflow/src/workflow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import {
import { versioningIntentToProto } from '@temporalio/common/lib/versioning-intent-enum';
import { Duration, msOptionalToTs, msToNumber, msToTs, tsToMs } from '@temporalio/common/lib/time';
import { composeInterceptors } from '@temporalio/common/lib/interceptors';
import { temporal } from '@temporalio/proto';
import { CancellationScope, registerSleepImplementation } from './cancellation-scope';
import {
ActivityInput,
Expand All @@ -39,7 +40,9 @@ import {
DefaultSignalHandler,
EnhancedStackTrace,
Handler,
UpdateValidator,
QueryHandlerOptions,
SignalHandlerOptions,
UpdateHandlerOptions,
WorkflowInfo,
} from './interfaces';
import { LocalActivityDoBackoff } from './errors';
Expand Down Expand Up @@ -1143,15 +1146,22 @@ export function defineQuery<Ret, Args extends any[] = [], Name extends string =
*
* @param def an {@link UpdateDefinition}, {@link SignalDefinition}, or {@link QueryDefinition} as returned by {@link defineUpdate}, {@link defineSignal}, or {@link defineQuery} respectively.
* @param handler a compatible handler function for the given definition or `undefined` to unset the handler.
* @param options an optional `description` of the handler and an optional update `validator` function.
*/
export function setHandler<Ret, Args extends any[], T extends SignalDefinition<Args> | QueryDefinition<Ret, Args>>(
export function setHandler<Ret, Args extends any[], T extends QueryDefinition<Ret, Args>>(
def: T,
handler: Handler<Ret, Args, T> | undefined
handler: Handler<Ret, Args, T> | undefined,
options?: QueryHandlerOptions
): void;
export function setHandler<Ret, Args extends any[], T extends SignalDefinition<Args>>(
def: T,
handler: Handler<Ret, Args, T> | undefined,
options?: SignalHandlerOptions
): void;
export function setHandler<Ret, Args extends any[], T extends UpdateDefinition<Ret, Args>>(
def: T,
handler: Handler<Ret, Args, T> | undefined,
options?: { validator: UpdateValidator<Args> }
options?: UpdateHandlerOptions<Args>
): void;

// For Updates and Signals we want to make a public guarantee something like the
Expand Down Expand Up @@ -1240,12 +1250,18 @@ export function setHandler<
Ret,
Args extends any[],
T extends UpdateDefinition<Ret, Args> | SignalDefinition<Args> | QueryDefinition<Ret, Args>,
>(def: T, handler: Handler<Ret, Args, T> | undefined, options?: { validator: UpdateValidator<Args> }): void {
>(
def: T,
handler: Handler<Ret, Args, T> | undefined,
options?: QueryHandlerOptions | SignalHandlerOptions | UpdateHandlerOptions<Args>
): void {
const activator = assertInWorkflowContext('Workflow.setHandler(...) may only be used from a Workflow Execution.');
const description = options?.description;
if (def.type === 'update') {
if (typeof handler === 'function') {
const validator = options?.validator as WorkflowUpdateValidatorType | undefined;
activator.updateHandlers.set(def.name, { handler, validator });
const updateOptions = options as UpdateHandlerOptions<Args> | undefined;
const validator = updateOptions?.validator as WorkflowUpdateValidatorType | undefined;
activator.updateHandlers.set(def.name, { handler, validator, description });
activator.dispatchBufferedUpdates();
} else if (handler == null) {
activator.updateHandlers.delete(def.name);
Expand All @@ -1254,7 +1270,7 @@ export function setHandler<
}
} else if (def.type === 'signal') {
if (typeof handler === 'function') {
activator.signalHandlers.set(def.name, handler as any);
activator.signalHandlers.set(def.name, { handler: handler as any, description });
activator.dispatchBufferedSignals();
} else if (handler == null) {
activator.signalHandlers.delete(def.name);
Expand All @@ -1263,7 +1279,7 @@ export function setHandler<
}
} else if (def.type === 'query') {
if (typeof handler === 'function') {
activator.queryHandlers.set(def.name, handler as any);
activator.queryHandlers.set(def.name, { handler: handler as any, description });
} else if (handler == null) {
activator.queryHandlers.delete(def.name);
} else {
Expand Down Expand Up @@ -1354,3 +1370,4 @@ export function upsertSearchAttributes(searchAttributes: SearchAttributes): void

export const stackTraceQuery = defineQuery<string>('__stack_trace');
export const enhancedStackTraceQuery = defineQuery<EnhancedStackTrace>('__enhanced_stack_trace');
export const workflowMetadataQuery = defineQuery<temporal.api.sdk.v1.IWorkflowMetadata>('__temporal_workflow_metadata');