Skip to content
Merged
21 changes: 21 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,27 @@ Here are some useful commands:
sacctmgr -n -p list assoc where user=$USER | awk '-F|' '{print " "$2}'
```

### Waiting for Jobs

Use `gridtk wait` to block until all jobs finish:
```bash
$ gridtk wait
Waiting for 3 job(s)... (checking every 10s)
Job 1: COMPLETED (0)
Job 2: COMPLETED (0)
Job 3: FAILED (1)
```

`gridtk wait` exits with code 1 if any job failed, making it easy to chain:
```bash
$ gridtk submit job.sh && gridtk wait && echo "All done!"
```

You can filter which jobs to wait for and change the polling interval:
```bash
$ gridtk wait -j 1,2 --interval 30
```

### Tab Completion

GridTK supports tab completion for the `gridtk` command. To enable it, add the following
Expand Down
83 changes: 81 additions & 2 deletions src/gridtk/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,9 +186,10 @@ def cli(ctx, database, logs_dir):

@cli.result_callback()
def process_result(result, **kwargs):
"""Delete the job manager from the context."""
"""Clean up empty databases and dispose the job manager."""
ctx = click.get_current_context()
del ctx.meta["job_manager"]
job_manager = ctx.meta.pop("job_manager")
job_manager.cleanup_empty_database()


@cli.command(
Expand Down Expand Up @@ -636,6 +637,84 @@ def delete(
session.commit()


@cli.command()
@job_filters
@click.option(
"--interval",
default=10,
type=click.INT,
help="Polling interval in seconds.",
)
@click.pass_context
def wait(ctx, job_ids, states, names, dependents, interval):
"""Wait for jobs to finish. Exits with code 1 if any job failed."""
import time

from .manager import JobManager

job_manager: JobManager = ctx.meta["job_manager"]
# Terminal states - jobs in these states won't change
terminal_states = {
"BOOT_FAIL",
"CANCELLED",
"COMPLETED",
"DEADLINE",
"FAILED",
"NODE_FAIL",
"OUT_OF_MEMORY",
"PREEMPTED",
"REVOKED",
"SPECIAL_EXIT",
"TIMEOUT",
}
# Failed states - if any job ends in these, exit code 1
failed_states = {
"BOOT_FAIL",
"CANCELLED",
"DEADLINE",
"FAILED",
"NODE_FAIL",
"OUT_OF_MEMORY",
"PREEMPTED",
"REVOKED",
"SPECIAL_EXIT",
"TIMEOUT",
}

while True:
with job_manager as session:
jobs = job_manager.list_jobs(
job_ids=job_ids, states=states, names=names, dependents=dependents
)
if not jobs:
click.echo("No jobs found.")
return

all_terminal = all(job.state in terminal_states for job in jobs)
if all_terminal:
any_failed = any(job.state in failed_states for job in jobs)
for job in jobs:
click.echo(f"Job {job.id}: {job.state} ({job.exit_code})")
session.commit()
if any_failed:
raise SystemExit(1)
return

# Show progress with state breakdown
from collections import Counter

active = [j for j in jobs if j.state not in terminal_states]
counts = Counter(j.state for j in active)
breakdown = ", ".join(f"{n} {s.lower()}" for s, n in sorted(counts.items()))
click.echo(
f"Waiting for {len(active)} job(s): {breakdown}"
f" (checking every {interval}s)"
)
session.commit()

time.sleep(interval)


@cli.command()
@job_filters
@click.option(
Expand Down
6 changes: 4 additions & 2 deletions src/gridtk/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -302,8 +302,8 @@ def resubmit_jobs(self, **kwargs):
self.session.add(job)
return jobs

def __del__(self):
# if there are no jobs in the database, delete the database file and the logs directory (if empty)
def cleanup_empty_database(self):
"""Delete the database file and logs directory if no jobs remain."""
with self:
if (
Path(self.database).exists()
Expand All @@ -312,4 +312,6 @@ def __del__(self):
Path(self.database).unlink()
if self.logs_dir.exists() and len(os.listdir(self.logs_dir)) == 0:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could use pathlib's Path.rglob instead of os.listdir

Suggested change
if self.logs_dir.exists() and len(os.listdir(self.logs_dir)) == 0:
if self.logs_dir.exists() and len(list(self.logs_dir.rglob("*"))) == 0:

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or even Path.iterdir() as suggested by the pathlib documentation

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry I missed this. I will keep it in mind for the next iteration. Thank you for all the quick review feedbacks. I am using gridtk with agents and they surprisingly use it really well without introduction.

shutil.rmtree(self.logs_dir)

def __del__(self):
self.engine.dispose()
29 changes: 28 additions & 1 deletion tests/test_gridtk.py
Original file line number Diff line number Diff line change
Expand Up @@ -701,7 +701,6 @@ def test_list_json(mock_check_output, runner):
_submit_job(
runner=runner, mock_check_output=mock_check_output, job_id=submit_job_id
)

mock_check_output.return_value = _pending_job_sacct_json(submit_job_id)
result = runner.invoke(cli, ["list", "--json"])
assert_click_runner_result(result)
Expand Down Expand Up @@ -732,6 +731,34 @@ def test_submit_json(mock_check_output, runner):
assert data["name"] == "gridtk"


@patch("subprocess.check_output")
def test_wait_command(mock_check_output, runner):
# Test wait with COMPLETED job (exit code 0)
with runner.isolated_filesystem():
submit_job_id = 9876543
_submit_job(
runner=runner, mock_check_output=mock_check_output, job_id=submit_job_id
)
mock_check_output.return_value = json.dumps(
_jobs_sacct_dict([submit_job_id], "COMPLETED", "None", "node001")
)
result = runner.invoke(cli, ["wait"])
assert_click_runner_result(result)
assert "Job 1: COMPLETED" in result.output

# Test wait with FAILED job (exit code 1)
with runner.isolated_filesystem():
submit_job_id = 9876543
mock_check_output.side_effect = None
_submit_job(
runner=runner, mock_check_output=mock_check_output, job_id=submit_job_id
)
mock_check_output.return_value = _failed_job_sacct_json(submit_job_id)
result = runner.invoke(cli, ["wait"])
assert_click_runner_result(result, exit_code=1)
assert "Job 1: FAILED" in result.output


if __name__ == "__main__":
import sys

Expand Down
Loading