Skip to content

Commit 93b65a2

Browse files
committed
bugfixes for running inside docker container, preemptable, and initial node count
1 parent ab9a241 commit 93b65a2

File tree

7 files changed

+55
-25
lines changed

7 files changed

+55
-25
lines changed

go/src/github.com/broadinstitute/kubequeconsume/consumer.go

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -49,13 +49,15 @@ type TaskStatusNotification struct {
4949
}
5050

5151
type Job struct {
52-
JobID int `datastore:"job_id"`
53-
Tasks []string `datastore:"tasks"`
54-
KubeJobSpec string `datastore:"kube_job_spec"`
55-
Metadata string `datastore:"metadata"`
56-
Cluster string `datastore:"cluster"`
57-
Status string `datastore:"status"`
58-
SubmitTime float64 `datastore:"submit_time"`
52+
JobID int `datastore:"job_id"`
53+
Tasks []string `datastore:"tasks"`
54+
KubeJobSpec string `datastore:"kube_job_spec"`
55+
Metadata string `datastore:"metadata"`
56+
Cluster string `datastore:"cluster"`
57+
Status string `datastore:"status"`
58+
SubmitTime float64 `datastore:"submit_time"`
59+
MaxPreemptableAttempts int32 `datastore:"max_preemptable_attempts"`
60+
TargetNodeCount int32 `datastore:"target_node_count"`
5961
}
6062

6163
type ClusterKeys struct {

sparklespray/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
__version__ = '3.2.0'
1+
__version__ = '3.3.1'

sparklespray/cluster_service.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -340,8 +340,8 @@ def get_running_tasks_with_invalid_owner(self) -> List[str]:
340340
else:
341341
node_req = node_req_by_instance_name[instance_name]
342342
if node_req.status == NODE_REQ_COMPLETE:
343-
log.warning("task status = {}, but node_req was {}".format(
344-
task.status, node_req.status))
343+
log.warning("task {} status = {}, but node_req was {}".format(
344+
task.task_id, task.status, node_req.status))
345345
if node_req.node_class != NODE_REQ_CLASS_PREEMPTIVE:
346346
log.error(
347347
"instance %s terminated but task %s was reported to still be using instance and the instance was not preemptiable", instance_name, task.task_id)

sparklespray/job_queue.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ def reset_task(self, task_id, status=STATUS_PENDING):
146146
task = self.task_storage.get_task(task_id)
147147
self._reset_task(task, status)
148148

149-
def submit(self, job_id, args, kube_job_spec, metadata, cluster, target_node_count):
149+
def submit(self, job_id, args, kube_job_spec, metadata, cluster, target_node_count, max_preemptable_attempts):
150150
kube_job_spec = json.dumps(kube_job_spec)
151151
tasks = []
152152
now = time.time()
@@ -170,7 +170,7 @@ def submit(self, job_id, args, kube_job_spec, metadata, cluster, target_node_cou
170170
task_index += 1
171171

172172
job = Job(job_id=job_id, tasks=[t.task_id for t in tasks], kube_job_spec=kube_job_spec, metadata=metadata, cluster=cluster, status=JOB_STATUS_SUBMITTED,
173-
submit_time=time.time(), target_node_count)
173+
submit_time=time.time(), target_node_count=target_node_count, max_preemptable_attempts=max_preemptable_attempts)
174174
self.job_storage.insert(job, batch=batch)
175175
batch.flush()
176176
#log.info("Saved task definition batch containing %d tasks", len(batch))

sparklespray/job_store.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@ class Job(object):
2424
cluster = attr.ib()
2525
status = attr.ib()
2626
submit_time = attr.ib()
27+
max_preemptable_attempts = attr.ib()
2728
target_node_count = attr.ib(default=1)
28-
max_preemptable_attempts = attr.ib(default=0)
2929

3030

3131
JOB_STATUS_SUBMITTED = "submitted"
@@ -47,6 +47,8 @@ def job_to_entity(client, o):
4747
entity['metadata'] = metadata
4848
entity['status'] = o.status
4949
entity['submit_time'] = o.submit_time
50+
entity['target_node_count'] = o.target_node_count
51+
entity['max_preemptable_attempts'] = o.max_preemptable_attempts
5052

5153
return entity
5254

@@ -59,7 +61,9 @@ def entity_to_job(entity):
5961
kube_job_spec=entity.get('kube_job_spec'),
6062
metadata=dict([(m['name'], m['value']) for m in metadata]),
6163
status=entity['status'],
62-
submit_time=entity.get('submit_time'))
64+
submit_time=entity.get('submit_time'),
65+
target_node_count=entity.get('target_node_count', 1),
66+
max_preemptable_attempts=entity.get('max_preemptable_attempts', 0))
6367

6468

6569
class JobStore:

sparklespray/main.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -455,6 +455,8 @@ def kill_cmd(jq: JobQueue, cluster, args):
455455

456456
# if there are any sit sitting at pending, mark them as killed
457457
tasks = jq.task_storage.get_tasks(jobid, status=STATUS_PENDING)
458+
txtui.user_print(
459+
"Marking {} pending tasks as killed".format(len(tasks)))
458460
for task in tasks:
459461
jq.reset_task(task.task_id, status=STATUS_KILLED)
460462

sparklespray/submit.py

Lines changed: 33 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -170,10 +170,11 @@ def _parse_mem_limit(txt):
170170

171171
def _make_cluster_name(job_name, image, machine_type, unique_name):
172172
import hashlib
173-
import os
173+
174174
if unique_name:
175175
return 'l-' + random_string(20)
176-
return "c-" + hashlib.md5("{}-{}-{}-{}-{}".format(job_name, image, machine_type, sparklespray.__version__, os.getlogin()).encode("utf8")).hexdigest()[:20]
176+
else:
177+
return "c-" + hashlib.md5(f"{job_name}-{image}-{machine_type}-{sparklespray.__version__}".encode("utf8")).hexdigest()[:20]
177178

178179

179180
def submit(jq: JobQueue, io: IO, cluster: Cluster, job_id: str, spec: dict, config: SubmitConfig, metadata: dict = {},
@@ -253,8 +254,12 @@ def submit(jq: JobQueue, io: IO, cluster: Cluster, job_id: str, spec: dict, conf
253254
machine_specs=machine_specs,
254255
monitor_port=monitor_port)
255256

257+
max_preemptable_attempts = 0
258+
if preemptible:
259+
max_preemptable_attempts = config.target_node_count * 2
260+
256261
jq.submit(job_id, list(zip(task_spec_urls, command_result_urls, log_urls)),
257-
pipeline_spec, metadata, cluster_name, target_node_count)
262+
pipeline_spec, metadata, cluster_name, config.target_node_count, max_preemptable_attempts)
258263

259264

260265
def new_job_id():
@@ -378,6 +383,9 @@ def add_submit_cmd(subparser):
378383
"--nodes", help="Max number of VMs to start up to run these tasks", type=int, default=1)
379384
parser.add_argument("--cd", help="The directory to change to before executing the command", default=".",
380385
dest="working_dir")
386+
parser.add_argument(
387+
"--skipifexists", help="If the job with this name already exists, do not submit a new one",
388+
action="store_true")
381389
parser.add_argument(
382390
"--symlinks",
383391
help="When localizing files, use symlinks instead of copying files into location. This should only be used when the uploaded files will not be modified by the job.",
@@ -386,6 +394,8 @@ def add_submit_cmd(subparser):
386394
"--local", help="Run the tasks inside of docker on the local machine", action="store_true")
387395
parser.add_argument(
388396
"--rerun", help="If set, will download all of the files from previous execution of this job to worker before running", action="store_true")
397+
parser.add_argument("--preemptible", action="store_true",
398+
help="If set, will try to turn on nodes initally as preemptible nodes")
389399
parser.add_argument("command", nargs=argparse.REMAINDER)
390400
parser.add_argument("--gpu_count", type=int,
391401
help="Number of gpus on your VM", default=0)
@@ -406,10 +416,15 @@ def submit_cmd(jq, io, cluster, args, config):
406416
else:
407417
image = config['default_image']
408418

409-
preemptible_flag = config.get("preemptible", "n").lower()
410-
if preemptible_flag not in ['y', 'n']:
411-
raise Exception(
412-
"setting 'preemptable' in config must either by 'y' or 'n' but was: {}".format(preemptible_flag))
419+
if args.preemptible:
420+
preemptible = True
421+
else:
422+
preemptible_flag = config.get("preemptible", "n").lower()
423+
if preemptible_flag not in ['y', 'n']:
424+
raise Exception(
425+
"setting 'preemptible' in config must either by 'y' or 'n' but was: {}".format(preemptible_flag))
426+
preemptible = preemptible_flag == 'y'
427+
413428
bootDiskSizeGb = _get_bootDiskSizeGb(config)
414429
default_url_prefix = config.get("default_url_prefix", "")
415430
work_dir = config.get("local_work_dir", os.path.expanduser(
@@ -418,7 +433,14 @@ def submit_cmd(jq, io, cluster, args, config):
418433
job_id = args.name
419434
if job_id is None:
420435
job_id = new_job_id()
421-
436+
elif args.skipifexists:
437+
job = jq.get_job(job_id, must=False)
438+
if job is not None:
439+
txtui.user_print(
440+
f"Found existing job {job_id} and submitted job with --skipifexists so aborting")
441+
return 0
442+
443+
target_node_count = args.nodes
422444
machine_type = config['machine_type']
423445
if args.machine_type:
424446
machine_type = args.machine_type
@@ -498,7 +520,7 @@ def submit_cmd(jq, io, cluster, args, config):
498520

499521
log.debug("spec: %s", json.dumps(spec, indent=2))
500522

501-
submit_config = SubmitConfig(preemptible=preemptible_flag == 'y',
523+
submit_config = SubmitConfig(preemptible=preemptible,
502524
bootDiskSizeGb=bootDiskSizeGb,
503525
default_url_prefix=default_url_prefix,
504526
machine_type=machine_type,
@@ -510,7 +532,7 @@ def submit_cmd(jq, io, cluster, args, config):
510532
mount_point=config.get("mount", "/mnt/"),
511533
kubequeconsume_url=kubequeconsume_exe_url,
512534
gpu_count=gpu_count,
513-
target_node_count=args.nodes
535+
target_node_count=target_node_count
514536
)
515537

516538
cluster_name = None
@@ -539,7 +561,7 @@ def submit_cmd(jq, io, cluster, args, config):
539561
if not (args.dryrun or args.skip_kube_submit) and args.wait_for_completion:
540562
log.info("Waiting for job to terminate")
541563
successful_execution = watch(
542-
io, jq, job_id, cluster, target_nodes=1, loglive=True)
564+
io, jq, job_id, cluster, target_nodes=target_node_count, loglive=True)
543565
finished = True
544566

545567
if finished:

0 commit comments

Comments
 (0)