@@ -18,16 +18,11 @@ def _wait_for_completed_worker(results):
18
18
return None
19
19
20
20
while True :
21
- completed = False
22
21
for worker_id , result in results .items ():
23
22
if result is None or result .done ():
24
- completed = True
25
- break
26
- if completed :
27
- del results [worker_id ]
28
- return worker_id
29
- else :
30
- time .sleep (1 )
23
+ del results [worker_id ]
24
+ return worker_id
25
+ time .sleep (1 )
31
26
32
27
33
28
def _run (tasks , azure_json , num_cores , mem_size_gb , n_running_containers , delete_group_container_on_complete , batch_id ):
@@ -58,7 +53,10 @@ def _run(tasks, azure_json, num_cores, mem_size_gb, n_running_containers, delete
58
53
59
54
# Run until completion.
60
55
container_counter = 0
61
- results = {x : None for x in range (n_running_containers )}
56
+ n_tasks = len (tasks )
57
+ n_containers = min (n_tasks , n_running_containers )
58
+ results = {x : None for x in range (n_containers )}
59
+ container_group_names = set ()
62
60
while len (tasks ) != 0 :
63
61
params = tasks .pop (0 )
64
62
worker_id = _wait_for_completed_worker (results )
@@ -95,10 +93,11 @@ def _run(tasks, azure_json, num_cores, mem_size_gb, n_running_containers, delete
95
93
restart_policy = ContainerGroupRestartPolicy .never ,
96
94
)
97
95
container_group_name = f"powerlift-container-group-{ worker_id } -{ batch_id } "
98
-
99
96
result = aci_client .container_groups .begin_create_or_update (
100
97
resource_group .name , container_group_name , container_group
101
98
)
99
+
100
+ container_group_names .add (container_group_name )
102
101
results [worker_id ] = result
103
102
104
103
# Wait for all container groups to complete
@@ -107,8 +106,7 @@ def _run(tasks, azure_json, num_cores, mem_size_gb, n_running_containers, delete
107
106
108
107
# Delete all container groups
109
108
if delete_group_container_on_complete :
110
- for worker_id in range (n_running_containers ):
111
- container_group_name = f"powerlift-container-group-{ worker_id } -{ batch_id } "
109
+ for container_group_name in container_group_names :
112
110
aci_client .container_groups .begin_delete (
113
111
resource_group_name , container_group_name
114
112
)
@@ -170,7 +168,7 @@ def __init__(
170
168
"resource_group" : resource_group ,
171
169
}
172
170
self ._batch_id = random .getrandbits (64 )
173
- super ().__init__ (store = store , n_cpus = n_running_containers , raise_exception = raise_exception , wheel_filepaths = wheel_filepaths )
171
+ super ().__init__ (store = store , n_cpus = 1 , raise_exception = raise_exception , wheel_filepaths = wheel_filepaths )
174
172
175
173
def delete_credentials (self ):
176
174
"""Deletes credentials in object for accessing Azure Resources."""
0 commit comments