diff --git a/docs/src/SUMMARY.md b/docs/src/SUMMARY.md index e136d0f4c..1d70033b5 100644 --- a/docs/src/SUMMARY.md +++ b/docs/src/SUMMARY.md @@ -110,6 +110,7 @@ - [Dashboard Deployment](./specialized/tools/dashboard-deployment.md) - [Configuring AI Assistants](./specialized/tools/ai-assistants.md) - [AI-Assisted Workflow Management](./specialized/tools/ai-assistant.md) + - [Analyzing Workflows with datasight](./specialized/tools/datasight.md) - [Map Python Functions Across Workers](./specialized/tools/map_python_function_across_workers.md) - [Filtering CLI Output with Nushell](./specialized/tools/filtering-with-nushell.md) - [Shell Completions](./specialized/tools/shell-completions.md) diff --git a/docs/src/specialized/admin/server-deployment.md b/docs/src/specialized/admin/server-deployment.md index 11d68c5df..55ad82822 100644 --- a/docs/src/specialized/admin/server-deployment.md +++ b/docs/src/specialized/admin/server-deployment.md @@ -341,6 +341,120 @@ Poor fits: - **Process death loses unsaved data.** Always snapshot before stopping the server if you care about the current state. `SIGTERM`/`SIGINT` (graceful shutdown) does **not** automatically snapshot. +## Exporting Filtered Database Copies + +`torc-server export` produces a standalone SQLite copy of the live database, optionally filtered to +a subset of workflows. The original workflow and job IDs are preserved verbatim, so log files, +ticket references, and screenshots referring to the production IDs remain interpretable in the +exported copy. The most common use case is **handing a debugging copy to an end user** who does not +have direct access to the production database — for example, so they can analyze their workflows +with [datasight](../tools/datasight.md), `sqlite3`, or another SQL tool without touching production. + +```bash +# Hand a single user their workflows +torc-server export --user alice --output alice.db + +# Export everything in a project's access group +torc-server export --access-group 7 --output proj-energy.db + +# Pull a specific list of workflows (positional) +torc-server export 42 99 314 --output requested.db + +# Full unfiltered copy (useful as a hot-backup) +torc-server export --output snapshot.db +``` + +The filters are mutually exclusive — pick one of `--user` (repeatable), `--access-group` +(repeatable), or positional workflow IDs. Without any filter, the command produces a full copy. + +### How it works + +1. **Snapshot.** SQLite's [`VACUUM INTO`](https://sqlite.org/lang_vacuum.html#vacuuminto) writes a + transactionally consistent, defragmented copy of the live database to the output path. This does + _not_ require quiescing the running server — readers and writers continue normally during the + snapshot. +2. **Filter.** The output database is reopened with foreign keys enabled, and a single + `DELETE FROM workflow WHERE id NOT IN ()` runs. Every per-workflow table has + `ON DELETE CASCADE` on `workflow_id`, so jobs, files, results, events, ro_crate entities, compute + nodes, etc. are removed automatically by the cascade chain — for the workflows the filter + actually deleted. +3. **Sweep orphans (always).** Cascade only fires when the parent row IS deleted, so pre-existing + orphans in the source DB survive the snapshot. Common sources: a `delete_workflow` code path that + toggled `PRAGMA foreign_keys = OFF`, or a bare `sqlite3` CLI session (the CLI defaults to + `foreign_keys = OFF`). The export iteratively runs `PRAGMA foreign_key_check` and deletes every + reported violation until none remain. `workflow_status` is pruned separately (its back-reference + column has no FK declared and so is invisible to `foreign_key_check`). This step runs for + unfiltered exports too — FK violations are data corruption, not fidelity to the source. +4. **Sanitize.** If a filter was applied and `--preserve-access-groups` is not set, the exported + database has its `user_group_membership` and `access_group` tables emptied. See + [Access-control sanitization](#access-control-sanitization) below. +5. **Compact.** A final `VACUUM` reclaims the space freed by the deletes (skip with `--no-vacuum`). + +If anything in steps 2–5 fails after step 1 has written the snapshot, the partial output file is +removed before the error is reported — a failed export never leaves a half-finished database on +disk. + +### Flags + +| Flag | Effect | +| -------------------------- | -------------------------------------------------------------------------------------------------- | +| `-o`, `--output ` | Output SQLite file path (required). | +| `-d`, `--database ` | Source database path. Defaults to `DATABASE_URL`. | +| `--user ` | Keep only workflows owned by this user. Repeatable. | +| `--access-group ` | Keep only workflows linked to this access-group ID. Repeatable. | +| (positional) | Keep only these workflow IDs. | +| `--overwrite` | Replace the output file if it already exists. | +| `--preserve-access-groups` | Keep `access_group` / `user_group_membership` / `workflow_access_group` instead of stripping them. | +| `--no-vacuum` | Skip the final `VACUUM`. Faster, but the output file retains the source database's allocated size. | + +If a filter is specified and matches zero workflows, the command errors out and removes the +partially-written output file rather than producing an empty database. + +### Access-control sanitization + +By default, `torc-server export` strips three tables from any **filtered** export: + +- `user_group_membership` — has no per-workflow scoping, so leaving it intact would leak unrelated + users' group affiliations. +- `access_group` — group names and descriptions for groups across the whole server. +- `workflow_access_group` — cascades away when `access_group` is emptied. + +This is conservative on purpose: there is no straightforward per-workflow filter that wouldn't risk +accidentally leaking entries about other users or groups. If the recipient _is_ authorized to see +the entire access-control state (for example, when handing a full copy to another admin), pass +`--preserve-access-groups` to keep the tables intact. + +For unfiltered (full-copy) exports, the access tables are kept as-is regardless — the operator +running the command already has access to everything in the database. + +### Recommended workflow for end-user requests + +The expected interaction pattern is admin-mediated: + +1. End user asks for a copy of workflow `42` (or all workflows under user `alice`, etc.). +2. Admin runs `torc-server export` with the appropriate filter on the server host. +3. Admin reviews the output (`sqlite3 alice.db "SELECT id, user, name FROM workflow"`) and confirms + it contains only the intended scope. +4. Admin transfers the file to the user. +5. User analyzes the copy locally — IDs match production, so anything that was in their logs or + tickets continues to make sense. + +This avoids needing to grant the user direct filesystem access to the production database, while +still giving them a faithful debugging artifact. + +### Notes + +- **Live server safe.** `VACUUM INTO` does not block the source server's writers or readers, and the + export connection participates in SQLite's normal WAL coherency, so the snapshot reflects every + committed transaction the running server can see. +- **Same-IDs guarantee.** The export preserves all primary keys. By contrast, + [`torc workflows export`](../../core/workflows/export-import-workflows.md) emits portable JSON + that loses ID identity on import — use that flow when the recipient cannot get a SQLite file from + an admin. +- **External files are not bundled.** Only database rows are exported. Files referenced by `path` in + the `file` table (job inputs, outputs, logs on shared filesystems) are not copied; the recipient + analyzes metadata only unless those paths are independently shared. + ## Complete Example: Production Deployment ```bash diff --git a/docs/src/specialized/tools/datasight.md b/docs/src/specialized/tools/datasight.md new file mode 100644 index 000000000..46a6732bc --- /dev/null +++ b/docs/src/specialized/tools/datasight.md @@ -0,0 +1,231 @@ +# Analyzing Torc Workflows with datasight + +[**datasight**](https://github.com/dsgrid/datasight) is an AI-powered data exploration tool that +connects an AI agent to a SQL database (DuckDB, PostgreSQL, SQLite, Flight SQL) and provides a web +UI for asking natural-language questions. The agent writes SQL, runs it, and renders interactive +Plotly visualizations. + +A torc server stores its state in **SQLite**, which makes it a natural fit for datasight. This guide +shows how to point datasight at a torc database to answer questions like: + +- _Which jobs in workflow 123 exceeded their memory allocation?_ +- _Show failures grouped by return code._ +- _What's the average exec time per resource group?_ +- _Which compute nodes ran the longest jobs?_ + +> **Read-only tool.** datasight queries the database — it does not mutate it. For changes (rerun, +> recover, update resources) use the regular `torc` CLI commands. + +--- + +## Three Audiences + +Setup depends on whether you have direct read access to the torc server's SQLite file. + +| Audience | Path | +| --------------------------------------- | ----------------------------------------------------------------------------------------------------------------- | +| **Admin / shared-server operator** | Point datasight directly at `dev.db` (or your prod DB path). | +| **End user — admin will run an export** | Ask the admin to run `torc-server export` to produce a filtered SQLite copy and hand it back. Same IDs preserved. | +| **End user without any admin coop** | Use `torc workflows export` to get a JSON, import into a local torc-server, then point datasight at _that_ DB. | + +The admin path is one step. The admin-mediated export (Path B) is the right default when you can get +cooperation from whoever runs the server — it preserves the original workflow and job IDs, so log +files and ticket references continue to make sense. Path C is the fallback when no admin help is +available; the JSON round-trip assigns new IDs. + +--- + +## Path A — Admin / Direct DB Access + +### 1. Install datasight + +```bash +uv tool install datasight +``` + +Set up an Anthropic API key (or another supported LLM provider — see the datasight README). + +### 2. Bootstrap a project from the torc reference files + +This repo ships a ready-to-use config under +[`examples/datasight/`](https://github.com/NatLabRockies/torc/tree/main/examples/datasight): + +```bash +mkdir ~/torc-analysis +cd ~/torc-analysis +datasight init +cp /path/to/torc/examples/datasight/schema.yaml . +cp /path/to/torc/examples/datasight/schema_description.md . +cp /path/to/torc/examples/datasight/queries.yaml . +``` + +Edit `.env` to point at the torc SQLite database: + +```bash +DATABASE_URL=sqlite:////absolute/path/to/torc/server/db/sqlite/dev.db +LLM_PROVIDER=anthropic +ANTHROPIC_API_KEY=... +``` + +### 3. Run + +```bash +datasight run +``` + +Open and start asking questions. By default datasight binds to localhost; +use `--unix-socket /path/to/datasight.sock` for SSH-forwarded socket access on HPC login nodes. + +--- + +## Path B — Admin-Mediated SQLite Export (recommended for end users) + +The admin runs `torc-server export` on the server host to produce a filtered SQLite file containing +only the requested workflows, then sends the file to you. You point datasight at it like a normal +SQLite database. + +This preserves all original workflow and job IDs, so the database in your hands is _the same +database_ the production server has — just trimmed to your subset. Log lines like +`workflow_id=42 job_id=917` keep matching, which makes Path A debugging usable for end users. + +### 1. Admin runs the export + +```bash +# Filter by user (most common) +torc-server export --user alice --output alice.db + +# Or by access group +torc-server export --access-group 7 --output proj-energy.db + +# Or by specific workflow IDs +torc-server export 42 99 --output requested.db + +# Full copy (no filter) +torc-server export --output snapshot.db +``` + +By default, `access_group`, `workflow_access_group`, and `user_group_membership` are stripped from +filtered exports because those tables span the whole server and would leak entries for other users +and groups. Pass `--preserve-access-groups` only when producing a full copy or when the recipient is +authorized to see the entire access-control state. + +The admin should review the output (`sqlite3 alice.db "SELECT id, user, name FROM workflow"`) before +handing it over. + +Useful flags: + +| Flag | Effect | +| -------------------------- | ----------------------------------------------------------------------------------------------------------- | +| `--overwrite` | Replace an existing output file. | +| `--preserve-access-groups` | Keep ACL tables instead of stripping them. Only safe for full copies or vetted recipients. | +| `--no-vacuum` | Skip the final `VACUUM`; faster, but the file keeps the source's original size even after rows are deleted. | + +The export uses SQLite's `VACUUM INTO` for a transactionally consistent snapshot, then deletes +non-matching workflows; cascading foreign keys clean up the per-workflow rows automatically. + +### 2. Point datasight at the file + +Follow the same setup as Path A, with `DATABASE_URL` pointing at the SQLite file the admin sent you. +No local torc-server is required. + +> **Refresh.** The export is a snapshot in time. For new results, ask for a fresh export — there is +> no in-place update. + +--- + +## Path C — User Without Any DB Access + +If you can't get an admin to run `torc-server export` for you, fall back to the JSON export/import +flow. This trades ID preservation for not needing any server-side cooperation. + +### 1. Get an export with results included + +Either run this yourself (if you have CLI access to the shared server) or ask the admin to run it +for you: + +```bash +torc workflows export --include-results --include-events \ + --output workflow_.json +``` + +The `--include-results` flag is **essential** — without it the `result` table is empty and most of +the useful queries (memory overruns, slow jobs, failure causes) won't work. `--include-events` is +optional but useful for timeline analysis. + +See [Exporting and Importing Workflows](../../core/workflows/export-import-workflows.md) for the +full export/import reference. + +### 2. Run a personal local torc-server + +You only need this for storage; you don't have to actually execute jobs through it. Install torc +locally, then start a server with its own SQLite database: + +```bash +torc-server run --host localhost -p 8080 +``` + +In a second shell, point your CLI at it: + +```bash +export TORC_API_URL="http://localhost:8080/torc-service/v1" +torc workflows import workflow_.json +``` + +The import creates a fresh workflow with a new ID in your local DB. Take note of the local DB path +(default `db/sqlite/dev.db`). + +### 3. Run datasight against your local DB + +Follow the same setup as Path A, with `DATABASE_URL` pointing at your local torc-server's SQLite +file. + +> **Note on freshness.** The export is a snapshot. If you want to analyze new results from the +> shared server you need a fresh export. For ongoing monitoring, ask your admin about adding +> read-only access to the production DB or running datasight against it on the server side. + +--- + +## The Reference Files + +| File | Purpose | +| ------------------------------------------------------------------------------------------------------------------- | ------------------------------------------------------------------------------------------------------------- | +| [`schema.yaml`](https://github.com/NatLabRockies/torc/blob/main/examples/datasight/schema.yaml) | Restricts AI exploration to the analytically useful tables; hides internal scheduling columns. | +| [`schema_description.md`](https://github.com/NatLabRockies/torc/blob/main/examples/datasight/schema_description.md) | Domain context for the AI: status integer enum, return code conventions, key joins, JSON metadata extraction. | +| [`queries.yaml`](https://github.com/NatLabRockies/torc/blob/main/examples/datasight/queries.yaml) | Seeded NL/SQL pairs the AI uses as few-shot examples. | + +The `schema_description.md` is the highest-leverage file — without it, the AI will see raw integer +status codes (0–10) on `job.status` and have no way to decode them, won't know that `137` return +code means OOM, and won't know to use `memory_bytes` instead of the human-readable `memory` string +for math. **Customize it** with anything specific to how your team uses `workflow.metadata` (project +tags, ticket IDs, dataset versions, etc.) — that's where datasight unlocks the most value. + +--- + +## Example Questions to Try + +Once datasight is running, try these to verify the integration is working: + +- _"How many workflows are in the database, grouped by user?"_ +- _"For workflow 123, show jobs whose peak memory exceeded their allocation, sorted by overage."_ +- _"Plot exec time distribution for workflow 123, faceted by resource group."_ +- _"Which compute nodes had the most failed jobs last week?"_ +- _"For my workflows tagged with project='climate-2026' in metadata, summarize total CPU-hours."_ + +Pin useful results to the dashboard, or export the session as a self-contained HTML page or a +runnable Python script — see the datasight docs for more. + +--- + +## Troubleshooting + +**The AI keeps writing `WHERE status = 'failed'`.** Your `schema_description.md` isn't being loaded. +Confirm it sits in the project directory next to `.env` and restart `datasight run`. + +**Queries return nothing for `result`-table questions.** Either the workflow hasn't run any jobs +yet, or (Path C) your export was missing `--include-results`. + +**`datasight run` complains it can't open the DB.** SQLite requires an absolute path in +`DATABASE_URL` (`sqlite:////absolute/path/...` — note the four slashes). + +**Schema introspection is slow.** The torc DB has many internal tables; the included `schema.yaml` +already filters to the analytical subset, which keeps startup fast. diff --git a/docs/src/specialized/tools/index.md b/docs/src/specialized/tools/index.md index 2459800a0..6ac9b2403 100644 --- a/docs/src/specialized/tools/index.md +++ b/docs/src/specialized/tools/index.md @@ -7,6 +7,8 @@ Additional tools and third-party integrations. - [Dashboard Deployment](./dashboard-deployment.md) - Deploying the web dashboard - [Configuring AI Assistants](./ai-assistants.md) - Setting up AI integration - [AI-Assisted Workflow Management](./ai-assistant.md) - Using AI for workflow management +- [Analyzing Workflows with datasight](./datasight.md) - Natural-language SQL exploration of the + torc database - [Map Python Functions Across Workers](./map_python_function_across_workers.md) - Python integration - [Filtering CLI Output with Nushell](./filtering-with-nushell.md) - Advanced CLI usage diff --git a/examples/datasight/queries.yaml b/examples/datasight/queries.yaml new file mode 100644 index 000000000..e7a31b007 --- /dev/null +++ b/examples/datasight/queries.yaml @@ -0,0 +1,173 @@ +# Seed natural-language / SQL pairs for datasight against a torc SQLite DB. +# +# These are injected into the AI's system prompt as few-shot examples and +# seeded into agent memory, so similar future questions return better answers. +# Examples use `123` as a stand-in workflow ID — substitute a real ID when +# running these by hand. + +- question: List all workflows for a user, newest first + sql: | + SELECT id, name, description, user, timestamp, + json_extract(metadata, '$.project') AS project + FROM workflow + WHERE user = 'alice' + ORDER BY timestamp DESC + LIMIT 50 + +- question: How many jobs are in each status for workflow 123? + sql: | + SELECT + CASE status + WHEN 0 THEN 'uninitialized' + WHEN 1 THEN 'blocked' + WHEN 2 THEN 'ready' + WHEN 3 THEN 'pending' + WHEN 4 THEN 'running' + WHEN 5 THEN 'completed' + WHEN 6 THEN 'failed' + WHEN 7 THEN 'canceled' + WHEN 8 THEN 'terminated' + WHEN 9 THEN 'disabled' + WHEN 10 THEN 'pending_failed' + END AS status_name, + COUNT(*) AS job_count + FROM job + WHERE workflow_id = 123 + GROUP BY status + ORDER BY job_count DESC + +- question: Show the most recent failed jobs in workflow 123 + sql: | + SELECT j.id AS job_id, j.name, r.return_code, r.exec_time_minutes, + r.completion_time + FROM workflow_result wr + JOIN result r ON r.id = wr.result_id + JOIN job j ON j.id = wr.job_id + WHERE wr.workflow_id = 123 + AND r.status = 6 + ORDER BY r.completion_time DESC + LIMIT 50 + +- question: Which jobs in workflow 123 exceeded their memory allocation? + sql: | + SELECT j.id AS job_id, j.name, + rr.memory AS configured_memory, + r.peak_memory_bytes, + rr.memory_bytes AS configured_memory_bytes, + ROUND(1.0 * r.peak_memory_bytes / rr.memory_bytes, 2) + AS overage_ratio + FROM workflow_result wr + JOIN result r ON r.id = wr.result_id + JOIN job j ON j.id = wr.job_id + JOIN resource_requirements rr ON rr.id = j.resource_requirements_id + WHERE wr.workflow_id = 123 + AND r.peak_memory_bytes > rr.memory_bytes + ORDER BY overage_ratio DESC + +- question: Which jobs exceeded their CPU allocation? + sql: | + SELECT j.id AS job_id, j.name, + rr.num_cpus AS configured_cpus, + r.peak_cpu_percent, + ROUND(r.peak_cpu_percent / (rr.num_cpus * 100.0), 2) + AS overage_ratio + FROM workflow_result wr + JOIN result r ON r.id = wr.result_id + JOIN job j ON j.id = wr.job_id + JOIN resource_requirements rr ON rr.id = j.resource_requirements_id + WHERE wr.workflow_id = 123 + AND r.peak_cpu_percent > rr.num_cpus * 100.0 + ORDER BY overage_ratio DESC + +- question: Show failures grouped by return code + sql: | + SELECT r.return_code, + CASE r.return_code + WHEN 137 THEN 'OOM (SIGKILL)' + WHEN 139 THEN 'segfault' + WHEN 143 THEN 'SIGTERM' + WHEN 152 THEN 'timeout (Slurm SIGUSR1)' + ELSE 'other' + END AS likely_cause, + COUNT(*) AS occurrences + FROM workflow_result wr + JOIN result r ON r.id = wr.result_id + WHERE wr.workflow_id = 123 + AND r.return_code != 0 + GROUP BY r.return_code + ORDER BY occurrences DESC + +- question: What are the 20 longest-running jobs in workflow 123? + sql: | + SELECT j.id AS job_id, j.name, r.exec_time_minutes, + rr.runtime AS configured_runtime, + rr.runtime_s / 60.0 AS configured_minutes + FROM workflow_result wr + JOIN result r ON r.id = wr.result_id + JOIN job j ON j.id = wr.job_id + JOIN resource_requirements rr ON rr.id = j.resource_requirements_id + WHERE wr.workflow_id = 123 + ORDER BY r.exec_time_minutes DESC + LIMIT 20 + +- question: Average exec time and peak memory per resource group + sql: | + SELECT rr.name AS resource_group, + rr.num_cpus, rr.memory, rr.runtime, + COUNT(*) AS job_runs, + ROUND(AVG(r.exec_time_minutes), 2) AS avg_minutes, + MAX(r.peak_memory_bytes) AS max_peak_memory_bytes + FROM result r + JOIN job j ON j.id = r.job_id + JOIN resource_requirements rr ON rr.id = j.resource_requirements_id + WHERE r.workflow_id = 123 + GROUP BY rr.id + ORDER BY avg_minutes DESC + +- question: Which compute nodes ran the most jobs? + sql: | + SELECT cn.hostname, cn.compute_node_type, + COUNT(*) AS jobs_run, + ROUND(SUM(r.exec_time_minutes), 1) AS total_exec_minutes + FROM result r + JOIN compute_node cn ON cn.id = r.compute_node_id + WHERE r.workflow_id = 123 + GROUP BY cn.id + ORDER BY jobs_run DESC + +- question: Compare actual vs. configured runtime for jobs that timed out + sql: | + SELECT j.name, + rr.runtime AS configured, + ROUND(rr.runtime_s / 60.0, 2) AS configured_minutes, + ROUND(r.exec_time_minutes, 2) AS actual_minutes, + r.return_code + FROM workflow_result wr + JOIN result r ON r.id = wr.result_id + JOIN job j ON j.id = wr.job_id + JOIN resource_requirements rr ON rr.id = j.resource_requirements_id + WHERE wr.workflow_id = 123 + AND (r.return_code = 152 + OR r.exec_time_minutes > rr.runtime_s / 60.0) + ORDER BY actual_minutes DESC + +- question: Find workflows tagged with a specific project in metadata + sql: | + SELECT id, name, user, timestamp, + json_extract(metadata, '$.project') AS project + FROM workflow + WHERE json_extract(metadata, '$.project') = 'climate-2026' + ORDER BY timestamp DESC + +- question: How many distinct retry attempts has each failed job had? + sql: | + SELECT j.id AS job_id, j.name, + COUNT(*) AS attempts, + SUM(CASE WHEN r.status = 5 THEN 1 ELSE 0 END) AS completed, + SUM(CASE WHEN r.status = 6 THEN 1 ELSE 0 END) AS failed + FROM job j + JOIN result r ON r.job_id = j.id + WHERE j.workflow_id = 123 + GROUP BY j.id + HAVING attempts > 1 + ORDER BY attempts DESC diff --git a/examples/datasight/schema.yaml b/examples/datasight/schema.yaml new file mode 100644 index 000000000..030c80653 --- /dev/null +++ b/examples/datasight/schema.yaml @@ -0,0 +1,49 @@ +# datasight schema configuration for a torc SQLite database. +# +# Restricts AI exploration to the tables most useful for analyzing job +# execution, resource usage, and failures. Internal scheduling/plumbing +# columns are excluded to reduce noise in the AI's working context. +# +# Drop this file (alongside schema_description.md and queries.yaml) into +# your datasight project directory. See the integration guide at +# docs/src/specialized/tools/datasight.md for the full setup. + +tables: + - name: workflow + excluded_columns: + - resource_monitor_config + - compute_node_expiration_buffer_seconds + - compute_node_wait_for_new_jobs_seconds + - compute_node_ignore_workflow_completion + - compute_node_wait_for_healthy_database_minutes + + - name: workflow_status + excluded_columns: [] + + - name: job + excluded_columns: + - unblocking_processed + - schedule_compute_nodes + - invocation_script + - cancel_on_blocking_job_failure + - supports_termination + + - name: resource_requirements + excluded_columns: [] + + - name: result + excluded_columns: [] + + - name: workflow_result + excluded_columns: [] + + - name: compute_node + excluded_columns: + - pid + - scheduler_config_id + + - name: file + excluded_columns: [] + + - name: user_data + excluded_columns: [] diff --git a/examples/datasight/schema_description.md b/examples/datasight/schema_description.md new file mode 100644 index 000000000..1ab32750b --- /dev/null +++ b/examples/datasight/schema_description.md @@ -0,0 +1,168 @@ +# Torc Workflow Database + +This database is the persistent store for **Torc**, a distributed workflow orchestration system. +Each workflow contains a graph of computational jobs with dependencies, resource requirements, and +per-execution results. Users analyze this data to understand failure patterns, find slow jobs, +right-size resource allocations, and audit which compute nodes ran what. + +This is a **SQLite** database — use SQLite syntax (`json_extract`, `strftime`, `datetime`, etc.). Do +not use PostgreSQL- or DuckDB-specific functions. + +## Schema Overview + +| Table | What it represents | +| ----------------------- | ------------------------------------------------------------------- | +| `workflow` | Top-level workflow definition (one row per workflow) | +| `workflow_status` | Per-workflow run state (canceled / archived flags + current run_id) | +| `job` | Individual computational tasks within a workflow | +| `resource_requirements` | Named resource specs (CPU/memory/runtime) shared by groups of jobs | +| `result` | One row per job execution attempt — the primary fact table | +| `workflow_result` | Pointer to the latest `result` row for each (workflow, job) | +| `compute_node` | A worker process (local or Slurm allocation) that executed jobs | +| `file` | File artifacts that establish implicit job dependencies | +| `user_data` | User-defined JSON artifacts that establish implicit dependencies | + +The **`result`** table is the most important for analysis: it has actual measured CPU/memory/exec +time per execution, plus return codes. Most "what failed" or "what was slow" questions start there. + +## Job Status (CRITICAL — stored as INTEGER, not text) + +The `job.status` column is an integer. Always decode it with a `CASE` expression: + +```sql +CASE job.status + WHEN 0 THEN 'uninitialized' + WHEN 1 THEN 'blocked' + WHEN 2 THEN 'ready' + WHEN 3 THEN 'pending' + WHEN 4 THEN 'running' + WHEN 5 THEN 'completed' + WHEN 6 THEN 'failed' + WHEN 7 THEN 'canceled' + WHEN 8 THEN 'terminated' + WHEN 9 THEN 'disabled' + WHEN 10 THEN 'pending_failed' +END AS status_name +``` + +Same integer scheme is used in `result.status` (the recorded status of a single execution attempt). + +- `failed` (6) — the job failed and Torc gave up on it. +- `terminated` (8) — the job was killed (e.g. by Slurm time limit or signal). +- `pending_failed` (10) — failed and awaiting AI failure classification before being retried. + +**Do not assume `status` is a string.** Joining `result` to `job` returns integers; decode them. + +## Workflow Status + +`workflow_status` does **not** map to a name like `job.status` does. It is a row-per-workflow record +with boolean flags: + +- `is_canceled` — user (or scheduler) canceled the workflow. +- `is_archived` — workflow has been archived (also see `workflow.is_archived`). +- `run_id` — the current run number (incremented on each restart/recovery). +- `has_detected_need_to_run_completion_script` — internal flag; usually ignore. + +To answer "is workflow X currently running?" you typically check whether any of its jobs are in +status 3 (`pending`) or 4 (`running`), not `workflow_status` directly. + +## Return Code Conventions + +`result.return_code` is the process exit code. Special values worth knowing: + +- `0` — success +- `137` — killed by SIGKILL, almost always **out-of-memory (OOM)** +- `139` — segmentation fault +- `143` — terminated by SIGTERM +- `152` — Slurm SIGUSR1 / time-limit warning, almost always **timeout** + +For "did this job hit OOM?" filter `result.return_code = 137`. For timeouts use `152` (or compare +`exec_time_minutes` against the configured runtime in `resource_requirements`). + +## Paired Columns: Strings vs. Numerics + +Resource fields exist in **two forms**. Always use the numeric form for math and comparisons: + +| String form (display) | Numeric form (use for queries) | +| --------------------------------------------------- | -------------------------------------------- | +| `resource_requirements.memory` (`"2g"`) | `resource_requirements.memory_bytes` (bytes) | +| `resource_requirements.runtime` (ISO8601, `"PT2H"`) | `resource_requirements.runtime_s` (seconds) | + +The `result` table records **actual** usage, which you compare against the **configured** limit: + +| Actual (in `result`) | Configured (in `resource_requirements`) | +| ------------------------------------------- | --------------------------------------- | +| `peak_memory_bytes` | `memory_bytes` | +| `peak_cpu_percent` (e.g. 350.0 = 3.5 cores) | `num_cpus * 100` | +| `exec_time_minutes` | `runtime_s / 60.0` | + +A job exceeded its memory allocation when `result.peak_memory_bytes > rr.memory_bytes`. A job +exceeded its CPU allocation when `result.peak_cpu_percent > rr.num_cpus * 100`. + +## Key Joins + +```sql +-- Latest result per job (use this when the user says "the result" of a job) +FROM workflow_result wr +JOIN result r ON r.id = wr.result_id + +-- All historical execution attempts for a job +FROM job j +JOIN result r ON r.job_id = j.id +ORDER BY r.run_id + +-- Resource allocation context for a result +FROM result r +JOIN job j ON j.id = r.job_id +JOIN resource_requirements rr ON rr.id = j.resource_requirements_id + +-- Which compute node ran a job +FROM result r +JOIN compute_node cn ON cn.id = r.compute_node_id +``` + +A workflow can have multiple `compute_node` rows (one per worker process or Slurm allocation). +`compute_node.compute_node_type` distinguishes local vs. Slurm allocations. + +## User-Specific Context: `workflow.metadata` and `user_data.data` + +`workflow.metadata` is a nullable **JSON TEXT** column for user-defined workflow context — things +like project name, dataset version, experiment tag, ticket ID. Look here first when the user asks +project-specific questions ("show workflows tagged for project X"). Extract with SQLite JSON: + +```sql +SELECT id, name, json_extract(metadata, '$.project') AS project +FROM workflow +WHERE json_extract(metadata, '$.project') = 'climate-2026' +``` + +Common keys vary by user/org — there is no enforced schema. If the user asks about an unfamiliar +field, run `SELECT DISTINCT json_extract(metadata, '$.') FROM workflow` to see what's there, or +`SELECT metadata FROM workflow WHERE metadata IS NOT NULL LIMIT 5` to scan the shape. + +`user_data.data` is also JSON TEXT and holds workflow-defined intermediate values that establish job +dependencies. Inspect it the same way. + +## Things to Prefer When Querying + +- For "the result" of a job, prefer `workflow_result → result` (latest attempt) over scanning + `result` directly — much faster and matches what the UI shows. +- When grouping by resource allocation, group by `resource_requirements_id` (multiple jobs typically + share one entry) rather than per-job. +- Memory comparisons: always use `_bytes`. However, display memory values in GiB. +- Time comparisons: always use `runtime_s` and `exec_time_minutes` (note the unit mismatch — + convert). +- Timestamps in `result.completion_time` and `workflow.timestamp` are ISO8601 TEXT. Use SQLite's + `datetime()` / `strftime()` to compare or bucket them. +- The database is shared across many users; always include `WHERE workflow.user = ''` or + `WHERE workflow.id = ` unless the question is explicitly cross-user. + +## Things to Avoid + +- Do not `SELECT *` from `result` for whole-DB scans — it can be very large. Always join through + `workflow_result` or filter by `workflow_id` first. +- Do not assume `job.status = 'failed'` works — `status` is an integer; use `= 6`. +- Do not reason about "is this workflow done?" from `workflow_status` alone — check the latest job + statuses. +- Do not modify the database (no INSERT/UPDATE/DELETE). datasight is read-only by design; if a query + implies a mutation, refuse and suggest the equivalent `torc` CLI command. diff --git a/src/bin/torc-server.rs b/src/bin/torc-server.rs index 20dc518ba..df50fe92a 100644 --- a/src/bin/torc-server.rs +++ b/src/bin/torc-server.rs @@ -12,6 +12,7 @@ use tracing::info; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; use tracing_timing::{Builder, Histogram}; +use torc::server::export; use torc::server::http_server; use torc::server::logging; use torc::server::service; @@ -142,6 +143,57 @@ enum Commands { #[command(subcommand)] action: ServiceAction, }, + /// Export a (filtered) copy of the database to a standalone SQLite file. + /// + /// With no filter, produces a full copy. Supply --user, --access-group, or + /// positional workflow IDs to restrict the export to a subset. Workflow IDs + /// and all per-workflow data are preserved verbatim, so log files referring + /// to the original IDs continue to make sense in the exported database. + Export { + #[command(flatten)] + args: ExportArgs, + }, +} + +#[derive(Args, Clone)] +struct ExportArgs { + /// Output path for the exported SQLite database. + #[arg(short, long)] + output: PathBuf, + + /// Source database path. Defaults to DATABASE_URL. + #[arg(short, long)] + database: Option, + + /// Keep only workflows owned by this user (repeatable). + #[arg(long = "user", conflicts_with_all = ["access_groups", "workflow_ids"])] + users: Vec, + + /// Keep only workflows associated with this access group ID (repeatable). + #[arg(long = "access-group", conflicts_with_all = ["users", "workflow_ids"])] + access_groups: Vec, + + /// Keep only these specific workflow IDs. + #[arg(conflicts_with_all = ["users", "access_groups"])] + workflow_ids: Vec, + + /// Overwrite the output file if it already exists. + #[arg(long)] + overwrite: bool, + + /// Preserve the access_group, workflow_access_group, and user_group_membership + /// tables in the exported database. Off by default because these tables + /// contain entries for users and groups outside the filtered set and are + /// difficult to sanitize without risking accidental leaks. Only use this + /// when producing a full copy or when the recipient is authorized to see + /// the entire access-control state. + #[arg(long)] + preserve_access_groups: bool, + + /// Skip the final VACUUM on the output database. Faster, but the file + /// retains the original database's size even after rows are deleted. + #[arg(long)] + no_vacuum: bool, } #[derive(clap::Subcommand)] @@ -218,6 +270,7 @@ fn main() -> Result<()> { match cli.command { Some(Commands::Service { action }) => handle_service_action(action), Some(Commands::Run { config }) => run_server(config), + Some(Commands::Export { args }) => handle_export(args), None => { // Default: run server with default config // We need to re-parse as "run" to get ServerConfig defaults from clap @@ -231,6 +284,52 @@ fn main() -> Result<()> { } } +fn handle_export(args: ExportArgs) -> Result<()> { + // Minimal logging setup — the export command writes a status line to stderr + // via tracing; defer to RUST_LOG if set, otherwise default to info. + let env_filter = + tracing_subscriber::EnvFilter::try_from_default_env().unwrap_or_else(|_| "info".into()); + let _ = tracing_subscriber::registry() + .with( + tracing_subscriber::fmt::layer() + .with_writer(std::io::stderr) + .with_target(false) + .with_level(true), + ) + .with(env_filter) + .try_init(); + + let source_db = match args.database { + Some(ref path) => path.clone(), + None => env::var("DATABASE_URL") + .map_err(|_| anyhow::anyhow!("--database not provided and DATABASE_URL not set"))?, + }; + + let filter = if !args.users.is_empty() { + export::Filter::Users(args.users.clone()) + } else if !args.access_groups.is_empty() { + export::Filter::AccessGroups(args.access_groups.clone()) + } else if !args.workflow_ids.is_empty() { + export::Filter::WorkflowIds(args.workflow_ids.clone()) + } else { + export::Filter::None + }; + + let opts = export::ExportOptions { + source_db_url: source_db, + output_path: args.output, + filter, + overwrite: args.overwrite, + preserve_access_groups: args.preserve_access_groups, + run_final_vacuum: !args.no_vacuum, + }; + + let runtime = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build()?; + runtime.block_on(export::run_export(opts)) +} + fn handle_service_action(action: ServiceAction) -> Result<()> { let (command, user_level, config) = match action { ServiceAction::Install { user, config } => { diff --git a/src/server.rs b/src/server.rs index d6b3956c8..b60169c01 100644 --- a/src/server.rs +++ b/src/server.rs @@ -16,6 +16,8 @@ pub mod event_broadcast; pub mod header; pub mod htpasswd; // These modules are needed by the server binary and by the Rust-owned OpenAPI emitter. +#[cfg(feature = "server-bin")] +pub mod export; #[cfg(any(feature = "server-bin", feature = "openapi-codegen"))] pub mod http_server; #[cfg(feature = "openapi-codegen")] diff --git a/src/server/export.rs b/src/server/export.rs new file mode 100644 index 000000000..5e36ceb3a --- /dev/null +++ b/src/server/export.rs @@ -0,0 +1,419 @@ +//! `torc-server export` implementation. +//! +//! Produces a standalone SQLite copy of the live torc database, optionally +//! filtered to a subset of workflows. Workflow IDs and all per-workflow rows +//! are preserved verbatim, so log files referring to the original IDs remain +//! interpretable in the exported database — the primary use case is handing a +//! debugging copy to a user analyzing their workflows with external tools +//! (datasight, sqlite3, etc.). +//! +//! Strategy: +//! 1. `VACUUM INTO ''` — produces a transactionally consistent, +//! defragmented snapshot without quiescing the source server. A separate +//! SQLite connection participates in the source's WAL coherency (it +//! opens the `-wal` and `-shm` files), so the snapshot reflects every +//! committed transaction the running server can see. +//! 2. Open the snapshot, `PRAGMA foreign_keys=ON`, and (if a filter was +//! requested) `DELETE FROM workflow WHERE id NOT IN ()`. Every +//! per-workflow table has `ON DELETE CASCADE` on `workflow_id`, so the +//! cascade chain cleans up jobs, files, results, events, etc. for the +//! rows we just deleted. +//! 3. Sweep pre-existing orphans, regardless of whether a filter was +//! applied. The source DB can carry rows whose `workflow_id` already +//! pointed at a missing workflow before the export ran — typically from +//! a `delete_workflow` code path that bypasses cascade by toggling +//! `PRAGMA foreign_keys = OFF`, or a bare `sqlite3` CLI session (the +//! CLI defaults to FKs off). Cascade only fires for parent rows we +//! actually delete, so orphans survive verbatim. Iteratively run +//! `PRAGMA foreign_key_check` and delete every reported violation until +//! none remain. Then explicitly prune `workflow_status` (whose +//! back-reference column has no FK declared and so is invisible to +//! `foreign_key_check`). Both steps run in unfiltered mode too — FK +//! violations are data corruption, not "fidelity to the source." +//! 4. If a filter was applied and `preserve_access_groups` is false, wipe +//! `user_group_membership` (which has no per-workflow scoping and would +//! leak unrelated users' group affiliations) and `access_group` (which +//! cascades the `workflow_access_group` join table). +//! 5. Optional final `VACUUM` to reclaim space freed by the deletes. +//! +//! If anything in steps 2–5 fails after `VACUUM INTO` has written the file, +//! the partial output is removed before the error propagates so callers +//! can't mistake it for a valid export. + +use anyhow::{Context, Result, anyhow, bail}; +use sqlx::{Connection, SqliteConnection, sqlite::SqliteConnectOptions}; +use std::path::{Path, PathBuf}; +use std::str::FromStr; +use std::time::Duration; +use tracing::info; + +#[derive(Debug, Clone)] +pub enum Filter { + None, + Users(Vec), + AccessGroups(Vec), + WorkflowIds(Vec), +} + +#[derive(Debug, Clone)] +pub struct ExportOptions { + /// Source database URL or bare path. `sqlite:` prefix is added if missing. + pub source_db_url: String, + pub output_path: PathBuf, + pub filter: Filter, + pub overwrite: bool, + pub preserve_access_groups: bool, + pub run_final_vacuum: bool, +} + +pub async fn run_export(opts: ExportOptions) -> Result<()> { + let source_url = normalize_db_url(&opts.source_db_url); + + refuse_self_overwrite(&source_url, &opts.output_path)?; + prepare_output_path(&opts.output_path, opts.overwrite)?; + + let mut src = SqliteConnection::connect_with( + &SqliteConnectOptions::from_str(&source_url)? + .create_if_missing(false) + .busy_timeout(Duration::from_secs(60)), + ) + .await + .with_context(|| format!("opening source database at {source_url}"))?; + + let target = opts + .output_path + .to_str() + .ok_or_else(|| anyhow!("output path is not valid UTF-8"))?; + info!("Snapshotting database to {}", target); + // VACUUM INTO requires a string literal, not a bound parameter. Escape + // single quotes per the SQL standard (' → ''). + let escaped = target.replace('\'', "''"); + sqlx::query(&format!("VACUUM INTO '{escaped}'")) + .execute(&mut src) + .await + .context("VACUUM INTO")?; + let _ = src.close().await; + + // Once VACUUM INTO writes the snapshot, any subsequent failure must clean + // up the partial output file so a caller can't mistake it for a valid + // export. + match finalize_snapshot(&opts).await { + Ok(()) => Ok(()), + Err(err) => { + let _ = std::fs::remove_file(&opts.output_path); + Err(err) + } + } +} + +async fn finalize_snapshot(opts: &ExportOptions) -> Result<()> { + let target = opts + .output_path + .to_str() + .ok_or_else(|| anyhow!("output path is not valid UTF-8"))?; + let dest_url = format!("sqlite:{target}"); + let mut dst = SqliteConnection::connect_with( + &SqliteConnectOptions::from_str(&dest_url)? + .foreign_keys(true) + .create_if_missing(false) + .busy_timeout(Duration::from_secs(60)), + ) + .await + .with_context(|| format!("opening output database at {dest_url}"))?; + + let filter_applied = !matches!(opts.filter, Filter::None); + if filter_applied { + apply_workflow_filter(&mut dst, &opts.filter).await?; + let remaining: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM workflow") + .fetch_one(&mut dst) + .await?; + if remaining == 0 { + bail!("filter matched no workflows; refusing to write empty export"); + } + info!("Retained {remaining} workflow(s) after filter"); + } + + // Always sweep FK orphans and orphan workflow_status rows, regardless of + // filter mode. Cascade only fires for parent rows the filter deleted; + // pre-existing orphans (from any code path that ran with foreign_keys=OFF + // — including the bare sqlite3 CLI, which defaults to OFF) survive + // VACUUM INTO and are data corruption, not fidelity to the source. + prune_orphans(&mut dst).await?; + // workflow_status has no FK declared back to workflow (the back-reference + // column was added via ALTER TABLE ADD COLUMN, which SQLite cannot extend + // with FK constraints), so foreign_key_check doesn't see its orphans. + sqlx::query("DELETE FROM workflow_status WHERE id NOT IN (SELECT status_id FROM workflow)") + .execute(&mut dst) + .await + .context("DELETE FROM workflow_status (orphan prune)")?; + + if filter_applied && !opts.preserve_access_groups { + info!( + "Stripping access_group / user_group_membership (use --preserve-access-groups to keep)" + ); + sqlx::query("DELETE FROM user_group_membership") + .execute(&mut dst) + .await + .context("DELETE FROM user_group_membership")?; + // access_group cascades workflow_access_group via ON DELETE CASCADE. + sqlx::query("DELETE FROM access_group") + .execute(&mut dst) + .await + .context("DELETE FROM access_group")?; + } + + if opts.run_final_vacuum { + info!("Running final VACUUM"); + sqlx::query("VACUUM") + .execute(&mut dst) + .await + .context("VACUUM")?; + } + + let workflow_count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM workflow") + .fetch_one(&mut dst) + .await?; + let job_count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM job") + .fetch_one(&mut dst) + .await?; + let _ = dst.close().await; + + let bytes = std::fs::metadata(&opts.output_path) + .map(|m| m.len()) + .unwrap_or(0); + info!( + "Wrote {} ({} workflow(s), {} job(s), {:.2} MiB)", + opts.output_path.display(), + workflow_count, + job_count, + bytes as f64 / (1024.0 * 1024.0) + ); + Ok(()) +} + +/// Reject `--output` paths that resolve to the same filesystem location as the +/// source database. Without this check, a slip like +/// `torc-server export --database prod.db --output prod.db --overwrite` would +/// trigger `prepare_output_path` to `remove_file(prod.db)` before the export +/// even opens the source — turning a typo into permanent data loss. +/// +/// Comparison is by canonical filesystem path so symlinks, relative paths, and +/// the various `sqlite:` URL prefixes all collapse to the same target. If the +/// source URL is in-memory or refers to a path that doesn't exist on disk +/// (which would fail later anyway), this check abstains. +fn refuse_self_overwrite(source_url: &str, output: &Path) -> Result<()> { + let raw_source = strip_sqlite_url_prefix(source_url); + if raw_source == ":memory:" || raw_source.contains("file::memory:") { + return Ok(()); + } + let source_canonical = match Path::new(raw_source).canonicalize() { + Ok(p) => p, + // Source path doesn't resolve — opening it for VACUUM INTO will fail + // with a clearer error than anything we'd produce here. + Err(_) => return Ok(()), + }; + let output_canonical = canonicalize_with_missing_file(output)?; + if source_canonical == output_canonical { + bail!( + "refusing to overwrite the source database with itself ({})", + source_canonical.display() + ); + } + Ok(()) +} + +/// Canonicalize a path that may not exist yet by canonicalizing its parent +/// directory and rejoining the file name. The parent must exist — that's a +/// reasonable precondition for a writeable output location. +fn canonicalize_with_missing_file(path: &Path) -> Result { + if let Ok(p) = path.canonicalize() { + return Ok(p); + } + let parent = path.parent().unwrap_or_else(|| Path::new(".")); + let parent_canonical = parent + .canonicalize() + .with_context(|| format!("canonicalizing output directory {}", parent.display()))?; + let file_name = path + .file_name() + .ok_or_else(|| anyhow!("output path has no file name"))?; + Ok(parent_canonical.join(file_name)) +} + +/// Strip whichever `sqlite:` URL form is in front of the path. Handles the +/// bare `sqlite:` form (e.g. `sqlite:relative.db`) and the URL-style +/// `sqlite://` (e.g. `sqlite:///abs/path.db`). +fn strip_sqlite_url_prefix(s: &str) -> &str { + s.strip_prefix("sqlite://") + .or_else(|| s.strip_prefix("sqlite:")) + .unwrap_or(s) +} + +fn prepare_output_path(path: &PathBuf, overwrite: bool) -> Result<()> { + if path.exists() { + if !overwrite { + bail!( + "output file {} already exists (use --overwrite to replace)", + path.display() + ); + } + std::fs::remove_file(path) + .with_context(|| format!("removing existing {}", path.display()))?; + } + // SQLite refuses to write VACUUM INTO if a stale -journal/-wal/-shm sidecar + // is present alongside the target. + for suffix in ["-wal", "-shm", "-journal"] { + let mut p = path.clone().into_os_string(); + p.push(suffix); + let p: PathBuf = p.into(); + if p.exists() { + std::fs::remove_file(&p).with_context(|| format!("removing {}", p.display()))?; + } + } + Ok(()) +} + +async fn apply_workflow_filter(dst: &mut SqliteConnection, filter: &Filter) -> Result<()> { + match filter { + Filter::None => Ok(()), + Filter::Users(users) => { + let placeholders = repeat_placeholders(users.len()); + let sql = format!("DELETE FROM workflow WHERE user NOT IN ({placeholders})"); + let mut q = sqlx::query(&sql); + for u in users { + q = q.bind(u); + } + q.execute(dst) + .await + .context("DELETE FROM workflow (user filter)")?; + Ok(()) + } + Filter::AccessGroups(ids) => { + // i64 values — safe to interpolate directly. + let id_list = id_list(ids); + let sql = format!( + "DELETE FROM workflow WHERE id NOT IN \ + (SELECT workflow_id FROM workflow_access_group WHERE group_id IN ({id_list}))" + ); + sqlx::query(&sql) + .execute(dst) + .await + .context("DELETE FROM workflow (access-group filter)")?; + Ok(()) + } + Filter::WorkflowIds(ids) => { + let id_list = id_list(ids); + let sql = format!("DELETE FROM workflow WHERE id NOT IN ({id_list})"); + sqlx::query(&sql) + .execute(dst) + .await + .context("DELETE FROM workflow (id filter)")?; + Ok(()) + } + } +} + +/// Iteratively delete rows that violate any foreign-key constraint. +/// +/// `PRAGMA foreign_key_check` enumerates every FK violation currently present +/// in the database — in our case, this catches pre-existing orphans the +/// source DB carried in (where the FK parent was already missing before our +/// filter ran). Deletes are performed per-table in batches keyed off the +/// virtual-table's `rowid` column (the violating row's rowid), and the loop +/// repeats because a single deletion can cascade and surface further +/// violations on the next pass. Bounded by `MAX_ITERATIONS` so a pathological +/// schema can't spin forever. +async fn prune_orphans(dst: &mut SqliteConnection) -> Result<()> { + use std::collections::BTreeMap; + const MAX_ITERATIONS: usize = 16; + + for pass in 1..=MAX_ITERATIONS { + let violations: Vec<(String, i64)> = + sqlx::query_as(r#"SELECT "table", rowid FROM pragma_foreign_key_check"#) + .fetch_all(&mut *dst) + .await + .context("PRAGMA foreign_key_check")?; + if violations.is_empty() { + return Ok(()); + } + let mut by_table: BTreeMap> = BTreeMap::new(); + for (t, r) in violations { + by_table.entry(t).or_default().push(r); + } + let total: usize = by_table.values().map(|v| v.len()).sum(); + info!( + "Pruning {total} orphan row(s) across {} table(s) (pass {pass})", + by_table.len() + ); + for (table, rowids) in by_table { + let placeholders = repeat_placeholders(rowids.len()); + // Quote the identifier in case a future migration introduces a + // table name that needs it; double any embedded quote per the SQL + // standard. + let sql = format!( + r#"DELETE FROM "{}" WHERE rowid IN ({})"#, + table.replace('"', "\"\""), + placeholders, + ); + let mut q = sqlx::query(&sql); + for r in &rowids { + q = q.bind(*r); + } + q.execute(&mut *dst) + .await + .with_context(|| format!("DELETE orphans from {table}"))?; + } + } + bail!( + "orphan pruning did not converge after {MAX_ITERATIONS} iterations \ + — likely a schema issue, refusing to loop forever" + ); +} + +fn repeat_placeholders(n: usize) -> String { + if n == 0 { + return String::new(); + } + let mut s = String::with_capacity(2 * n - 1); + s.push('?'); + for _ in 1..n { + s.push_str(",?"); + } + s +} + +fn id_list(ids: &[i64]) -> String { + ids.iter().map(i64::to_string).collect::>().join(",") +} + +fn normalize_db_url(input: &str) -> String { + if input.starts_with("sqlite:") { + input.to_string() + } else { + format!("sqlite:{input}") + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn placeholders_handles_zero_one_many() { + assert_eq!(repeat_placeholders(0), ""); + assert_eq!(repeat_placeholders(1), "?"); + assert_eq!(repeat_placeholders(3), "?,?,?"); + } + + #[test] + fn id_list_formats_correctly() { + assert_eq!(id_list(&[]), ""); + assert_eq!(id_list(&[42]), "42"); + assert_eq!(id_list(&[1, 2, 3]), "1,2,3"); + } + + #[test] + fn normalize_adds_prefix() { + assert_eq!(normalize_db_url("/tmp/x.db"), "sqlite:/tmp/x.db"); + assert_eq!(normalize_db_url("sqlite:foo"), "sqlite:foo"); + } +} diff --git a/tests/test_export.rs b/tests/test_export.rs new file mode 100644 index 000000000..53f47e959 --- /dev/null +++ b/tests/test_export.rs @@ -0,0 +1,739 @@ +#![cfg(feature = "server-bin")] +//! Integration tests for `torc-server export`. +//! +//! Each test seeds a freshly-migrated SQLite database with two users (alice, +//! bob), an access group linking both users' workflows, and a couple of jobs, +//! then drives `torc::server::export::run_export` directly and asserts on the +//! resulting file. + +use sqlx::Connection; +use sqlx::SqliteConnection; +use sqlx::sqlite::{SqliteConnectOptions, SqlitePool, SqlitePoolOptions}; +use std::path::{Path, PathBuf}; +use std::str::FromStr; +use tempfile::TempDir; +use torc::server::export::{ExportOptions, Filter, run_export}; + +struct SeedIds { + alice_workflow: i64, + bob_workflow: i64, + group: i64, +} + +async fn setup_source_db(path: &Path) -> SqlitePool { + let url = format!("sqlite:{}", path.display()); + let opts = SqliteConnectOptions::from_str(&url) + .unwrap() + .create_if_missing(true) + .foreign_keys(true); + let pool = SqlitePoolOptions::new().connect_with(opts).await.unwrap(); + sqlx::migrate!("./torc-server/migrations") + .run(&pool) + .await + .expect("migrations"); + pool +} + +async fn seed(pool: &SqlitePool) -> SeedIds { + // Each workflow needs its own workflow_status row (FK). + let s1: i64 = sqlx::query_scalar("INSERT INTO workflow_status DEFAULT VALUES RETURNING id") + .fetch_one(pool) + .await + .unwrap(); + let s2: i64 = sqlx::query_scalar("INSERT INTO workflow_status DEFAULT VALUES RETURNING id") + .fetch_one(pool) + .await + .unwrap(); + + let alice: i64 = sqlx::query_scalar( + "INSERT INTO workflow (name, user, timestamp, status_id) \ + VALUES ('alice-wf', 'alice', '2026-01-01', ?) RETURNING id", + ) + .bind(s1) + .fetch_one(pool) + .await + .unwrap(); + let bob: i64 = sqlx::query_scalar( + "INSERT INTO workflow (name, user, timestamp, status_id) \ + VALUES ('bob-wf', 'bob', '2026-01-01', ?) RETURNING id", + ) + .bind(s2) + .fetch_one(pool) + .await + .unwrap(); + + // Two jobs for alice, one for bob — exercises the cascade chain. + sqlx::query("INSERT INTO job (workflow_id, name, command, status) VALUES (?, 'j1', 'echo', 0)") + .bind(alice) + .execute(pool) + .await + .unwrap(); + sqlx::query("INSERT INTO job (workflow_id, name, command, status) VALUES (?, 'j2', 'echo', 0)") + .bind(alice) + .execute(pool) + .await + .unwrap(); + let bob_job: i64 = sqlx::query_scalar( + "INSERT INTO job (workflow_id, name, command, status) \ + VALUES (?, 'j1', 'echo', 0) RETURNING id", + ) + .bind(bob) + .fetch_one(pool) + .await + .unwrap(); + + // Seed rows in workflow-scoped tables added across multiple migration eras + // so the cascade chain is verified end-to-end. If any of these tables ever + // loses its ON DELETE CASCADE on workflow_id, the filtered export tests + // will fail because bob's rows will survive the workflow DELETE. + sqlx::query("INSERT INTO event (workflow_id, timestamp, data) VALUES (?, 0, '{}')") + .bind(bob) + .execute(pool) + .await + .unwrap(); + sqlx::query("INSERT INTO failure_handler (workflow_id, name, rules) VALUES (?, 'h', '[]')") + .bind(bob) + .execute(pool) + .await + .unwrap(); + sqlx::query( + "INSERT INTO ro_crate_entity (workflow_id, entity_id, entity_type, metadata) \ + VALUES (?, '#bob', 'Workflow', '{}')", + ) + .bind(bob) + .execute(pool) + .await + .unwrap(); + sqlx::query("INSERT INTO slurm_stats (workflow_id, job_id, run_id) VALUES (?, ?, 1)") + .bind(bob) + .bind(bob_job) + .execute(pool) + .await + .unwrap(); + + let group: i64 = + sqlx::query_scalar("INSERT INTO access_group (name) VALUES ('proj-x') RETURNING id") + .fetch_one(pool) + .await + .unwrap(); + for &wf in &[alice, bob] { + sqlx::query("INSERT INTO workflow_access_group (workflow_id, group_id) VALUES (?, ?)") + .bind(wf) + .bind(group) + .execute(pool) + .await + .unwrap(); + } + for user in ["alice", "bob"] { + sqlx::query("INSERT INTO user_group_membership (user_name, group_id) VALUES (?, ?)") + .bind(user) + .bind(group) + .execute(pool) + .await + .unwrap(); + } + + SeedIds { + alice_workflow: alice, + bob_workflow: bob, + group, + } +} + +async fn count(path: &Path, sql: &str) -> i64 { + let url = format!("sqlite:{}", path.display()); + let mut c = SqliteConnection::connect_with( + &SqliteConnectOptions::from_str(&url) + .unwrap() + .foreign_keys(true), + ) + .await + .unwrap(); + let n: i64 = sqlx::query_scalar(sql).fetch_one(&mut c).await.unwrap(); + let _ = c.close().await; + n +} + +async fn workflow_ids(path: &Path) -> Vec { + let url = format!("sqlite:{}", path.display()); + let mut c = SqliteConnection::connect_with( + &SqliteConnectOptions::from_str(&url) + .unwrap() + .foreign_keys(true), + ) + .await + .unwrap(); + let rows: Vec<(i64,)> = sqlx::query_as("SELECT id FROM workflow ORDER BY id") + .fetch_all(&mut c) + .await + .unwrap(); + let _ = c.close().await; + rows.into_iter().map(|(id,)| id).collect() +} + +fn opts(src: &Path, out: PathBuf, filter: Filter) -> ExportOptions { + ExportOptions { + source_db_url: src.display().to_string(), + output_path: out, + filter, + overwrite: false, + preserve_access_groups: false, + run_final_vacuum: true, + } +} + +async fn fresh_source(tmp: &TempDir) -> (PathBuf, SeedIds) { + let src = tmp.path().join("src.db"); + let pool = setup_source_db(&src).await; + let ids = seed(&pool).await; + pool.close().await; + (src, ids) +} + +struct ExtendedIds { + base: SeedIds, + carol_workflow: i64, + group_y: i64, +} + +/// Like `fresh_source` but also seeds a third workflow owned by `carol` linked +/// to a separate access group `proj-y`. This gives the multi-value filter +/// tests three workflows split across two groups so unions and exclusions are +/// visible. +async fn fresh_source_extended(tmp: &TempDir) -> (PathBuf, ExtendedIds) { + let src = tmp.path().join("src.db"); + let pool = setup_source_db(&src).await; + let base = seed(&pool).await; + + let s3: i64 = sqlx::query_scalar("INSERT INTO workflow_status DEFAULT VALUES RETURNING id") + .fetch_one(&pool) + .await + .unwrap(); + let carol: i64 = sqlx::query_scalar( + "INSERT INTO workflow (name, user, timestamp, status_id) \ + VALUES ('carol-wf', 'carol', '2026-01-01', ?) RETURNING id", + ) + .bind(s3) + .fetch_one(&pool) + .await + .unwrap(); + sqlx::query("INSERT INTO job (workflow_id, name, command, status) VALUES (?, 'j1', 'echo', 0)") + .bind(carol) + .execute(&pool) + .await + .unwrap(); + let group_y: i64 = + sqlx::query_scalar("INSERT INTO access_group (name) VALUES ('proj-y') RETURNING id") + .fetch_one(&pool) + .await + .unwrap(); + sqlx::query("INSERT INTO workflow_access_group (workflow_id, group_id) VALUES (?, ?)") + .bind(carol) + .bind(group_y) + .execute(&pool) + .await + .unwrap(); + sqlx::query("INSERT INTO user_group_membership (user_name, group_id) VALUES ('carol', ?)") + .bind(group_y) + .execute(&pool) + .await + .unwrap(); + pool.close().await; + ( + src, + ExtendedIds { + base, + carol_workflow: carol, + group_y, + }, + ) +} + +fn sorted(mut v: Vec) -> Vec { + v.sort(); + v +} + +#[tokio::test] +async fn user_filter_preserves_ids_and_strips_acl_tables() { + let tmp = TempDir::new().unwrap(); + let (src, ids) = fresh_source(&tmp).await; + let out = tmp.path().join("alice.db"); + + run_export(opts(&src, out.clone(), Filter::Users(vec!["alice".into()]))) + .await + .expect("export"); + + // Same workflow IDs as in the source — that's the headline guarantee. + assert_eq!(workflow_ids(&out).await, vec![ids.alice_workflow]); + + // Cascade removed bob's job; alice's two jobs remain. + assert_eq!(count(&out, "SELECT COUNT(*) FROM job").await, 2); + + // Cascade also removed bob's rows in every other workflow-scoped table. + // Alice has no rows in these tables (the seed only populates them for + // bob), so all four counts must be zero. + for table in ["event", "failure_handler", "ro_crate_entity", "slurm_stats"] { + assert_eq!( + count(&out, &format!("SELECT COUNT(*) FROM {table}")).await, + 0, + "{table} should be empty after bob's workflow was filtered out — \ + missing ON DELETE CASCADE on workflow_id?", + ); + } + + // workflow_status has no cascade FK; the export prunes orphans explicitly + // so bob's status row is not left behind to leak workflow counts. + assert_eq!(count(&out, "SELECT COUNT(*) FROM workflow_status").await, 1); + + // ACL tables stripped by default in filtered mode. + assert_eq!(count(&out, "SELECT COUNT(*) FROM access_group").await, 0); + assert_eq!( + count(&out, "SELECT COUNT(*) FROM user_group_membership").await, + 0 + ); + assert_eq!( + count(&out, "SELECT COUNT(*) FROM workflow_access_group").await, + 0 + ); +} + +#[tokio::test] +async fn workflow_id_filter_keeps_only_listed_ids() { + let tmp = TempDir::new().unwrap(); + let (src, ids) = fresh_source(&tmp).await; + let out = tmp.path().join("just_bob.db"); + + run_export(opts( + &src, + out.clone(), + Filter::WorkflowIds(vec![ids.bob_workflow]), + )) + .await + .expect("export"); + + assert_eq!(workflow_ids(&out).await, vec![ids.bob_workflow]); + assert_eq!(count(&out, "SELECT COUNT(*) FROM job").await, 1); +} + +#[tokio::test] +async fn access_group_filter_returns_all_linked_workflows() { + let tmp = TempDir::new().unwrap(); + let (src, ids) = fresh_source(&tmp).await; + let out = tmp.path().join("group.db"); + + run_export(opts( + &src, + out.clone(), + Filter::AccessGroups(vec![ids.group]), + )) + .await + .expect("export"); + + let mut found = workflow_ids(&out).await; + found.sort(); + let mut expected = vec![ids.alice_workflow, ids.bob_workflow]; + expected.sort(); + assert_eq!(found, expected); +} + +#[tokio::test] +async fn full_copy_keeps_acl_tables() { + let tmp = TempDir::new().unwrap(); + let (src, _) = fresh_source(&tmp).await; + let out = tmp.path().join("full.db"); + + run_export(opts(&src, out.clone(), Filter::None)) + .await + .expect("export"); + + assert_eq!(count(&out, "SELECT COUNT(*) FROM workflow").await, 2); + // No filter applied, so ACL tables remain intact and `workflow_status` + // keeps both rows. Orphan pruning still runs in unfiltered mode, but the + // seed produces a clean DB, so the sweep finds nothing to remove. Any + // pruning behavior with actual orphans is exercised by + // `full_copy_also_prunes_pre_existing_orphans`. + assert_eq!(count(&out, "SELECT COUNT(*) FROM access_group").await, 1); + assert_eq!( + count(&out, "SELECT COUNT(*) FROM user_group_membership").await, + 2 + ); + assert_eq!( + count(&out, "SELECT COUNT(*) FROM workflow_access_group").await, + 2 + ); + assert_eq!(count(&out, "SELECT COUNT(*) FROM workflow_status").await, 2); +} + +#[tokio::test] +async fn preserve_access_groups_flag_keeps_acl_tables_in_filtered_export() { + let tmp = TempDir::new().unwrap(); + let (src, ids) = fresh_source(&tmp).await; + let out = tmp.path().join("alice_preserved.db"); + + let mut o = opts(&src, out.clone(), Filter::Users(vec!["alice".into()])); + o.preserve_access_groups = true; + run_export(o).await.expect("export"); + + assert_eq!(workflow_ids(&out).await, vec![ids.alice_workflow]); + assert_eq!(count(&out, "SELECT COUNT(*) FROM access_group").await, 1); + assert_eq!( + count(&out, "SELECT COUNT(*) FROM user_group_membership").await, + 2 + ); +} + +#[tokio::test] +async fn empty_filter_result_errors_and_removes_partial_output() { + let tmp = TempDir::new().unwrap(); + let (src, _) = fresh_source(&tmp).await; + let out = tmp.path().join("nobody.db"); + + let err = run_export(opts( + &src, + out.clone(), + Filter::Users(vec!["nobody-here".into()]), + )) + .await + .expect_err("expected error for empty filter result"); + assert!( + err.to_string().contains("no workflows"), + "unexpected error: {err}" + ); + assert!( + !out.exists(), + "partial export file should be removed on empty result" + ); +} + +#[tokio::test] +async fn refuses_to_overwrite_existing_file_without_flag() { + let tmp = TempDir::new().unwrap(); + let (src, _) = fresh_source(&tmp).await; + let out = tmp.path().join("first.db"); + + run_export(opts(&src, out.clone(), Filter::None)) + .await + .expect("first export"); + + // Second run without --overwrite should fail. + let err = run_export(opts(&src, out.clone(), Filter::None)) + .await + .expect_err("expected refusal to overwrite"); + assert!(err.to_string().contains("--overwrite"), "got: {err}"); + + // With overwrite=true it succeeds. + let mut o = opts(&src, out.clone(), Filter::None); + o.overwrite = true; + run_export(o).await.expect("overwrite"); + assert_eq!(count(&out, "SELECT COUNT(*) FROM workflow").await, 2); +} + +// --- Multi-value filter variants ------------------------------------------- +// +// The single-value tests above exercise `IN (?)` with one bound parameter; +// these tests exercise the multi-bind / comma-joined-id paths and confirm the +// filters behave as a union (OR) rather than an intersection. + +#[tokio::test] +async fn multi_user_filter_keeps_all_listed_users() { + let tmp = TempDir::new().unwrap(); + let (src, ext) = fresh_source_extended(&tmp).await; + let out = tmp.path().join("alice_carol.db"); + + run_export(opts( + &src, + out.clone(), + Filter::Users(vec!["alice".into(), "carol".into()]), + )) + .await + .expect("export"); + + assert_eq!( + sorted(workflow_ids(&out).await), + sorted(vec![ext.base.alice_workflow, ext.carol_workflow]), + ); + // bob's job (1) dropped; alice's two + carol's one remain. + assert_eq!(count(&out, "SELECT COUNT(*) FROM job").await, 3); +} + +#[tokio::test] +async fn multi_workflow_id_filter_keeps_all_listed_ids() { + let tmp = TempDir::new().unwrap(); + let (src, ext) = fresh_source_extended(&tmp).await; + let out = tmp.path().join("alice_and_carol_ids.db"); + + run_export(opts( + &src, + out.clone(), + Filter::WorkflowIds(vec![ext.base.alice_workflow, ext.carol_workflow]), + )) + .await + .expect("export"); + + assert_eq!( + sorted(workflow_ids(&out).await), + sorted(vec![ext.base.alice_workflow, ext.carol_workflow]), + ); +} + +#[tokio::test] +async fn multi_workflow_id_filter_ignores_nonexistent_ids() { + let tmp = TempDir::new().unwrap(); + let (src, ext) = fresh_source_extended(&tmp).await; + let out = tmp.path().join("alice_plus_bogus.db"); + + // alice + a non-existent ID — should keep alice and silently ignore the bogus one. + run_export(opts( + &src, + out.clone(), + Filter::WorkflowIds(vec![ext.base.alice_workflow, 999_999]), + )) + .await + .expect("export"); + + assert_eq!(workflow_ids(&out).await, vec![ext.base.alice_workflow]); +} + +#[tokio::test] +async fn multi_access_group_filter_returns_union() { + let tmp = TempDir::new().unwrap(); + let (src, ext) = fresh_source_extended(&tmp).await; + let out = tmp.path().join("both_groups.db"); + + // proj-x links alice + bob; proj-y links carol. Filtering by both groups + // must return all three workflows. + run_export(opts( + &src, + out.clone(), + Filter::AccessGroups(vec![ext.base.group, ext.group_y]), + )) + .await + .expect("export"); + + assert_eq!( + sorted(workflow_ids(&out).await), + sorted(vec![ + ext.base.alice_workflow, + ext.base.bob_workflow, + ext.carol_workflow, + ]), + ); +} + +#[tokio::test] +async fn single_access_group_excludes_workflows_not_in_that_group() { + let tmp = TempDir::new().unwrap(); + let (src, ext) = fresh_source_extended(&tmp).await; + let out = tmp.path().join("just_proj_y.db"); + + // proj-y has only carol. alice and bob (proj-x only) must be dropped. + run_export(opts( + &src, + out.clone(), + Filter::AccessGroups(vec![ext.group_y]), + )) + .await + .expect("export"); + + assert_eq!(workflow_ids(&out).await, vec![ext.carol_workflow]); +} + +#[tokio::test] +async fn pre_existing_orphans_in_source_are_pruned() { + // Regression test for the case observed in the wild: the source DB + // contains rows in workflow-scoped tables whose workflow_id points at a + // workflow row that was already gone before the export ran. Cascade + // can't help (the parent row is missing, so there's nothing to delete), + // and VACUUM INTO copies the orphans verbatim. The export must sweep + // them with PRAGMA foreign_key_check. + + let tmp = TempDir::new().unwrap(); + let src = tmp.path().join("src.db"); + let pool = setup_source_db(&src).await; + let ids = seed(&pool).await; + + // Inject orphan rows on a single connection with foreign_keys disabled + // so the inserts succeed without a real workflow parent. + let mut conn = pool.acquire().await.unwrap(); + sqlx::query("PRAGMA foreign_keys = OFF") + .execute(&mut *conn) + .await + .unwrap(); + let orphan_workflow_id: i64 = 999_999; + sqlx::query( + "INSERT INTO failure_handler (workflow_id, name, rules) \ + VALUES (?, 'orphan-handler', '[]')", + ) + .bind(orphan_workflow_id) + .execute(&mut *conn) + .await + .unwrap(); + sqlx::query( + "INSERT INTO ro_crate_entity (workflow_id, entity_id, entity_type, metadata) \ + VALUES (?, '#orphan', 'Workflow', '{}')", + ) + .bind(orphan_workflow_id) + .execute(&mut *conn) + .await + .unwrap(); + sqlx::query("INSERT INTO event (workflow_id, timestamp, data) VALUES (?, 0, '{}')") + .bind(orphan_workflow_id) + .execute(&mut *conn) + .await + .unwrap(); + drop(conn); + pool.close().await; + + // Filter to alice. Cascade fires for bob's rows; orphan rows referencing + // workflow 999_999 are not the cascade's responsibility — they must be + // pruned by the foreign_key_check sweep. + let out = tmp.path().join("alice.db"); + run_export(opts(&src, out.clone(), Filter::Users(vec!["alice".into()]))) + .await + .expect("export"); + + assert_eq!(workflow_ids(&out).await, vec![ids.alice_workflow]); + + // Every table that had orphan rows should now be empty. + for table in ["failure_handler", "ro_crate_entity", "event"] { + assert_eq!( + count(&out, &format!("SELECT COUNT(*) FROM {table}")).await, + 0, + "{table} still contains orphan rows after export", + ); + } + + // Belt-and-suspenders: the output database should have zero FK + // violations of any kind. + assert_eq!( + count(&out, "SELECT COUNT(*) FROM pragma_foreign_key_check").await, + 0, + "pragma_foreign_key_check should report no violations", + ); +} + +#[tokio::test] +async fn full_copy_also_prunes_pre_existing_orphans() { + // Same orphan-injection pattern as the filtered test, but with no + // filter applied. Full-copy exports must still produce a DB free of FK + // violations — orphans are data corruption, not fidelity to the source. + + let tmp = TempDir::new().unwrap(); + let src = tmp.path().join("src.db"); + let pool = setup_source_db(&src).await; + let _ids = seed(&pool).await; + + let mut conn = pool.acquire().await.unwrap(); + sqlx::query("PRAGMA foreign_keys = OFF") + .execute(&mut *conn) + .await + .unwrap(); + sqlx::query( + "INSERT INTO failure_handler (workflow_id, name, rules) \ + VALUES (999999, 'orphan', '[]')", + ) + .execute(&mut *conn) + .await + .unwrap(); + drop(conn); + pool.close().await; + + let out = tmp.path().join("full.db"); + run_export(opts(&src, out.clone(), Filter::None)) + .await + .expect("export"); + + assert_eq!( + count(&out, "SELECT COUNT(*) FROM pragma_foreign_key_check").await, + 0, + "full copy should still prune pre-existing orphans", + ); + // Workflow rows are preserved (no filter); the orphan failure_handler row + // is gone, but bob's legitimate one (seeded for his existing workflow) + // remains. + assert_eq!(count(&out, "SELECT COUNT(*) FROM workflow").await, 2); + assert_eq!(count(&out, "SELECT COUNT(*) FROM failure_handler").await, 1); +} + +#[tokio::test] +async fn no_vacuum_flag_skips_final_vacuum_but_still_produces_valid_export() { + // Exercises run_final_vacuum=false (the --no-vacuum CLI flag). The output + // file size assertion is intentionally loose — we just confirm a valid + // SQLite database with the expected logical contents was produced. + + let tmp = TempDir::new().unwrap(); + let (src, ids) = fresh_source(&tmp).await; + let out = tmp.path().join("no_vacuum.db"); + + let mut o = opts(&src, out.clone(), Filter::Users(vec!["alice".into()])); + o.run_final_vacuum = false; + run_export(o).await.expect("export"); + + // Same correctness guarantees as a normal filtered export — VACUUM is + // purely a space-reclamation step. + assert_eq!(workflow_ids(&out).await, vec![ids.alice_workflow]); + assert_eq!(count(&out, "SELECT COUNT(*) FROM job").await, 2); + assert_eq!( + count(&out, "SELECT COUNT(*) FROM pragma_foreign_key_check").await, + 0, + ); + assert_eq!(count(&out, "SELECT COUNT(*) FROM access_group").await, 0); +} + +#[tokio::test] +async fn refuses_to_export_over_source_database() { + // Regression test: a slip like `--database prod.db --output prod.db + // --overwrite` would otherwise call remove_file() on the source before + // opening it for VACUUM INTO — turning a typo into permanent data loss. + // The export must reject this case before any destructive action. + + let tmp = TempDir::new().unwrap(); + let src = tmp.path().join("src.db"); + let pool = setup_source_db(&src).await; + let _ = seed(&pool).await; + pool.close().await; + + let mut o = opts(&src, src.clone(), Filter::None); + o.overwrite = true; + let err = run_export(o) + .await + .expect_err("expected refusal to overwrite source"); + assert!( + err.to_string().contains("source database"), + "unexpected error: {err}" + ); + + // Source must still be intact — both the file and its content. + assert!( + src.exists(), + "source DB should not have been deleted by the failed export" + ); + assert_eq!(count(&src, "SELECT COUNT(*) FROM workflow").await, 2); +} + +#[tokio::test] +async fn refuses_self_overwrite_when_paths_differ_textually_but_resolve_the_same() { + // Symlinks, `./`, etc. all collapse to the same canonical path. Make + // sure the check uses canonical comparison, not string equality. + + let tmp = TempDir::new().unwrap(); + let src = tmp.path().join("src.db"); + let pool = setup_source_db(&src).await; + let _ = seed(&pool).await; + pool.close().await; + + // Construct a textually different path that resolves to the same file + // by routing through `./` and a redundant component. + let aliased = tmp.path().join(".").join("src.db"); + let mut o = opts(&src, aliased, Filter::None); + o.overwrite = true; + let err = run_export(o) + .await + .expect_err("expected refusal even via aliased path"); + assert!( + err.to_string().contains("source database"), + "unexpected error: {err}" + ); + assert!(src.exists(), "source DB should still be intact"); +}