Skip to content

Commit f949ece

Browse files
authored
Merge pull request #1560 from mvdbeek/fix_subworkflow_invocation_tracking
Fix subworkflow invocation tracking
2 parents b46526f + 58fc42f commit f949ece

File tree

7 files changed

+113
-41
lines changed

7 files changed

+113
-41
lines changed

planemo/galaxy/activity.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
from planemo.galaxy.invocations.api import (
4646
BioblendInvocationApi,
4747
JOB_ERROR_STATES,
48+
NON_TERMINAL_JOB_STATES,
4849
)
4950
from planemo.galaxy.invocations.polling import PollingTrackerImpl
5051
from planemo.galaxy.invocations.polling import wait_for_invocation_and_jobs as polling_wait_for_invocation_and_jobs
@@ -866,9 +867,8 @@ def get_state():
866867
if not response:
867868
# invocation may not have any attached jobs, that's fine
868869
return "ok"
869-
non_terminal_states = {"running", "queued", "new", "ready", "resubmitted", "upload", "waiting"}
870870
current_states = set(item["state"] for item in response)
871-
current_non_terminal_states = non_terminal_states.intersection(current_states)
871+
current_non_terminal_states = NON_TERMINAL_JOB_STATES.intersection(current_states)
872872
# Mix of "error"-ish terminal job, dataset, invocation terminal states, so we can use this for whatever we throw at it
873873
hierarchical_fail_states = [
874874
"error",

planemo/galaxy/invocations/api.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,3 +77,4 @@ def invocation_state_terminal(state: str):
7777

7878

7979
JOB_ERROR_STATES = ["error", "deleted", "failed", "stopped", "stop", "deleting"]
80+
NON_TERMINAL_JOB_STATES = {"running", "queued", "new", "ready", "resubmitted", "upload", "waiting"}

planemo/galaxy/invocations/polling.py

Lines changed: 29 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
InvocationApi,
1111
InvocationJobsSummary,
1212
JOB_ERROR_STATES,
13+
NON_TERMINAL_JOB_STATES,
1314
)
1415
from .progress import WorkflowProgressDisplay
1516

@@ -101,7 +102,7 @@ def _check_for_errors(
101102
)
102103
if error_message:
103104
final_state = "new" if not invocation else invocation["state"]
104-
job_state = summary_job_state(invocation_jobs)
105+
job_state = summary_job_state(invocation_jobs, fail_fast)
105106
return final_state, job_state, error_message
106107
return None
107108

@@ -154,24 +155,25 @@ def wait_for_invocation_and_jobs(
154155
ctx, invocation_id, invocation_api, workflow_progress_display, fail_fast
155156
)
156157

157-
error_result = _check_for_errors(
158-
ctx,
159-
invocation_id,
160-
sub_exception,
161-
sub_invocation,
162-
sub_jobs,
163-
invocation_api,
164-
workflow_progress_display,
165-
fail_fast,
166-
)
167-
if error_result:
168-
return error_result
158+
if sub_invocation:
159+
error_result = _check_for_errors(
160+
ctx,
161+
sub_invocation["id"] if sub_invocation else invocation_id,
162+
sub_exception,
163+
sub_invocation,
164+
sub_jobs,
165+
invocation_api,
166+
workflow_progress_display,
167+
fail_fast,
168+
)
169+
if error_result:
170+
return error_result
169171

170172
if not _is_polling_complete(workflow_progress_display):
171173
polling_tracker.sleep()
172174

173175
ctx.vlog(f"The final state of all jobs and subworkflow invocations for invocation [{invocation_id}] is 'ok'")
174-
job_state = summary_job_state(last_invocation_jobs)
176+
job_state = summary_job_state(last_invocation_jobs, fail_fast)
175177
assert last_invocation
176178

177179
# Final check for job errors when fail_fast is enabled
@@ -203,7 +205,7 @@ def workflow_in_error_message(
203205
"""Return an error message if workflow is in an error state."""
204206

205207
invocation_state = "new" if not last_invocation else last_invocation["state"]
206-
job_state = summary_job_state(last_invocation_jobs)
208+
job_state = summary_job_state(last_invocation_jobs, fail_fast)
207209

208210
error_message = None
209211
if last_exception:
@@ -233,14 +235,19 @@ def workflow_in_error_message(
233235
return error_message
234236

235237

236-
# we're still mocking out the old history state by just picking out a random
237-
# job state of interest. Seems like we should drop this.
238-
def summary_job_state(job_states_summary: Optional[InvocationJobsSummary]):
239-
states = (job_states_summary or {"states": {}}).get("states", {}).copy()
240-
states.pop("ok", None)
241-
states.pop("skipped", None)
238+
def summary_job_state(job_states_summary: Optional[InvocationJobsSummary], fail_fast: bool = False):
239+
states = {state for state in (job_states_summary or {"states": {}})["states"]}
240+
if not fail_fast:
241+
current_non_terminal_states = NON_TERMINAL_JOB_STATES.intersection(states)
242+
if current_non_terminal_states:
243+
# ensure all non-terminal states advance, then return the first failing state, if any.
244+
return next(iter(current_non_terminal_states))
242245
if states:
243-
return next(iter(states.keys()))
246+
# We have ensured that that all jobs are terminal, we want to return failed jobs in the summary if there are any.
247+
for error_state in JOB_ERROR_STATES:
248+
if error_state in states:
249+
return error_state
250+
return next(iter(states))
244251
else:
245252
return "ok"
246253

planemo/galaxy/invocations/progress.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929

3030
# Types for various invocation responses
3131
class InvocationStep(TypedDict, total=False):
32+
id: str
3233
state: Optional[str]
3334
subworkflow_invocation_id: Optional[str]
3435

@@ -347,6 +348,7 @@ def __init__(
347348
self.subworkflow_invocation_ids_seen: Set[str] = set()
348349
self.subworkflow_invocation_ids_completed: Set[str] = set()
349350
self.subworkflow_invocation_id: Optional[str] = None
351+
self.new_steps: List[str] = []
350352
self.invocation_id = invocation_id
351353
display = display_configuration or DisplayConfiguration()
352354
self.galaxy_url = galaxy_url
@@ -358,10 +360,14 @@ def __init__(
358360
def _register_subworkflow_invocation_ids_from(self, invocation: Invocation):
359361
subworkflow_invocation_ids: List[str] = []
360362
steps = invocation.get("steps") or []
363+
new_steps: List[str] = []
361364
for step in steps:
365+
if step["state"] == "new":
366+
new_steps.append(step["id"])
362367
subworkflow_invocation_id = step.get("subworkflow_invocation_id")
363368
if subworkflow_invocation_id:
364369
subworkflow_invocation_ids.append(subworkflow_invocation_id)
370+
self.new_steps = new_steps
365371
self._register_subworkflow_invocation_ids(subworkflow_invocation_ids)
366372

367373
def _register_subworkflow_invocation_ids(self, ids: List[str]):
@@ -375,6 +381,9 @@ def an_incomplete_subworkflow_id(self):
375381
return random.choice(tuple(self.subworkflow_invocation_ids_seen - self.subworkflow_invocation_ids_completed))
376382

377383
def all_subworkflows_complete(self):
384+
if self.new_steps:
385+
# These don't have subworkflow invocation ids yet, we can't know if they're all complete
386+
return False
378387
return len(self.subworkflow_invocation_ids_seen) == len(self.subworkflow_invocation_ids_completed)
379388

380389
def get_invocation_ui_link(self):

tests/test_invocation_polling.py

Lines changed: 27 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
SCENARIO_1,
2626
SCENARIO_MULTIPLE_OK_SUBWORKFLOWS,
2727
SCENARIO_NESTED_SUBWORKFLOWS,
28+
SCENARIO_SUBWORKFLOW_WITH_FAILED_JOBS,
2829
)
2930

3031
SLEEP = 0
@@ -42,10 +43,10 @@ def sleep(self) -> None:
4243

4344
def test_polling_scenario_1():
4445
final_invocation_state, job_state, error_message = run_workflow_simulation(SCENARIO_1, fail_fast=True)
45-
assert final_invocation_state == "scheduled"
46-
assert job_state == "failed"
46+
assert final_invocation_state == "ready" # early job error and fail fast, invocation doesn't advance to scheduled
47+
assert job_state == "error"
4748
assert error_message
48-
assert "failed" in error_message
49+
assert "error" in error_message
4950

5051

5152
def test_polling_scenario_three_ok_subworkflows():
@@ -80,10 +81,10 @@ def test_polling_without_display():
8081
display,
8182
fail_fast=True,
8283
)
83-
assert final_invocation_state == "scheduled"
84-
assert job_state == "failed"
84+
assert final_invocation_state == "ready"
85+
assert job_state == "error"
8586
assert error_message
86-
assert "failed" in error_message
87+
assert "error" in error_message
8788

8889

8990
def test_polling_with_compact_display():
@@ -117,19 +118,19 @@ def test_fail_fast_enabled_with_job_failure():
117118
"""Test that fail_fast=True returns error when a job fails."""
118119
final_invocation_state, job_state, error_message = run_workflow_simulation(SCENARIO_1, fail_fast=True)
119120
# Invocation should still be scheduled (workflow scheduling succeeded)
120-
assert final_invocation_state == "scheduled"
121-
assert job_state == "failed"
121+
assert final_invocation_state == "ready"
122+
assert job_state == "error"
122123
# fail_fast should detect the failed job and return error message
123124
assert error_message
124-
assert "Failed to run workflow, at least one job is in [failed] state." in error_message
125+
assert "Failed to run workflow, at least one job is in [error] state." in error_message
125126

126127

127128
def test_fail_fast_disabled_with_job_failure():
128129
"""Test that fail_fast=False does not report job failures as errors."""
129130
final_invocation_state, job_state, error_message = run_workflow_simulation(SCENARIO_1, fail_fast=False)
130131
# Invocation should be scheduled (workflow scheduling succeeded)
131132
assert final_invocation_state == "scheduled"
132-
assert job_state == "failed"
133+
assert job_state == "error"
133134
# Without fail_fast, job failures shouldn't cause error messages
134135
# (unless invocation itself fails, which it doesn't in this case)
135136
assert error_message is None
@@ -145,6 +146,19 @@ def test_fail_fast_enabled_with_successful_workflow():
145146
assert not error_message
146147

147148

149+
def test_fail_fast_enabled_with_subworkflow_job_failure():
150+
"""Test that fail_fast=True terminates when encountering jobs that are errored inside a subworkflow invocation."""
151+
final_invocation_state, job_state, error_message = run_workflow_simulation(
152+
SCENARIO_SUBWORKFLOW_WITH_FAILED_JOBS, fail_fast=True
153+
)
154+
# Invocation is ready to schedule more steps, yet the polling should terminate
155+
assert final_invocation_state == "ready"
156+
assert job_state == "error"
157+
# fail_fast should detect the failed job in the subworkflow and return error message
158+
assert error_message
159+
assert "Failed to run workflow, at least one job is in [error] state." in error_message
160+
161+
148162
def run_workflow_simulation(
149163
yaml_str: str, display_configuration: Optional[DisplayConfiguration] = None, fail_fast: bool = False
150164
):
@@ -179,7 +193,7 @@ def show_job(self, job_id, full_details=False):
179193
"""Return mock job details with exit code and stderr."""
180194
return {
181195
"id": job_id,
182-
"state": "failed",
196+
"state": "error",
183197
"exit_code": 1,
184198
"stderr": f"Error: Mock job {job_id} failed with exit code 1\nAdditional error details here",
185199
"stdout": f"Mock job {job_id} output",
@@ -204,7 +218,7 @@ class MockInvocationsApi:
204218

205219
def show_invocation_step(self, invocation_id, step_id):
206220
"""Return mock invocation step details."""
207-
return {"id": step_id, "jobs": [{"id": f"job_{step_id}", "state": "failed"}]}
221+
return {"id": step_id, "jobs": [{"id": f"job_{step_id}", "state": "error"}]}
208222

209223

210224
class SimulatedApi(InvocationApi):
@@ -239,7 +253,7 @@ def get_job(self, job_id: str, full_details: bool = False) -> Job:
239253
"""Return mock job details."""
240254
return {
241255
"id": job_id,
242-
"state": "failed",
256+
"state": "error",
243257
"exit_code": 1,
244258
"stderr": f"Error: Mock job {job_id} failed with exit code 1\nAdditional error details here",
245259
"stdout": f"Mock job {job_id} output",

tests/test_workflow_progress.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@
66
)
77
from planemo.galaxy.invocations.progress_display import DisplayConfiguration
88

9-
STEP_NEW = {"state": "new"}
10-
STEP_SCHEDULED = {"state": "scheduled"}
9+
STEP_NEW = {"state": "new", "id": "1"}
10+
STEP_SCHEDULED = {"state": "scheduled", "id": "1"}
1111
SLEEP = 0.8
1212

1313

tests/test_workflow_simulation.py

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,23 @@
11
from planemo.galaxy.invocations.simulations import parse_workflow_simulation_from_string
22

3+
# These scenario simulates a workflow execution timeline with the following structure:
4+
#
5+
# Field explanations:
6+
# - states: Array defining the main workflow invocation's state progression over time
7+
# Format: [initial_state, intermediate_state:duration, final_state]
8+
# - "new": Workflow just created
9+
# - "ready:4": Workflow in ready state for 4 time ticks
10+
# - "scheduled": Workflow is running/scheduled
11+
#
12+
# - steps: Array of workflow steps that execute in sequence
13+
# - Each step has a "state" and may contain "jobs" or nested "invocation" (subworkflow)
14+
# - "after: N": This step starts after N time ticks
15+
#
16+
# - jobs: Array of jobs within a step
17+
# - states: Job state progression over time
18+
# - Common states: new -> queued -> running -> ok/failed
19+
# - Format with duration: "queued:2" means job stays queued for 2 ticks
20+
321
SCENARIO_1 = """
422
states: [new, ready:4, scheduled]
523
steps:
@@ -9,7 +27,7 @@
927
- after: 2
1028
state: scheduled
1129
jobs:
12-
- states: [new, queued, failed]
30+
- states: [new, queued, error]
1331
- states: [new, queued, ok]
1432
- after: 3
1533
state: scheduled
@@ -157,6 +175,29 @@
157175
"""
158176

159177

178+
SCENARIO_SUBWORKFLOW_WITH_FAILED_JOBS = """
179+
states: [new, ready:4]
180+
steps:
181+
- state: scheduled
182+
jobs:
183+
- states: [new, queued:2, running:2, ok]
184+
- after: 2
185+
state: scheduled
186+
jobs:
187+
- states: [new, queued, running:2, ok]
188+
- states: [new, queued, running:4, ok]
189+
- after: 3
190+
state: scheduled
191+
invocation:
192+
states: [new, ready]
193+
steps:
194+
- state: scheduled
195+
jobs:
196+
- states: [new, queued, error]
197+
- states: [new:2, paused]
198+
"""
199+
200+
160201
def test_parse_scenario_1_invocation_state_evolution():
161202
invocation = parse_workflow_simulation_from_string(SCENARIO_1)
162203
invocation_dict = invocation.get_api_invocation()
@@ -227,7 +268,7 @@ def test_parse_scenario_1_invocation_job_states():
227268
assert len(states) == 3
228269
assert states["ok"] == 1
229270
assert states["running"] == 1
230-
assert states["failed"] == 1
271+
assert states["error"] == 1
231272

232273

233274
def test_parse_scenario_1_subworkflow_invocation_state():

0 commit comments

Comments
 (0)