Skip to content

Nexus: worker, workflow-backed operations, and workflow caller #813

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 177 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
177 commits
Select commit Hold shift + click to select a range
94667cf
Nexus
dandavison Apr 19, 2025
e97acf1
Nexus workflow caller
dandavison Jun 10, 2025
bba27cd
Nexus: squashed commit
dandavison Jun 12, 2025
7692510
Option 1 for workflow_run_operation_handler
dandavison Jun 21, 2025
54dc188
Revert "Option 1 for workflow_run_operation_handler"
dandavison Jun 21, 2025
75cb096
Adjust imports
dandavison Jun 21, 2025
31ce1fc
Revert "Adjust imports"
dandavison Jun 21, 2025
8ce108f
TODO
dandavison Jun 21, 2025
ecc0876
Option 2 for workflow_run_operation_handler
dandavison Jun 21, 2025
d5c1184
Failing test: first of two workflows incorrectly delivers result
dandavison Jun 22, 2025
20e82fc
WIP: Option 3 for workflow_run_operation_handler
dandavison Jun 22, 2025
26543ae
TemporalNexusOperationContext should not be an ABC
dandavison Jun 22, 2025
e512834
Remove unused output_type
dandavison Jun 22, 2025
80cf8aa
Cleanup
dandavison Jun 22, 2025
32b1604
Make WorkflowOperationToken generic, parameterized by output type
dandavison Jun 22, 2025
e1b996d
Option 4 for WorkflowRunOperationHandler
dandavison Jun 22, 2025
3f3ab83
Cleanup
dandavison Jun 22, 2025
2e3181e
Refactor test
dandavison Jun 22, 2025
8aa937f
Failing test: request ID is not used for non-backing workflow
dandavison Jun 22, 2025
93d047b
Bug fix: wire request_id through as top-level start_workflow param
dandavison Jun 22, 2025
2b77eb1
Rename: TemporalOperationContext
dandavison Jun 22, 2025
a7fc543
Rename: cancel_operation
dandavison Jun 22, 2025
dd7ecff
Cleanup
dandavison Jun 22, 2025
95f8b4a
Do not allow Nexus operation to set client used for starting workflow
dandavison Jun 22, 2025
6d6887a
Make task queue optional when starting workflows
dandavison Jun 22, 2025
b0d7a60
Add nexus_task_poller_behavior
dandavison Jun 22, 2025
0322767
Handle PollShutdownError
dandavison Jun 22, 2025
8c2de6e
Respond to upstream: default to async
dandavison Jun 23, 2025
7bc3692
Cleanup; changes from review comments
dandavison Jun 22, 2025
a24be8d
Respond to upstream: handler factory instead of sync_operation_handler
dandavison Jun 23, 2025
3549629
Switch workflow_run_operation_handler to standard factory
dandavison Jun 23, 2025
fdb3e37
Do not support passing client to cancel_operation
dandavison Jun 23, 2025
c79bde5
RTU: bridge Rust
dandavison Jun 23, 2025
b91207e
Fix: make all methods `async def` on WorkflowRunOperationHandler
dandavison Jun 24, 2025
60fcee3
Get rid of TypeGuard
dandavison Jun 24, 2025
3715460
Support passing result_type when getting workflow handle from token
dandavison Jun 24, 2025
2b5debc
Implement fetch_result handler
dandavison Jun 24, 2025
b3ddaf9
Cleanup
dandavison Jun 24, 2025
e04218f
Tests: clean up type annotation warnings
dandavison Jun 24, 2025
9e99ca7
Improve type annotation warnings
dandavison Jun 24, 2025
8046b97
Cleanup
dandavison Jun 24, 2025
4ceba6d
Import nexus.handler.logger in worker
dandavison Jun 24, 2025
7bd6bb5
Do not issue warnings when user is not using type annotations
dandavison Jun 24, 2025
f958627
Remove redundant validation
dandavison Jun 24, 2025
6fcf72f
Respond to code review comments
dandavison Jun 24, 2025
3ed5174
Don't swallow exceptions when encoding failures
dandavison Jun 24, 2025
7fcd501
Catch BaseException at top-level in worker
dandavison Jun 24, 2025
04ce78d
Fail worker on broken executor
dandavison Jun 24, 2025
7ec57ff
Revert "Catch BaseException at top-level in worker"
dandavison Jun 24, 2025
7301307
Cleanup
dandavison Jun 24, 2025
4cbb09a
Change context method name: .current() -> .get()
dandavison Jun 24, 2025
1b84f11
Rename: TemporalNexusOperationContext
dandavison Jun 24, 2025
bdfc019
Expose contextvar object directly
dandavison Jun 24, 2025
2b1dece
Mark methods as private
dandavison Jun 24, 2025
086efa5
Add run-time type check
dandavison Jun 24, 2025
b6dfb96
Make start_workflow a static function
dandavison Jun 24, 2025
93e0775
Remove accidental exports
dandavison Jun 24, 2025
29344ad
Docstrings
dandavison Jun 24, 2025
60e4668
Comment, cleanup
dandavison Jun 24, 2025
e79f222
TODO
dandavison Jun 24, 2025
c7b0170
TODOs
dandavison Jun 24, 2025
d731ac2
Get rid of spurious type parameters
dandavison Jun 25, 2025
8755353
Add worker logging
dandavison Jun 25, 2025
a1a3df6
Type-level enforcement of the two ways to use WorkflowRunOperationHan…
dandavison Jun 25, 2025
1db7ff0
Respond to upstream: SyncOperation.from_callable
dandavison Jun 25, 2025
07c6d39
-> WorkflowRunOperation.from_callable()
dandavison Jun 25, 2025
2616755
TODO
dandavison Jun 25, 2025
27d7e41
Parameterize workflow_run_operation tests
dandavison Jun 25, 2025
c0cf503
Failing test case
dandavison Jun 25, 2025
efb9df5
Test: clean up imports
dandavison Jun 25, 2025
400260d
Respond to upstream: sync_operation_handler
dandavison Jun 25, 2025
7355554
New workflow_run_operation_handler
dandavison Jun 25, 2025
ec31690
Delete reference to obsolete __nexus_service_metadata__
dandavison Jun 26, 2025
7f9c144
TODO
dandavison Jun 26, 2025
602d412
Use get_callable_name utility
dandavison Jun 26, 2025
ec1f05a
Fix test: 'not an async def` message changed
dandavison Jun 26, 2025
c6a8e32
Refactor
dandavison Jun 26, 2025
63d19b2
Reorganize: temporalio.nexus.handler -> temporalo.nexus
dandavison Jun 26, 2025
b0c1180
Fix signatures of start_method on workflow caller side
dandavison Jun 26, 2025
9ab2d19
`from temporalio import nexus` everywhere
dandavison Jun 26, 2025
19968d7
Qualify client.WorkflowHandle in temporalio.nexus
dandavison Jun 26, 2025
fb3238b
Fixup: no coverage for these
dandavison Jun 26, 2025
0cc4359
Rename: nexus.WorkflowHandle
dandavison Jun 26, 2025
0c1982b
nexus.WorkflowHandle.{to,from}_token()
dandavison Jun 26, 2025
fa0344b
Respond to upstream: sync_operation_handler -> sync_operation
dandavison Jun 26, 2025
f1bd90d
workflow_run_operation_handler -> workflow_run_operation
dandavison Jun 26, 2025
3526b89
from nexus import workflow_run_operation
dandavison Jun 26, 2025
29f11ca
Respond to upstream: operation_handler is not in the public API
dandavison Jun 26, 2025
97f1d48
New nexus operation context API
dandavison Jun 26, 2025
009faca
Fix broken test
dandavison Jun 26, 2025
844a9c3
Fix another test
dandavison Jun 26, 2025
af00209
Move Failure tests utility
dandavison Jun 26, 2025
914b35e
Fix test
dandavison Jun 26, 2025
16432f7
RTU: relocate OperationError
dandavison Jun 26, 2025
0f3b85e
Copy get_types utility from nexusrpc
dandavison Jun 26, 2025
686f156
Fixup: eliminate references to WorkflowOperationToken
dandavison Jun 26, 2025
33f4f82
Move start_workflow to WorkflowRunOperationContext
dandavison Jun 26, 2025
39f220c
Move WorkflowRunOperationContext to _operation_context module
dandavison Jun 27, 2025
6ce70ba
Wire through additional context type in union
dandavison Jun 26, 2025
f723602
Eliminate unnecessary modeling of callable types
dandavison Jun 26, 2025
ddaee4f
Fix passing Nexus context headers/request ID from worker
dandavison Jun 26, 2025
fc285e0
Always passthrough nexusrpc
dandavison Jun 26, 2025
bd88867
Revert disabling of sandbox for nexus workflow tests
dandavison Jun 26, 2025
d153d94
Passthrough 3rd-party imports in tests helpers module
dandavison Jun 26, 2025
7358f00
uv.lock
dandavison Jun 26, 2025
2f160aa
Strengthen warning note
dandavison Jun 26, 2025
7e53850
Docstrings, comments
dandavison Jun 26, 2025
15beaff
Type-level cleanup/evolution in workflow caller
dandavison Jun 26, 2025
132f693
TODOs
dandavison Jun 27, 2025
d057268
Move logger
dandavison Jun 27, 2025
0ec14d8
Separate Temporal context for each operation verb
dandavison Jun 27, 2025
b1b9ea3
Make Temporal context classes non-private
dandavison Jun 27, 2025
2ba2bc1
Use TemporalStartOperationContext instead of WorkflowRunOperationContext
dandavison Jun 27, 2025
f36c215
Revert "Use TemporalStartOperationContext instead of WorkflowRunOpera…
dandavison Jun 27, 2025
38c1c57
Make WorkflowRunOperationContext subclass StartOperationContext
dandavison Jun 27, 2025
3320f28
Mark TemporalStartOperationContext as private
dandavison Jun 27, 2025
5e563c0
Handle OperationError consistently with HandlerError
dandavison Jun 27, 2025
a6e9777
RTU: operation_id -> operation_token
dandavison Jun 27, 2025
72d14df
Fix cancellation context bug
dandavison Jun 27, 2025
db09733
RTU: Use nexusrpc.get_service_definition
dandavison Jun 28, 2025
e721f55
Docstring
dandavison Jun 29, 2025
ca5f572
RTU get_operation_factory
dandavison Jun 29, 2025
e7091ac
Workflow OperationError / HandlerError test
dandavison Jun 28, 2025
a9bac66
Convert nexus_handler_failure_info as nexusrpc.HandlerError
dandavison Jun 29, 2025
bf2a02d
RTU: Move HandlerError to root module
dandavison Jun 29, 2025
9b6f836
RTU: test is fixed by syncio.sync_operation
dandavison Jun 29, 2025
f53a783
RTU: unskip test
dandavison Jun 29, 2025
e30bba2
RTU: syncio tree
dandavison Jun 29, 2025
b3de2ef
Don't pass cause to HandlerError constructor
dandavison Jun 29, 2025
54a0a86
RTU: registration time enforcement of syncio/asyncio mistakes
dandavison Jun 30, 2025
332658d
WIP
dandavison Jun 30, 2025
0bba35e
RTU: Copy operation factory getter/setter from nexusrpc
dandavison Jun 30, 2025
3ee29e7
Use getters/setters
dandavison Jun 30, 2025
5be55aa
Move no-type-annotations test to invalid usage test
dandavison Jun 30, 2025
109c976
Remove operations without type annotations
dandavison Jul 1, 2025
ad42e67
Split test
dandavison Jul 1, 2025
5a5d9c6
Test operations without type annotations
dandavison Jul 1, 2025
86a9a61
Delete redundant test
dandavison Jul 1, 2025
caef930
Delete failing callable instance test
dandavison Jul 1, 2025
b288eaa
Test error conversion
dandavison Jul 1, 2025
6c08c80
Translating Java assertions
dandavison Jul 1, 2025
d5043d7
Update test
dandavison Jul 1, 2025
e4141da
Corrected Java output
dandavison Jul 2, 2025
b4fcd07
Update test assertions
dandavison Jul 2, 2025
b51063b
Add timeout test
dandavison Jul 2, 2025
7c5f107
Install the Nexus SDK from GitHub
dandavison Jul 2, 2025
638a121
Update error tests
dandavison Jul 2, 2025
c6dbb52
Edit TODOs
dandavison Jul 2, 2025
0fc88c0
Make a pass through prerelease TODOs
dandavison Jul 2, 2025
2ca30ff
Revert change to callable types
dandavison Jul 2, 2025
b3146ea
Test start_workflow overloads
dandavison Jul 2, 2025
ac3c96e
Add additional overloads
dandavison Jul 3, 2025
de2aa48
string name workflow
dandavison Jul 3, 2025
915fde1
Use a dataclass
dandavison Jul 3, 2025
4462e3a
More overloads
dandavison Jul 3, 2025
4de6b48
Fix mypy failures
dandavison Jul 3, 2025
38a50fa
Revert "Convert nexus_handler_failure_info as nexusrpc.HandlerError"
dandavison Jul 3, 2025
0fb7837
Cleanup error test
dandavison Jul 3, 2025
7656a02
Revert "Delete redundant test"
dandavison Jul 3, 2025
075ec7c
Evolve context API
dandavison Jul 3, 2025
6c1e894
Rename as temporalio.nexus.cancel_workflow
dandavison Jul 3, 2025
d5fce2a
Fix test
dandavison Jul 3, 2025
8f3681b
Cleanup
dandavison Jul 3, 2025
8117ea1
Impprovements from code review comments
dandavison Jul 3, 2025
7b62955
Expose nexus.LoggerAdapter
dandavison Jul 3, 2025
4e34462
Add outbound links for sync responses also
dandavison Jul 3, 2025
8889dde
Don't expose separate workflow.start_nexus_operation
dandavison Jul 3, 2025
41b9709
Remove unnecessary type hint
dandavison Jul 3, 2025
c2d4825
New Nexus client constructor
dandavison Jul 3, 2025
a4fd205
Remove unused test helper methods
dandavison Jul 3, 2025
a5f67d2
Clean up token type
dandavison Jul 3, 2025
f7f8a51
Refactor start timeout test
dandavison Jul 3, 2025
01960ed
Cancellation timeout test
dandavison Jul 3, 2025
5103f80
Create running_task for cancellation op handler
dandavison Jul 3, 2025
03513b5
Test creation of worker from ServiceHandler instances
dandavison Jul 3, 2025
b6bcd6c
Test creation of worker from programmatically-created ServiceHandler
dandavison Jul 3, 2025
a840316
Reapply "Convert nexus_handler_failure_info as nexusrpc.HandlerError"
dandavison Jul 3, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ informal introduction to the features and their implementation.
- [Heartbeating and Cancellation](#heartbeating-and-cancellation)
- [Worker Shutdown](#worker-shutdown)
- [Testing](#testing-1)
- [Nexus](#nexus)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not see this section

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bump, we should add this section showing how to make simple Nexus Python operation and how to call Nexus operations from workflows.

- [Workflow Replay](#workflow-replay)
- [Observability](#observability)
- [Metrics](#metrics)
Expand Down Expand Up @@ -1308,6 +1309,7 @@ affect calls activity code might make to functions on the `temporalio.activity`
* `cancel()` can be invoked to simulate a cancellation of the activity
* `worker_shutdown()` can be invoked to simulate a worker shutdown during execution of the activity


### Workflow Replay

Given a workflow's history, it can be replayed locally to check for things like non-determinism errors. For example,
Expand Down
13 changes: 12 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ keywords = [
"workflow",
]
dependencies = [
"nexus-rpc",
"protobuf>=3.20,<6",
"python-dateutil>=2.8.2,<3 ; python_version < '3.11'",
"types-protobuf>=3.20",
Expand Down Expand Up @@ -44,7 +45,7 @@ dev = [
"psutil>=5.9.3,<6",
"pydocstyle>=6.3.0,<7",
"pydoctor>=24.11.1,<25",
"pyright==1.1.377",
"pyright==1.1.400",
"pytest~=7.4",
"pytest-asyncio>=0.21,<0.22",
"pytest-timeout~=2.2",
Expand All @@ -53,6 +54,8 @@ dev = [
"twine>=4.0.1,<5",
"ruff>=0.5.0,<0.6",
"maturin>=1.8.2",
"pytest-cov>=6.1.1",
"httpx>=0.28.1",
"pytest-pretty>=1.3.0",
]

Expand Down Expand Up @@ -162,6 +165,7 @@ exclude = [
"tests/worker/workflow_sandbox/testmodules/proto",
"temporalio/bridge/worker.py",
"temporalio/contrib/opentelemetry.py",
"temporalio/contrib/pydantic.py",
"temporalio/converter.py",
"temporalio/testing/_workflow.py",
"temporalio/worker/_activity.py",
Expand All @@ -173,6 +177,10 @@ exclude = [
"tests/api/test_grpc_stub.py",
"tests/conftest.py",
"tests/contrib/test_opentelemetry.py",
"tests/contrib/pydantic/models.py",
"tests/contrib/pydantic/models_2.py",
"tests/contrib/pydantic/test_pydantic.py",
"tests/contrib/pydantic/workflows.py",
"tests/test_converter.py",
"tests/test_service.py",
"tests/test_workflow.py",
Expand Down Expand Up @@ -208,3 +216,6 @@ exclude = [
[tool.uv]
# Prevent uv commands from building the package by default
package = false

[tool.uv.sources]
nexus-rpc = { git = "https://github.com/nexus-rpc/sdk-python" }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be removed before merging, right? We need a release of the Nexus SDK to pypi.

32 changes: 31 additions & 1 deletion temporalio/bridge/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use temporal_sdk_core_api::worker::{
};
use temporal_sdk_core_api::Worker;
use temporal_sdk_core_protos::coresdk::workflow_completion::WorkflowActivationCompletion;
use temporal_sdk_core_protos::coresdk::{ActivityHeartbeat, ActivityTaskCompletion};
use temporal_sdk_core_protos::coresdk::{ActivityHeartbeat, ActivityTaskCompletion, nexus::NexusTaskCompletion};
use temporal_sdk_core_protos::temporal::api::history::v1::History;
use tokio::sync::mpsc::{channel, Sender};
use tokio_stream::wrappers::ReceiverStream;
Expand Down Expand Up @@ -60,6 +60,7 @@ pub struct WorkerConfig {
graceful_shutdown_period_millis: u64,
nondeterminism_as_workflow_fail: bool,
nondeterminism_as_workflow_fail_for_types: HashSet<String>,
nexus_task_poller_behavior: PollerBehavior,
}

#[derive(FromPyObject)]
Expand Down Expand Up @@ -565,6 +566,18 @@ impl WorkerRef {
})
}

fn poll_nexus_task<'p>(&self, py: Python<'p>) -> PyResult<Bound<'p, PyAny>> {
let worker = self.worker.as_ref().unwrap().clone();
self.runtime.future_into_py(py, async move {
let bytes = match worker.poll_nexus_task().await {
Ok(task) => task.encode_to_vec(),
Err(PollError::ShutDown) => return Err(PollShutdownError::new_err(())),
Err(err) => return Err(PyRuntimeError::new_err(format!("Poll failure: {}", err))),
};
Ok(bytes)
})
}

fn complete_workflow_activation<'p>(
&self,
py: Python<'p>,
Expand Down Expand Up @@ -599,6 +612,22 @@ impl WorkerRef {
})
}

fn complete_nexus_task<'p>(&self,
py: Python<'p>,
proto: &Bound<'_, PyBytes>,
) -> PyResult<Bound<'p, PyAny>> {
let worker = self.worker.as_ref().unwrap().clone();
let completion = NexusTaskCompletion::decode(proto.as_bytes())
.map_err(|err| PyValueError::new_err(format!("Invalid proto: {}", err)))?;
self.runtime.future_into_py(py, async move {
worker
.complete_nexus_task(completion)
.await
.context("Completion failure")
.map_err(Into::into)
})
}

fn record_activity_heartbeat(&self, proto: &Bound<'_, PyBytes>) -> PyResult<()> {
enter_sync!(self.runtime);
let heartbeat = ActivityHeartbeat::decode(proto.as_bytes())
Expand Down Expand Up @@ -696,6 +725,7 @@ fn convert_worker_config(
})
.collect::<HashMap<String, HashSet<WorkflowErrorType>>>(),
)
.nexus_task_poller_behavior(conf.nexus_task_poller_behavior)
.build()
.map_err(|err| PyValueError::new_err(format!("Invalid worker config: {err}")))
}
Expand Down
18 changes: 17 additions & 1 deletion temporalio/bridge/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import temporalio.bridge.client
import temporalio.bridge.proto
import temporalio.bridge.proto.activity_task
import temporalio.bridge.proto.nexus
import temporalio.bridge.proto.workflow_activation
import temporalio.bridge.proto.workflow_completion
import temporalio.bridge.runtime
Expand All @@ -35,7 +36,7 @@
from temporalio.bridge.temporal_sdk_bridge import (
CustomSlotSupplier as BridgeCustomSlotSupplier,
)
from temporalio.bridge.temporal_sdk_bridge import PollShutdownError
from temporalio.bridge.temporal_sdk_bridge import PollShutdownError # type: ignore


@dataclass
Expand All @@ -60,6 +61,7 @@ class WorkerConfig:
graceful_shutdown_period_millis: int
nondeterminism_as_workflow_fail: bool
nondeterminism_as_workflow_fail_for_types: Set[str]
nexus_task_poller_behavior: PollerBehavior


@dataclass
Expand Down Expand Up @@ -216,6 +218,14 @@ async def poll_activity_task(
await self._ref.poll_activity_task()
)

async def poll_nexus_task(
self,
) -> temporalio.bridge.proto.nexus.NexusTask:
"""Poll for a nexus task."""
return temporalio.bridge.proto.nexus.NexusTask.FromString(
await self._ref.poll_nexus_task()
)

async def complete_workflow_activation(
self,
comp: temporalio.bridge.proto.workflow_completion.WorkflowActivationCompletion,
Expand All @@ -229,6 +239,12 @@ async def complete_activity_task(
"""Complete an activity task."""
await self._ref.complete_activity_task(comp.SerializeToString())

async def complete_nexus_task(
self, comp: temporalio.bridge.proto.nexus.NexusTaskCompletion
) -> None:
"""Complete a nexus task."""
await self._ref.complete_nexus_task(comp.SerializeToString())

def record_activity_heartbeat(
self, comp: temporalio.bridge.proto.ActivityHeartbeat
) -> None:
Expand Down
53 changes: 50 additions & 3 deletions temporalio/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -464,9 +464,17 @@ async def start_workflow(
rpc_metadata: Mapping[str, str] = {},
rpc_timeout: Optional[timedelta] = None,
request_eager_start: bool = False,
stack_level: int = 2,
priority: temporalio.common.Priority = temporalio.common.Priority.default,
versioning_override: Optional[temporalio.common.VersioningOverride] = None,
# The following options should not be considered part of the public API. They
# are deliberately not exposed in overloads, and are not subject to any
# backwards compatibility guarantees.
nexus_completion_callbacks: Sequence[NexusCompletionCallback] = [],
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nexus callbacks are only one type of callback that we were planning on supporting. I wouldn't mention nexus in the name here.

workflow_event_links: Sequence[
temporalio.api.common.v1.Link.WorkflowEvent
] = [],
request_id: Optional[str] = None,
stack_level: int = 2,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are also missing the conflict options which are required for USE_EXISTING to make sense for workflow callers. Can be done in a followup PR since this was functionality that was added as a second step for Go and Java.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would for now throw if a user tries to set USE_EXISTING in the workflow run operation handler.

) -> WorkflowHandle[Any, Any]:
"""Start a workflow and return its handle.

Expand Down Expand Up @@ -529,7 +537,6 @@ async def start_workflow(
name, result_type_from_type_hint = (
temporalio.workflow._Definition.get_name_and_result_type(workflow)
)

return await self._impl.start_workflow(
StartWorkflowInput(
workflow=name,
Expand Down Expand Up @@ -557,6 +564,9 @@ async def start_workflow(
rpc_timeout=rpc_timeout,
request_eager_start=request_eager_start,
priority=priority,
nexus_completion_callbacks=nexus_completion_callbacks,
workflow_event_links=workflow_event_links,
request_id=request_id,
)
)

Expand Down Expand Up @@ -5193,6 +5203,10 @@ class StartWorkflowInput:
rpc_timeout: Optional[timedelta]
request_eager_start: bool
priority: temporalio.common.Priority
# The following options are experimental and unstable.
nexus_completion_callbacks: Sequence[NexusCompletionCallback]
workflow_event_links: Sequence[temporalio.api.common.v1.Link.WorkflowEvent]
request_id: Optional[str]
Comment on lines +5207 to +5209
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May want to mention here these are unstable/experimental

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May want to mention here these are unstable/experimental

Done

versioning_override: Optional[temporalio.common.VersioningOverride] = None


Expand Down Expand Up @@ -5807,8 +5821,26 @@ async def _build_start_workflow_execution_request(
self, input: StartWorkflowInput
) -> temporalio.api.workflowservice.v1.StartWorkflowExecutionRequest:
req = temporalio.api.workflowservice.v1.StartWorkflowExecutionRequest()
req.request_eager_execution = input.request_eager_start
await self._populate_start_workflow_execution_request(req, input)
# _populate_start_workflow_execution_request is used for both StartWorkflowInput
# and UpdateWithStartStartWorkflowInput. UpdateWithStartStartWorkflowInput does
# not have the following two fields so they are handled here.
req.request_eager_execution = input.request_eager_start
if input.request_id:
req.request_id = input.request_id

req.completion_callbacks.extend(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Put the links on the callbacks so they can be associated.

temporalio.api.common.v1.Callback(
nexus=temporalio.api.common.v1.Callback.Nexus(
url=callback.url, header=callback.header
)
)
for callback in input.nexus_completion_callbacks
)
req.links.extend(
temporalio.api.common.v1.Link(workflow_event=link)
for link in input.workflow_event_links
)
return req

async def _build_signal_with_start_workflow_execution_request(
Expand Down Expand Up @@ -7231,6 +7263,21 @@ def api_key(self, value: Optional[str]) -> None:
self.service_client.update_api_key(value)


@dataclass(frozen=True)
class NexusCompletionCallback:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May want to mention this is unstable/experimental and also not really for user use (I understand exposing because it's exposed in the interceptor)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@dataclass(frozen=True)
class NexusCompletionCallback:
    """Nexus callback to attach to events such as workflow completion.

    .. warning::
        This option is experimental and unstable.
    """

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't make callbacks Nexus specific and don't call them Completion callbacks since we will want to use these callbacks for other purposes too.

"""Nexus callback to attach to events such as workflow completion.

.. warning::
This option is experimental and unstable.
"""

url: str
"""Callback URL."""

header: Mapping[str, str]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think in Python we use headers plural, header is just the Go term AFAIR. Please double check.

"""Header to attach to callback request."""


async def _encode_user_metadata(
converter: temporalio.converter.DataConverter,
summary: Optional[Union[str, temporalio.api.common.v1.Payload]],
Expand Down
2 changes: 1 addition & 1 deletion temporalio/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from abc import ABC, abstractmethod
from dataclasses import dataclass
from datetime import datetime, timedelta
from enum import Enum, IntEnum
from enum import IntEnum
from typing import (
Any,
Callable,
Expand Down
37 changes: 37 additions & 0 deletions temporalio/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from datetime import datetime
from enum import IntEnum
from itertools import zip_longest
from logging import getLogger
from typing import (
Any,
Awaitable,
Expand All @@ -40,6 +41,7 @@
import google.protobuf.json_format
import google.protobuf.message
import google.protobuf.symbol_database
import nexusrpc
import typing_extensions

import temporalio.api.common.v1
Expand All @@ -60,6 +62,8 @@
if sys.version_info >= (3, 10):
from types import UnionType

logger = getLogger(__name__)


class PayloadConverter(ABC):
"""Base payload converter to/from multiple payloads/values."""
Expand Down Expand Up @@ -911,6 +915,12 @@ def _error_to_failure(
failure.child_workflow_execution_failure_info.retry_state = (
temporalio.api.enums.v1.RetryState.ValueType(error.retry_state or 0)
)
# TODO(nexus-prerelease): test coverage for this
elif isinstance(error, temporalio.exceptions.NexusOperationError):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For symmetry reasons, I suspect we also need to convert NexusHandlerError (in theory a failure conversion from a failure should be able to convert back to a failure)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need to add test coverage per the comment, and for what you're saying. Maybe this is something to resolve when we merge the workflow caller.

failure.nexus_operation_execution_failure_info.SetInParent()
failure.nexus_operation_execution_failure_info.operation_token = (
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about the rest of the error fields? They should be copied over too.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please also convert from nexus.HandlerError to avoid those from transforming into ApplicationErrors.

error.operation_token
)

def from_failure(
self,
Expand Down Expand Up @@ -1006,6 +1016,33 @@ def from_failure(
if child_info.retry_state
else None,
)
elif failure.HasField("nexus_handler_failure_info"):
nexus_handler_failure_info = failure.nexus_handler_failure_info
try:
_type = nexusrpc.HandlerErrorType[nexus_handler_failure_info.type]
except KeyError:
logger.warning(
f"Unknown Nexus HandlerErrorType: {nexus_handler_failure_info.type}"
)
_type = nexusrpc.HandlerErrorType.INTERNAL
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In other SDKs we preserve the original string type for forwards compatibility.

return nexusrpc.HandlerError(
failure.message or "Nexus handler error",
type=_type,
retryable={
temporalio.api.enums.v1.NexusHandlerErrorRetryBehavior.NEXUS_HANDLER_ERROR_RETRY_BEHAVIOR_RETRYABLE: True,
temporalio.api.enums.v1.NexusHandlerErrorRetryBehavior.NEXUS_HANDLER_ERROR_RETRY_BEHAVIOR_NON_RETRYABLE: False,
}.get(nexus_handler_failure_info.retry_behavior),
Comment on lines +1031 to +1034
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: you can do this without incurring the cost of constructing a dict on every conversion.

)
elif failure.HasField("nexus_operation_execution_failure_info"):
nexus_op_failure_info = failure.nexus_operation_execution_failure_info
err = temporalio.exceptions.NexusOperationError(
failure.message or "Nexus operation error",
scheduled_event_id=nexus_op_failure_info.scheduled_event_id,
endpoint=nexus_op_failure_info.endpoint,
service=nexus_op_failure_info.service,
operation=nexus_op_failure_info.operation,
operation_token=nexus_op_failure_info.operation_token,
)
else:
err = temporalio.exceptions.FailureError(failure.message or "Failure error")
err._failure = failure
Expand Down
Loading