From 85a2e64b1f1a6642be05d7c6c596a917335f2fe5 Mon Sep 17 00:00:00 2001 From: Thomas Hardy Date: Wed, 18 Jun 2025 20:18:14 +0200 Subject: [PATCH 1/2] add activity reset logic --- .../client/src/async-completion-client.ts | 13 ++++ .../heartbeat-cancellation-details.ts | 2 +- packages/test/src/helpers-integration.ts | 23 +++++-- .../test/src/test-integration-workflows.ts | 67 ++++++++++++++----- packages/worker/src/activity.ts | 19 ++++-- packages/worker/src/worker.ts | 2 +- 6 files changed, 96 insertions(+), 30 deletions(-) diff --git a/packages/client/src/async-completion-client.ts b/packages/client/src/async-completion-client.ts index 8a481bcc2..2bb7414d6 100644 --- a/packages/client/src/async-completion-client.ts +++ b/packages/client/src/async-completion-client.ts @@ -42,6 +42,13 @@ export class ActivityCancelledError extends Error {} @SymbolBasedInstanceOfError('ActivityPausedError') export class ActivityPausedError extends Error {} +/** + * Thrown by {@link AsyncCompletionClient.heartbeat} when the reporting Activity + * has been reset. + */ +@SymbolBasedInstanceOfError('ActivityResetError') +export class ActivityResetError extends Error {} + /** * Options used to configure {@link AsyncCompletionClient} */ @@ -219,6 +226,7 @@ export class AsyncCompletionClient extends BaseClient { const payloads = await encodeToPayloads(this.dataConverter, details); let cancelRequested = false; let paused = false; + let reset = false; try { if (taskTokenOrFullActivityId instanceof Uint8Array) { const response = await this.workflowService.recordActivityTaskHeartbeat({ @@ -229,6 +237,7 @@ export class AsyncCompletionClient extends BaseClient { }); cancelRequested = !!response.cancelRequested; paused = !!response.activityPaused; + reset = !!response.activityReset; } else { const response = await this.workflowService.recordActivityTaskHeartbeatById({ identity: this.options.identity, @@ -238,6 +247,7 @@ export class AsyncCompletionClient extends BaseClient { }); cancelRequested = !!response.cancelRequested; paused = !!response.activityPaused; + reset = !!response.activityReset; } } catch (err) { this.handleError(err); @@ -245,6 +255,9 @@ export class AsyncCompletionClient extends BaseClient { if (cancelRequested) { throw new ActivityCancelledError('cancelled'); } + if (reset) { + throw new ActivityResetError('reset'); + } if (paused) { throw new ActivityPausedError('paused'); } diff --git a/packages/test/src/activities/heartbeat-cancellation-details.ts b/packages/test/src/activities/heartbeat-cancellation-details.ts index 9a7d7daf4..18a147f09 100644 --- a/packages/test/src/activities/heartbeat-cancellation-details.ts +++ b/packages/test/src/activities/heartbeat-cancellation-details.ts @@ -11,7 +11,7 @@ export async function heartbeatCancellationDetailsActivity( // eslint-disable-next-line no-constant-condition while (true) { try { - activity.heartbeat(); + activity.heartbeat('heartbeated'); await activity.sleep(300); } catch (err) { if (err instanceof activity.CancelledFailure && catchErr) { diff --git a/packages/test/src/helpers-integration.ts b/packages/test/src/helpers-integration.ts index 7ae6e53fc..a5e826cc3 100644 --- a/packages/test/src/helpers-integration.ts +++ b/packages/test/src/helpers-integration.ts @@ -303,7 +303,11 @@ export async function assertPendingActivityExistsEventually( return activityInfo as temporal.api.workflow.v1.IPendingActivityInfo; } -export async function setActivityPauseState(handle: WorkflowHandle, activityId: string, pause: boolean): Promise { +export async function setActivityState( + handle: WorkflowHandle, + activityId: string, + state: 'pause' | 'unpause' | 'reset' +): Promise { const desc = await handle.describe(); const req = { namespace: handle.client.options.namespace, @@ -313,17 +317,24 @@ export async function setActivityPauseState(handle: WorkflowHandle, activityId: }, id: activityId, }; - if (pause) { + if (state === 'pause') { await handle.client.workflowService.pauseActivity(req); - } else { + } else if (state === 'unpause') { await handle.client.workflowService.unpauseActivity(req); + } else { + const resetReq = { ...req, resetHeartbeat: true }; + await handle.client.workflowService.resetActivity(resetReq); } await waitUntil(async () => { const info = await assertPendingActivityExistsEventually(handle, activityId, 10000); - if (pause) { - return info.paused ?? false; + if (state === 'pause') { + return info.paused === true; + } else if (state === 'unpause') { + return info.paused === false; + } else { + // Heartbeat details reset + return info.heartbeatDetails === null; } - return !info.paused; }, 10000); } diff --git a/packages/test/src/test-integration-workflows.ts b/packages/test/src/test-integration-workflows.ts index 2ca7dc4a7..2cd2b24c6 100644 --- a/packages/test/src/test-integration-workflows.ts +++ b/packages/test/src/test-integration-workflows.ts @@ -29,7 +29,7 @@ import { createLocalTestEnvironment, helpers, makeTestFunction, - setActivityPauseState, + setActivityState, } from './helpers-integration'; import { overrideSdkInternalFlag } from './mock-internal-flags'; import { asSdkLoggerSink, loadHistory, RUN_TIME_SKIPPING_TESTS, waitUntil } from './helpers'; @@ -1424,7 +1424,7 @@ test('Workflow can return root workflow', async (t) => { }); }); -export async function heartbeatPauseWorkflow( +export async function heartbeatCancellationDetailsWorkflow( activityId: string, catchErr: boolean, maximumAttempts: number @@ -1464,14 +1464,14 @@ test('Activity pause returns expected cancellation details', async (t) => { await worker.runUntil(async () => { const testActivityId = randomUUID(); - const handle = await startWorkflow(heartbeatPauseWorkflow, { args: [testActivityId, true, 1] }); + const handle = await startWorkflow(heartbeatCancellationDetailsWorkflow, { args: [testActivityId, true, 1] }); const activityInfo = await assertPendingActivityExistsEventually(handle, testActivityId, 5000); t.true(activityInfo.paused === false); - await setActivityPauseState(handle, testActivityId, true); + await setActivityState(handle, testActivityId, 'pause'); const activityInfo2 = await assertPendingActivityExistsEventually(handle, `${testActivityId}-2`, 5000); t.true(activityInfo2.paused === false); - await setActivityPauseState(handle, `${testActivityId}-2`, true); + await setActivityState(handle, `${testActivityId}-2`, 'pause'); const result = await handle.result(); t.deepEqual(result[0], { cancelRequested: false, @@ -1494,12 +1494,12 @@ test('Activity pause returns expected cancellation details', async (t) => { test('Activity can pause and unpause', async (t) => { const { createWorker, startWorkflow } = helpers(t); - async function checkHeartbeatDetailsExist(handle: WorkflowHandle, activityId: string) { + async function checkHeartbeatDetailsExist(handle: WorkflowHandle, activityId: string, expectedDetails: string) { const activityInfo = await assertPendingActivityExistsEventually(handle, activityId, 5000); if (activityInfo.heartbeatDetails?.payloads) { for (const payload of activityInfo.heartbeatDetails?.payloads || []) { - if (payload.data && payload.data?.length > 0) { - return true; + if (payload.data != null) { + return workflow.defaultPayloadConverter.fromPayload(payload) === expectedDetails; } } } @@ -1515,24 +1515,61 @@ test('Activity can pause and unpause', async (t) => { await worker.runUntil(async () => { const testActivityId = randomUUID(); - const handle = await startWorkflow(heartbeatPauseWorkflow, { args: [testActivityId, false, 2] }); + const handle = await startWorkflow(heartbeatCancellationDetailsWorkflow, { args: [testActivityId, false, 2] }); const activityInfo = await assertPendingActivityExistsEventually(handle, testActivityId, 5000); t.true(activityInfo.paused === false); - await setActivityPauseState(handle, testActivityId, true); + await setActivityState(handle, testActivityId, 'pause'); await waitUntil(async () => { - return await checkHeartbeatDetailsExist(handle, testActivityId); + return await checkHeartbeatDetailsExist(handle, testActivityId, 'finally-complete'); }, 5000); - await setActivityPauseState(handle, testActivityId, false); + await setActivityState(handle, testActivityId, 'unpause'); const activityInfo2 = await assertPendingActivityExistsEventually(handle, `${testActivityId}-2`, 5000); t.true(activityInfo2.paused === false); - await setActivityPauseState(handle, `${testActivityId}-2`, true); + await setActivityState(handle, `${testActivityId}-2`, 'pause'); await waitUntil(async () => { - return await checkHeartbeatDetailsExist(handle, `${testActivityId}-2`); + return await checkHeartbeatDetailsExist(handle, `${testActivityId}-2`, 'finally-complete'); }, 5000); - await setActivityPauseState(handle, `${testActivityId}-2`, false); + await setActivityState(handle, `${testActivityId}-2`, 'unpause'); const result = await handle.result(); // Undefined values are converted to null by data converter. t.true(result[0] === null); t.true(result[1] === null); }); }); + +test('Activity reset returns expected cancellation details', async (t) => { + const { createWorker, startWorkflow } = helpers(t); + const worker = await createWorker({ + activities: { + heartbeatCancellationDetailsActivity, + heartbeatCancellationDetailsActivity2: heartbeatCancellationDetailsActivity, + }, + }); + + await worker.runUntil(async () => { + const testActivityId = randomUUID(); + const handle = await startWorkflow(heartbeatCancellationDetailsWorkflow, { args: [testActivityId, true, 2] }); + + await assertPendingActivityExistsEventually(handle, testActivityId, 5000); + await setActivityState(handle, testActivityId, 'reset'); + await assertPendingActivityExistsEventually(handle, `${testActivityId}-2`, 10000); + await setActivityState(handle, `${testActivityId}-2`, 'reset'); + const result = await handle.result(); + t.deepEqual(result[0], { + cancelRequested: false, + notFound: false, + paused: false, + timedOut: false, + workerShutdown: false, + reset: true, + }); + t.deepEqual(result[1], { + cancelRequested: false, + notFound: false, + paused: false, + timedOut: false, + workerShutdown: false, + reset: true, + }); + }); +}); diff --git a/packages/worker/src/activity.ts b/packages/worker/src/activity.ts index 93056fece..6154f06f3 100644 --- a/packages/worker/src/activity.ts +++ b/packages/worker/src/activity.ts @@ -143,7 +143,9 @@ export class Activity { (error instanceof CancelledFailure || isAbortError(error)) && this.context.cancellationSignal.aborted ) { - if (this.context.cancellationDetails.details?.paused) { + if (this.context.cancellationDetails.details?.reset) { + this.workerLogger.debug('Activity reset', { durationMs }); + } else if (this.context.cancellationDetails.details?.paused) { this.workerLogger.debug('Activity paused', { durationMs }); } else { this.workerLogger.debug('Activity completed as cancelled', { durationMs }); @@ -180,14 +182,17 @@ export class Activity { } else if (this.cancelReason) { // Either a CancelledFailure that we threw or AbortError from AbortController if (err instanceof CancelledFailure) { - // If cancel due to activity pause, emit an application failure for the pause. - if (this.context.cancellationDetails.details?.paused) { + // If cancel due to activity pause or reset, emit an application failure. + if (this.context.cancellationDetails.details?.paused || this.context.cancellationDetails.details?.reset) { + let message = 'Activity reset'; + let errType = 'ActivityReset'; + if (!this.context.cancellationDetails.details?.reset) { + message = 'Activity paused'; + errType = 'ActivityPause'; + } return { failed: { - failure: await encodeErrorToFailure( - this.dataConverter, - new ApplicationFailure('Activity paused', 'ActivityPause') - ), + failure: await encodeErrorToFailure(this.dataConverter, new ApplicationFailure(message, errType)), }, }; } else { diff --git a/packages/worker/src/worker.ts b/packages/worker/src/worker.ts index 04d694321..8d90657f4 100644 --- a/packages/worker/src/worker.ts +++ b/packages/worker/src/worker.ts @@ -990,7 +990,7 @@ export class Worker { details, onError() { // activity must be defined - // empty cancellation details, not corresponding detail for heartbeat detail conversion failure + // empty cancellation details, no corresponding detail for heartbeat detail conversion failure activity?.cancel( 'HEARTBEAT_DETAILS_CONVERSION_FAILED', ActivityCancellationDetails.fromProto(undefined) From e0d04038d142a3daafeaa429ff3ff6db9a6c3794 Mon Sep 17 00:00:00 2001 From: Thomas Hardy Date: Wed, 18 Jun 2025 20:23:20 +0200 Subject: [PATCH 2/2] remove spurious console.log usage --- packages/test/src/test-integration-split-three.ts | 2 -- 1 file changed, 2 deletions(-) diff --git a/packages/test/src/test-integration-split-three.ts b/packages/test/src/test-integration-split-three.ts index 5539a5765..6d9b93a3a 100644 --- a/packages/test/src/test-integration-split-three.ts +++ b/packages/test/src/test-integration-split-three.ts @@ -156,7 +156,6 @@ test( await worker.runUntil(handle.result()); let firstChild = true; const history = await handle.fetchHistory(); - console.log('events'); for (const event of history?.events ?? []) { switch (event.eventType) { case temporal.api.enums.v1.EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED: @@ -184,7 +183,6 @@ test('workflow start without priorities sees undefined for the key', configMacro const { env, createWorkerWithDefaults } = config; const { startWorkflow } = configurableHelpers(t, t.context.workflowBundle, env); const worker = await createWorkerWithDefaults(t, { activities }); - console.log('STARTING WORKFLOW'); const handle1 = await startWorkflow(workflows.priorityWorkflow, { args: [true, undefined],