Skip to content

Commit ab9a241

Browse files
committed
Added option to submit to control target number of nodes to spawn
1 parent 6ba7fbc commit ab9a241

File tree

2 files changed

+8
-4
lines changed

2 files changed

+8
-4
lines changed

sparklespray/job_queue.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ def reset_task(self, task_id, status=STATUS_PENDING):
146146
task = self.task_storage.get_task(task_id)
147147
self._reset_task(task, status)
148148

149-
def submit(self, job_id, args, kube_job_spec, metadata, cluster):
149+
def submit(self, job_id, args, kube_job_spec, metadata, cluster, target_node_count):
150150
kube_job_spec = json.dumps(kube_job_spec)
151151
tasks = []
152152
now = time.time()
@@ -170,7 +170,7 @@ def submit(self, job_id, args, kube_job_spec, metadata, cluster):
170170
task_index += 1
171171

172172
job = Job(job_id=job_id, tasks=[t.task_id for t in tasks], kube_job_spec=kube_job_spec, metadata=metadata, cluster=cluster, status=JOB_STATUS_SUBMITTED,
173-
submit_time=time.time())
173+
submit_time=time.time(), target_node_count)
174174
self.job_storage.insert(job, batch=batch)
175175
batch.flush()
176176
#log.info("Saved task definition batch containing %d tasks", len(batch))

sparklespray/submit.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ class SubmitConfig(BaseModel):
4545
mount_point: str
4646
kubequeconsume_url: str
4747
gpu_count: int
48+
target_node_count: int
4849

4950

5051
class ExistingJobException(Exception):
@@ -253,7 +254,7 @@ def submit(jq: JobQueue, io: IO, cluster: Cluster, job_id: str, spec: dict, conf
253254
monitor_port=monitor_port)
254255

255256
jq.submit(job_id, list(zip(task_spec_urls, command_result_urls, log_urls)),
256-
pipeline_spec, metadata, cluster_name)
257+
pipeline_spec, metadata, cluster_name, target_node_count)
257258

258259

259260
def new_job_id():
@@ -373,6 +374,8 @@ def add_submit_cmd(subparser):
373374
parser.add_argument("--results", action="append",
374375
help="Wildcard to use to find results which will be uploaded. (defaults to '*') Can be specified multiple times",
375376
default=None, dest="results_wildcards")
377+
parser.add_argument(
378+
"--nodes", help="Max number of VMs to start up to run these tasks", type=int, default=1)
376379
parser.add_argument("--cd", help="The directory to change to before executing the command", default=".",
377380
dest="working_dir")
378381
parser.add_argument(
@@ -506,7 +509,8 @@ def submit_cmd(jq, io, cluster, args, config):
506509
zones=config['zones'],
507510
mount_point=config.get("mount", "/mnt/"),
508511
kubequeconsume_url=kubequeconsume_exe_url,
509-
gpu_count=gpu_count
512+
gpu_count=gpu_count,
513+
target_node_count=args.nodes
510514
)
511515

512516
cluster_name = None

0 commit comments

Comments
 (0)