Skip to content

Commit afadc15

Browse files
authored
Allow proper stack trace on eviction deadlock (#530)
Fixes #527
1 parent aa26503 commit afadc15

File tree

5 files changed

+69
-6
lines changed

5 files changed

+69
-6
lines changed

temporalio/worker/_workflow.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -261,7 +261,10 @@ async def _handle_activation(
261261
# TODO(cretz): Should we build a complex mechanism to continually
262262
# try the eviction until it succeeds?
263263
if cache_remove_job:
264-
logger.exception("Failed running eviction job, not evicting")
264+
logger.exception(
265+
"Failed running eviction job, not evicting. "
266+
+ "Since eviction could not be processed, this worker cannot complete and the slot will remain forever used."
267+
)
265268
self._could_not_evict_count += 1
266269
return
267270

temporalio/worker/_workflow_instance.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -357,7 +357,7 @@ def activate(
357357
# have a different workflow/event-loop going.
358358
if self._deleting and self._tasks:
359359
raise RuntimeError(
360-
f"Cache removal processed, but {len(self._tasks)} tasks remain. "
360+
f"Eviction processed, but {len(self._tasks)} tasks remain. "
361361
+ f"Stack traces below:\n\n{self._stack_trace()}"
362362
)
363363

@@ -1776,7 +1776,7 @@ async def _signal_external_workflow(
17761776

17771777
def _stack_trace(self) -> str:
17781778
stacks = []
1779-
for task in self._tasks:
1779+
for task in list(self._tasks):
17801780
# TODO(cretz): These stacks are not very clean currently
17811781
frames = []
17821782
for frame in task.get_stack():

temporalio/worker/workflow_sandbox/_runner.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -173,4 +173,4 @@ def _run_code(self, code: str, **extra_globals: Any) -> None:
173173
finally:
174174
temporalio.workflow.unsafe._set_in_sandbox(False)
175175
for k, v in extra_globals.items():
176-
del self.globals_and_locals[k]
176+
self.globals_and_locals.pop(k, None)

tests/helpers/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ async def assert_eq_eventually(
5959
await asyncio.sleep(interval.total_seconds())
6060
assert (
6161
expected == last_value
62-
), "timed out waiting for equal, asserted against last value"
62+
), f"timed out waiting for equal, asserted against last value of {last_value}"
6363

6464

6565
async def worker_versioning_enabled(client: Client) -> bool:

tests/worker/test_workflow.py

Lines changed: 61 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2467,6 +2467,8 @@ async def test_workflow_deadlock(client: Client):
24672467
async with new_worker(
24682468
client, DeadlockedWorkflow, disable_safe_workflow_eviction=True
24692469
) as worker:
2470+
if worker._workflow_worker:
2471+
worker._workflow_worker._deadlock_timeout_seconds = 1
24702472
deadlock_thread_event.clear()
24712473
handle = await client.start_workflow(
24722474
DeadlockedWorkflow.run,
@@ -2488,7 +2490,7 @@ async def last_history_task_failure() -> str:
24882490

24892491
try:
24902492
await assert_eq_eventually(
2491-
"[TMPRL1101] Potential deadlock detected, workflow didn't yield within 2 second(s)",
2493+
"[TMPRL1101] Potential deadlock detected, workflow didn't yield within 1 second(s)",
24922494
last_history_task_failure,
24932495
timeout=timedelta(seconds=5),
24942496
interval=timedelta(seconds=1),
@@ -2497,6 +2499,64 @@ async def last_history_task_failure() -> str:
24972499
deadlock_thread_event.set()
24982500

24992501

2502+
@workflow.defn
2503+
class EvictionDeadlockWorkflow:
2504+
def __init__(self) -> None:
2505+
self.val = 1
2506+
2507+
async def wait_until_positive(self):
2508+
while True:
2509+
await workflow.wait_condition(lambda: self.val > 0)
2510+
self.val = -self.val
2511+
2512+
async def wait_until_negative(self):
2513+
while True:
2514+
await workflow.wait_condition(lambda: self.val < 0)
2515+
self.val = -self.val
2516+
2517+
@workflow.run
2518+
async def run(self):
2519+
await asyncio.gather(self.wait_until_negative(), self.wait_until_positive())
2520+
2521+
2522+
async def test_workflow_eviction_deadlock(client: Client):
2523+
# We are running the worker, but we can't ever shut it down on eviction
2524+
# error so we send shutdown in the background and leave this worker dangling
2525+
worker = new_worker(client, EvictionDeadlockWorkflow)
2526+
if worker._workflow_worker:
2527+
worker._workflow_worker._deadlock_timeout_seconds = 1
2528+
worker_task = asyncio.create_task(worker.run())
2529+
2530+
# Run workflow that deadlocks
2531+
handle = await client.start_workflow(
2532+
EvictionDeadlockWorkflow.run,
2533+
id=f"workflow-{uuid.uuid4()}",
2534+
task_queue=worker.task_queue,
2535+
)
2536+
2537+
async def last_history_task_failure() -> str:
2538+
resp = await client.workflow_service.get_workflow_execution_history(
2539+
GetWorkflowExecutionHistoryRequest(
2540+
namespace=client.namespace,
2541+
execution=WorkflowExecution(workflow_id=handle.id),
2542+
),
2543+
)
2544+
for event in reversed(resp.history.events):
2545+
if event.event_type == EventType.EVENT_TYPE_WORKFLOW_TASK_FAILED:
2546+
return event.workflow_task_failed_event_attributes.failure.message
2547+
return "<no failure>"
2548+
2549+
await assert_eq_eventually(
2550+
"[TMPRL1101] Potential deadlock detected, workflow didn't yield within 1 second(s)",
2551+
last_history_task_failure,
2552+
timeout=timedelta(seconds=5),
2553+
interval=timedelta(seconds=1),
2554+
)
2555+
2556+
# Send cancel but don't wait
2557+
worker_task.cancel()
2558+
2559+
25002560
class PatchWorkflowBase:
25012561
def __init__(self) -> None:
25022562
self._result = "<unset>"

0 commit comments

Comments
 (0)