diff --git a/README.md b/README.md index fc778a0..f5ed8c3 100644 --- a/README.md +++ b/README.md @@ -260,6 +260,27 @@ Here are some useful commands: sacctmgr -n -p list assoc where user=$USER | awk '-F|' '{print " "$2}' ``` +### Waiting for Jobs + +Use `gridtk wait` to block until all jobs finish: +```bash +$ gridtk wait +Waiting for 3 job(s)... (checking every 10s) +Job 1: COMPLETED (0) +Job 2: COMPLETED (0) +Job 3: FAILED (1) +``` + +`gridtk wait` exits with code 1 if any job failed, making it easy to chain: +```bash +$ gridtk submit job.sh && gridtk wait && echo "All done!" +``` + +You can filter which jobs to wait for and change the polling interval: +```bash +$ gridtk wait -j 1,2 --interval 30 +``` + ### Tab Completion GridTK supports tab completion for the `gridtk` command. To enable it, add the following diff --git a/src/gridtk/cli.py b/src/gridtk/cli.py index da33737..dbefd0c 100644 --- a/src/gridtk/cli.py +++ b/src/gridtk/cli.py @@ -186,9 +186,10 @@ def cli(ctx, database, logs_dir): @cli.result_callback() def process_result(result, **kwargs): - """Delete the job manager from the context.""" + """Clean up empty databases and dispose the job manager.""" ctx = click.get_current_context() - del ctx.meta["job_manager"] + job_manager = ctx.meta.pop("job_manager") + job_manager.cleanup_empty_database() @cli.command( @@ -636,6 +637,84 @@ def delete( session.commit() +@cli.command() +@job_filters +@click.option( + "--interval", + default=10, + type=click.INT, + help="Polling interval in seconds.", +) +@click.pass_context +def wait(ctx, job_ids, states, names, dependents, interval): + """Wait for jobs to finish. Exits with code 1 if any job failed.""" + import time + + from .manager import JobManager + + job_manager: JobManager = ctx.meta["job_manager"] + # Terminal states - jobs in these states won't change + terminal_states = { + "BOOT_FAIL", + "CANCELLED", + "COMPLETED", + "DEADLINE", + "FAILED", + "NODE_FAIL", + "OUT_OF_MEMORY", + "PREEMPTED", + "REVOKED", + "SPECIAL_EXIT", + "TIMEOUT", + } + # Failed states - if any job ends in these, exit code 1 + failed_states = { + "BOOT_FAIL", + "CANCELLED", + "DEADLINE", + "FAILED", + "NODE_FAIL", + "OUT_OF_MEMORY", + "PREEMPTED", + "REVOKED", + "SPECIAL_EXIT", + "TIMEOUT", + } + + while True: + with job_manager as session: + jobs = job_manager.list_jobs( + job_ids=job_ids, states=states, names=names, dependents=dependents + ) + if not jobs: + click.echo("No jobs found.") + return + + all_terminal = all(job.state in terminal_states for job in jobs) + if all_terminal: + any_failed = any(job.state in failed_states for job in jobs) + for job in jobs: + click.echo(f"Job {job.id}: {job.state} ({job.exit_code})") + session.commit() + if any_failed: + raise SystemExit(1) + return + + # Show progress with state breakdown + from collections import Counter + + active = [j for j in jobs if j.state not in terminal_states] + counts = Counter(j.state for j in active) + breakdown = ", ".join(f"{n} {s.lower()}" for s, n in sorted(counts.items())) + click.echo( + f"Waiting for {len(active)} job(s): {breakdown}" + f" (checking every {interval}s)" + ) + session.commit() + + time.sleep(interval) + + @cli.command() @job_filters @click.option( diff --git a/src/gridtk/manager.py b/src/gridtk/manager.py index dee41af..85e193f 100644 --- a/src/gridtk/manager.py +++ b/src/gridtk/manager.py @@ -302,8 +302,8 @@ def resubmit_jobs(self, **kwargs): self.session.add(job) return jobs - def __del__(self): - # if there are no jobs in the database, delete the database file and the logs directory (if empty) + def cleanup_empty_database(self): + """Delete the database file and logs directory if no jobs remain.""" with self: if ( Path(self.database).exists() @@ -312,4 +312,6 @@ def __del__(self): Path(self.database).unlink() if self.logs_dir.exists() and len(os.listdir(self.logs_dir)) == 0: shutil.rmtree(self.logs_dir) + + def __del__(self): self.engine.dispose() diff --git a/tests/test_gridtk.py b/tests/test_gridtk.py index 76bf09e..2f1e7ec 100644 --- a/tests/test_gridtk.py +++ b/tests/test_gridtk.py @@ -701,7 +701,6 @@ def test_list_json(mock_check_output, runner): _submit_job( runner=runner, mock_check_output=mock_check_output, job_id=submit_job_id ) - mock_check_output.return_value = _pending_job_sacct_json(submit_job_id) result = runner.invoke(cli, ["list", "--json"]) assert_click_runner_result(result) @@ -732,6 +731,34 @@ def test_submit_json(mock_check_output, runner): assert data["name"] == "gridtk" +@patch("subprocess.check_output") +def test_wait_command(mock_check_output, runner): + # Test wait with COMPLETED job (exit code 0) + with runner.isolated_filesystem(): + submit_job_id = 9876543 + _submit_job( + runner=runner, mock_check_output=mock_check_output, job_id=submit_job_id + ) + mock_check_output.return_value = json.dumps( + _jobs_sacct_dict([submit_job_id], "COMPLETED", "None", "node001") + ) + result = runner.invoke(cli, ["wait"]) + assert_click_runner_result(result) + assert "Job 1: COMPLETED" in result.output + + # Test wait with FAILED job (exit code 1) + with runner.isolated_filesystem(): + submit_job_id = 9876543 + mock_check_output.side_effect = None + _submit_job( + runner=runner, mock_check_output=mock_check_output, job_id=submit_job_id + ) + mock_check_output.return_value = _failed_job_sacct_json(submit_job_id) + result = runner.invoke(cli, ["wait"]) + assert_click_runner_result(result, exit_code=1) + assert "Job 1: FAILED" in result.output + + if __name__ == "__main__": import sys