Skip to content

Commit cd26f5a

Browse files
committed
PR #736 no need to split up db_updates in _process_threadworker_updates
is not necessary and eliminates `job_db.read()` which is not part of JobDatabaseInterface
1 parent d1cb693 commit cd26f5a

File tree

2 files changed

+3
-14
lines changed

2 files changed

+3
-14
lines changed

openeo/extra/job_management/__init__.py

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -709,17 +709,9 @@ def _process_threadworker_updates(
709709
# Build DataFrame of updates indexed by df_idx
710710
df_updates = pd.DataFrame(updates).set_index("df_idx", drop=True)
711711

712-
# Determine which rows to upsert
713-
existing_indices = set(df_updates.index).intersection(job_db.read().index)
714-
if existing_indices:
715-
df_upsert = df_updates.loc[sorted(existing_indices)]
716-
job_db.persist(df_upsert)
717-
stats["job_db persist"] = stats.get("job_db persist", 0) + 1
718-
719-
# Any df_idx not in original index are errors
720-
missing = set(df_updates.index) - existing_indices
721-
if missing:
722-
_log.error(f"Skipping non-existing dataframe indices: {sorted(missing)}")
712+
job_db.persist(df_updates)
713+
stats["job_db persist"] = stats.get("job_db persist", 0) + 1
714+
723715

724716
def on_job_done(self, job: BatchJob, row):
725717
"""

tests/extra/job_management/test_job_management.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -786,9 +786,6 @@ def test_process_threadworker_updates(self, tmp_path, caplog):
786786
assert stats.get("queued", 0) == 2
787787
assert stats["job_db persist"] == 1
788788

789-
# Assert error log for invalid index
790-
assert any("Skipping non-existing dataframe indices" in msg for msg in caplog.messages)
791-
792789
def test_no_results_leaves_db_and_stats_untouched(self, tmp_path, caplog):
793790
pool = _JobManagerWorkerThreadPool(max_workers=2)
794791
stats = collections.defaultdict(int)

0 commit comments

Comments
 (0)