@@ -13,10 +13,11 @@ import argparse
13
13
from datetime import date
14
14
15
15
import os
16
- # import glob
16
+ import sys
17
17
import shlex
18
18
import subprocess
19
19
import socket
20
+ import shutil
20
21
21
22
# https://stackoverflow.com/questions/3503719/emulating-bash-source-in-python
22
23
def source (fn ):
@@ -71,6 +72,8 @@ parser.add_argument('--force', dest='force', action='store_true',
71
72
help = 'send force flag to rolling_submit' )
72
73
parser .add_argument ('--validate' , dest = 'validate' , action = 'store_true' ,
73
74
help = 'check a previous run to validate true completion (special message)' )
75
+ parser .add_argument ('--job-hist' , dest = 'job_hist' , action = 'store_true' ,
76
+ help = 'run jobhist on each step (saves jobhist.txt)' )
74
77
parser .add_argument ('--cancel' , dest = 'cancel' , action = 'store_true' ,
75
78
help = 'cancel jobs started by pipeline (bounded by start-at / stop-at)' )
76
79
parser .add_argument ('--use-partition' , nargs = 1 , type = str , default = ['' ],
@@ -92,6 +95,7 @@ date_str = args['date_str'][0]
92
95
no_run = args ['no_run' ]
93
96
force = args ['force' ]
94
97
validate = args ['validate' ]
98
+ job_hist = args ['job_hist' ]
95
99
cancel = args ['cancel' ]
96
100
mock_run = args ['mock_run' ]
97
101
use_partition = args ['use_partition' ][0 ]
@@ -185,13 +189,6 @@ with open(workflow_file, 'r') as f:
185
189
# ipack == -1, jobs are never packed (slurm packing)
186
190
nopack = (not arg_pack ) if ipack == 0 else (ipack < 0 )
187
191
188
- if nopack :
189
- # have to use packing in order to use gpus.
190
- # i.e., slurm can not allow jobs to share gpus (without cuda MPS management enabled).
191
- assert ( all ([x == 0 for x in ngpus .values ()])) # xxx - not allowed
192
- # without packing, all the wall times have to match
193
- assert ( all ([x == times [cuse_partition ] for x in times .values ()]) )
194
-
195
192
# mrolling mode (chunk_size > 0) always uses all partitions specified
196
193
use_mroll = (chunk_size > 0 )
197
194
wait_str = '-W' if use_mroll else ''
@@ -203,6 +200,13 @@ with open(workflow_file, 'r') as f:
203
200
job_ids [name ] = - 1
204
201
continue
205
202
203
+ if nopack :
204
+ # have to use packing in order to use gpus.
205
+ # i.e., slurm can not allow jobs to share gpus (without cuda MPS management enabled).
206
+ assert ( all ([x == 0 for x in ngpus .values ()])) # xxx - not allowed
207
+ # without packing, all the wall times have to match
208
+ assert ( all ([x == times [cuse_partition ] for x in times .values ()]) )
209
+
206
210
# get the dependencies
207
211
deps = [x for x in deps if x ]
208
212
deps_ids = [job_ids [x ] for x in deps if job_ids [x ] > 0 ] + base_deps
@@ -251,15 +255,17 @@ with open(workflow_file, 'r') as f:
251
255
cmd = "mrolling_submit --wait --swarm {}" .format (os .path .join ('..' , swarm_file ))
252
256
cmd += " --sleep_time {} --swarm_chunksize {}" .format (sleep_time , chunk_size )
253
257
if nopack or mroll_all :
254
- njob_cutoff = max_array - 2 * chunk_size - 1
255
- # because we default to using all partitions to slurm with nopack,
256
- # then we do not also need to specify the partitions to mrolling.
257
- njob_cutoff_str = "--njob_cutoffs {}" .format (njob_cutoff )
258
- mspartition = "--partitions {}" .format (partition )
259
258
if not nopack :
260
259
assert ( all ([x == nproc for x in nprocs .values ()])) # need equal packing
261
260
gpu_str = ''
262
261
pack_str = '-p {}' .format (nproc )
262
+ njob_cutoff = max_array - 2 * chunk_size - 1
263
+ else :
264
+ njob_cutoff = (max_array - 1 )// npartitions - 2 * chunk_size - 1
265
+ # because we default to using all partitions to slurm with nopack,
266
+ # then we do not also need to specify the partitions to mrolling.
267
+ njob_cutoff_str = "--njob_cutoffs {}" .format (njob_cutoff )
268
+ mspartition = "--partitions {}" .format (partition )
263
269
swarm_opts = (" --swarm_opts " + _swarm_opts ())
264
270
else :
265
271
njob_cutoff_str = "--njob_cutoffs \" "
@@ -287,8 +293,9 @@ with open(workflow_file, 'r') as f:
287
293
288
294
if use_mroll :
289
295
top_swarm = 'top_' + swarm_file
290
- with open (top_swarm , 'w' ) as fh :
291
- fh .write (cmd + '\n ' )
296
+ if not (validate or cancel or job_hist ):
297
+ with open (top_swarm , 'w' ) as fh :
298
+ fh .write (cmd + '\n ' )
292
299
293
300
cmd = "rolling_submit --swarms {}" .format (top_swarm )
294
301
time = mroll_time
@@ -305,13 +312,17 @@ with open(workflow_file, 'r') as f:
305
312
print ('Processing line:' )
306
313
print (' ' .join (sline ))
307
314
job_id = 0
308
- if not (validate or cancel ):
315
+ if not (validate or cancel or job_hist ):
309
316
assert (os .path .isfile (swarm_file )) # swarm file missing
310
317
job_id = echo_and_run (cmd , mock_run , cmd_no if no_run else - 1 )
311
318
print (); print (); print ()
312
319
else :
313
320
subdir = '_' + (top_swarm if top_swarm else swarm_file )
314
- with open (os .path .join (subdir ,'job_id.txt' ), 'r' ) as jf :
321
+ fn = os .path .join (subdir , 'job_id.txt' )
322
+ if not os .path .isfile (fn ) and job_hist :
323
+ job_ids [name ] = - 1
324
+ continue
325
+ with open (fn , 'r' ) as jf :
315
326
for jline in jf :
316
327
cjline = jline .strip ()
317
328
if cjline :
@@ -352,6 +363,15 @@ with open(workflow_file, 'r') as f:
352
363
print ('\t ' + cmd )
353
364
args = shlex .split (cmd )
354
365
scancel = subprocess .Popen (args ); scancel .wait ()
366
+ elif job_hist :
367
+ jobhist_txt = os .path .join (subdir , 'jobhist.txt' )
368
+ jobhist_script = os .path .join (os .path .dirname (__file__ ), 'jobhist' )
369
+ _job_id = os .path .join (subdir , '_' + swarm_file ) if top_swarm else job_id
370
+ args = shlex .split ("{} {} {}" .format (sys .executable , jobhist_script , _job_id ))
371
+ with open (jobhist_txt , 'w' ) as jh :
372
+ jobhist = subprocess .Popen (args , stdout = jh , stderr = jh )
373
+ jobhist .wait ()
374
+ shutil .copyfile (jobhist_txt , subdir + '-jobhist.txt' )
355
375
else :
356
376
assert (False ) # you should not be here
357
377
# else - if not (validate or cancel):
0 commit comments