diff --git a/.gitignore b/.gitignore index c8cca59..1a1ac61 100644 --- a/.gitignore +++ b/.gitignore @@ -9,3 +9,4 @@ dist/ .claude/settings.local.json session-*.md test/ssl/ +FOLLOWUPS.md diff --git a/AGENT.md b/AGENT.md index eb9db6b..9e1798e 100644 --- a/AGENT.md +++ b/AGENT.md @@ -24,15 +24,27 @@ Prefer fixup commits over amending and force-pushing. ## Maintenance Tooling -`tools/maintenance.py` is a self-contained Python script for DuckLake maintenance operations (snapshot expiry, file cleanup, orphan deletion, checkpoint, tiered compaction). It connects to DuckLake using the same env vars as the main app (`DUCKLAKE_RDS_*`, `DUCKDB_S3_*`, `DUCKLAKE_DATA_PATH`) but does not import from the `millpond` package. Designed to run as a K8s CronJob reusing the same Docker image. +`tools/maintenance.py` is a self-contained Python script for DuckLake maintenance operations (snapshot expiry, file cleanup, orphan deletion, checkpoint, tiered compaction, deletion-queue dedup, catalog-side orphan recovery, fsck). It connects to DuckLake using the same env vars as the main app (`DUCKLAKE_RDS_*`, `DUCKDB_S3_*`, `DUCKLAKE_DATA_PATH`) but does not import from the `millpond` package. Designed to run as a K8s CronJob reusing the same Docker image. -The `compact` subcommand implements tiered compaction: each tier wraps `ducklake_merge_adjacent_files()` with a save/restore of the catalog's `target_file_size` option (which `ducklake_set_option` writes durably and cannot be unset). Tier specs and the steady-state default live in `TIERS` and `DEFAULT_TARGET_FILE_SIZE` at the top of the file. Bin semantics on DuckLake 1.4.x are `min_file_size` inclusive, `max_file_size` exclusive — so the tier ranges `[0, 1 MiB)`, `[1 MiB, 10 MiB)`, `[10 MiB, 64 MiB)` partition the file-size space without overlap. The `compact-probe` subcommand runs `ducklake_merge_adjacent_files()` against one table with a small `max_compacted_files` cap and no `target_file_size` change — used as a lightweight diagnostic from a periodic CronJob. +`connect()` does two ATTACHes against the same upstream Postgres: the DuckLake catalog as `lake` (the `ATTACH_NAME` constant) and a direct Postgres ATTACH as `pg` (the `PG_ATTACH_NAME` constant) used by `postgres_execute` / `postgres_query` for ctid-based DML and advisory-lock acquisition. S3 access is configured via both the legacy `SET s3_*` settings (honored by the ducklake catalog driver) and a `CREATE OR REPLACE SECRET s3` (required for ad-hoc httpfs ops like `glob('s3://...')` and `read_parquet('s3://...')` in DuckDB 1.4 — the legacy form alone returns 403 for those paths). Spills are pointed at `/tmp/duckdb_spill` so they land on the writable cron-pod emptyDir, not the read-only rootfs. + +The `compact` subcommand implements tiered compaction: each tier wraps `ducklake_merge_adjacent_files()` with a save/restore of the catalog's `target_file_size` option (which `ducklake_set_option` writes durably and cannot be unset). Tier specs and the steady-state default live in `TIERS` and `DEFAULT_TARGET_FILE_SIZE` at the top of the file. Bin semantics on DuckLake 1.4.x are `min_file_size` inclusive, `max_file_size` exclusive — so the tier ranges `[0, 1 MiB)`, `[1 MiB, 10 MiB)`, `[10 MiB, 64 MiB)` partition the file-size space without overlap. The `compact-probe` subcommand runs `ducklake_merge_adjacent_files()` against one table with a small `max_compacted_files` cap and no `target_file_size` change — used as a lightweight diagnostic from a periodic CronJob. `compact` exposes `--threads` (default 2) and `--memory-limit` (default 4GB) because `ducklake_merge_adjacent_files` over-uses memory relative to input volume in 1.4 and the conservative defaults keep the cron pod alive on real lakes; raise them when the lake fits. + +DuckLake stores its catalog tables (`ducklake_data_file`, `ducklake_table`, `ducklake_files_scheduled_for_deletion`, etc.) in a Postgres schema named `__ducklake_metadata_`, where `` is the alias from the `ATTACH … AS ` statement. `maintenance.py` exposes the alias as the `ATTACH_NAME` constant and the derived `METADATA_SCHEMA = f"__ducklake_metadata_{ATTACH_NAME}"`; any new code that reads the catalog directly (rather than through the `ducklake_*` SQL functions) must reference `METADATA_SCHEMA` so the attach name and schema name never drift. + +`tools/maintenance.sql` is executed verbatim at every session start, both by `maintenance.py`'s `connect()` and by the `just shell` recipe (via the duckdb CLI's `.read` meta-command). The file contains no templating — `__ducklake_metadata_lake` and the rest are written literally so both load paths stay consistent. It defines runtime macros (`count_pending_dups()`, `find_catalog_orphans(data_path)`) and documents the constraints any new recipe must follow: no `LEFT ANTI JOIN` (DuckDB 1.4 limitation; use `LEFT JOIN ... WHERE rhs IS NULL`), no Postgres `ctid` from duckdb-side SQL (use `postgres_execute` / `postgres_query` instead), no literal `glob('s3://...')` inside `CREATE MACRO` bodies (DuckDB 1.4 evaluates them eagerly at macro creation, which would S3-LIST the lake on every connect — pass the path as a parameter), and the advisory-lock key. + +The catalog-side orphan-recovery subcommands (`dedup-deletions`, `find-orphans`, `heal-orphans`, `cleanup-all-safe`, `fsck`) form a self-contained recovery toolkit for the failure mode where an interrupted `cleanup-all` leaves the catalog with rows pointing at S3 keys that no longer exist (because the upstream txn rolled back but the S3 deletes are permanent). `heal-orphans` runs two safety gates before deleting: B1 proves `ducklake_data_file` is non-empty AND no orphan path is still live (no vacuous pass on an empty catalog, no false positive that would delete a live file), and B3 aborts if any positional-delete vector references an "orphan" id (such a file is still live for vector lookups). `cleanup-all-safe` is the orchestrator that loops dedup + heal + cleanup-all under one advisory lock until cleanup-all exits clean; `fsck` adds the `ducklake_delete_orphaned_files` S3-side sweep on top. All destructive subcommands take `pg_try_advisory_lock(hashtext('millpond-ducklake-maintenance')::bigint)` on the `pg` ATTACH; the lock is held by the `pg` connection (not the `lake` connection that DuckLake uses internally), so it provides mutual exclusion between maintenance invocations but not catalog-write atomicity against arbitrary writers — document this caveat anywhere the lock is mentioned. `main()` logs `millpond (maintenance)` on startup using `importlib.metadata.version("millpond")`, mirroring `millpond/main.py`. If `MILLPOND_IMAGE` is set in the env, the image identifier is appended (`image=`). The chart-side wiring is optional — without it, the package version is sufficient to identify which build is running. +`cleanup` and `cleanup-all` log a single structured throughput line on completion: `cleanup throughput: files_processed=N elapsed_s=T rate_obj_s=R queue_depth_after=A`. `files_processed` comes directly from `len(result)` — the count of rows `ducklake_cleanup_old_files` returned — rather than a queue-depth delta. A delta would be misleading whenever any other writer enqueues deletions during the call (the maintenance advisory lock by design only mutexes maintenance invocations, not arbitrary writers); `len(result)` is accurate regardless. `queue_depth_after` is queried with a single post-call snapshot and shows remaining work but is not used in the rate calculation. The line is intentionally suppressed on `--dry-run` because dry-run returns preview rows and a rate computed from those would falsely claim work was done. + If `PUSHGATEWAY_URL` is set, the script pushes two metrics: `maintenance_start_time{operation}` (pushed immediately on start) and `maintenance_duration_seconds{operation, status}` (pushed on completion). This enables Grafana annotation queries for maintenance windows. -`tools/justfile` wraps the script for interactive use and is copied to `/justfile` in the Docker image. The `shell` and `drop` recipes still use the DuckDB CLI directly. The `DUCKDB` env var can override the duckdb binary path. +DuckDB's HTTP logging and the postgres extension's `pg_debug_show_queries` are off by default — both add per-call overhead that compounds across tens of thousands of S3 deletes. Pass `--debug` at the maintenance.py command level to opt back into both for short-lived debugging. + +`tools/justfile` wraps the script for interactive use and is copied to `/justfile` in the Docker image. The `shell` recipe pre-ATTACHes both `lake` and `pg`, configures the S3 SECRET, and `.read`s `tools/maintenance.sql` so every session starts with the macros loaded; the `drop` recipe still uses the DuckDB CLI directly. The `DUCKDB` env var can override the duckdb binary path; `MAINTENANCE_SCRIPT` and `MAINTENANCE_SQL` env vars override the in-image paths for dev use. ## What This Is diff --git a/Dockerfile b/Dockerfile index c0b64df..aaaa63e 100644 --- a/Dockerfile +++ b/Dockerfile @@ -24,6 +24,7 @@ COPY --from=builder /app/millpond /app/millpond COPY --from=builder /root/.local/bin/duckdb /usr/local/bin/duckdb COPY tools/justfile /justfile COPY tools/maintenance.py /app/tools/maintenance.py +COPY tools/maintenance.sql /app/tools/maintenance.sql ENV PATH="/app/.venv/bin:$PATH" diff --git a/README.md b/README.md index d9c8490..a91f952 100644 --- a/README.md +++ b/README.md @@ -94,28 +94,60 @@ Requires Docker (uses `keytool` from the Kafka container image for cert generati ### DuckLake Maintenance -`tools/maintenance.py` is a self-contained Python script for DuckLake maintenance operations (snapshot expiry, file cleanup, orphan deletion, checkpoint). It is baked into the Docker image at `/app/tools/maintenance.py` and designed to run as a K8s CronJob reusing the same image and credentials as the main application. +`tools/maintenance.py` is a self-contained Python script for DuckLake maintenance operations (snapshot expiry, file cleanup, orphan deletion, checkpoint, tiered compaction, deletion-queue dedup, catalog-side orphan recovery). It is baked into the Docker image at `/app/tools/maintenance.py` and designed to run as a K8s CronJob reusing the same image and credentials as the main application. ```bash -python /app/tools/maintenance.py maintain --days 7 # expire snapshots + cleanup files +python /app/tools/maintenance.py maintain --days 7 # expire snapshots + cleanup files python /app/tools/maintenance.py maintain --days 7 --dry-run # preview only -python /app/tools/maintenance.py expire --days 3 # expire snapshots only -python /app/tools/maintenance.py cleanup --days 1 # cleanup scheduled files only -python /app/tools/maintenance.py checkpoint # integrated merge + expire + cleanup -python /app/tools/maintenance.py orphans # delete orphaned S3 files +python /app/tools/maintenance.py expire --days 3 # expire snapshots only +python /app/tools/maintenance.py cleanup --days 1 # cleanup scheduled files only +python /app/tools/maintenance.py cleanup-all # cleanup all scheduled files regardless of age +python /app/tools/maintenance.py dedup-deletions # drop duplicate rows in the pending-deletion queue +python /app/tools/maintenance.py find-orphans # list catalog rows whose S3 key no longer exists +python /app/tools/maintenance.py heal-orphans # delete those catalog rows (gated B1/B3 safety checks) +python /app/tools/maintenance.py cleanup-all-safe # dedup + heal-orphans + cleanup-all in a loop until clean +python /app/tools/maintenance.py fsck # cleanup-all-safe + ducklake_delete_orphaned_files +python /app/tools/maintenance.py checkpoint # integrated merge + expire + cleanup +python /app/tools/maintenance.py orphans # delete S3-side orphaned files (catalog has no row) +python /app/tools/maintenance.py compact --tier 1 # tiered compaction (see "When to add a merge job") ``` +The script logs `cleanup throughput: files_processed=N elapsed_s=T rate_obj_s=R queue_depth_after=A` after every `cleanup` / `cleanup-all` (skipped on `--dry-run`), so you can confirm steady-state throughput without enabling debug logging. `files_processed` is the actual count of files the call returned, not a queue-depth delta, so the number is accurate even when other writers enqueue deletions during the run. Pass `--debug` to opt back into DuckDB's HTTP and Postgres-extension query logging — both are off by default because they add per-call overhead that compounds across tens of thousands of S3 deletes. + If `PUSHGATEWAY_URL` is set, the script pushes `maintenance_start_time` (on start) and `maintenance_duration_seconds` (on completion) to a Prometheus Pushgateway, enabling Grafana annotation queries for maintenance windows. +#### Catalog-side orphan recovery + +If a `cleanup-all` run is interrupted (DuckLake bug: an S3 NoSuchKey on DELETE rolls back the whole transaction, but the S3 deletes already-completed are permanent), the catalog ends up with rows in `ducklake_files_scheduled_for_deletion` that point at S3 keys that no longer exist. Every subsequent `cleanup-all` will crash on those orphans until they're cleaned up. The catalog-recovery subcommands handle this without manual SQL surgery: + +| Subcommand | Action | +|---|---| +| `find-orphans` | List orphan rows on stdout (read-only). | +| `heal-orphans` | Delete the orphan rows. Two safety gates: B1 proves `ducklake_data_file` is non-empty AND no orphan path is still live; B3 aborts if any positional-delete vector references an orphan id. `--dry-run` runs the gates but skips the DELETE. | +| `cleanup-all-safe` | Loop dedup-deletions + heal-orphans + cleanup-all under one advisory lock until cleanup-all exits clean. Caps at `--max-iterations` (default 10). | +| `fsck` | `cleanup-all-safe` followed by `ducklake_delete_orphaned_files` (S3-side orphan sweep). The end-to-end "lake catalog is healthy" recipe. | + +Mutual exclusion comes from `pg_try_advisory_lock(hashtext('millpond-ducklake-maintenance')::bigint)` taken on the `pg` ATTACH; concurrent maintenance invocations bail with a clear error rather than racing each other's DELETEs. + +`tools/maintenance.sql` is loaded at every session start (both by `maintenance.py` and by the `just shell` recipe) and defines small DuckDB macros for ad-hoc inspection — `SELECT count_pending_dups()` for queue dup count, `SELECT * FROM find_catalog_orphans('s3://bucket/lake/data')` for the orphan list. The header documents the conventions (no `LEFT ANTI JOIN`, no duckdb-side `ctid`, advisory-lock key) that any new recipe must follow. + `tools/justfile` wraps the script and is also baked into the image at `/justfile` for interactive use: ```bash -just --list # see available recipes -just maintain-dry-run 3 # preview: expire >3 day snapshots + cleanup -just maintain 3 # execute it -just shell # interactive DuckDB shell connected to DuckLake -just drop events # drop a table (data files remain until cleanup) -just orphans-dry-run # preview orphaned S3 files +just --list # see available recipes +just maintain-dry-run 3 # preview: expire >3 day snapshots + cleanup +just maintain 3 # execute it +just dedup-deletions-dry-run # preview duplicate rows in the pending-deletion queue +just dedup-deletions # drop them +just find-orphans # list catalog-side orphan rows +just heal-orphans-dry-run # preview heal-orphans (gates only, no DELETE) +just heal-orphans # delete catalog-side orphan rows +just cleanup-all-safe # dedup + heal + cleanup-all in a loop +just fsck-dry-run # preview fsck end-to-end +just fsck # bring catalog to known-good state +just shell # interactive DuckDB shell with lake + pg ATTACHed and macros loaded +just drop events # drop a table (data files remain until cleanup) +just orphans-dry-run # preview S3-side orphaned files ``` All commands use the pod's existing env vars (`DUCKLAKE_RDS_*`, `DUCKDB_S3_*`, `DUCKLAKE_DATA_PATH`). @@ -242,6 +274,8 @@ Tier ranges (verified semantics: `min_file_size` inclusive, `max_file_size` excl | `compact-to-tier-2` | `[1 MiB, 10 MiB)` | ~32 MiB | | `compact-to-tier-3` | `[10 MiB, 64 MiB)` | ~128 MiB | +The `compact` subcommand bounds DuckDB resource use during the merge — `--threads` (default 2) and `--memory-limit` (default 4GB) — because `ducklake_merge_adjacent_files` isn't fully streaming today and over-uses memory relative to input size. The defaults are conservative; raise them on lakes that fit comfortably in pod memory. + This is an out-of-band maintenance operation, not part of the hot path. See the [sizing calculator](https://posthog.github.io/millpond/sizing-calculator.html) for interactive estimates. diff --git a/millpond/ducklake.py b/millpond/ducklake.py index b8ff446..aacd9e5 100644 --- a/millpond/ducklake.py +++ b/millpond/ducklake.py @@ -16,8 +16,15 @@ def _escape_libpq(value: str | None) -> str: """Escape a value for a libpq connection string. - Wraps in single quotes and backslash-escapes internal single quotes and backslashes. - See: https://www.postgresql.org/docs/current/libpq-connect.html#LIBPQ-CONNSTRING + Wraps in single quotes and backslash-escapes internal single quotes and + backslashes, per the libpq connstring grammar: + + https://www.postgresql.org/docs/current/libpq-connect.html#LIBPQ-CONNSTRING + + Note this is *not* the same parser as Postgres SQL string literals — the + SQL parser uses ``''`` for embedded quotes and is governed by + ``standard_conforming_strings``; the libpq connstring parser is a + separate grammar that has always required ``\\'`` and ``\\\\``. """ if value is None: return "''" diff --git a/pyproject.toml b/pyproject.toml index 8fbe0a7..fd21f51 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -20,6 +20,9 @@ dependencies = [ "pyarrow>=18.0", "orjson>=3.10", "prometheus-client>=0.21", + # Required by duckdb's Python conversion for TIMESTAMPTZ columns; the + # stdlib zoneinfo module isn't accepted as a substitute as of 1.4.x. + "pytz>=2024.1", ] [project.optional-dependencies] @@ -47,6 +50,7 @@ markers = [ "integration: marks tests as integration tests (deselect with '-m \"not integration\"')", "e2e: marks tests as E2E tests requiring docker-compose stack", ] +pythonpath = ["tools"] [tool.uv] constraint-dependencies = [ diff --git a/tests/unit/test_ducklake.py b/tests/unit/test_ducklake.py index 3e04831..58c4ac0 100644 --- a/tests/unit/test_ducklake.py +++ b/tests/unit/test_ducklake.py @@ -74,6 +74,8 @@ def test_quoted_string_rejected(self): class TestEscapeLibpq: + """libpq connstring grammar: backslash escapes, NOT SQL-style doubled quotes.""" + def test_plain_value(self): assert _escape_libpq("ducklake") == "'ducklake'" diff --git a/tests/unit/test_maintenance.py b/tests/unit/test_maintenance.py new file mode 100644 index 0000000..89eb153 --- /dev/null +++ b/tests/unit/test_maintenance.py @@ -0,0 +1,385 @@ +"""Unit tests for tools/maintenance.py. + +Coverage tier A: pure helpers, log message shape, argparse plumbing. +Coverage tier B: orchestrator retry/dispatch logic with mocked sub-calls. + +Tier C (macros against a stubbed catalog) and tier D (full e2e against a +real lake) are intentionally out of scope here — they need either an +in-process duckdb with stubbed schemas or the docker-compose stack. +""" + +import logging +from unittest.mock import MagicMock, patch + +import duckdb +import maintenance +import pytest + +# --------------------------------------------------------------------------- +# Tier A — pure helpers and log shape +# --------------------------------------------------------------------------- + + +class TestSqlStringLiteral: + def test_plain(self): + assert maintenance._sql_string_literal("plain") == "'plain'" + + def test_embedded_quote_doubled(self): + assert maintenance._sql_string_literal("with'quote") == "'with''quote'" + + def test_already_doubled_quotes_are_escaped_again(self): + # The helper has no idea whether the input was pre-escaped; doubling + # is a one-way transform consistent with SQL string-literal rules. + assert maintenance._sql_string_literal("two''already") == "'two''''already'" + + def test_empty(self): + assert maintenance._sql_string_literal("") == "''" + + +class TestBytesToHuman: + """Round-trip DuckLake's stored byte-count format back to a units-suffixed form.""" + + def test_clean_mib(self): + assert maintenance._bytes_to_human("67108864") == "64MiB" + assert maintenance._bytes_to_human("134217728") == "128MiB" + assert maintenance._bytes_to_human("5242880") == "5MiB" + + def test_clean_gib(self): + assert maintenance._bytes_to_human("1073741824") == "1GiB" + assert maintenance._bytes_to_human("2147483648") == "2GiB" + + def test_clean_kib(self): + assert maintenance._bytes_to_human("1024") == "1KiB" + assert maintenance._bytes_to_human("4096") == "4KiB" + + def test_picks_largest_clean_unit(self): + # 64 MiB = 65536 KiB; the converter picks MiB, not KiB. + assert maintenance._bytes_to_human("67108864") == "64MiB" + + def test_non_power_of_1024_returns_none(self): + assert maintenance._bytes_to_human("12345678") is None + + def test_zero_or_negative_returns_none(self): + assert maintenance._bytes_to_human("0") is None + assert maintenance._bytes_to_human("-1024") is None + + def test_non_integer_returns_none(self): + assert maintenance._bytes_to_human("128MiB") is None + assert maintenance._bytes_to_human("") is None + assert maintenance._bytes_to_human(None) is None + + +class TestLogCleanupThroughput: + """The throughput line is keyed off ``files_processed`` (rows the + operation actually returned) rather than a queue-depth delta — concurrent + writers can change the queue mid-run, making any delta misleading or + even negative.""" + + def test_typical(self, caplog): + with caplog.at_level(logging.INFO, logger="maintenance"): + maintenance._log_cleanup_throughput( + "cleanup-all", files_processed=50, elapsed_s=10.0, queue_depth_after=950 + ) + msg = caplog.records[0].getMessage() + assert "cleanup-all throughput" in msg + assert "files_processed=50" in msg + assert "elapsed_s=10.0" in msg + assert "rate_obj_s=5.0" in msg + assert "queue_depth_after=950" in msg + # No before-snapshot in the new shape — explicit assertion that the + # racy field is gone, in case anyone re-introduces it. + assert "queue_depth_before" not in msg + + def test_zero_elapsed_does_not_divide_by_zero(self, caplog): + with caplog.at_level(logging.INFO, logger="maintenance"): + maintenance._log_cleanup_throughput("cleanup", 0, 0.0, 0) + msg = caplog.records[0].getMessage() + assert "rate_obj_s=0.0" in msg + + def test_full_drain(self, caplog): + with caplog.at_level(logging.INFO, logger="maintenance"): + maintenance._log_cleanup_throughput("cleanup-all", 47023, 9405.0, 0) + msg = caplog.records[0].getMessage() + assert "files_processed=47023" in msg + assert "rate_obj_s=5.0" in msg + assert "queue_depth_after=0" in msg + + +class TestArgparse: + """The new subcommands must reach the dispatch with the expected fields.""" + + def setup_method(self): + self.parser = maintenance.build_parser() + + def test_dedup_deletions_dry_run(self): + args = self.parser.parse_args(["dedup-deletions", "--dry-run"]) + assert args.command == "dedup-deletions" + assert args.dry_run is True + + def test_dedup_deletions_real(self): + args = self.parser.parse_args(["dedup-deletions"]) + assert args.dry_run is False + + def test_find_orphans(self): + args = self.parser.parse_args(["find-orphans"]) + assert args.command == "find-orphans" + + def test_heal_orphans(self): + args = self.parser.parse_args(["heal-orphans"]) + assert args.command == "heal-orphans" + assert args.dry_run is False + + def test_heal_orphans_dry_run(self): + args = self.parser.parse_args(["heal-orphans", "--dry-run"]) + assert args.dry_run is True + + def test_cleanup_all_safe_default_iterations(self): + args = self.parser.parse_args(["cleanup-all-safe"]) + assert args.command == "cleanup-all-safe" + assert args.max_iterations == 10 + + def test_cleanup_all_safe_override(self): + args = self.parser.parse_args(["cleanup-all-safe", "--max-iterations", "3"]) + assert args.max_iterations == 3 + + def test_cleanup_all_safe_zero_rejected(self): + with pytest.raises(SystemExit): + self.parser.parse_args(["cleanup-all-safe", "--max-iterations", "0"]) + + def test_fsck_dry_run(self): + args = self.parser.parse_args(["fsck", "--dry-run"]) + assert args.command == "fsck" + assert args.dry_run is True + assert args.max_iterations == 10 + + def test_fsck_real(self): + args = self.parser.parse_args(["fsck", "--max-iterations", "5"]) + assert args.dry_run is False + assert args.max_iterations == 5 + + def test_compact_threads_memory_defaults(self): + args = self.parser.parse_args(["compact", "--tier", "1"]) + assert args.threads == 2 + assert args.memory_limit == "4GB" + + def test_compact_threads_memory_override(self): + args = self.parser.parse_args(["compact", "--tier", "2", "--threads", "8", "--memory-limit", "16GB"]) + assert args.threads == 8 + assert args.memory_limit == "16GB" + + +# --------------------------------------------------------------------------- +# Tier B — orchestrators with mocked sub-calls +# --------------------------------------------------------------------------- + + +class TestCleanupAllSafe: + """Verify the dedup → heal → cleanup-all retry loop.""" + + def _patches(self): + return ( + patch("maintenance._acquire_advisory_lock"), + patch("maintenance.dedup_deletions"), + patch("maintenance.heal_orphans"), + patch("maintenance.cleanup_all"), + ) + + def test_succeeds_on_first_attempt(self): + lock, dedup, heal, cleanup = (p.start() for p in self._patches()) + try: + maintenance.cleanup_all_safe(MagicMock(), max_iterations=10) + finally: + patch.stopall() + # Lock taken once for the whole orchestration. + assert lock.call_count == 1 + # One pass of dedup + heal + cleanup_all. + assert dedup.call_count == 1 + assert heal.call_count == 1 + assert cleanup.call_count == 1 + + def test_retries_after_io_exception_then_succeeds(self): + lock, dedup, heal, cleanup = (p.start() for p in self._patches()) + # First attempt: cleanup_all raises (simulating the c1 NoSuchKey crash). + # Second attempt: dedup + heal mop up the fresh orphans, cleanup succeeds. + cleanup.side_effect = [duckdb.IOException("simulated NoSuchKey"), None] + try: + maintenance.cleanup_all_safe(MagicMock(), max_iterations=10) + finally: + patch.stopall() + assert lock.call_count == 1, "lock taken once for the whole orchestration" + assert dedup.call_count == 2, "dedup re-runs to clean up after the crash" + assert heal.call_count == 2, "heal re-runs to clean up after the crash" + assert cleanup.call_count == 2 + + def test_exhausts_iterations_and_raises(self): + lock, dedup, heal, cleanup = (p.start() for p in self._patches()) + cleanup.side_effect = duckdb.IOException("persistent crash") + try: + with pytest.raises(RuntimeError, match="exhausted 3 iterations"): + maintenance.cleanup_all_safe(MagicMock(), max_iterations=3) + finally: + patch.stopall() + assert dedup.call_count == 3 + assert heal.call_count == 3 + assert cleanup.call_count == 3 + + +class TestFsck: + """Verify the dry-run vs real dispatch and ordering.""" + + def test_dry_run_delegates_to_dry_run_subcalls(self): + with ( + patch("maintenance.dedup_deletions") as dedup, + patch("maintenance.heal_orphans") as heal, + patch("maintenance.orphans") as s3_orphans, + patch("maintenance.cleanup_all_safe") as orch, + ): + conn = MagicMock() + maintenance.fsck(conn, dry_run=True, max_iterations=10) + # Dry-run must run heal_orphans so its B1/B3 gates execute. + dedup.assert_called_once_with(conn, dry_run=True) + heal.assert_called_once_with(conn, dry_run=True) + s3_orphans.assert_called_once_with(conn, dry_run=True) + orch.assert_not_called() + + def test_real_run_calls_orchestrator_and_s3_sweep(self): + with ( + patch("maintenance.dedup_deletions") as dedup, + patch("maintenance.heal_orphans") as heal, + patch("maintenance.orphans") as s3_orphans, + patch("maintenance.cleanup_all_safe") as orch, + ): + conn = MagicMock() + maintenance.fsck(conn, dry_run=False, max_iterations=7) + # Real path goes through cleanup_all_safe (which itself calls dedup + + # heal under the lock); it must NOT call heal/dedup directly here, or + # we'd be running them outside the lock. + orch.assert_called_once_with(conn, 7) + s3_orphans.assert_called_once_with(conn, dry_run=False) + dedup.assert_not_called() + heal.assert_not_called() + + def test_dry_run_propagates_gate_failure(self): + """A real fsck would abort once heal-orphans hits a failed gate; the + dry-run must surface the same outcome rather than reporting healthy.""" + with ( + patch("maintenance.dedup_deletions"), + patch("maintenance.heal_orphans") as heal, + patch("maintenance.orphans"), + ): + heal.side_effect = RuntimeError("safety gate B1 failed: ...") + with pytest.raises(RuntimeError, match="safety gate B1"): + maintenance.fsck(MagicMock(), dry_run=True, max_iterations=10) + + +class TestSetCompactionTuning: + """Pure SQL emission; no real connection needed.""" + + def test_emits_expected_sets(self): + conn = MagicMock() + maintenance._set_compaction_tuning(conn, threads=4, memory_limit="8GB") + executed = [c.args[0] for c in conn.execute.call_args_list] + assert "SET threads = 4" in executed + assert "SET memory_limit = '8GB'" in executed + assert "SET preserve_insertion_order = false" in executed + assert "SET http_timeout = 600000" in executed + + def test_rejects_injection_in_memory_limit(self): + conn = MagicMock() + with pytest.raises(ValueError, match="Illegal character"): + maintenance._set_compaction_tuning(conn, threads=2, memory_limit="4GB'; DROP TABLE x; --") + # Sanitization happens before any execute; conn must not have been touched. + conn.execute.assert_not_called() + + +class TestScopedTargetFileSize: + """Read-and-restore round-trip for target_file_size; warns only on real failure.""" + + def _conn(self, prior_value): + """A duckdb-conn shaped MagicMock that returns ``prior_value`` from the + ducklake_options read and accepts the ducklake_set_option calls.""" + conn = MagicMock() + # Each conn.execute() returns a result object whose fetchone() is + # configured per call. We only care about the first fetchone (the + # ducklake_options read); the set_option CALLs return result objects + # that are never .fetchone()'d. + result = MagicMock() + result.fetchone.return_value = (prior_value,) if prior_value is not None else None + conn.execute.return_value = result + return conn + + def test_no_warning_when_prior_converts_cleanly_to_default(self, caplog): + # 134217728 bytes == 128 MiB == DEFAULT_TARGET_FILE_SIZE. The conversion + # succeeded; the warning must NOT fire just because the converted form + # equals the default — that's a healthy install, not a failure. + conn = self._conn("134217728") + with caplog.at_level(logging.WARNING, logger="maintenance"): + with maintenance._scoped_target_file_size(conn, "5MiB"): + pass + warnings = [r for r in caplog.records if r.levelno >= logging.WARNING] + assert warnings == [], "no warning expected on a clean default-value round-trip" + + def test_no_warning_when_prior_converts_cleanly_to_non_default(self, caplog): + # 64 MiB — operator value, not the default. + conn = self._conn("67108864") + with caplog.at_level(logging.WARNING, logger="maintenance"): + with maintenance._scoped_target_file_size(conn, "5MiB"): + pass + assert [r for r in caplog.records if r.levelno >= logging.WARNING] == [] + + def test_warning_when_conversion_fails(self, caplog): + # 12345678 isn't a clean power of 1024; _bytes_to_human returns None, + # and we genuinely lose the operator's value — that's the case where + # the warning is informative. + conn = self._conn("12345678") + with caplog.at_level(logging.WARNING, logger="maintenance"): + with maintenance._scoped_target_file_size(conn, "5MiB"): + pass + warnings = [r.getMessage() for r in caplog.records if r.levelno >= logging.WARNING] + assert len(warnings) == 1 + assert "could not be converted" in warnings[0] + assert "12345678" in warnings[0] + + def test_no_warning_when_prior_unset(self, caplog): + conn = self._conn(None) + with caplog.at_level(logging.WARNING, logger="maintenance"): + with maintenance._scoped_target_file_size(conn, "5MiB"): + pass + assert [r for r in caplog.records if r.levelno >= logging.WARNING] == [] + + def test_restores_to_converted_prior(self): + conn = self._conn("67108864") + with maintenance._scoped_target_file_size(conn, "5MiB"): + pass + # The last execute call should be the restore SET to '64MiB'. + last_sql = conn.execute.call_args_list[-1].args[0] + assert "target_file_size" in last_sql + assert "'64MiB'" in last_sql + + def test_restores_to_default_when_prior_unset(self): + conn = self._conn(None) + with maintenance._scoped_target_file_size(conn, "5MiB"): + pass + last_sql = conn.execute.call_args_list[-1].args[0] + assert f"'{maintenance.DEFAULT_TARGET_FILE_SIZE}'" in last_sql + + +class TestAcquireAdvisoryLock: + """The lock-helper SQL must use single quotes around the inner literal.""" + + def test_emits_single_postgres_query_call_with_doubled_quotes(self): + conn = MagicMock() + conn.execute.return_value.fetchone.return_value = (True,) + maintenance._acquire_advisory_lock(conn) + sent = conn.execute.call_args_list[0].args[0] + # Outer single-quote-wrapped literal (postgres_query 2nd arg) and + # inner single quotes around 'millpond-...' must be doubled to + # survive the duckdb-side parser (regression test for a real bug). + assert "postgres_query('pg', '" in sent + assert "''millpond-ducklake-maintenance''" in sent + + def test_raises_when_lock_held_by_another_session(self): + conn = MagicMock() + conn.execute.return_value.fetchone.return_value = (False,) + with pytest.raises(RuntimeError, match="advisory lock"): + maintenance._acquire_advisory_lock(conn) diff --git a/tests/unit/test_maintenance_macros.py b/tests/unit/test_maintenance_macros.py new file mode 100644 index 0000000..0be5980 --- /dev/null +++ b/tests/unit/test_maintenance_macros.py @@ -0,0 +1,417 @@ +"""Tier C: SQL macros in tools/maintenance.sql against a stub schema. + +In-process DuckDB with stub catalog tables and a real local-filesystem glob. +Exercises path-normalization logic where the same physical file may be +referenced as either an absolute URI or a bucket-relative key (per quirk r1 +in the followup doc) — the same area where a previous regression +(``heal-orphans`` B1 gate, false-pass on cross-table form mismatch) lived. + +Also covers the heal-orphans B1/B3 safety gates against stub +ducklake_data_file / ducklake_delete_file tables, including the +end_snapshot live filter that distinguishes current rows from expired +historical entries. +""" + +import re + +import duckdb +import maintenance +import pytest + + +def _make_stub_lake(con): + """Mirror the DuckLake catalog tables maintenance.py / maintenance.sql touch. + + Includes the ``end_snapshot`` column on data_file and delete_file even + though the existing macro tests don't read it — heal-orphans's safety + gates do, and keeping one stub schema across the file avoids drift. + """ + con.execute("CREATE SCHEMA __ducklake_metadata_lake") + con.execute( + "CREATE TABLE __ducklake_metadata_lake.ducklake_files_scheduled_for_deletion (" + " data_file_id BIGINT, path VARCHAR" + ")" + ) + con.execute( + "CREATE TABLE __ducklake_metadata_lake.ducklake_data_file (" + " data_file_id BIGINT, path VARCHAR, end_snapshot BIGINT" + ")" + ) + con.execute( + "CREATE TABLE __ducklake_metadata_lake.ducklake_delete_file ( data_file_id BIGINT, end_snapshot BIGINT)" + ) + + +def _seed_queue(con, rows): + con.executemany( + "INSERT INTO __ducklake_metadata_lake.ducklake_files_scheduled_for_deletion VALUES (?, ?)", + rows, + ) + + +def _load_macros(con): + con.execute(maintenance.MAINTENANCE_SQL_PATH.read_text()) + + +@pytest.fixture +def lake_con(): + con = duckdb.connect() + _make_stub_lake(con) + yield con + con.close() + + +@pytest.fixture +def data_dir(tmp_path): + """A local-filesystem stand-in for the lake's S3 data path.""" + d = tmp_path / "lake" / "data" + d.mkdir(parents=True) + return d + + +def _touch(directory, names): + for n in names: + (directory / n).write_bytes(b"") + + +class TestCountPendingDups: + def test_no_dups(self, lake_con): + _seed_queue(lake_con, [(1, "a.parquet"), (2, "b.parquet")]) + _load_macros(lake_con) + assert lake_con.execute("SELECT count_pending_dups()").fetchone()[0] == 0 + + def test_one_dup(self, lake_con): + _seed_queue(lake_con, [(1, "a.parquet"), (1, "a.parquet"), (2, "b.parquet")]) + _load_macros(lake_con) + assert lake_con.execute("SELECT count_pending_dups()").fetchone()[0] == 1 + + def test_multiple_dups_per_path(self, lake_con): + # path 'c' has 3 entries: 2 extras. path 'a' has 2 entries: 1 extra. Total 3. + _seed_queue( + lake_con, + [ + (1, "a.parquet"), + (1, "a.parquet"), + (2, "b.parquet"), + (3, "c.parquet"), + (3, "c.parquet"), + (3, "c.parquet"), + ], + ) + _load_macros(lake_con) + assert lake_con.execute("SELECT count_pending_dups()").fetchone()[0] == 3 + + def test_empty_queue(self, lake_con): + _load_macros(lake_con) + assert lake_con.execute("SELECT count_pending_dups()").fetchone()[0] == 0 + + +class TestFindCatalogOrphans: + """Path-matching tolerates absolute s3:// URIs and bucket-relative keys + in the same column (per quirk r1). Tests both forms and a mix.""" + + def test_returns_empty_when_all_paths_live(self, lake_con, data_dir): + _touch(data_dir, ["a.parquet", "b.parquet"]) + _seed_queue(lake_con, [(1, str(data_dir / "a.parquet")), (2, str(data_dir / "b.parquet"))]) + _load_macros(lake_con) + rows = lake_con.execute("SELECT * FROM find_catalog_orphans(?)", [str(data_dir)]).fetchall() + assert rows == [] + + def test_absolute_form_orphan_detected(self, lake_con, data_dir): + _touch(data_dir, ["live.parquet"]) + _seed_queue( + lake_con, + [ + (1, str(data_dir / "live.parquet")), # absolute, exists + (2, str(data_dir / "missing.parquet")), # absolute, gone + ], + ) + _load_macros(lake_con) + rows = lake_con.execute("SELECT * FROM find_catalog_orphans(?)", [str(data_dir)]).fetchall() + assert rows == [(2, str(data_dir / "missing.parquet"))] + + def test_relative_form_orphan_detected(self, lake_con, data_dir): + _touch(data_dir, ["live.parquet"]) + _seed_queue( + lake_con, + [ + (1, "live.parquet"), # relative, exists + (2, "missing.parquet"), # relative, gone + ], + ) + _load_macros(lake_con) + rows = lake_con.execute("SELECT * FROM find_catalog_orphans(?)", [str(data_dir)]).fetchall() + assert rows == [(2, "missing.parquet")] + + def test_mixed_forms_in_same_queue(self, lake_con, data_dir): + _touch(data_dir, ["a.parquet", "b.parquet"]) + _seed_queue( + lake_con, + [ + (1, str(data_dir / "a.parquet")), # absolute, live + (2, "b.parquet"), # relative, live + (3, str(data_dir / "x.parquet")), # absolute, orphan + (4, "y.parquet"), # relative, orphan + ], + ) + _load_macros(lake_con) + rows = lake_con.execute( + "SELECT * FROM find_catalog_orphans(?) ORDER BY data_file_id", + [str(data_dir)], + ).fetchall() + assert rows == [ + (3, str(data_dir / "x.parquet")), + (4, "y.parquet"), + ] + + def test_no_false_positive_for_relative_when_only_absolute_in_listing(self, lake_con, data_dir): + # Live S3 (here: local) listing returns absolute paths; the queue stores + # the same file as a relative key. Without the second branch of the + # macro's join (`l.file = data_path || '/' || s.path`), this would be + # falsely flagged as an orphan. + _touch(data_dir, ["live.parquet"]) + _seed_queue(lake_con, [(1, "live.parquet")]) + _load_macros(lake_con) + rows = lake_con.execute("SELECT * FROM find_catalog_orphans(?)", [str(data_dir)]).fetchall() + assert rows == [], "relative-form path with matching live file must NOT be reported as orphan" + + def test_empty_queue_returns_empty(self, lake_con, data_dir): + _touch(data_dir, ["live.parquet"]) + _load_macros(lake_con) + rows = lake_con.execute("SELECT * FROM find_catalog_orphans(?)", [str(data_dir)]).fetchall() + assert rows == [] + + def test_empty_listing_makes_everything_an_orphan(self, lake_con, data_dir): + # Empty data_dir — no parquet files exist. Every queue row is an orphan. + _seed_queue(lake_con, [(1, "a.parquet"), (2, "b.parquet")]) + _load_macros(lake_con) + rows = lake_con.execute( + "SELECT * FROM find_catalog_orphans(?) ORDER BY data_file_id", + [str(data_dir)], + ).fetchall() + assert rows == [(1, "a.parquet"), (2, "b.parquet")] + + def test_trailing_slash_in_data_path_no_false_orphan(self, lake_con, data_dir): + # `s3://bucket/lake/data/` is a common form. Without rtrim normalization + # the relative-form join produces `.../data//live.parquet` and the live + # row stored as `.../data/live.parquet` is mis-flagged as an orphan. + _touch(data_dir, ["live.parquet"]) + _seed_queue(lake_con, [(1, str(data_dir / "live.parquet"))]) + _load_macros(lake_con) + # Pass data_dir with a trailing slash. + rows = lake_con.execute("SELECT * FROM find_catalog_orphans(?)", [str(data_dir) + "/"]).fetchall() + assert rows == [], "trailing-slash data_path must not flag live absolute-form file as orphan" + + def test_trailing_slash_in_data_path_with_relative_form_also_clean(self, lake_con, data_dir): + # Same case but the queue stores the relative form. Without rtrim the + # join would produce `.../data//live.parquet` which never matches the + # glob output `.../data/live.parquet`, again flagging a live file. + _touch(data_dir, ["live.parquet"]) + _seed_queue(lake_con, [(1, "live.parquet")]) + _load_macros(lake_con) + rows = lake_con.execute("SELECT * FROM find_catalog_orphans(?)", [str(data_dir) + "/"]).fetchall() + assert rows == [], "trailing-slash data_path must not flag live relative-form file as orphan" + + def test_trailing_slash_still_detects_real_orphan(self, lake_con, data_dir): + # Sanity: rtrim normalization must not break orphan detection itself. + _touch(data_dir, ["live.parquet"]) + _seed_queue(lake_con, [(1, "missing.parquet")]) + _load_macros(lake_con) + rows = lake_con.execute("SELECT * FROM find_catalog_orphans(?)", [str(data_dir) + "/"]).fetchall() + assert rows == [(1, "missing.parquet")] + + +def _seed_data_files(con, rows): + """rows: iterable of (data_file_id, path, end_snapshot_or_None).""" + con.executemany( + "INSERT INTO __ducklake_metadata_lake.ducklake_data_file VALUES (?, ?, ?)", + rows, + ) + + +def _seed_delete_files(con, rows): + """rows: iterable of (data_file_id, end_snapshot_or_None).""" + con.executemany( + "INSERT INTO __ducklake_metadata_lake.ducklake_delete_file VALUES (?, ?)", + rows, + ) + + +class TestHealOrphansGates: + """heal-orphans's B1 and B3 safety gates, including the end_snapshot live + filter (the bug that an earlier review caught: gates were counting all + rows, so historical entries blocked perfectly valid heal-orphans runs). + + Drives heal_orphans(conn, dry_run=True) end-to-end so the test exercises + macro loading + path normalization + gate SQL in one shot. Dry-run skips + advisory-lock acquisition (which would need a real pg ATTACH) and the + final DELETE — ideal for unit-style coverage. + """ + + def _run(self, lake_con, data_dir, monkeypatch): + monkeypatch.setenv("DUCKLAKE_DATA_PATH", str(data_dir)) + _load_macros(lake_con) + # heal_orphans creates _orphans via find_catalog_orphans(?) then + # runs B1/B3. dry_run=True returns before the DELETE. + maintenance.heal_orphans(lake_con, dry_run=True) + + def test_b1_passes_when_only_expired_rows_match_queue(self, lake_con, data_dir, monkeypatch): + # data_file_id=1 was once live (path 'a') but is now expired (snapshot 100). + # data_file_id=2 is currently live with a different path ('live'). + # Queue holds 'a.parquet' (orphaned because the live data file does + # not include it). Without the end_snapshot filter, B1 sees the + # expired row and aborts. With the filter, it does not. + _seed_queue(lake_con, [(1, "a.parquet")]) + _seed_data_files( + lake_con, + [ + (1, "a.parquet", 100), # expired + (2, "live.parquet", None), # live + ], + ) + # data_dir empty so the queue path is genuinely orphaned. + self._run(lake_con, data_dir, monkeypatch) # must NOT raise + + def test_b1_aborts_when_live_row_matches_queue(self, lake_con, data_dir, monkeypatch): + # data_file_id=1 is currently live with path 'a' AND that same path is + # in the queue. The queue entry is NOT a real orphan; B1 must abort. + _seed_queue(lake_con, [(1, "a.parquet")]) + _seed_data_files(lake_con, [(1, "a.parquet", None)]) + with pytest.raises(RuntimeError, match="safety gate B1 failed.*still appear as live"): + self._run(lake_con, data_dir, monkeypatch) + + def test_b1_aborts_when_no_live_data_files_at_all(self, lake_con, data_dir, monkeypatch): + # Vacuous-pass guard: a catalog with zero LIVE rows is suspect even + # if there are historical rows. Bail rather than assuming the queue + # is full of orphans on an empty live state. + _seed_queue(lake_con, [(1, "a.parquet")]) + _seed_data_files(lake_con, [(99, "old.parquet", 50)]) # only expired rows + with pytest.raises(RuntimeError, match="zero live rows"): + self._run(lake_con, data_dir, monkeypatch) + + def test_b3_ignores_expired_delete_vectors(self, lake_con, data_dir, monkeypatch): + # Queue has data_file_id=42 as orphan; ducklake_delete_file has an + # expired delete vector against id=42. Without the end_snapshot + # filter, B3 would abort; with it, it doesn't. + _seed_queue(lake_con, [(42, "a.parquet")]) + _seed_data_files(lake_con, [(99, "live.parquet", None)]) # at least one live row + _seed_delete_files(lake_con, [(42, 100)]) # expired vector against orphan id + self._run(lake_con, data_dir, monkeypatch) # must NOT raise + + def test_b3_aborts_on_live_delete_vector_against_orphan(self, lake_con, data_dir, monkeypatch): + # A live (end_snapshot IS NULL) positional-delete vector pointing at + # an "orphan" id means the file is still live for vector lookups — + # hard abort. + _seed_queue(lake_con, [(42, "a.parquet")]) + _seed_data_files(lake_con, [(99, "live.parquet", None)]) + _seed_delete_files(lake_con, [(42, None)]) # live vector + with pytest.raises(RuntimeError, match="safety gate B3 failed"): + self._run(lake_con, data_dir, monkeypatch) + + def test_b1_aborts_with_trailing_slash_data_path_when_path_is_live(self, lake_con, data_dir, monkeypatch): + # Regression: with DUCKLAKE_DATA_PATH ending in '/', the B1 gate must + # still match a relative-form queue path against an absolute-form live + # data_file row. Without rtrim normalization, the queue row would + # normalize to `.../data//a.parquet` (double slash) and never match + # the live absolute path `.../data/a.parquet` — so would_be_live + # would stay 0 and heal-orphans would delete a queue entry for a + # still-live file. + # + # Setup: data_dir is empty so find_catalog_orphans (correctly) flags + # the relative queue entry as an orphan. data_file has the absolute + # form as a live row. The B1 gate must catch the cross-form match. + live_abs = str(data_dir / "a.parquet") + _seed_queue(lake_con, [(1, "a.parquet")]) # relative + _seed_data_files(lake_con, [(1, live_abs, None)]) # live, absolute + monkeypatch.setenv("DUCKLAKE_DATA_PATH", str(data_dir) + "/") + _load_macros(lake_con) + with pytest.raises(RuntimeError, match="safety gate B1 failed.*still appear as live"): + maintenance.heal_orphans(lake_con, dry_run=True) + + +class TestB1GateS3Paths: + """Direct tests of _heal_orphans_b1_counts with literal s3:// URIs. + + The macro and gate tests above use local-filesystem paths because real + S3 access is out of scope for unit tests; that means the + ``LIKE 's3://%'`` branch of the gate's CASE expression was untested. + These tests populate the stub schema with literal s3:// paths and call + the helper directly so a regression to the s3:// branch fails here. + """ + + def _populate_orphans(self, lake_con, rows): + """Manually create the _orphans temp table that heal-orphans builds + from find_catalog_orphans. Lets us test the gate query without + invoking the macro (which would require a real S3 LIST).""" + lake_con.execute("CREATE OR REPLACE TEMP TABLE _orphans (data_file_id BIGINT, path VARCHAR)") + lake_con.executemany("INSERT INTO _orphans VALUES (?, ?)", rows) + + def test_s3_absolute_match_caught_by_b1(self, lake_con): + # Both queue and data_file store the s3:// absolute form. The gate's + # `LIKE 's3://%'` branch must short-circuit normalization on both + # sides and recognize the same file. + self._populate_orphans(lake_con, [(1, "s3://bucket/lake/data/a.parquet")]) + _seed_data_files(lake_con, [(1, "s3://bucket/lake/data/a.parquet", None)]) + total_live, would_be_live = maintenance._heal_orphans_b1_counts(lake_con, "s3://bucket/lake/data") + assert total_live == 1 + assert would_be_live == 1 + + def test_s3_relative_queue_vs_s3_absolute_data_file(self, lake_con): + # Queue stores the data_path-relative form (just the key, e.g. + # 'a.parquet'); data_file stores the s3:// form. B1 must normalize + # the queue side to s3:// and match. The relative form is relative + # to ``data_path`` (the lake root), not to the bucket — that's what + # we observed in the canary: with DUCKLAKE_DATA_PATH=s3://b/data + # the queue stores keys like 'main/events/.../X.parquet'. + self._populate_orphans(lake_con, [(1, "a.parquet")]) + _seed_data_files(lake_con, [(1, "s3://bucket/lake/data/a.parquet", None)]) + _, would_be_live = maintenance._heal_orphans_b1_counts(lake_con, "s3://bucket/lake/data") + assert would_be_live == 1, "s3 absolute live row must match relative-queue orphan after normalization" + + def test_s3_absolute_queue_vs_s3_relative_data_file(self, lake_con): + # Symmetric: queue absolute, data_file relative. + self._populate_orphans(lake_con, [(1, "s3://bucket/lake/data/a.parquet")]) + _seed_data_files(lake_con, [(1, "a.parquet", None)]) + _, would_be_live = maintenance._heal_orphans_b1_counts(lake_con, "s3://bucket/lake/data") + assert would_be_live == 1 + + def test_s3_with_trailing_slash_still_matches(self, lake_con): + self._populate_orphans(lake_con, [(1, "a.parquet")]) + _seed_data_files(lake_con, [(1, "s3://bucket/lake/data/a.parquet", None)]) + _, would_be_live = maintenance._heal_orphans_b1_counts(lake_con, "s3://bucket/lake/data/") + assert would_be_live == 1, "trailing-slash data_path on s3:// must not produce double-slash mismatch" + + def test_s3_expired_data_file_does_not_block(self, lake_con): + # data_file has the matching path but it's expired (end_snapshot != NULL). + # B1 must NOT count it as live. + self._populate_orphans(lake_con, [(1, "s3://bucket/lake/data/a.parquet")]) + _seed_data_files( + lake_con, + [ + (1, "s3://bucket/lake/data/a.parquet", 100), # expired match + (2, "s3://bucket/lake/data/live.parquet", None), # live, different + ], + ) + total_live, would_be_live = maintenance._heal_orphans_b1_counts(lake_con, "s3://bucket/lake/data") + assert total_live == 1 + assert would_be_live == 0, "expired s3:// row must not block heal-orphans" + + +class TestSchemaConsistency: + def test_macros_match_metadata_schema_constant(self): + """The .sql file hardcodes `__ducklake_metadata_lake` (verbatim load + via `.read` in `just shell` precludes Python templating). If anyone + ever changes ATTACH_NAME, this test fails loudly — the constraint + is documented in the .sql header but the assertion is what makes it + load-bearing.""" + sql = maintenance.MAINTENANCE_SQL_PATH.read_text() + # Strip line comments so we don't catch the cautionary references in + # the header. + without_comments = "\n".join(line.split("--", 1)[0] for line in sql.splitlines()) + refs = set(re.findall(r"__ducklake_metadata_\w+", without_comments)) + unexpected = refs - {maintenance.METADATA_SCHEMA} + assert not unexpected, ( + f"maintenance.sql references {sorted(unexpected)} but METADATA_SCHEMA " + f"is {maintenance.METADATA_SCHEMA!r}. If you change ATTACH_NAME in " + "maintenance.py, update the schema references in maintenance.sql to match." + ) diff --git a/tools/justfile b/tools/justfile index c4e172d..ba2b94b 100644 --- a/tools/justfile +++ b/tools/justfile @@ -12,11 +12,63 @@ rds_password := env("DUCKLAKE_RDS_PASSWORD", "") s3_region := env("DUCKDB_S3_REGION", "us-east-1") s3_key := env("DUCKDB_S3_ACCESS_KEY_ID", "") s3_secret := env("DUCKDB_S3_SECRET_ACCESS_KEY", "") +s3_endpoint := env("DUCKDB_S3_ENDPOINT", "s3." + s3_region + ".amazonaws.com") +s3_url_style := env("DUCKDB_S3_URL_STYLE", "vhost") +s3_use_ssl := env("DUCKDB_S3_USE_SSL", "true") data_path := env("DUCKLAKE_DATA_PATH", "") -_setup := "INSTALL httpfs; INSTALL ducklake; INSTALL postgres; LOAD httpfs; LOAD ducklake; LOAD postgres; SET pg_debug_show_queries = true; SET s3_region = '" + s3_region + "'; SET s3_endpoint = 's3." + s3_region + ".amazonaws.com'; SET s3_access_key_id = '" + s3_key + "'; SET s3_secret_access_key = '" + s3_secret + "'; SET s3_use_ssl = true; SET s3_url_style = 'vhost'; ATTACH 'postgres:host=" + rds_host + " port=" + rds_port + " dbname=" + rds_db + " user=" + rds_user + " password=" + rds_password + "' AS lake (TYPE ducklake, DATA_PATH '" + data_path + "'); USE lake;" +# DUCKDB_S3_USE_SSL must be a literal SQL boolean — it's interpolated into +# both `SET s3_use_ssl = ''` (a SQL string literal) and into +# `CREATE OR REPLACE SECRET ... USE_SSL ` (an UNQUOTED bool, where +# even a single quote in the env value would be raw SQL injection). Strict +# whitelist beats per-context escaping here. maintenance.py applies an +# equivalent check via `_sanitize_setting_value`; this is the just-side +# equivalent. +_s3_use_ssl_validated := if s3_use_ssl == "true" { "true" } else if s3_use_ssl == "false" { "false" } else { error("DUCKDB_S3_USE_SSL must be 'true' or 'false', got: " + s3_use_ssl) } + +# Two-layer escaping for the connstring that lands inside an ATTACH SQL literal: +# +# 1. libpq connstring grammar (the inner layer the duckdb postgres extension +# passes to libpq) — wrap value in single quotes, backslash-escape +# embedded `\` and `'`. NOT SQL-style `''` doubling — libpq's connstring +# parser is a different grammar from the Postgres SQL parser and only +# understands backslash escapes. +# https://www.postgresql.org/docs/current/libpq-connect.html#LIBPQ-CONNSTRING +# +# 2. Postgres SQL string-literal grammar (the outer layer applied because +# the whole connstring is wrapped in `ATTACH '...'`) — replace every `'` +# with `''`. Backslashes pass through unchanged. +# +# Without layer 1, a password containing a single quote produces an invalid +# libpq connstring (closes the value early). Without layer 2, the same value +# breaks out of the wrapping ATTACH literal. Apply them in order. + +_rds_db_pq := "'" + replace(replace(rds_db, "\\", "\\\\"), "'", "\\'") + "'" +_rds_user_pq := "'" + replace(replace(rds_user, "\\", "\\\\"), "'", "\\'") + "'" +_rds_password_pq := "'" + replace(replace(rds_password, "\\", "\\\\"), "'", "\\'") + "'" +_pg_connstr := "host=" + rds_host + " port=" + rds_port + " dbname=" + _rds_db_pq + " user=" + _rds_user_pq + " password=" + _rds_password_pq +_pg_connstr_sql := replace(_pg_connstr, "'", "''") + +_data_path_sql := replace(data_path, "'", "''") +_s3_key_sql := replace(s3_key, "'", "''") +_s3_secret_sql := replace(s3_secret, "'", "''") +_s3_region_sql := replace(s3_region, "'", "''") +_s3_endpoint_sql := replace(s3_endpoint, "'", "''") +_s3_url_style_sql := replace(s3_url_style, "'", "''") + +# _setup mirrors what maintenance.py's connect() does so `just shell` ends up +# with the same wired-up session as a maintenance subcommand: +# * temp_directory points at the writable emptyDir in the cron pod (the +# image rootfs is read-only, so the duckdb default `.tmp` would crash on +# any spill). +# * Both the legacy SET s3_* settings (honored by the ducklake catalog +# driver in 1.4) and the modern CREATE SECRET (required by ad-hoc httpfs +# ops like glob() and read_parquet()) are configured. + +_setup := "INSTALL httpfs; INSTALL ducklake; INSTALL postgres; LOAD httpfs; LOAD ducklake; LOAD postgres; SET temp_directory = '/tmp/duckdb_spill'; SET enable_http_logging = false; SET pg_debug_show_queries = false; SET s3_region = '" + _s3_region_sql + "'; SET s3_endpoint = '" + _s3_endpoint_sql + "'; SET s3_access_key_id = '" + _s3_key_sql + "'; SET s3_secret_access_key = '" + _s3_secret_sql + "'; SET s3_use_ssl = " + _s3_use_ssl_validated + "; SET s3_url_style = '" + _s3_url_style_sql + "'; CREATE OR REPLACE SECRET s3 (TYPE s3, PROVIDER config, KEY_ID '" + _s3_key_sql + "', SECRET '" + _s3_secret_sql + "', REGION '" + _s3_region_sql + "', ENDPOINT '" + _s3_endpoint_sql + "', URL_STYLE '" + _s3_url_style_sql + "', USE_SSL " + _s3_use_ssl_validated + "); ATTACH 'postgres:" + _pg_connstr_sql + "' AS lake (TYPE ducklake, DATA_PATH '" + _data_path_sql + "'); ATTACH '" + _pg_connstr_sql + "' AS pg (TYPE postgres); USE lake;" maintenance := env("MAINTENANCE_SCRIPT", "/app/tools/maintenance.py") +maintenance_sql := env("MAINTENANCE_SQL", "/app/tools/maintenance.sql") default: @just --list @@ -28,7 +80,7 @@ default: # Open an interactive DuckDB shell connected to the DuckLake [group('interactive')] shell: - {{ duckdb }} -cmd "{{ _setup }}" + {{ duckdb }} -cmd "{{ _setup }}" -cmd ".read {{ maintenance_sql }}" # Drop a table [group('interactive')] @@ -64,6 +116,46 @@ cleanup days="1": cleanup-all: python {{ maintenance }} cleanup-all +# Preview duplicate rows in the pending-deletion queue (workaround for DuckLake bug c5) +[group('lifecycle')] +dedup-deletions-dry-run: + python {{ maintenance }} dedup-deletions --dry-run + +# Drop duplicate rows from the pending-deletion queue (workaround for DuckLake bug c5) +[group('lifecycle')] +dedup-deletions: + python {{ maintenance }} dedup-deletions + +# List catalog rows whose S3 key no longer exists (catalog-side orphans) +[group('lifecycle')] +find-orphans: + python {{ maintenance }} find-orphans + +# Preview heal-orphans (run safety gates, skip the DELETE) +[group('lifecycle')] +heal-orphans-dry-run: + python {{ maintenance }} heal-orphans --dry-run + +# Delete catalog rows whose S3 key no longer exists (gated B1/B3 safety checks) +[group('lifecycle')] +heal-orphans: + python {{ maintenance }} heal-orphans + +# Loop dedup + heal-orphans + cleanup-all until cleanup-all exits clean +[group('lifecycle')] +cleanup-all-safe max_iterations="10": + python {{ maintenance }} cleanup-all-safe --max-iterations {{ max_iterations }} + +# Preview fsck (cleanup-all-safe + S3-orphan sweep, dry-run) +[group('lifecycle')] +fsck-dry-run: + python {{ maintenance }} fsck --dry-run + +# Bring the lake catalog to a known-good state end-to-end +[group('lifecycle')] +fsck max_iterations="10": + python {{ maintenance }} fsck --max-iterations {{ max_iterations }} + # Preview orphaned files [group('lifecycle')] orphans-dry-run: diff --git a/tools/maintenance.py b/tools/maintenance.py index 6d99ba1..df45df8 100644 --- a/tools/maintenance.py +++ b/tools/maintenance.py @@ -25,6 +25,7 @@ import sys import time from importlib.metadata import PackageNotFoundError, version +from pathlib import Path import duckdb from prometheus_client import CollectorRegistry, Gauge, push_to_gateway @@ -63,6 +64,27 @@ def _log_version() -> None: # rather than leaving the catalog at whatever the last tier set. DEFAULT_TARGET_FILE_SIZE = "128MiB" +# Single source of truth for the DuckLake ATTACH name. DuckLake creates a +# Postgres metadata schema named ``__ducklake_metadata_``, so the +# attach name and the schema name must always be derived from the same value +# or queries silently target the wrong schema. +ATTACH_NAME = "lake" +METADATA_SCHEMA = f"__ducklake_metadata_{ATTACH_NAME}" + +# Direct Postgres ATTACH name used for `postgres_execute` / `postgres_query` +# calls; distinct from the DuckLake-catalog ATTACH (ATTACH_NAME). +PG_ATTACH_NAME = "pg" + +# Companion SQL file: header conventions plus runtime-loadable macros. +MAINTENANCE_SQL_PATH = Path(__file__).resolve().parent / "maintenance.sql" + +# Stable identifier for `pg_try_advisory_lock`. The lock guards mutual +# exclusion *between maintenance invocations*: it is held by the `pg` +# ATTACH connection, not by the catalog connection DuckLake uses +# internally for ducklake_* function calls, so it does NOT serialize +# against arbitrary other writers (e.g. the millpond ingest pods). +ADVISORY_LOCK_KEY_SQL = "hashtext('millpond-ducklake-maintenance')::bigint" + def _setup_logging(verbose: bool = False) -> None: level = "DEBUG" if verbose else os.environ.get("LOG_LEVEL", "INFO").upper() @@ -82,6 +104,12 @@ def _require(name: str) -> str: def _escape_libpq(value: str | None) -> str: + """Escape a value for a libpq connection string. + + Wraps in single quotes and backslash-escapes internal single quotes and + backslashes, per the libpq connstring grammar (NOT the Postgres SQL + parser's grammar — they're different parsers with different rules). + """ if value is None: return "''" escaped = value.replace("\\", "\\\\").replace("'", "\\'") @@ -101,10 +129,20 @@ def _positive_int(s: str) -> int: return n -def connect() -> duckdb.DuckDBPyConnection: - """Connect to DuckLake using environment variables.""" +def connect(debug: bool = False) -> duckdb.DuckDBPyConnection: + """Connect to DuckLake using environment variables. + + debug=True re-enables DuckDB's HTTP logging and the postgres extension's + query debug output. Both are off by default because they add per-call + overhead that compounds across 30k+ S3 deletes and many catalog reads. + """ conn = duckdb.connect() + # DuckDB defaults to spilling under '.tmp' relative to CWD; in the millpond + # image that path is on the read-only rootfs, so any compaction that needs + # to spill crashes. /tmp is the writable emptyDir in the cron pod. + conn.execute("SET temp_directory = '/tmp/duckdb_spill'") + # S3 config from env vars s3_region = os.environ.get("DUCKDB_S3_REGION", "us-east-1") s3_defaults = { @@ -131,7 +169,29 @@ def connect() -> duckdb.DuckDBPyConnection: conn.execute("LOAD httpfs") conn.execute("LOAD ducklake") conn.execute("LOAD postgres") - conn.execute("SET pg_debug_show_queries = true") + conn.execute(f"SET enable_http_logging = {str(debug).lower()}") + conn.execute(f"SET pg_debug_show_queries = {str(debug).lower()}") + + # The legacy SET s3_* settings above are honored by the ducklake catalog + # driver but not by ad-hoc httpfs ops (`glob('s3://...')`, + # `read_parquet('s3://...')`) in DuckDB 1.4 — those go through the SECRET + # manager. Mirror the same credentials into a SECRET so every recipe in + # this script (and any operator-typed glob) authenticates correctly. + s3_endpoint_val = os.environ.get("DUCKDB_S3_ENDPOINT", s3_defaults["s3_endpoint"]) + s3_use_ssl_val = os.environ.get("DUCKDB_S3_USE_SSL", s3_defaults["s3_use_ssl"]) + s3_url_style_val = os.environ.get("DUCKDB_S3_URL_STYLE", s3_defaults["s3_url_style"]) + s3_key = os.environ.get("DUCKDB_S3_ACCESS_KEY_ID", "") + s3_secret = os.environ.get("DUCKDB_S3_SECRET_ACCESS_KEY", "") + for v in (s3_endpoint_val, s3_use_ssl_val, s3_url_style_val, s3_key, s3_secret, s3_region): + if v: + _sanitize_setting_value(v) + conn.execute( + f"CREATE OR REPLACE SECRET s3 (" + f"TYPE s3, PROVIDER config, " + f"KEY_ID '{s3_key}', SECRET '{s3_secret}', " + f"REGION '{s3_region}', ENDPOINT '{s3_endpoint_val}', " + f"URL_STYLE '{s3_url_style_val}', USE_SSL {s3_use_ssl_val})" + ) rds_host = _require("DUCKLAKE_RDS_HOST") rds_port = os.environ.get("DUCKLAKE_RDS_PORT", "5432") @@ -147,10 +207,24 @@ def connect() -> duckdb.DuckDBPyConnection: ) pg_connstr_sql = pg_connstr.replace("'", "''") conn.execute(f""" - ATTACH 'ducklake:postgres:{pg_connstr_sql}' AS lake ( + ATTACH 'ducklake:postgres:{pg_connstr_sql}' AS {ATTACH_NAME} ( DATA_PATH '{data_path.replace("'", "''")}' ) """) + # Direct Postgres ATTACH for postgres_execute / postgres_query; needed by + # the catalog-maintenance recipes that touch ctid or run DML the duckdb + # postgres extension doesn't expose duckdb-side. + conn.execute(f"ATTACH '{pg_connstr_sql}' AS {PG_ATTACH_NAME} (TYPE postgres)") + + if MAINTENANCE_SQL_PATH.exists(): + # File is executed verbatim by both maintenance.py and the duckdb CLI's + # `.read` meta-command (the `just shell` recipe), so it must contain no + # templating placeholders — references to `__ducklake_metadata_lake` + # are written literally to keep both paths consistent. + conn.execute(MAINTENANCE_SQL_PATH.read_text()) + log.debug("Loaded SQL macros from %s", MAINTENANCE_SQL_PATH) + else: + log.warning("maintenance.sql not found at %s; macros unavailable", MAINTENANCE_SQL_PATH) log.info( "Connected: metadata=%s:%s/%s data=%s", @@ -166,7 +240,7 @@ def expire(conn: duckdb.DuckDBPyConnection, days: int, dry_run: bool) -> None: """Expire snapshots older than N days.""" log.info("Expiring snapshots older than %d days (dry_run=%s)", days, dry_run) result = conn.execute( - f"CALL ducklake_expire_snapshots('lake', " + f"CALL ducklake_expire_snapshots('{ATTACH_NAME}', " f"older_than => now() - INTERVAL '{days} days', " f"dry_run => {str(dry_run).lower()})" ).fetchall() @@ -174,16 +248,56 @@ def expire(conn: duckdb.DuckDBPyConnection, days: int, dry_run: bool) -> None: log.info("expire: %s", row) +def _scheduled_for_deletion_count(conn: duckdb.DuckDBPyConnection) -> int: + """Queue depth of ducklake_files_scheduled_for_deletion.""" + return conn.execute(f"SELECT COUNT(*) FROM {METADATA_SCHEMA}.ducklake_files_scheduled_for_deletion").fetchone()[0] + + +def _log_cleanup_throughput( + operation: str, + files_processed: int, + elapsed_s: float, + queue_depth_after: int, +) -> None: + """Emit one structured line with cleanup throughput stats. + + Single line, key=value pairs, grep-friendly. ``files_processed`` is taken + directly from the count of rows ``ducklake_cleanup_old_files`` returned + rather than from a queue-depth delta — the delta is wrong if any other + writer enqueues deletions during the call (and the maintenance advisory + lock by design only mutexes maintenance invocations, not arbitrary + writers). ``queue_depth_after`` gives a "how much remains" signal but + isn't used in the rate. + """ + rate = files_processed / elapsed_s if elapsed_s > 0 else 0.0 + log.info( + "%s throughput: files_processed=%d elapsed_s=%.1f rate_obj_s=%.1f queue_depth_after=%d", + operation, + files_processed, + elapsed_s, + rate, + queue_depth_after, + ) + + def cleanup(conn: duckdb.DuckDBPyConnection, days: int, dry_run: bool) -> None: """Delete files scheduled for deletion older than N days.""" log.info("Cleaning up files older than %d days (dry_run=%s)", days, dry_run) + t0 = time.monotonic() result = conn.execute( - f"CALL ducklake_cleanup_old_files('lake', " + f"CALL ducklake_cleanup_old_files('{ATTACH_NAME}', " f"older_than => now() - INTERVAL '{days} days', " f"dry_run => {str(dry_run).lower()})" ).fetchall() + elapsed = time.monotonic() - t0 for row in result: log.info("cleanup: %s", row) + if not dry_run: + # Skip throughput log on dry_run: ducklake_cleanup_old_files returns the + # would-be-deleted rows in dry-run mode, so len(result) is the preview + # count, not actually-processed work — claiming a rate from that would + # be misleading. + _log_cleanup_throughput("cleanup", len(result), elapsed, _scheduled_for_deletion_count(conn)) def cleanup_all(conn: duckdb.DuckDBPyConnection, dry_run: bool) -> None: @@ -192,15 +306,279 @@ def cleanup_all(conn: duckdb.DuckDBPyConnection, dry_run: bool) -> None: log.info("cleanup-all has no dry-run mode; skipping") return log.info("Cleaning up all files scheduled for deletion") - result = conn.execute("CALL ducklake_cleanup_old_files('lake', cleanup_all => true)").fetchall() + t0 = time.monotonic() + result = conn.execute(f"CALL ducklake_cleanup_old_files('{ATTACH_NAME}', cleanup_all => true)").fetchall() + elapsed = time.monotonic() - t0 for row in result: log.info("cleanup-all: %s", row) + _log_cleanup_throughput("cleanup-all", len(result), elapsed, _scheduled_for_deletion_count(conn)) + + +def _sql_string_literal(s: str) -> str: + """Quote a Python string as a SQL string literal (single-quote-doubled).""" + return "'" + s.replace("'", "''") + "'" + + +def _acquire_advisory_lock(conn: duckdb.DuckDBPyConnection) -> None: + """Take the maintenance advisory lock or raise if another session holds it. + + The lock is taken on the `pg` ATTACH and released automatically when that + connection closes (including on crash); no explicit release needed for + single-subcommand invocations. Any subcommand that mutates the catalog + should call this before doing so. + """ + inner_sql = f"SELECT pg_try_advisory_lock({ADVISORY_LOCK_KEY_SQL}) AS acquired" + held = conn.execute( + f"SELECT acquired FROM postgres_query('{PG_ATTACH_NAME}', {_sql_string_literal(inner_sql)})" + ).fetchone()[0] + if not held: + raise RuntimeError( + "Another maintenance session is holding the advisory lock; aborting. " + "If you're sure no other invocation is running, the previous holder's " + "connection may not have closed cleanly — wait a few seconds and retry." + ) + log.info("Acquired advisory lock %s", ADVISORY_LOCK_KEY_SQL) + + +def dedup_deletions(conn: duckdb.DuckDBPyConnection, dry_run: bool) -> None: + """Drop duplicate rows from ducklake_files_scheduled_for_deletion. + + The same path can land in the queue across multiple snapshots (DuckLake + bug c5); combined with c1, the second visit poisons cleanup-all because + the S3 DELETE returns NoSuchKey and rolls back the whole transaction. + Keep one row per distinct path (the lowest ctid) and drop the rest. + + The DELETE is ctid-based and runs through the duckdb postgres extension + (`postgres_execute`); duckdb-side DML can't see Postgres system columns. + """ + dups = conn.execute("SELECT count_pending_dups()").fetchone()[0] + log.info("dedup-deletions: %d duplicate rows in queue (dry_run=%s)", dups, dry_run) + if dry_run or dups == 0: + return + _acquire_advisory_lock(conn) + delete_sql = ( + f"DELETE FROM {METADATA_SCHEMA}.ducklake_files_scheduled_for_deletion " + f"WHERE ctid NOT IN (" + f"SELECT MIN(ctid) FROM {METADATA_SCHEMA}.ducklake_files_scheduled_for_deletion GROUP BY path" + f")" + ) + conn.execute(f"CALL postgres_execute('{PG_ATTACH_NAME}', {_sql_string_literal(delete_sql)})") + after = conn.execute("SELECT count_pending_dups()").fetchone()[0] + log.info("dedup-deletions: queue now has %d duplicate rows", after) + + +def _heal_orphans_b1_counts(conn: duckdb.DuckDBPyConnection, data_path: str) -> tuple[int, int]: + """Run heal-orphans's B1 gate counts against an already-populated _orphans. + + Returns ``(total_live_data_files, would_be_live)``. Filters + ducklake_data_file to live rows (``end_snapshot IS NULL``) and normalizes + both sides to absolute form so cross-table mismatches in storage form + don't slip past the gate. Extracted from heal_orphans so production and + tests share the same query — without this, regressions to the + ``LIKE 's3://%' OR LIKE '/%'`` branch silently slip past test fixtures + that only use one or the other. + """ + return conn.execute( + f""" + WITH orphan_abs AS ( + SELECT CASE WHEN path LIKE 's3://%' OR path LIKE '/%' THEN path + ELSE rtrim(?, '/') || '/' || path END AS abs_path + FROM _orphans + ), + live_data_abs AS ( + SELECT CASE WHEN path LIKE 's3://%' OR path LIKE '/%' THEN path + ELSE rtrim(?, '/') || '/' || path END AS abs_path + FROM {METADATA_SCHEMA}.ducklake_data_file + WHERE end_snapshot IS NULL + ) + SELECT + (SELECT COUNT(*) FROM {METADATA_SCHEMA}.ducklake_data_file + WHERE end_snapshot IS NULL) AS total_live, + (SELECT COUNT(*) FROM live_data_abs + WHERE abs_path IN (SELECT abs_path FROM orphan_abs)) AS would_be_live + """, + [data_path, data_path], + ).fetchone() + + +def heal_orphans(conn: duckdb.DuckDBPyConnection, dry_run: bool) -> None: + """Delete catalog rows whose S3 key no longer exists. + + Five-step gated procedure (addresses lead-QE punch list B1/B2/B3/H1/H4): + + 1. Take the advisory lock (skipped on --dry-run since nothing + mutates and the dry-run output is informational only). + 2. Materialize the orphan set into a TEMP TABLE so subsequent + safety gates and the final DELETE all see the same snapshot. + Done under the lock so the snapshot is stable for the whole + procedure — without this ordering another maintenance + invocation could change ducklake_files_scheduled_for_deletion + between scan and DELETE, invalidating the gates. + 3. Safety gate B1: prove `ducklake_data_file` is non-empty AND that + none of the orphan paths are referenced as live data files. A + vacuous pass (gate succeeds because the lake is empty) is not + allowed. + 4. Safety gate B3: prove no positional delete vector + (`ducklake_delete_file`) points at an orphan `data_file_id`. + If one does, the file is still live for vector lookups — abort. + 5. One `postgres_execute` DELETE matching on `path` (UUIDv7-unique + per quirk r3); single statement, atomic at the Postgres layer. + """ + data_path = _require("DUCKLAKE_DATA_PATH") + log.info("heal-orphans: scanning for catalog-side orphans (dry_run=%s)", dry_run) + + if not dry_run: + _acquire_advisory_lock(conn) + + conn.execute( + "CREATE OR REPLACE TEMP TABLE _orphans AS " + "SELECT data_file_id, path FROM find_catalog_orphans(?)", + [data_path], + ) + n_orphans = conn.execute("SELECT COUNT(*) FROM _orphans").fetchone()[0] + log.info("heal-orphans: %d catalog rows reference S3 paths that no longer exist", n_orphans) + if n_orphans == 0: + return + + # B1: positive-proof gate. Both clauses must hold. The query lives in + # _heal_orphans_b1_counts so production and tests share one source. + total_live_data_files, would_be_live = _heal_orphans_b1_counts(conn, data_path) + if total_live_data_files == 0: + raise RuntimeError( + "heal-orphans safety gate B1 failed: ducklake_data_file has zero " + "live rows (end_snapshot IS NULL). Refusing to operate on a " + "vacuous catalog." + ) + if would_be_live > 0: + raise RuntimeError( + f"heal-orphans safety gate B1 failed: {would_be_live} of the " + f"{n_orphans} 'orphan' paths still appear as live rows in " + "ducklake_data_file. Aborting — these are not orphans." + ) + + # B3: any LIVE positional-delete vector pointing at an orphan id is a hard + # abort. The delete-vector table references the data file by + # data_file_id, so a match here means the file is still live for vector + # lookups. Historical (end_snapshot IS NOT NULL) delete vectors are no + # longer live and must not block heal-orphans, mirroring the B1 gate. + delete_vector_refs = conn.execute( + f"SELECT COUNT(*) FROM {METADATA_SCHEMA}.ducklake_delete_file " + f"WHERE end_snapshot IS NULL " + f" AND data_file_id IN (SELECT data_file_id FROM _orphans)" + ).fetchone()[0] + if delete_vector_refs > 0: + raise RuntimeError( + f"heal-orphans safety gate B3 failed: {delete_vector_refs} positional " + "delete vector(s) reference 'orphan' data_file_ids. Aborting — those " + "files are still live for delete-vector lookups." + ) + + log.info("heal-orphans: safety gates B1+B3 passed; %d rows queued for delete", n_orphans) + if dry_run: + return + + # Lock already held from step 1; materialize the path list out of the temp + # table and ship it as a single DELETE through postgres_execute. The duckdb + # postgres extension autocommits per statement, so this one DELETE is + # atomic at the upstream Postgres layer (per quirk r4). + paths = [row[0] for row in conn.execute("SELECT path FROM _orphans").fetchall()] + path_list = ", ".join(_sql_string_literal(p) for p in paths) + delete_sql = ( + f"DELETE FROM {METADATA_SCHEMA}.ducklake_files_scheduled_for_deletion " + f"WHERE path IN ({path_list})" + ) + conn.execute(f"CALL postgres_execute('{PG_ATTACH_NAME}', {_sql_string_literal(delete_sql)})") + + after = conn.execute( + "SELECT COUNT(*) FROM _orphans o " + f"JOIN {METADATA_SCHEMA}.ducklake_files_scheduled_for_deletion s ON o.path = s.path" + ).fetchone()[0] + if after != 0: + log.warning("heal-orphans: %d orphan rows remain in the queue after DELETE", after) + else: + log.info("heal-orphans: %d orphan rows removed from the queue", n_orphans) + + +def cleanup_all_safe(conn: duckdb.DuckDBPyConnection, max_iterations: int) -> None: + """Loop dedup + heal-orphans + cleanup-all until cleanup-all exits clean. + + Each crashed `ducklake_cleanup_old_files` (DuckLake bug c1: a NoSuchKey + on S3 DELETE rolls back the txn but the S3 deletes already-committed are + permanent) creates fresh catalog-side orphans. The orchestrator heals + those between attempts so the next cleanup-all sees a clean queue. + + The advisory lock is acquired once for the whole orchestration so all + three steps share mutual exclusion. + """ + _acquire_advisory_lock(conn) + for attempt in range(1, max_iterations + 1): + log.info("cleanup-all-safe: attempt %d / %d", attempt, max_iterations) + dedup_deletions(conn, dry_run=False) + heal_orphans(conn, dry_run=False) + try: + cleanup_all(conn, dry_run=False) + log.info("cleanup-all-safe: cleanup-all succeeded on attempt %d", attempt) + return + except duckdb.IOException as e: + log.warning( + "cleanup-all-safe: cleanup-all crashed on attempt %d (%s); " + "looping to heal fresh orphans", + attempt, + e, + ) + raise RuntimeError( + f"cleanup-all-safe exhausted {max_iterations} iterations without a clean cleanup-all run" + ) + + +def fsck(conn: duckdb.DuckDBPyConnection, dry_run: bool, max_iterations: int) -> None: + """End-to-end "lake catalog is healthy" recipe. + + Runs `cleanup-all-safe` (dedup + heal-orphans + cleanup-all in a loop) + followed by `ducklake_delete_orphaned_files` to mop up S3-side orphans + from any prior interrupted writes. Tiered compaction is intentionally + out of scope for this recipe; run `compact-to-tier-N` separately. + + Dry-run delegates to the dry-run forms of each step rather than counting + queue rows manually, so the B1/B3 safety gates inside heal-orphans + actually run. A real fsck that would abort because of a failed gate + aborts the dry-run too — operators see the same outcome. + """ + if dry_run: + log.info("fsck dry-run: starting") + dedup_deletions(conn, dry_run=True) + heal_orphans(conn, dry_run=True) + orphans(conn, dry_run=True) + log.info("fsck dry-run: done") + return + + cleanup_all_safe(conn, max_iterations) + orphans(conn, dry_run=False) + + +def find_orphans(conn: duckdb.DuckDBPyConnection) -> None: + """List catalog rows whose S3 key no longer exists. + + Pure SELECT via the `find_catalog_orphans(data_path)` macro. Logs the + summary on stderr and prints `data_file_idpath` rows on stdout, + so the output is grep / wc / xargs-friendly. + """ + data_path = _require("DUCKLAKE_DATA_PATH") + rows = conn.execute( + "SELECT data_file_id, path FROM find_catalog_orphans(?)", + [data_path], + ).fetchall() + log.info("find-orphans: %d catalog rows reference S3 paths that no longer exist", len(rows)) + for data_file_id, path in rows: + print(f"{data_file_id}\t{path}") def orphans(conn: duckdb.DuckDBPyConnection, dry_run: bool) -> None: """Find and delete orphaned S3 files.""" log.info("Deleting orphaned files (dry_run=%s)", dry_run) - result = conn.execute(f"CALL ducklake_delete_orphaned_files('lake', dry_run => {str(dry_run).lower()})").fetchall() + result = conn.execute( + f"CALL ducklake_delete_orphaned_files('{ATTACH_NAME}', dry_run => {str(dry_run).lower()})" + ).fetchall() for row in result: log.info("orphans: %s", row) @@ -208,7 +586,7 @@ def orphans(conn: duckdb.DuckDBPyConnection, dry_run: bool) -> None: def checkpoint(conn: duckdb.DuckDBPyConnection) -> None: """Run CHECKPOINT (integrated merge + expire + cleanup).""" log.info("Running CHECKPOINT") - conn.execute("CHECKPOINT lake") + conn.execute(f"CHECKPOINT {ATTACH_NAME}") log.info("CHECKPOINT complete") @@ -218,26 +596,114 @@ def maintain(conn: duckdb.DuckDBPyConnection, days: int, dry_run: bool) -> None: cleanup(conn, days, dry_run) +def _bytes_to_human(stored_value: str) -> str | None: + """Convert a DuckLake-stored byte-count string back to a units-suffixed form. + + DuckLake persists ``target_file_size`` as raw bytes (e.g. ``'67108864'``) + but ``ducklake_set_option`` rejects that form on input — it needs a + KiB/MiB/GiB suffix. Pick the largest 1024^i unit that divides the value + cleanly. Returns None when the input is not a clean integer or has no + clean power-of-1024 representation; the caller should fall back to a + safe default in that case. + """ + try: + n = int(stored_value) + except (TypeError, ValueError): + return None + if n <= 0: + return None + for unit, scale in (("TiB", 2**40), ("GiB", 2**30), ("MiB", 2**20), ("KiB", 2**10)): + if n >= scale and n % scale == 0: + return f"{n // scale}{unit}" + return None + + @contextlib.contextmanager def _scoped_target_file_size(conn: duckdb.DuckDBPyConnection, value: str): - """Set target_file_size for the body, restore prior value (or default) on exit.""" + """Set target_file_size for the body, restore the prior catalog value on exit. + + Reads the prior GLOBAL value before the body runs and restores it in the + finally block. Operators who have intentionally configured a non-default + global target_file_size keep it; this command's tier-specific override + only applies during the wrapped body. + + Three subtleties from DuckLake 1.4 internals: + + * ``ducklake_options`` returns one row per GLOBAL/SCHEMA/TABLE scope, so + filter to ``scope = 'GLOBAL'`` — the unfiltered fetchone in the + original implementation could return ``('',)`` from a TABLE-scope row. + * DuckLake persists the value as a raw byte count (e.g. ``'67108864'``), + but ``ducklake_set_option`` rejects that form on input — it needs a + KiB/MiB/GiB suffix. Convert via ``_bytes_to_human`` before restoring. + * If the prior value isn't a clean power of 1024, we can't represent it + with a units suffix; log a warning and fall back to + ``DEFAULT_TARGET_FILE_SIZE`` rather than leaving the catalog at the + tier-specific value we set during the body. + """ _sanitize_setting_value(value) - prior = conn.execute("SELECT value FROM ducklake_options('lake') WHERE option_name = 'target_file_size'").fetchone() - conn.execute(f"CALL ducklake_set_option('lake', 'target_file_size', '{value}')") + prior_row = conn.execute( + f"SELECT value FROM ducklake_options('{ATTACH_NAME}') " + f"WHERE option_name = 'target_file_size' AND scope = 'GLOBAL'" + ).fetchone() + # Distinguish three cases so we only warn on a genuine conversion failure + # (a prior GLOBAL value of 134217728 converts to '128MiB' which equals + # DEFAULT_TARGET_FILE_SIZE — without this distinction we'd emit a noisy + # warning on every compaction in a healthy default install). + if prior_row is None or not prior_row[0]: + restore = DEFAULT_TARGET_FILE_SIZE + else: + converted = _bytes_to_human(prior_row[0]) + if converted is None: + log.warning( + "target_file_size GLOBAL value %r could not be converted to a " + "units-suffixed form; falling back to %s on restore", + prior_row[0], + DEFAULT_TARGET_FILE_SIZE, + ) + restore = DEFAULT_TARGET_FILE_SIZE + else: + restore = converted + _sanitize_setting_value(restore) + conn.execute(f"CALL ducklake_set_option('{ATTACH_NAME}', 'target_file_size', '{value}')") try: yield finally: - restore = prior[0] if prior else DEFAULT_TARGET_FILE_SIZE - _sanitize_setting_value(restore) - conn.execute(f"CALL ducklake_set_option('lake', 'target_file_size', '{restore}')") + conn.execute(f"CALL ducklake_set_option('{ATTACH_NAME}', 'target_file_size', '{restore}')") log.info("target_file_size restored to %s", restore) +def _set_compaction_tuning(conn: duckdb.DuckDBPyConnection, threads: int, memory_limit: str) -> None: + """Bound DuckDB resource use for compaction. + + Defaults are conservative to keep ducklake_merge_adjacent_files within a + cron pod's memory limit: empirically the merge plan still over-uses memory + relative to a pure streaming op (see DuckLake bug c8), so 2 threads / 4 GB + is the safe floor that succeeded where 12 threads / 20 GB OOMKilled. + Operators can raise via --threads / --memory-limit when the lake fits. + """ + _sanitize_setting_value(memory_limit) + conn.execute(f"SET threads = {threads}") + conn.execute(f"SET memory_limit = '{memory_limit}'") + # Skip the implicit sort to preserve insert order; not needed for a merge + # that already orders by (begin_snapshot, row_id_start, data_file_id). + conn.execute("SET preserve_insertion_order = false") + # Default 30s is too tight for the multi-MB GETs/PUTs that compaction + # drives; 10 min covers the worst-case S3 hiccup without livelocking. + conn.execute("SET http_timeout = 600000") + log.info( + "compaction tuning: threads=%d memory_limit=%s preserve_insertion_order=false http_timeout=600000ms", + threads, + memory_limit, + ) + + def compact( conn: duckdb.DuckDBPyConnection, tier: int, table: str | None, dry_run: bool, + threads: int, + memory_limit: str, ) -> None: """Compact files in tier N (1, 2, or 3) for the catalog or one table.""" spec = TIERS[tier] @@ -260,11 +726,11 @@ def compact( if not _SETTING_VALUE_RE.match(table): raise ValueError(f"Illegal character in table name: {table!r}") where.append( - f"table_id IN (SELECT table_id FROM __ducklake_metadata_lake.ducklake_table WHERE table_name = '{table}')" + f"table_id IN (SELECT table_id FROM {METADATA_SCHEMA}.ducklake_table WHERE table_name = '{table}')" ) candidate_count, candidate_bytes = conn.execute( f"SELECT COUNT(*), COALESCE(SUM(file_size_bytes), 0) " - f"FROM __ducklake_metadata_lake.ducklake_data_file WHERE {' AND '.join(where)}" + f"FROM {METADATA_SCHEMA}.ducklake_data_file WHERE {' AND '.join(where)}" ).fetchone() log.info( "compact tier-%d candidates: %d files, %d bytes total", @@ -283,10 +749,11 @@ def compact( if min_b is not None: args.append(f"min_file_size => {min_b}") if table: - sql = f"CALL ducklake_merge_adjacent_files('lake', '{table}', {', '.join(args)})" + sql = f"CALL ducklake_merge_adjacent_files('{ATTACH_NAME}', '{table}', {', '.join(args)})" else: - sql = f"CALL ducklake_merge_adjacent_files('lake', {', '.join(args)})" + sql = f"CALL ducklake_merge_adjacent_files('{ATTACH_NAME}', {', '.join(args)})" + _set_compaction_tuning(conn, threads, memory_limit) with _scoped_target_file_size(conn, target): result = conn.execute(sql).fetchall() for row in result: @@ -299,7 +766,8 @@ def compact_probe(conn: duckdb.DuckDBPyConnection, table: str, max_compacted_fil raise ValueError(f"Illegal character in table name: {table!r}") log.info("compact-probe: table=%s max_compacted_files=%d", table, max_compacted_files) result = conn.execute( - f"CALL ducklake_merge_adjacent_files('lake', '{table}', max_compacted_files => {max_compacted_files})" + f"CALL ducklake_merge_adjacent_files('{ATTACH_NAME}', '{table}', " + f"max_compacted_files => {max_compacted_files})" ).fetchall() for row in result: log.info("compact-probe: %s", row) @@ -311,6 +779,11 @@ def build_parser() -> argparse.ArgumentParser: formatter_class=argparse.RawDescriptionHelpFormatter, ) parser.add_argument("--verbose", action="store_true", help="Debug logging") + parser.add_argument( + "--debug", + action="store_true", + help="Enable DuckDB HTTP + postgres query debug logging (high overhead; off by default)", + ) sub = parser.add_subparsers(dest="command", required=True) @@ -328,6 +801,51 @@ def build_parser() -> argparse.ArgumentParser: p = sub.add_parser("cleanup-all", help="Delete all scheduled files") p.add_argument("--dry-run", action="store_true") + # dedup-deletions + p = sub.add_parser( + "dedup-deletions", + help="Drop duplicate rows from ducklake_files_scheduled_for_deletion (workaround for DuckLake bug c5)", + ) + p.add_argument("--dry-run", action="store_true") + + # find-orphans + sub.add_parser( + "find-orphans", + help="List ducklake_files_scheduled_for_deletion rows whose S3 key no longer exists", + ) + + # heal-orphans + p = sub.add_parser( + "heal-orphans", + help="Delete catalog rows whose S3 key no longer exists (gated; see B1/B3 safety checks)", + ) + p.add_argument("--dry-run", action="store_true") + + # cleanup-all-safe + p = sub.add_parser( + "cleanup-all-safe", + help="Orchestrator: dedup + heal-orphans + cleanup-all in a loop until cleanup-all exits clean", + ) + p.add_argument( + "--max-iterations", + type=_positive_int, + default=10, + help="Maximum dedup/heal/cleanup-all iterations before giving up (default 10)", + ) + + # fsck + p = sub.add_parser( + "fsck", + help="cleanup-all-safe + ducklake_delete_orphaned_files (catalog-healthy end-to-end recipe)", + ) + p.add_argument("--dry-run", action="store_true") + p.add_argument( + "--max-iterations", + type=_positive_int, + default=10, + help="Maximum cleanup-all-safe iterations before giving up (default 10)", + ) + # orphans p = sub.add_parser("orphans", help="Delete orphaned S3 files") p.add_argument("--dry-run", action="store_true") @@ -345,6 +863,17 @@ def build_parser() -> argparse.ArgumentParser: p.add_argument("--tier", type=int, choices=[1, 2, 3], required=True) p.add_argument("--table", default="", help="Limit to one table; empty = catalog-wide") p.add_argument("--dry-run", action="store_true") + p.add_argument( + "--threads", + type=_positive_int, + default=2, + help="DuckDB threads during the merge (default 2; raise cautiously, see DuckLake bug c8)", + ) + p.add_argument( + "--memory-limit", + default="4GB", + help="DuckDB memory_limit during the merge (default 4GB)", + ) # compact-probe p = sub.add_parser("compact-probe", help="Probe: merge a few adjacent files in one table") @@ -393,7 +922,7 @@ def main(argv: list[str] | None = None) -> None: t0 = time.monotonic() status = "success" conn = None - conn = connect() + conn = connect(debug=args.debug) try: match args.command: case "expire": @@ -402,6 +931,16 @@ def main(argv: list[str] | None = None) -> None: cleanup(conn, args.days, args.dry_run) case "cleanup-all": cleanup_all(conn, args.dry_run) + case "dedup-deletions": + dedup_deletions(conn, args.dry_run) + case "find-orphans": + find_orphans(conn) + case "heal-orphans": + heal_orphans(conn, args.dry_run) + case "cleanup-all-safe": + cleanup_all_safe(conn, args.max_iterations) + case "fsck": + fsck(conn, args.dry_run, args.max_iterations) case "orphans": orphans(conn, args.dry_run) case "maintain": @@ -409,7 +948,7 @@ def main(argv: list[str] | None = None) -> None: case "checkpoint": checkpoint(conn) case "compact": - compact(conn, args.tier, args.table or None, args.dry_run) + compact(conn, args.tier, args.table or None, args.dry_run, args.threads, args.memory_limit) case "compact-probe": compact_probe(conn, args.table, args.max_compacted_files) except Exception: diff --git a/tools/maintenance.sql b/tools/maintenance.sql new file mode 100644 index 0000000..b205d0f --- /dev/null +++ b/tools/maintenance.sql @@ -0,0 +1,74 @@ +-- DuckLake catalog maintenance recipes. +-- +-- Loaded at session start by `tools/maintenance.py` (via `conn.execute`) and +-- by the `shell` recipe in `tools/justfile` (via the duckdb CLI's `.read` +-- meta-command). Both paths execute the file verbatim, so the file itself +-- must be valid SQL — no templating, no placeholders. +-- +-- Conventions +-- ----------- +-- * Schema name. DuckLake stores its catalog tables in +-- `__ducklake_metadata_`. We attach as `lake` everywhere +-- (the `ATTACH_NAME` constant in `maintenance.py`), so this file +-- references `__ducklake_metadata_lake.` directly. If you ever +-- change the ATTACH alias, the references here must change with it. +-- +-- * No `LEFT ANTI JOIN`. DuckDB 1.4 does not support that syntax. Use +-- `LEFT JOIN ... WHERE rhs IS NULL` or `NOT EXISTS (...)` instead. +-- +-- * No Postgres `ctid` from duckdb-side SQL. The duckdb postgres extension +-- does not expose Postgres system columns to duckdb-side queries. Anything +-- that touches `ctid` must run via `postgres_execute` / `postgres_query` +-- against the `pg (TYPE postgres)` ATTACH — see `dedup_deletions` in +-- `maintenance.py` for the working pattern. +-- +-- * No literal `glob('s3://...')` inside `CREATE MACRO` bodies. DuckDB 1.4 +-- evaluates a literal glob eagerly at macro creation time, which would +-- S3-LIST the lake on every connect (even for subcommands that don't +-- care). Macros that need an S3 path take it as a parameter — see +-- `find_catalog_orphans` below. +-- +-- * Advisory lock. Maintenance jobs that mutate the catalog acquire +-- `pg_try_advisory_lock(hashtext('millpond-ducklake-maintenance')::bigint)` +-- on the `pg` ATTACH for the duration of the session. The lock is held by +-- the `pg` connection, not the `lake` connection that DuckLake itself uses +-- internally, so it provides mutual exclusion between maintenance +-- invocations — not catalog-write atomicity against arbitrary writers. + +-- Number of duplicate rows in the pending-deletion queue. +-- A non-zero value will self-poison the next `cleanup-all` per DuckLake bug c5. +CREATE OR REPLACE TEMP MACRO count_pending_dups() AS ( + SELECT COUNT(*) - COUNT(DISTINCT path) + FROM __ducklake_metadata_lake.ducklake_files_scheduled_for_deletion +); + +-- Catalog rows in ducklake_files_scheduled_for_deletion whose S3 key no +-- longer exists. These rows poison the next `cleanup-all` because the S3 +-- DELETE returns NoSuchKey and the whole transaction rolls back (DuckLake +-- bug c1). Each invocation does an S3 LIST under `data_path` — cheap on +-- small lakes, expensive on large ones; cache via a TEMP TABLE if you call +-- it more than once in the same session. +-- +-- Pass the lake's bucket-relative root as `data_path` (e.g. +-- `'s3://bucket/lake/data'` or `'s3://bucket/lake/data/'`); +-- maintenance.py's `find-orphans` subcommand supplies it from +-- `DUCKLAKE_DATA_PATH` for you. We `rtrim(data_path, '/')` everywhere it +-- appears so an operator-configured trailing slash doesn't produce +-- `.../data//file.parquet` from the relative-form join (which would +-- never match an absolute live row and would misclassify a still-live +-- file as an orphan). +-- +-- Path-matching tolerates both storage forms (per quirk r1): +-- * absolute s3:// URI: matches `s.path = l.file` directly +-- * bucket-relative key: matches `l.file = rtrim(data_path,'/')||'/'||s.path` +CREATE OR REPLACE TEMP MACRO find_catalog_orphans(data_path) AS TABLE ( + WITH live AS ( + SELECT file FROM glob(rtrim(data_path, '/') || '/**/*.parquet') + ) + SELECT s.data_file_id, s.path + FROM __ducklake_metadata_lake.ducklake_files_scheduled_for_deletion s + LEFT JOIN live l + ON s.path = l.file + OR l.file = rtrim(data_path, '/') || '/' || s.path + WHERE l.file IS NULL +); diff --git a/uv.lock b/uv.lock index a2e56ff..9f05e48 100644 --- a/uv.lock +++ b/uv.lock @@ -255,6 +255,7 @@ dependencies = [ { name = "orjson" }, { name = "prometheus-client" }, { name = "pyarrow" }, + { name = "pytz" }, ] [package.optional-dependencies] @@ -277,6 +278,7 @@ requires-dist = [ { name = "orjson", specifier = ">=3.10" }, { name = "prometheus-client", specifier = ">=0.21" }, { name = "pyarrow", specifier = ">=18.0" }, + { name = "pytz", specifier = ">=2024.1" }, ] provides-extras = ["msk-iam"] @@ -456,6 +458,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/0b/d7/1959b9648791274998a9c3526f6d0ec8fd2233e4d4acce81bbae76b44b2a/python_dotenv-1.2.2-py3-none-any.whl", hash = "sha256:1d8214789a24de455a8b8bd8ae6fe3c6b69a5e3d64aa8a8e5d68e694bbcb285a", size = 22101, upload-time = "2026-03-01T16:00:25.09Z" }, ] +[[package]] +name = "pytz" +version = "2026.2" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/ff/46/dd499ec9038423421951e4fad73051febaa13d2df82b4064f87af8b8c0c3/pytz-2026.2.tar.gz", hash = "sha256:0e60b47b29f21574376f218fe21abc009894a2321ea16c6754f3cad6eb7cdd6a", size = 320861, upload-time = "2026-05-04T01:35:29.667Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/ec/dd/96da98f892250475bdf2328112d7468abdd4acc7b902b6af23f4ed958ea0/pytz-2026.2-py2.py3-none-any.whl", hash = "sha256:04156e608bee23d3792fd45c94ae47fae1036688e75032eea2e3bf0323d1f126", size = 510141, upload-time = "2026-05-04T01:35:27.408Z" }, +] + [[package]] name = "pywin32" version = "311"