Skip to content

Commit 01bc1cd

Browse files
kiukchungfacebook-github-bot
authored andcommitted
(torchx/scheduler) Fill hostnames for each replica in slurm scheduler's describe API (#1080)
Summary: Additionally fill hostname, resource (cpu, memMB), image, entrypoint in `describe_squeue` for each role/replica. Reviewed By: d4l3k Differential Revision: D76485112
1 parent 50b8c02 commit 01bc1cd

File tree

4 files changed

+1812
-107
lines changed

4 files changed

+1812
-107
lines changed

.github/workflows/slurm-local-integration-tests.yaml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,11 @@ on:
66
- main
77
pull_request:
88

9+
910
env:
10-
SLURM_VERSION: 21.08.6
11+
# slurm tag should be one of https://github.com/SchedMD/slurm/tags
12+
SLURM_TAG: slurm-23-11-11-1
13+
SLURM_VERSION: 23.11.11
1114

1215
jobs:
1316
slurm:

torchx/schedulers/slurm_scheduler.py

Lines changed: 115 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import tempfile
2121
from dataclasses import dataclass
2222
from datetime import datetime
23+
from subprocess import CalledProcessError, PIPE
2324
from typing import Any, Dict, Iterable, List, Mapping, Optional, Tuple
2425

2526
import torchx
@@ -39,6 +40,7 @@
3940
macros,
4041
NONE,
4142
ReplicaStatus,
43+
Resource,
4244
Role,
4345
RoleStatus,
4446
runopts,
@@ -66,6 +68,11 @@
6668
"TIMEOUT": AppState.FAILED,
6769
}
6870

71+
72+
def appstate_from_slurm_state(slurm_state: str) -> AppState:
73+
return SLURM_STATES.get(slurm_state, AppState.UNKNOWN)
74+
75+
6976
SBATCH_JOB_OPTIONS = {
7077
"comment",
7178
"mail-user",
@@ -483,15 +490,34 @@ def _cancel_existing(self, app_id: str) -> None:
483490

484491
def describe(self, app_id: str) -> Optional[DescribeAppResponse]:
485492
try:
486-
return self._describe_sacct(app_id)
487-
except subprocess.CalledProcessError:
488493
return self._describe_squeue(app_id)
494+
except CalledProcessError as e:
495+
# NOTE: squeue errors out with 'slurm_load_jobs error: Invalid job id specified'
496+
# if the job does not exist or has finished (e.g. not in PENDING or RUNNING states)
497+
# in this case, fall back to the less descriptive but more persistent sacct
498+
# (slurm cluster must have accounting storage enabled for sacct to work)
499+
log.info(
500+
"unable to get job info for `{}` with `squeue` ({}), trying `sacct`".format(
501+
app_id, e.stderr
502+
)
503+
)
504+
return self._describe_sacct(app_id)
489505

490506
def _describe_sacct(self, app_id: str) -> Optional[DescribeAppResponse]:
491-
p = subprocess.run(
492-
["sacct", "--parsable2", "-j", app_id], stdout=subprocess.PIPE, check=True
493-
)
494-
output = p.stdout.decode("utf-8").split("\n")
507+
try:
508+
output = subprocess.check_output(
509+
["sacct", "--parsable2", "-j", app_id],
510+
stderr=PIPE,
511+
encoding="utf-8",
512+
).split("\n")
513+
except CalledProcessError as e:
514+
log.info(
515+
"unable to get job info for `{}` with `sacct` ({})".format(
516+
app_id, e.stderr
517+
)
518+
)
519+
return None
520+
495521
if len(output) <= 1:
496522
return None
497523

@@ -511,11 +537,7 @@ def _describe_sacct(self, app_id: str) -> Optional[DescribeAppResponse]:
511537

512538
state = row["State"]
513539
msg = state
514-
state_enum = SLURM_STATES.get(state)
515-
assert (
516-
state_enum
517-
), f"failed to translate slurm state {state} to torchx state"
518-
app_state = state_enum
540+
app_state = appstate_from_slurm_state(state)
519541

520542
role, _, replica_id = row["JobName"].rpartition("-")
521543
if not replica_id or not role:
@@ -540,46 +562,96 @@ def _describe_sacct(self, app_id: str) -> Optional[DescribeAppResponse]:
540562
msg=msg,
541563
)
542564

543-
def _describe_squeue(self, app_id: str) -> Optional[DescribeAppResponse]:
544-
p = subprocess.run(
545-
["squeue", "--json", "-j", app_id], stdout=subprocess.PIPE, check=True
565+
def _describe_squeue(self, app_id: str) -> DescribeAppResponse:
566+
# squeue errors out with 'slurm_load_jobs error: Invalid job id specified'
567+
# if the job does not exist or is finished (e.g. not in PENDING or RUNNING state)
568+
output = subprocess.check_output(
569+
["squeue", "--json", "-j", app_id], stderr=PIPE, encoding="utf-8"
546570
)
547-
output_json = json.loads(p.stdout.decode("utf-8"))
548-
549-
roles = {}
550-
roles_statuses = {}
551-
msg = ""
552-
app_state = AppState.UNKNOWN
553-
for job in output_json["jobs"]:
554-
state = job["job_state"][0]
555-
msg = state
556-
state_enum = SLURM_STATES.get(state)
557-
assert (
558-
state_enum
559-
), f"failed to translate slurm state {state} to torchx state"
560-
app_state = state_enum
561-
562-
role, _, replica_id = job["name"].rpartition("-")
563-
if not replica_id or not role:
564-
# name should always have at least 3 parts but sometimes sacct
565-
# is slow to update
566-
continue
567-
if role not in roles:
568-
roles[role] = Role(name=role, num_replicas=0, image="")
569-
roles_statuses[role] = RoleStatus(role, [])
570-
roles[role].num_replicas += 1
571-
roles_statuses[role].replicas.append(
572-
ReplicaStatus(
573-
id=int(replica_id), role=role, state=app_state, hostname=""
571+
print(f"*********************")
572+
print(f"{json.dumps(output, indent=2)}")
573+
print(f"{output=}")
574+
print(f"*********************")
575+
output_json = json.loads(output)
576+
jobs = output_json["jobs"]
577+
578+
roles: dict[str, Role] = {}
579+
roles_statuses: dict[str, RoleStatus] = {}
580+
state = AppState.UNKNOWN
581+
582+
for job in jobs:
583+
# job name is of the form "{role_name}-{replica_id}"
584+
role_name, _, replica_id = job["name"].rpartition("-")
585+
586+
entrypoint = job["command"]
587+
image = job["current_working_directory"]
588+
state = appstate_from_slurm_state(job["job_state"][0])
589+
590+
job_resources = job["job_resources"]
591+
592+
role = roles.setdefault(
593+
role_name,
594+
Role(
595+
name=role_name,
596+
image=image,
597+
entrypoint=entrypoint,
598+
num_replicas=0,
574599
),
575600
)
601+
role_status = roles_statuses.setdefault(
602+
role_name,
603+
RoleStatus(role_name, replicas=[]),
604+
)
605+
606+
if state == AppState.PENDING:
607+
# NOTE: torchx launched jobs points to exactly one host
608+
# otherwise, scheduled_nodes could be a node list expression (eg. 'slurm-compute-node[0-20,21,45-47]')
609+
hostname = job_resources["scheduled_nodes"]
610+
role.num_replicas += 1
611+
role_status.replicas.append(
612+
ReplicaStatus(
613+
id=int(replica_id),
614+
role=role_name,
615+
state=state,
616+
hostname=hostname,
617+
)
618+
)
619+
else: # state == AppState.RUNNING
620+
# NOTE: torchx schedules on slurm with sbatch + heterogenous job
621+
# where each replica is a "sub-job" so `allocated_nodes` will always be 1
622+
# but we deal with jobs that have not been launched with torchx
623+
# which can have multiple hosts per sub-job (count them as replicas)
624+
node_infos = job_resources.get("allocated_nodes", [])
625+
print(f"***** {node_infos=}")
626+
for node_info in node_infos:
627+
# NOTE: we expect resource specs for all the nodes to be the same
628+
# NOTE: use allocated (not used/requested) memory since
629+
# users may only specify --cpu, in which case slurm
630+
# uses the (system) configured {mem-per-cpu} * {cpus}
631+
# to allocate memory.
632+
# NOTE: getting gpus is tricky because it modeled as a trackable-resource
633+
# or not configured at all (use total-cpu-on-host as proxy for gpus)
634+
cpu = int(node_info["cpus_used"])
635+
memMB = int(node_info["memory_allocated"])
636+
637+
hostname = node_info["nodename"]
638+
639+
role.resource = Resource(cpu=cpu, memMB=memMB, gpu=-1)
640+
role.num_replicas += 1
641+
role_status.replicas.append(
642+
ReplicaStatus(
643+
id=int(replica_id),
644+
role=role_name,
645+
state=state,
646+
hostname=hostname,
647+
)
648+
)
576649

577650
return DescribeAppResponse(
578651
app_id=app_id,
579652
roles=list(roles.values()),
580653
roles_statuses=list(roles_statuses.values()),
581-
state=app_state,
582-
msg=msg,
654+
state=state,
583655
)
584656

585657
def log_iter(

0 commit comments

Comments
 (0)