1
+ /*
2
+ * Copyright 2023-, Stellenbosch University, South Africa
3
+ * Copyright 2024-, Evaluacion y Desarrollo de Negocios, Spain
4
+ *
5
+ * Licensed under the Apache License, Version 2.0 (the "License");
6
+ * you may not use this file except in compliance with the License.
7
+ * You may obtain a copy of the License at
8
+ *
9
+ * http://www.apache.org/licenses/LICENSE-2.0
10
+ *
11
+ * Unless required by applicable law or agreed to in writing, software
12
+ * distributed under the License is distributed on an "AS IS" BASIS,
13
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
+ * See the License for the specific language governing permissions and
15
+ * limitations under the License.
16
+ */
17
+
18
+ package nextflow.nomad.builders
19
+
20
+ import io.nomadproject.client.model.Affinity
21
+ import io.nomadproject.client.model.Constraint
22
+ import io.nomadproject.client.model.Job
23
+ import io.nomadproject.client.model.Spread
24
+ import io.nomadproject.client.model.TaskGroup
25
+ import io.nomadproject.client.model.Task
26
+ import io.nomadproject.client.model.ReschedulePolicy
27
+ import io.nomadproject.client.model.RestartPolicy
28
+ import io.nomadproject.client.model.Resources
29
+ import io.nomadproject.client.model.Template
30
+ import io.nomadproject.client.model.VolumeMount
31
+ import io.nomadproject.client.model.VolumeRequest
32
+ import nextflow.nomad.config.NomadJobOpts
33
+ import nextflow.nomad.executor.TaskDirectives
34
+ import nextflow.nomad.models.ConstraintsBuilder
35
+ import nextflow.nomad.models.JobConstraints
36
+ import nextflow.nomad.models.JobSpreads
37
+ import nextflow.nomad.models.JobVolume
38
+ import nextflow.nomad.models.SpreadsBuilder
39
+ import nextflow.processor.TaskRun
40
+ import nextflow.util.MemoryUnit
41
+
42
+
43
+ import groovy.transform.CompileStatic
44
+ import groovy.util.logging.Slf4j
45
+
46
+ /**
47
+ *
48
+ * @author Abhinav Sharma <[email protected] >
49
+ */
50
+
51
+ @Slf4j
52
+ @CompileStatic
53
+ class JobBuilder {
54
+ private Job job = new Job ()
55
+
56
+ JobBuilder withId (String id ) {
57
+ job. ID = id
58
+ return this
59
+ }
60
+
61
+ JobBuilder withName (String name ) {
62
+ job. name = name
63
+ return this
64
+ }
65
+
66
+ JobBuilder withType (String type ) {
67
+ job. type = type
68
+ return this
69
+ }
70
+
71
+ JobBuilder withDatacenters (List<String > datacenters ) {
72
+ job. datacenters = datacenters
73
+ return this
74
+ }
75
+
76
+ static Job assignDatacenters (TaskRun task , Job job ){
77
+ def datacenters = task. processor?. config?. get(TaskDirectives . DATACENTERS )
78
+ if ( datacenters ){
79
+ if ( datacenters instanceof List<String > ) {
80
+ job. datacenters( datacenters as List<String > )
81
+ return job;
82
+ }
83
+ if ( datacenters instanceof Closure ) {
84
+ String str = datacenters. call(). toString()
85
+ job. datacenters( [str])
86
+ return job;
87
+ }
88
+ job. datacenters( [datacenters. toString()] as List<String > )
89
+ return job
90
+ }
91
+ job
92
+ }
93
+
94
+ JobBuilder withNamespace (String namespace ) {
95
+ job. namespace = namespace
96
+ return this
97
+ }
98
+
99
+ JobBuilder withTaskGroups (List<TaskGroup > taskGroups ) {
100
+ job. taskGroups = taskGroups
101
+ return this
102
+ }
103
+
104
+ Job build () {
105
+ return job
106
+ }
107
+
108
+ static protected Resources getResources (TaskRun task ) {
109
+ final DEFAULT_CPUS = 1
110
+ final DEFAULT_MEMORY = " 500.MB"
111
+
112
+ final taskCfg = task. getConfig()
113
+ final taskCores = ! taskCfg. get(" cpus" ) ? DEFAULT_CPUS : taskCfg. get(" cpus" ) as Integer
114
+ final taskMemory = taskCfg. get(" memory" ) ? new MemoryUnit ( taskCfg. get(" memory" ) as String ) : new MemoryUnit (DEFAULT_MEMORY )
115
+
116
+ final res = new Resources ()
117
+ .cores(taskCores)
118
+ .memoryMB(taskMemory. toMega() as Integer )
119
+
120
+ return res
121
+ }
122
+
123
+ static TaskGroup createTaskGroup (TaskRun taskRun , List<String > args , Map<String , String > env , NomadJobOpts jobOpts ){
124
+ final ReschedulePolicy taskReschedulePolicy = new ReschedulePolicy (). attempts(jobOpts. rescheduleAttempts)
125
+ final RestartPolicy taskRestartPolicy = new RestartPolicy (). attempts(jobOpts. restartAttempts)
126
+
127
+ def task = createTask(taskRun, args, env, jobOpts)
128
+ def taskGroup = new TaskGroup (
129
+ name : " group" ,
130
+ tasks : [ task ],
131
+ reschedulePolicy : taskReschedulePolicy,
132
+ restartPolicy : taskRestartPolicy
133
+ )
134
+
135
+
136
+ if ( jobOpts. volumeSpec ) {
137
+ taskGroup. volumes = [:]
138
+ jobOpts. volumeSpec. eachWithIndex { volumeSpec , idx ->
139
+ if (volumeSpec && volumeSpec. type == JobVolume . VOLUME_CSI_TYPE ) {
140
+ taskGroup. volumes[" vol_${ idx} " . toString()] = new VolumeRequest (
141
+ type : volumeSpec. type,
142
+ source : volumeSpec. name,
143
+ attachmentMode : volumeSpec. attachmentMode,
144
+ accessMode : volumeSpec. accessMode,
145
+ readOnly : volumeSpec. readOnly,
146
+ )
147
+ }
148
+
149
+ if (volumeSpec && volumeSpec. type == JobVolume . VOLUME_HOST_TYPE ) {
150
+ taskGroup. volumes[" vol_${ idx} " . toString()] = new VolumeRequest (
151
+ type : volumeSpec. type,
152
+ source : volumeSpec. name,
153
+ readOnly : volumeSpec. readOnly,
154
+ )
155
+ }
156
+ }
157
+ }
158
+ return taskGroup
159
+ }
160
+
161
+ static Task createTask (TaskRun task , List<String > args , Map<String , String > env , NomadJobOpts jobOpts ) {
162
+ final DRIVER = " docker"
163
+ final DRIVER_PRIVILEGED = true
164
+
165
+ final imageName = task. container
166
+ final workingDir = task. workDir. toAbsolutePath(). toString()
167
+ final taskResources = getResources(task)
168
+
169
+
170
+ def taskDef = new Task (
171
+ name : " nf-task" ,
172
+ driver : DRIVER ,
173
+ resources : taskResources,
174
+ config : [
175
+ image : imageName,
176
+ privileged : DRIVER_PRIVILEGED ,
177
+ work_dir : workingDir,
178
+ command : args. first(),
179
+ args : args. tail(),
180
+ ] as Map<String , Object > ,
181
+ env : env,
182
+ )
183
+
184
+ volumes(task, taskDef, workingDir, jobOpts)
185
+ affinity(task, taskDef, jobOpts)
186
+ constraint(task, taskDef, jobOpts)
187
+ constraints(task, taskDef, jobOpts)
188
+ secrets(task, taskDef, jobOpts)
189
+ return taskDef
190
+ }
191
+
192
+ static protected Task volumes (TaskRun task , Task taskDef , String workingDir , NomadJobOpts jobOpts ){
193
+ if ( jobOpts. dockerVolume){
194
+ String destinationDir = workingDir. split(File . separator). dropRight(2 ). join(File . separator)
195
+ taskDef. config. mount = [
196
+ type : " volume" ,
197
+ target : destinationDir,
198
+ source : jobOpts. dockerVolume,
199
+ readonly : false
200
+ ]
201
+ }
202
+
203
+ if ( jobOpts. volumeSpec){
204
+ taskDef. volumeMounts = []
205
+ jobOpts. volumeSpec. eachWithIndex { volumeSpec , idx ->
206
+ String destinationDir = volumeSpec. workDir ?
207
+ workingDir. split(File . separator). dropRight(2 ). join(File . separator) : volumeSpec. path
208
+ taskDef. volumeMounts. add new VolumeMount (
209
+ destination : destinationDir,
210
+ volume : " vol_${ idx} " . toString()
211
+ )
212
+ }
213
+ }
214
+
215
+ taskDef
216
+ }
217
+
218
+ static protected Task affinity (TaskRun task , Task taskDef , NomadJobOpts jobOpts ) {
219
+ if (jobOpts. affinitySpec) {
220
+ def affinity = new Affinity ()
221
+ if (jobOpts. affinitySpec. attribute) {
222
+ affinity. ltarget(jobOpts. affinitySpec. attribute)
223
+ }
224
+
225
+ affinity. operand(jobOpts. affinitySpec. operator ?: " =" )
226
+
227
+ if (jobOpts. affinitySpec. value) {
228
+ affinity. rtarget(jobOpts. affinitySpec. value)
229
+ }
230
+ if (jobOpts. affinitySpec. weight != null ) {
231
+ affinity. weight(jobOpts. affinitySpec. weight)
232
+ }
233
+ taskDef. affinities([affinity])
234
+ }
235
+ taskDef
236
+ }
237
+
238
+ protected static Task constraint (TaskRun task , Task taskDef , NomadJobOpts jobOpts ){
239
+ if ( jobOpts. constraintSpec ){
240
+ def constraint = new Constraint ()
241
+ if (jobOpts. constraintSpec. attribute){
242
+ constraint. ltarget(jobOpts. constraintSpec. attribute)
243
+ }
244
+
245
+ constraint. operand(jobOpts. constraintSpec. operator ?: " =" )
246
+
247
+ if (jobOpts. constraintSpec. value){
248
+ constraint. rtarget(jobOpts. constraintSpec. value)
249
+ }
250
+ taskDef. constraints([constraint])
251
+ }
252
+
253
+ taskDef
254
+ }
255
+
256
+ protected static Task constraints (TaskRun task , Task taskDef , NomadJobOpts jobOpts ){
257
+ def constraints = [] as List<Constraint >
258
+
259
+ if ( jobOpts. constraintsSpec ){
260
+ def list = ConstraintsBuilder . constraintsSpecToList(jobOpts. constraintsSpec)
261
+ constraints. addAll(list)
262
+ }
263
+
264
+ if ( task. processor?. config?. get(TaskDirectives . CONSTRAINTS ) &&
265
+ task. processor?. config?. get(TaskDirectives . CONSTRAINTS ) instanceof Closure ) {
266
+ Closure closure = task. processor?. config?. get(TaskDirectives . CONSTRAINTS ) as Closure
267
+ JobConstraints constraintsSpec = JobConstraints . parse(closure)
268
+ def list = ConstraintsBuilder . constraintsSpecToList(constraintsSpec)
269
+ constraints. addAll(list)
270
+ }
271
+
272
+ if ( constraints. size()) {
273
+ taskDef. constraints(constraints)
274
+ }
275
+ taskDef
276
+ }
277
+
278
+ protected static Task secrets (TaskRun task , Task taskDef , NomadJobOpts jobOpts ){
279
+ if ( jobOpts?. secretOpts?. enabled) {
280
+ def secrets = task. processor?. config?. get(TaskDirectives . SECRETS )
281
+ if (secrets) {
282
+ Template template = new Template (envvars : true , destPath : " /secrets/nf-nomad" )
283
+ String secretPath = jobOpts?. secretOpts?. path
284
+ String tmpl = secrets. collect { String name ->
285
+ " ${ name} ={{ with nomadVar \" $secretPath /${ name} \" }}{{ .${ name} }}{{ end }}"
286
+ }. join(' \n ' ). stripIndent()
287
+ template. embeddedTmpl(tmpl)
288
+ taskDef. addTemplatesItem(template)
289
+ }
290
+ }
291
+ taskDef
292
+ }
293
+
294
+ static Job spreads (TaskRun task , Job jobDef , NomadJobOpts jobOpts ){
295
+ def spreads = [] as List<Spread >
296
+ if ( jobOpts. spreadsSpec ){
297
+ def list = SpreadsBuilder . spreadsSpecToList(jobOpts. spreadsSpec)
298
+ spreads. addAll(list)
299
+ }
300
+ if ( task. processor?. config?. get(TaskDirectives . SPREAD ) &&
301
+ task. processor?. config?. get(TaskDirectives . SPREAD ) instanceof Map ) {
302
+ Map map = task. processor?. config?. get(TaskDirectives . SPREAD ) as Map
303
+ JobSpreads spreadSpec = new JobSpreads ()
304
+ spreadSpec. spread(map)
305
+ def list = SpreadsBuilder . spreadsSpecToList(spreadSpec)
306
+ spreads. addAll(list)
307
+ }
308
+
309
+ spreads. each{
310
+ jobDef. addSpreadsItem(it)
311
+ }
312
+ jobDef
313
+ }
314
+
315
+
316
+ }
0 commit comments