Skip to content

Commit f86225d

Browse files
authored
Merge pull request #8283 from chaen/v9.0_fix_utilityAgain
fix (WMS): do not import DB in utilities
2 parents 6cd1339 + 0f1d7ce commit f86225d

File tree

9 files changed

+159
-157
lines changed

9 files changed

+159
-157
lines changed

.github/workflows/basic.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ jobs:
126126
fetch-depth: 0
127127
repository: DIRACGrid/diracx
128128
path: diracx
129-
- uses: prefix-dev/setup-pixi@v0.8.11
129+
- uses: prefix-dev/setup-pixi@v0.9.0
130130
with:
131131
run-install: false
132132
post-cleanup: false
@@ -143,7 +143,7 @@ jobs:
143143
pixi add --pypi --feature diracx-core 'DIRAC @ file://'$PWD'/../DIRAC'
144144
# Show any changes
145145
git diff
146-
- uses: prefix-dev/setup-pixi@v0.8.11
146+
- uses: prefix-dev/setup-pixi@v0.9.0
147147
with:
148148
cache: false
149149
manifest-path: diracx/pixi.toml

src/DIRAC/TransformationSystem/Agent/TransformationCleaningAgent.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@
3737
RIGHT_DELETE,
3838
RIGHT_KILL,
3939
)
40-
from DIRAC.WorkloadManagementSystem.Utilities.jobAdministration import kill_delete_jobs
40+
from DIRAC.WorkloadManagementSystem.DB.StatusUtils import kill_delete_jobs
4141

4242
# # agent's name
4343
AGENT_NAME = "Transformation/TransformationCleaningAgent"

src/DIRAC/WorkloadManagementSystem/Agent/JobCleaningAgent.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@
3939
from DIRAC.WorkloadManagementSystem.DB.JobDB import JobDB
4040
from DIRAC.WorkloadManagementSystem.DB.SandboxMetadataDB import SandboxMetadataDB
4141
from DIRAC.WorkloadManagementSystem.Service.JobPolicy import RIGHT_DELETE
42-
from DIRAC.WorkloadManagementSystem.Utilities.jobAdministration import kill_delete_jobs
42+
from DIRAC.WorkloadManagementSystem.DB.StatusUtils import kill_delete_jobs
4343
from DIRAC.WorkloadManagementSystem.Utilities.JobParameters import getJobParameters
4444

4545

src/DIRAC/WorkloadManagementSystem/Agent/StalledJobAgent.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
from DIRAC.WorkloadManagementSystem.DB.JobLoggingDB import JobLoggingDB
2424
from DIRAC.WorkloadManagementSystem.DB.PilotAgentsDB import PilotAgentsDB
2525
from DIRAC.WorkloadManagementSystem.Service.JobPolicy import RIGHT_KILL
26-
from DIRAC.WorkloadManagementSystem.Utilities.jobAdministration import kill_delete_jobs
26+
from DIRAC.WorkloadManagementSystem.DB.StatusUtils import kill_delete_jobs
2727
from DIRAC.WorkloadManagementSystem.Utilities.JobParameters import getJobParameters
2828
from DIRAC.WorkloadManagementSystem.Utilities.Utils import rescheduleJobs
2929

Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
from DIRAC import S_ERROR, S_OK, gLogger
2+
from DIRAC.StorageManagementSystem.DB.StorageManagementDB import StorageManagementDB
3+
from DIRAC.WorkloadManagementSystem.Client import JobStatus
4+
from DIRAC.WorkloadManagementSystem.DB.JobDB import JobDB
5+
from DIRAC.WorkloadManagementSystem.DB.PilotAgentsDB import PilotAgentsDB
6+
from DIRAC.WorkloadManagementSystem.DB.TaskQueueDB import TaskQueueDB
7+
from DIRAC.WorkloadManagementSystem.Service.JobPolicy import RIGHT_DELETE, RIGHT_KILL
8+
from DIRAC.WorkloadManagementSystem.Utilities.jobAdministration import _filterJobStateTransition
9+
10+
11+
def _deleteJob(jobID, force=False):
12+
"""Set the job status to "Deleted"
13+
and remove the pilot that ran and its logging info if the pilot is finished.
14+
15+
:param int jobID: job ID
16+
:return: S_OK()/S_ERROR()
17+
"""
18+
if not (result := JobDB().setJobStatus(jobID, JobStatus.DELETED, "Checking accounting", force=force))["OK"]:
19+
gLogger.warn("Failed to set job Deleted status", result["Message"])
20+
return result
21+
22+
if not (result := TaskQueueDB().deleteJob(jobID))["OK"]:
23+
gLogger.warn("Failed to delete job from the TaskQueue")
24+
25+
# if it was the last job for the pilot
26+
result = PilotAgentsDB().getPilotsForJobID(jobID)
27+
if not result["OK"]:
28+
gLogger.error("Failed to get Pilots for JobID", result["Message"])
29+
return result
30+
for pilot in result["Value"]:
31+
res = PilotAgentsDB().getJobsForPilot(pilot)
32+
if not res["OK"]:
33+
gLogger.error("Failed to get jobs for pilot", res["Message"])
34+
return res
35+
if not res["Value"]: # if list of jobs for pilot is empty, delete pilot
36+
result = PilotAgentsDB().getPilotInfo(pilotID=pilot)
37+
if not result["OK"]:
38+
gLogger.error("Failed to get pilot info", result["Message"])
39+
return result
40+
ret = PilotAgentsDB().deletePilot(result["Value"]["PilotJobReference"])
41+
if not ret["OK"]:
42+
gLogger.error("Failed to delete pilot from PilotAgentsDB", ret["Message"])
43+
return ret
44+
45+
return S_OK()
46+
47+
48+
def _killJob(jobID, sendKillCommand=True, force=False):
49+
"""Kill one job
50+
51+
:param int jobID: job ID
52+
:param bool sendKillCommand: send kill command
53+
54+
:return: S_OK()/S_ERROR()
55+
"""
56+
if sendKillCommand:
57+
if not (result := JobDB().setJobCommand(jobID, "Kill"))["OK"]:
58+
gLogger.warn("Failed to set job Kill command", result["Message"])
59+
return result
60+
61+
gLogger.info("Job marked for termination", jobID)
62+
if not (result := JobDB().setJobStatus(jobID, JobStatus.KILLED, "Marked for termination", force=force))["OK"]:
63+
gLogger.warn("Failed to set job Killed status", result["Message"])
64+
if not (result := TaskQueueDB().deleteJob(jobID))["OK"]:
65+
gLogger.warn("Failed to delete job from the TaskQueue", result["Message"])
66+
67+
return S_OK()
68+
69+
70+
def kill_delete_jobs(right, validJobList, nonauthJobList=[], force=False):
71+
"""Kill (== set the status to "KILLED") or delete (== set the status to "DELETED") jobs as necessary
72+
73+
:param str right: RIGHT_KILL or RIGHT_DELETE
74+
75+
:return: S_OK()/S_ERROR()
76+
"""
77+
badIDs = []
78+
79+
killJobList = []
80+
deleteJobList = []
81+
if validJobList:
82+
result = JobDB().getJobsAttributes(killJobList, ["Status"])
83+
if not result["OK"]:
84+
return result
85+
jobStates = result["Value"]
86+
87+
# Get the jobs allowed to transition to the Killed state
88+
killJobList.extend(_filterJobStateTransition(jobStates, JobStatus.KILLED))
89+
90+
if right == RIGHT_DELETE:
91+
# Get the jobs allowed to transition to the Deleted state
92+
deleteJobList.extend(_filterJobStateTransition(jobStates, JobStatus.DELETED))
93+
94+
for jobID in killJobList:
95+
result = _killJob(jobID, force=force)
96+
if not result["OK"]:
97+
badIDs.append(jobID)
98+
99+
for jobID in deleteJobList:
100+
result = _deleteJob(jobID, force=force)
101+
if not result["OK"]:
102+
badIDs.append(jobID)
103+
104+
# Look for jobs that are in the Staging state to send kill signal to the stager
105+
stagingJobList = [jobID for jobID, sDict in jobStates.items() if sDict["Status"] == JobStatus.STAGING]
106+
107+
if stagingJobList:
108+
stagerDB = StorageManagementDB()
109+
gLogger.info("Going to send killing signal to stager as well!")
110+
result = stagerDB.killTasksBySourceTaskID(stagingJobList)
111+
if not result["OK"]:
112+
gLogger.warn("Failed to kill some Stager tasks", result["Message"])
113+
114+
if nonauthJobList or badIDs:
115+
result = S_ERROR("Some jobs failed deletion")
116+
if nonauthJobList:
117+
gLogger.warn("Non-authorized JobIDs won't be deleted", str(nonauthJobList))
118+
result["NonauthorizedJobIDs"] = nonauthJobList
119+
if badIDs:
120+
gLogger.warn("JobIDs failed to be deleted", str(badIDs))
121+
result["FailedJobIDs"] = badIDs
122+
return result
123+
124+
jobsList = killJobList if right == RIGHT_KILL else deleteJobList
125+
return S_OK(jobsList)
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
""" unit test (pytest) of JobAdministration module
2+
"""
3+
4+
from unittest.mock import MagicMock
5+
6+
import pytest
7+
8+
# sut
9+
from DIRAC.WorkloadManagementSystem.DB.StatusUtils import kill_delete_jobs
10+
11+
12+
@pytest.mark.parametrize(
13+
"jobIDs_list, right",
14+
[
15+
([], "Kill"),
16+
([], "Delete"),
17+
(1, "Kill"),
18+
([1, 2], "Kill"),
19+
],
20+
)
21+
def test___kill_delete_jobs(mocker, jobIDs_list, right):
22+
mocker.patch("DIRAC.WorkloadManagementSystem.DB.StatusUtils.JobDB", MagicMock())
23+
mocker.patch("DIRAC.WorkloadManagementSystem.DB.StatusUtils.TaskQueueDB", MagicMock())
24+
mocker.patch("DIRAC.WorkloadManagementSystem.DB.StatusUtils.PilotAgentsDB", MagicMock())
25+
mocker.patch("DIRAC.WorkloadManagementSystem.DB.StatusUtils.StorageManagementDB", MagicMock())
26+
27+
res = kill_delete_jobs(right, jobIDs_list)
28+
assert res["OK"]

src/DIRAC/WorkloadManagementSystem/Service/JobManagerHandler.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
RIGHT_SUBMIT,
3131
JobPolicy,
3232
)
33-
from DIRAC.WorkloadManagementSystem.Utilities.jobAdministration import kill_delete_jobs
33+
from DIRAC.WorkloadManagementSystem.DB.StatusUtils import kill_delete_jobs
3434
from DIRAC.WorkloadManagementSystem.Utilities.JobModel import JobDescriptionModel
3535
from DIRAC.WorkloadManagementSystem.Utilities.ParametricJob import generateParametricJobs, getParameterVectorLength
3636
from DIRAC.WorkloadManagementSystem.Utilities.Utils import rescheduleJobs

src/DIRAC/WorkloadManagementSystem/Utilities/jobAdministration.py

Lines changed: 0 additions & 123 deletions
Original file line numberDiff line numberDiff line change
@@ -1,127 +1,4 @@
1-
from DIRAC import S_ERROR, S_OK, gLogger
2-
from DIRAC.StorageManagementSystem.DB.StorageManagementDB import StorageManagementDB
31
from DIRAC.WorkloadManagementSystem.Client import JobStatus
4-
from DIRAC.WorkloadManagementSystem.DB.JobDB import JobDB
5-
from DIRAC.WorkloadManagementSystem.DB.PilotAgentsDB import PilotAgentsDB
6-
from DIRAC.WorkloadManagementSystem.DB.TaskQueueDB import TaskQueueDB
7-
from DIRAC.WorkloadManagementSystem.Service.JobPolicy import RIGHT_DELETE, RIGHT_KILL
8-
9-
10-
def _deleteJob(jobID, force=False):
11-
"""Set the job status to "Deleted"
12-
and remove the pilot that ran and its logging info if the pilot is finished.
13-
14-
:param int jobID: job ID
15-
:return: S_OK()/S_ERROR()
16-
"""
17-
if not (result := JobDB().setJobStatus(jobID, JobStatus.DELETED, "Checking accounting", force=force))["OK"]:
18-
gLogger.warn("Failed to set job Deleted status", result["Message"])
19-
return result
20-
21-
if not (result := TaskQueueDB().deleteJob(jobID))["OK"]:
22-
gLogger.warn("Failed to delete job from the TaskQueue")
23-
24-
# if it was the last job for the pilot
25-
result = PilotAgentsDB().getPilotsForJobID(jobID)
26-
if not result["OK"]:
27-
gLogger.error("Failed to get Pilots for JobID", result["Message"])
28-
return result
29-
for pilot in result["Value"]:
30-
res = PilotAgentsDB().getJobsForPilot(pilot)
31-
if not res["OK"]:
32-
gLogger.error("Failed to get jobs for pilot", res["Message"])
33-
return res
34-
if not res["Value"]: # if list of jobs for pilot is empty, delete pilot
35-
result = PilotAgentsDB().getPilotInfo(pilotID=pilot)
36-
if not result["OK"]:
37-
gLogger.error("Failed to get pilot info", result["Message"])
38-
return result
39-
ret = PilotAgentsDB().deletePilot(result["Value"]["PilotJobReference"])
40-
if not ret["OK"]:
41-
gLogger.error("Failed to delete pilot from PilotAgentsDB", ret["Message"])
42-
return ret
43-
44-
return S_OK()
45-
46-
47-
def _killJob(jobID, sendKillCommand=True, force=False):
48-
"""Kill one job
49-
50-
:param int jobID: job ID
51-
:param bool sendKillCommand: send kill command
52-
53-
:return: S_OK()/S_ERROR()
54-
"""
55-
if sendKillCommand:
56-
if not (result := JobDB().setJobCommand(jobID, "Kill"))["OK"]:
57-
gLogger.warn("Failed to set job Kill command", result["Message"])
58-
return result
59-
60-
gLogger.info("Job marked for termination", jobID)
61-
if not (result := JobDB().setJobStatus(jobID, JobStatus.KILLED, "Marked for termination", force=force))["OK"]:
62-
gLogger.warn("Failed to set job Killed status", result["Message"])
63-
if not (result := TaskQueueDB().deleteJob(jobID))["OK"]:
64-
gLogger.warn("Failed to delete job from the TaskQueue", result["Message"])
65-
66-
return S_OK()
67-
68-
69-
def kill_delete_jobs(right, validJobList, nonauthJobList=[], force=False):
70-
"""Kill (== set the status to "KILLED") or delete (== set the status to "DELETED") jobs as necessary
71-
72-
:param str right: RIGHT_KILL or RIGHT_DELETE
73-
74-
:return: S_OK()/S_ERROR()
75-
"""
76-
badIDs = []
77-
78-
killJobList = []
79-
deleteJobList = []
80-
if validJobList:
81-
result = JobDB().getJobsAttributes(killJobList, ["Status"])
82-
if not result["OK"]:
83-
return result
84-
jobStates = result["Value"]
85-
86-
# Get the jobs allowed to transition to the Killed state
87-
killJobList.extend(_filterJobStateTransition(jobStates, JobStatus.KILLED))
88-
89-
if right == RIGHT_DELETE:
90-
# Get the jobs allowed to transition to the Deleted state
91-
deleteJobList.extend(_filterJobStateTransition(jobStates, JobStatus.DELETED))
92-
93-
for jobID in killJobList:
94-
result = _killJob(jobID, force=force)
95-
if not result["OK"]:
96-
badIDs.append(jobID)
97-
98-
for jobID in deleteJobList:
99-
result = _deleteJob(jobID, force=force)
100-
if not result["OK"]:
101-
badIDs.append(jobID)
102-
103-
# Look for jobs that are in the Staging state to send kill signal to the stager
104-
stagingJobList = [jobID for jobID, sDict in jobStates.items() if sDict["Status"] == JobStatus.STAGING]
105-
106-
if stagingJobList:
107-
stagerDB = StorageManagementDB()
108-
gLogger.info("Going to send killing signal to stager as well!")
109-
result = stagerDB.killTasksBySourceTaskID(stagingJobList)
110-
if not result["OK"]:
111-
gLogger.warn("Failed to kill some Stager tasks", result["Message"])
112-
113-
if nonauthJobList or badIDs:
114-
result = S_ERROR("Some jobs failed deletion")
115-
if nonauthJobList:
116-
gLogger.warn("Non-authorized JobIDs won't be deleted", str(nonauthJobList))
117-
result["NonauthorizedJobIDs"] = nonauthJobList
118-
if badIDs:
119-
gLogger.warn("JobIDs failed to be deleted", str(badIDs))
120-
result["FailedJobIDs"] = badIDs
121-
return result
122-
123-
jobsList = killJobList if right == RIGHT_KILL else deleteJobList
124-
return S_OK(jobsList)
1252

1263

1274
def _filterJobStateTransition(jobStates, candidateState):

src/DIRAC/WorkloadManagementSystem/Utilities/test/Test_JobAdministration.py

Lines changed: 0 additions & 28 deletions
This file was deleted.

0 commit comments

Comments
 (0)