-
Notifications
You must be signed in to change notification settings - Fork 104
Nexus: worker, workflow-backed operations, and workflow caller #813
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
a845a6f
to
7a121bb
Compare
862e4f9
to
8ac0192
Compare
8940d51
to
520aecf
Compare
2fc971f
to
8bd6011
Compare
5d54f48
to
8ddfb94
Compare
fa3e8ec
to
f7bf47b
Compare
This reverts commit 86a9a61.
from ._operation_context import ( | ||
_TemporalCancelOperationContext as _TemporalCancelOperationContext, | ||
) | ||
from ._operation_context import ( | ||
_TemporalStartOperationContext as _TemporalStartOperationContext, | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We do not expose that to users hence the underscore prefix, it just happens to be in the same file. In this multi-file module, are we intending to expose these two underscore-prefixed types to users? If not, we should remove them.
import nexusrpc | ||
from nexusrpc import InputT, OutputT | ||
from nexusrpc.handler import ( | ||
OperationHandler, | ||
StartOperationContext, | ||
) | ||
|
||
from temporalio.nexus._operation_context import ( | ||
WorkflowRunOperationContext, | ||
) | ||
from temporalio.nexus._operation_handlers import ( | ||
WorkflowRunOperationHandler, | ||
) | ||
from temporalio.nexus._token import ( | ||
WorkflowHandle, | ||
) | ||
from temporalio.nexus._util import ( | ||
get_callable_name, | ||
get_workflow_run_start_method_input_and_output_type_annotations, | ||
set_operation_factory, | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With most other Python code in the SDK, we chose to fully qualify things where they are used if they are from a non-stdlib module. Not that we have to, but I think it can make it easier to reason about when reading, prevents some ambiguity, and makes code consistent across the codebase.
): | ||
ctx = _try_temporal_context() | ||
if ctx is None: | ||
raise RuntimeError("Not in Nexus operation context.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup, LGTM
|
||
|
||
@dataclass(frozen=True) | ||
class WorkflowRunOperationContext(StartOperationContext): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What I mean by "accessing a Nexus context should be done the same way regardless of whether it's Temporal-backed" is that you access the Nexus context the same way, regardless of whether the handler runs in a Temporal worker or some other worker. That is unrelated to the Temporal context IMO. Regardless, if you must mash the Nexus context with the Temporal context for workflow run, does the Temporal context need to be optional?
return workflow_handle | ||
|
||
|
||
@dataclass(frozen=True) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The problem here is that we are mixing public and private attributes, so to users it looks like a normal dataclass they can instantiate, but it's not. At the least, we should document that users should never instantiate this, though we need some kind of Nexus test harness probably.
self, self._next_seq("nexus_operation"), input, operation_handle_fn() | ||
) | ||
handle._apply_schedule_command() | ||
self._pending_nexus_operations[handle._seq] = handle |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When do you delete from this dictionary?
# TODO(nexus-preview): How should this behave? | ||
# Java has a separate class that only exists if the operation token exists: | ||
# https://github.com/temporalio/sdk-java/blob/master/temporal-sdk/src/main/java/io/temporal/internal/sync/NexusOperationExecutionImpl.java#L26 | ||
# And Go similar: | ||
# https://github.com/temporalio/sdk-go/blob/master/internal/workflow.go#L2770-L2771 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand the question exactly here
v.service = self._input.service | ||
v.operation = self._input.operation_name | ||
v.input.CopyFrom( | ||
self._instance._payload_converter.to_payload(self._input.input) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we do payload conversion before we add the command? We need fallible things to fail before we call the side-effecting add_command
, because otherwise the command will be half initialized
async def result(self) -> OutputT: | ||
return await self._task |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the use case for this call? It does not seem to be in the base class
tests/nexus/test_handler.py
Outdated
await _test_start_operation(test_case, True, env) | ||
|
||
|
||
async def _test_start_operation( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Otherwise I'd have to create a caller workflow and wouldn't have access to HTTP responses
This is what we ask our users to do. We should test as our users might, it helps us find things. Is what we're asking our users to do too onerous for us to also do? (understood it may not make this PR)
This reverts commit 38a50fa.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Submitting what I have so far since I saw changes were pushed.
@@ -208,3 +216,6 @@ exclude = [ | |||
[tool.uv] | |||
# Prevent uv commands from building the package by default | |||
package = false | |||
|
|||
[tool.uv.sources] | |||
nexus-rpc = { git = "https://github.com/nexus-rpc/sdk-python" } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be removed before merging, right? We need a release of the Nexus SDK to pypi.
# The following options should not be considered part of the public API. They | ||
# are deliberately not exposed in overloads, and are not subject to any | ||
# backwards compatibility guarantees. | ||
nexus_completion_callbacks: Sequence[NexusCompletionCallback] = [], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nexus callbacks are only one type of callback that we were planning on supporting. I wouldn't mention nexus in the name here.
temporalio.api.common.v1.Link.WorkflowEvent | ||
] = [], | ||
request_id: Optional[str] = None, | ||
stack_level: int = 2, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are also missing the conflict options which are required for USE_EXISTING
to make sense for workflow callers. Can be done in a followup PR since this was functionality that was added as a second step for Go and Java.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would for now throw if a user tries to set USE_EXISTING
in the workflow run operation handler.
if input.request_id: | ||
req.request_id = input.request_id | ||
|
||
req.completion_callbacks.extend( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Put the links on the callbacks so they can be associated.
@@ -7231,6 +7260,17 @@ def api_key(self, value: Optional[str]) -> None: | |||
self.service_client.update_api_key(value) | |||
|
|||
|
|||
@dataclass(frozen=True) | |||
class NexusCompletionCallback: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't make callbacks Nexus specific and don't call them Completion
callbacks since we will want to use these callbacks for other purposes too.
completion = temporalio.bridge.proto.nexus.NexusTaskCompletion( | ||
task_token=task.task.task_token | ||
) | ||
completion.error.failure.message = "Worker shutting down" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we do decide to keep this code, it should be a (retryable) INTERNAL handler error.
), | ||
) | ||
else: | ||
# TODO(nexus-preview): ack_cancel completions? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We discussed this on slack, there's no reason to ack the cancels apart from testing Core.
await self._bridge_worker().complete_nexus_task(completion) | ||
except Exception: | ||
logger.exception("Failed to send Nexus task completion") | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
finally: | |
try: | |
del self._running_tasks[task_token] | |
except KeyError: | |
logger.exception("Failed to remove task for completed Nexus operation cancel handler") | |
try: | ||
del self._running_tasks[task_token] | ||
except KeyError: | ||
logger.exception("Failed to remove completed Nexus operation") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logger.exception("Failed to remove completed Nexus operation") | |
logger.exception("Failed to remove task for completed Nexus operation start handler") |
await self._bridge_worker().complete_nexus_task(completion) | ||
except Exception: | ||
logger.exception("Failed to send Nexus task completion") | ||
finally: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might be worth putting this try finally block around the entire function to ensure we can avoid leaks.
logger.warning( | ||
f"Unknown Nexus HandlerErrorType: {nexus_handler_failure_info.type}" | ||
) | ||
_type = nexusrpc.HandlerErrorType.INTERNAL |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In other SDKs we preserve the original string type for forwards compatibility.
if err.retryable | ||
else temporalio.api.enums.v1.NexusHandlerErrorRetryBehavior.NEXUS_HANDLER_ERROR_RETRY_BEHAVIOR_NON_RETRYABLE |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is wrong, retryable
can be None
in which case it would translate to unspecified.
elif isinstance(err, ApplicationError): | ||
handler_err = nexusrpc.HandlerError( | ||
# TODO(nexus-prerelease): what should message be? | ||
err.message, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, we will figure this out. I will try to find time next week to spec it out.
handle._apply_schedule_command() | ||
self._pending_nexus_operations[handle._seq] = handle | ||
|
||
while True: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I find this loop confusing, can you document what this is trying to do?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will do. For reassurance it's essentially identical to the existing implementation of ChildWorkflow
: https://github.com/temporalio/sdk-python/blob/main/temporalio/worker/_workflow_instance.py#L1715-L1724
|
||
|
||
def create_nexus_client( | ||
endpoint: str, service: Union[Type[ServiceT], str] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would consider not making endpoint a positional so we can add a client that targets a task queue in the future.
return input | ||
|
||
|
||
class MyOperation(WorkflowRunOperationHandler): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do you need this class? You can use the decorator, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Approved with comments. I didn't get to look at the tests but the rest mostly looks good to me. We should revisit error rehydration and some of the additions that were made to Go and Java that weren't added here.
Initial Python Temporal Nexus implementation.
Temporal SDK PR to accompany
samples-python
PR Nexus samples samples-python#174Contains Nexus worker, components for users to define workflow-backed Nexus operations, and the ability to start and cancel a Nexus operation from a workflow.
Notes for reviewers
nexusrpc.handler.start_workflow
is a top-level static function, but currently there's still a public contextvar objectnexusrpc.handler.temporal_operation_context
. Let's settle on approach there. If we're using module-level getters I think they need to be named something likeget_client()
etc, despite the fact that we haveactivity.metric_meter()
? Little bit more discussion needed there.