Skip to content

Conversation

@ilongin
Copy link
Contributor

@ilongin ilongin commented Oct 22, 2025

Implements fine-grained checkpointing for UDF operations, enabling automatic recovery from failures with minimal recomputation.

Incremental Progress Tracking

  • Saves processed rows continuously during .map() and .gen() execution
  • Automatically resumes from where execution failed
  • Allows fixing bugs in UDF code and continuing without losing progress
  • Creates checkpoints when UDF operations complete successfully

Checkpoint Invalidation

  • Completed UDF checkpoints tied to function code
  • In-progress UDFs allow code changes and continuation
  • Completed UDFs recompute from scratch if code changes

New Environment Variable

  • DATACHAIN_UDF_RESET=1 - Force UDF restart from scratch, ignoring partial progress

Implementation Highlights

Job-Scoped Architecture

  • UDF tables scoped to job ID: udf_{job_id}_{hash}_{suffix}
  • Child jobs reuse parent tables via ancestor search
  • Clean separation prevents conflicts

Checkpoint Types

  • Partial checkpoints (hash_input) - Allow continuation with modified code
  • Final checkpoints (hash_output) - Invalidated if UDF changes
  • Processed table tracking for RowGenerator (.gen())

Aggregations

  • .agg() creates checkpoints on completion only
  • No partial progress tracking (restart from scratch on failure)

Files Changed

  • src/datachain/query/dataset.py - Core UDF checkpoint logic
  • src/datachain/data_storage/metastore.py - Checkpoint management
  • docs/guide/checkpoints.md - Updated documentation
  • Comprehensive tests in tests/func/test_checkpoints.py

Related

Fixes #1392

Summary by Sourcery

Introduce a comprehensive checkpointing framework for UDF execution: track and manage intermediate UDF results per job hash, support skipping or resuming work based on stored checkpoints, and add cleanup APIs to remove stale checkpoints and temporary tables.

New Features:

  • Implement UDF checkpointing with per-job hash tracking, resume/skip logic, and naming conventions for input/output/processed tables
  • Add Catalog.cleanup_checkpoints and Catalog.remove_checkpoint_by_hash to prune old checkpoints and associated UDF tables based on TTL or creation time
  • Enhance Metastore with idempotent create_checkpoint, optional job-scoped list_checkpoints, and remove_checkpoint APIs
  • Expose session-backed .job property and integrate checkpoint context into DataChain.hash and .save workflows

Enhancements:

  • Refactor UDFStep classes to use Session instead of Catalog and rename create_udf_table/process_input_query for clear input/output separation
  • Propagate hash_before/hash_after through query generation pipeline to drive checkpoint decisions
  • Handle callable object hashing in hash_utils and add database engine list_tables utility

Tests:

  • Add functional tests for checkpoint creation, parallel failures, and cleanup behavior under various TTL and job filters
  • Update unit tests to assert new checkpoint counts and hash_callable support for callable instances

Summary by Sourcery

Implement UDF-level checkpointing to enable incremental progress tracking and automatic recovery for map, gen, and aggregation operations.

New Features:

  • Introduce fine-grained UDF checkpointing for .map() and .gen() operations with partial and final checkpoints.
  • Enable automatic resumption of failed UDFs across jobs by reusing job-scoped input, output, and processed tables via ancestor search.
  • Add DATACHAIN_UDF_RESET environment variable to force UDF operations to restart from scratch.
  • Expose session.job property and incorporate last checkpoint hash into chain.hash and save workflows for consistent checkpoint context.
  • Extend metastore API with idempotent create_checkpoint, remove_checkpoint, list and query ancestor job checkpoints.
  • Support hashing of callable objects in hash_utils to ensure accurate checkpoint invalidation.

Enhancements:

  • Refactor UDFStep and dispatch pipeline to separate input, output, and processed tables, add batch callbacks, and flush partial results on errors.
  • Improve warehouse and SQLite metastore: idempotent pre-UDF table creation, rename_table preserving SQL types, metadata cleanup on drop/rename, and bump SQLite schema version.
  • Enhance insert_rows with optional batch_callback and tracking_field to correlate inputs and outputs during UDF dispatch.

Documentation:

  • Revise docs/guide/checkpoints.md to explain UDF checkpointing model, usage examples, invalidation rules, and the DATACHAIN_UDF_RESET flag.

Tests:

  • Add functional tests covering checkpoint creation, cross-job reuse, same-job re-execution, code-change invalidation, reset behavior, aggregator and generator continuation, and parallel execution recovery.
  • Add unit tests for metastore.get_ancestor_job_ids and hash_callable for callable objects.
  • Update existing tests and fixtures to reflect new UDF table naming conventions and adjusted checkpoint counts.

@ilongin ilongin marked this pull request as draft October 22, 2025 14:28
@sourcery-ai
Copy link
Contributor

sourcery-ai bot commented Oct 22, 2025

Reviewer's Guide

This PR implements a comprehensive, fine-grained checkpointing framework for UDF operations (.map() and .gen()), introducing partial and final checkpoints tied to function code, job-scoped input/output/processed tables, and seamless recovery from failures with minimal recomputation. It extends metastore and warehouse layers to support idempotent checkpoint management, batch tracking callbacks, job-aware hashing, and integrates a DATACHAIN_UDF_RESET flag. The changes also refactor the save logic to include job context, update documentation and CLI messaging, and expand test coverage to validate varied UDF failure and recovery scenarios.

Sequence diagram for UDF checkpointed execution and recovery

sequenceDiagram
  participant actor User
  participant DataChain
  participant UDFStep
  participant Metastore
  participant Warehouse

  User->>DataChain: Run .map()/.gen() with UDF
  DataChain->>UDFStep: Apply UDF step (with job context)
  UDFStep->>Metastore: Check for existing checkpoint (partial/final)
  alt Final checkpoint exists
    UDFStep->>Warehouse: Reuse output table, skip UDF execution
  else Partial checkpoint exists
    UDFStep->>Warehouse: Copy partial table, compute unprocessed rows
    UDFStep->>Warehouse: Resume UDF only for unprocessed rows
    Warehouse->>Metastore: Update processed table and checkpoint
  else No checkpoint
    UDFStep->>Warehouse: Create input table, run UDF from scratch
    Warehouse->>Metastore: Create partial checkpoint, update processed table
  end
  UDFStep->>Metastore: On success, promote to final checkpoint
  DataChain->>User: Return results, progress tracked
Loading

ER diagram for job-scoped UDF checkpoint tables

erDiagram
  Job {
    string id PK
    string parent_job_id
  }
  Checkpoint {
    string id PK
    string job_id FK
    string hash
    boolean partial
    datetime created_at
  }
  Table {
    string name PK
    string type
  }
  Job ||--o{ Checkpoint : "has"
  Job ||--o{ Table : "owns"
  Checkpoint ||--o{ Table : "tracks"
Loading

Class diagram for UDFStep and checkpoint management

classDiagram
  class UDFStep {
    +Session session
    +apply(query_generator, temp_tables, *args, **kwargs)
    +create_output_table(name)
    +get_input_query(input_table_name, original_query)
    +create_processed_table(checkpoint, copy_from_parent)
    +populate_udf_output_table(udf_table, query, processed_table)
    +get_or_create_input_table(query, hash)
    +_checkpoint_exist(hash, partial)
    +_skip_udf(checkpoint, hash_input, query)
    +_run_from_scratch(hash_input, hash_output, query)
    +_continue_udf(checkpoint, hash_output, query)
    +calculate_unprocessed_rows(input_table, partial_table, processed_table, original_query)
    +job
    +metastore
    +warehouse
  }
  class UDFSignal {
    +create_output_table(name)
    +create_result_query(udf_table, query)
    +inherits UDFStep
  }
  class RowGenerator {
    +create_output_table(name)
    +create_processed_table(checkpoint, copy_from_parent)
    +create_result_query(udf_table, query)
    +inherits UDFStep
  }
  class Metastore {
    +create_checkpoint(job_id, hash, partial)
    +remove_checkpoint(checkpoint)
    +get_ancestor_job_ids(job_id)
    +find_checkpoint(job_id, hash, partial)
    +get_last_checkpoint(job_id)
  }
  class Warehouse {
    +insert_rows(table, rows, batch_size, batch_callback, tracking_field)
    +rename_table(old_table, new_name)
    +create_pre_udf_table(query, name)
    +get_table(name)
    +drop_table(table, if_exists)
    +copy_table(target, query)
  }
  UDFSignal --|> UDFStep
  RowGenerator --|> UDFStep
Loading

File-Level Changes

Change Details Files
Refactor UDFStep to orchestrate checkpoint lifecycle
  • Add methods _checkpoint_exist, _skip_udf, _run_from_scratch, _continue_udf to drive partial/final checkpoint flows
  • Switch from Catalog to Session for UDFStep context and introduce job/metastore/warehouse properties
  • Extend populate_udf_output_table with optional processed_table and batch callbacks to track processed sys__ids
  • Always flush insert buffers on exception to expose partial outputs for recovery
src/datachain/query/dataset.py
Enhance metastore for ancestor lookup and idempotent checkpoints
  • Implement get_ancestor_job_ids via recursive CTE to search parent jobs
  • Adjust checkpoint schema unique constraint to include partial flag
  • Make create_checkpoint idempotent with on_conflict_do_nothing and return existing records
  • Add remove_checkpoint API to delete partial checkpoints after promotion
src/datachain/data_storage/metastore.py
Upgrade warehouse/SQLite for named UDF tables and batch tracking
  • Change create_pre_udf_table signature to accept explicit names and skip population if existing
  • Add batch_callback and tracking_field parameters to insert_rows to record processed rows
  • Improve rename_table to preserve metadata and handle errors
  • Clean up stale metadata entries when dropping or renaming tables
src/datachain/data_storage/sqlite.py
src/datachain/data_storage/warehouse.py
Make datachain save and hash job-aware
  • Extend Chain.hash() to accept name and in_job to include last checkpoint hash
  • Refactor save to compute a job-scoped hash and resolve dataset checkpoints accordingly
  • Remove old _calculate_job_hash, unify _resolve_checkpoint logic
src/datachain/lib/dc/datachain.py
Support hashing of callable objects in utility
  • Detect non-function callables and hash their __call__ method
  • Gracefully handle missing __name__ attributes when determining lambdas
  • Add unit tests verifying consistent hashes for callable instances
src/datachain/hash_utils.py
tests/unit/test_hash_utils.py
Introduce DATACHAIN_UDF_RESET for forced restart
  • Import and use env2bool to read DATACHAIN_UDF_RESET in UDFStep logic
  • Bypass checkpoint reuse when reset flag is set, forcing full UDF recompute
  • Adjust save and query resolution to respect reset behavior
src/datachain/query/dataset.py
src/datachain/lib/dc/datachain.py
Update documentation and CLI for checkpoint features
  • Revise guide/checkpoints.md to describe UDF checkpointing mechanics, invalidation rules, and reset flag
  • Change CLI gc messaging to reference temporary tables and updated behavior
  • Adjust end-to-end tests to expect new GC output
docs/guide/checkpoints.md
src/datachain/cli/commands/misc.py
tests/test_cli_e2e.py
tests/test_query_e2e.py
tests/func/test_catalog.py
Expand test coverage for UDF checkpoint scenarios
  • Add functional tests for map/gen checkpoint creation, continuation, cross-job reuse, code-change invalidation, parallel execution, and reset behavior
  • Modify unit tests to reflect new table naming, checkpoint counts, and cleanup expectations
  • Enhance conftest.py with cleanup utilities for UDF tables and ensure fresh job IDs between runs
tests/func/test_checkpoints.py
tests/unit/lib/test_checkpoints.py
tests/conftest.py
tests/func/test_warehouse.py
tests/func/test_metastore.py

Assessment against linked issues

Issue Objective Addressed Explanation
#1392 Implement UDF-level checkpointing to save and reuse partial UDF data between job runs if UDF fails, allowing users to fix bugs and continue without losing progress.
#1392 Ensure that partial UDF checkpoints are not invalidated by changes to the UDF code until the UDF finishes successfully, so users can rerun with modified code and still reuse partial results.
#1392 Create two types of UDF checkpoints: (1) a partial checkpoint before UDF calculation (hash includes only UDF inputs, not UDF code), and (2) a final checkpoint after UDF is done (hash includes UDF inputs and UDF code), with correct invalidation and promotion logic.

Possibly linked issues

  • #UDF checkpoints: The PR directly implements the two-tiered UDF checkpointing mechanism described in the issue, allowing partial results to be reused even after UDF code changes, and invalidating final checkpoints if UDF code changes.
  • #issue: The PR implements UDF-level checkpointing, directly fulfilling the issue's request to resume datachain query execution.
  • UDF checkpoints #1392: The PR implements the Checkpoint model, adding UDF-level checkpoints for recovery and progress tracking.

Tips and commands

Interacting with Sourcery

  • Trigger a new review: Comment @sourcery-ai review on the pull request.
  • Continue discussions: Reply directly to Sourcery's review comments.
  • Generate a GitHub issue from a review comment: Ask Sourcery to create an
    issue from a review comment by replying to it. You can also reply to a
    review comment with @sourcery-ai issue to create an issue from it.
  • Generate a pull request title: Write @sourcery-ai anywhere in the pull
    request title to generate a title at any time. You can also comment
    @sourcery-ai title on the pull request to (re-)generate the title at any time.
  • Generate a pull request summary: Write @sourcery-ai summary anywhere in
    the pull request body to generate a PR summary at any time exactly where you
    want it. You can also comment @sourcery-ai summary on the pull request to
    (re-)generate the summary at any time.
  • Generate reviewer's guide: Comment @sourcery-ai guide on the pull
    request to (re-)generate the reviewer's guide at any time.
  • Resolve all Sourcery comments: Comment @sourcery-ai resolve on the
    pull request to resolve all Sourcery comments. Useful if you've already
    addressed all the comments and don't want to see them anymore.
  • Dismiss all Sourcery reviews: Comment @sourcery-ai dismiss on the pull
    request to dismiss all existing Sourcery reviews. Especially useful if you
    want to start fresh with a new review - don't forget to comment
    @sourcery-ai review to trigger a new review!

Customizing Your Experience

Access your dashboard to:

  • Enable or disable review features such as the Sourcery-generated pull request
    summary, the reviewer's guide, and others.
  • Change the review language.
  • Add, remove or edit custom review instructions.
  • Adjust other review settings.

Getting Help

@cloudflare-workers-and-pages
Copy link

cloudflare-workers-and-pages bot commented Oct 22, 2025

Deploying datachain-documentation with  Cloudflare Pages  Cloudflare Pages

Latest commit: f914b7f
Status: ✅  Deploy successful!
Preview URL: https://3ee78108.datachain-documentation.pages.dev
Branch Preview URL: https://ilongin-1392-udf-checkpoints.datachain-documentation.pages.dev

View logs

Copy link
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey there - I've reviewed your changes - here's some feedback:

  • You have the checkpoint table prefix (udf_{job_id}_{hash}) hard-coded in multiple places—extract it into a shared constant or helper to avoid duplication and address the TODO you left about centralizing this.
  • There are quite a few TODO placeholders (e.g. partial checkpoint handling, debug prints, renaming tables) still in the core logic—please either implement or remove them before merging to avoid shipping unfinished behavior.
  • The repeated addition of *args, **kwargs to every apply signature suggests a heavier refactor; consider making the base Step.apply accept a uniform context object or keyword-only params so subclasses don’t have so much boilerplate.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- You have the checkpoint table prefix (`udf_{job_id}_{hash}`) hard-coded in multiple places—extract it into a shared constant or helper to avoid duplication and address the TODO you left about centralizing this.
- There are quite a few TODO placeholders (e.g. partial checkpoint handling, debug prints, renaming tables) still in the core logic—please either implement or remove them before merging to avoid shipping unfinished behavior.
- The repeated addition of `*args, **kwargs` to every `apply` signature suggests a heavier refactor; consider making the base `Step.apply` accept a uniform context object or keyword-only params so subclasses don’t have so much boilerplate.

## Individual Comments

### Comment 1
<location> `src/datachain/query/dataset.py:720-722` </location>
<code_context>
-        temp_tables.append(udf_table.name)
-        self.populate_udf_table(udf_table, query)
-        q, cols = self.create_result_query(udf_table, query)
+        if self._checkpoint_exist(hash_after):
+            result = self._skip_udf(hash_before, query)
+        elif self._checkpoint_exist(hash_before) and udf_mode == "unsafe":
+            # TODO implement continuing with partial checkpoint
+            result = self._run_from_scratch(hash_before, query)
</code_context>

<issue_to_address>
**issue (bug_risk):** The checkpoint skipping and unsafe mode logic is not fully implemented.

Since partial checkpoint continuation in unsafe mode is not implemented, raise a NotImplementedError or emit a warning when this code path is reached to prevent silent failures.
</issue_to_address>

### Comment 2
<location> `src/datachain/query/dataset.py:728-730` </location>
<code_context>
+        else:
+            result = self._run_from_scratch(hash_before, query)
+
+        # TODO rename tables to have new job_id in table names since maybe we are
+        # just skipping this as we found checkpoint but they have old job_id in name
+        self.session.catalog.metastore.create_checkpoint(self.job.id, hash_after)
+
+        return result
</code_context>

<issue_to_address>
**issue (bug_risk):** Table renaming for checkpoint reuse is not implemented.

Leaving table renaming as a TODO may cause name collisions or incorrect data linkage. Please address this or implement interim safeguards.
</issue_to_address>

### Comment 3
<location> `src/datachain/query/dataset.py:453-454` </location>
<code_context>
+        If query cache is enabled, use the cached table; otherwise use the original
+        query.
+        """
+        if os.getenv("DATACHAIN_DISABLE_QUERY_CACHE", "") not in ("", "0"):
+            return original_query
+        table = self.session.catalog.warehouse.db.get_table(input_table_name)
</code_context>

<issue_to_address>
**suggestion:** Environment variable checks for query cache may be error-prone.

Using direct string comparison for environment variables can lead to inconsistent behavior. A utility function for parsing boolean values would improve reliability.

```suggestion
        def env_var_is_true(env_value: str) -> bool:
            """Parse environment variable string to boolean."""
            return env_value.strip().lower() in ("1", "true", "yes", "on")

        if env_var_is_true(os.getenv("DATACHAIN_DISABLE_QUERY_CACHE", "")):
            return original_query
```
</issue_to_address>

### Comment 4
<location> `src/datachain/catalog/catalog.py:2057` </location>
<code_context>
+        # UDF table prefix pattern: udf_{job_id}_{hash}
+        # TODO move this table prefix pattern to some common place as we
+        # repeat this in multiple places (e.g in UDFStep and here)
+        table_prefix = f"udf_{checkpoint.job_id}_{checkpoint.hash}"
+        matching_tables = self.warehouse.db.list_tables(prefix=table_prefix)
+        if matching_tables:
</code_context>

<issue_to_address>
**suggestion:** Table prefix pattern is duplicated across codebase.

Centralize the UDF table prefix pattern in a shared utility or constant to prevent duplication and simplify future updates.

Suggested implementation:

```python
        from datachain.catalog.utils import get_udf_table_prefix

        # Get UDF table prefix for this checkpoint
        table_prefix = get_udf_table_prefix(checkpoint.job_id, checkpoint.hash)
        matching_tables = self.warehouse.db.list_tables(prefix=table_prefix)
        if matching_tables:

```

You need to create a new file `src/datachain/catalog/utils.py` (or add to an existing shared utils module) with the following function:

```python
def get_udf_table_prefix(job_id: str, hash: str) -> str:
    return f"udf_{job_id}_{hash}"
```

Also, update any other places in the codebase (e.g. UDFStep) that construct the UDF table prefix to use this utility function for full deduplication.
</issue_to_address>

### Comment 5
<location> `tests/func/test_checkpoints.py:80-83` </location>
<code_context>

</code_context>

<issue_to_address>
**issue (code-quality):** Avoid loops in tests. ([`no-loop-in-tests`](https://docs.sourcery.ai/Reference/Rules-and-In-Line-Suggestions/Python/Default-Rules/no-loop-in-tests))

<details><summary>Explanation</summary>Avoid complex code, like loops, in test functions.

Google's software engineering guidelines says:
"Clear tests are trivially correct upon inspection"
To reach that avoid complex code in tests:
* loops
* conditionals

Some ways to fix this:

* Use parametrized tests to get rid of the loop.
* Move the complex logic into helpers.
* Move the complex part into pytest fixtures.

> Complexity is most often introduced in the form of logic. Logic is defined via the imperative parts of programming languages such as operators, loops, and conditionals. When a piece of code contains logic, you need to do a bit of mental computation to determine its result instead of just reading it off of the screen. It doesn't take much logic to make a test more difficult to reason about.

Software Engineering at Google / [Don't Put Logic in Tests](https://abseil.io/resources/swe-book/html/ch12.html#donapostrophet_put_logic_in_tests)
</details>
</issue_to_address>

### Comment 6
<location> `tests/func/test_checkpoints.py:91-96` </location>
<code_context>

</code_context>

<issue_to_address>
**issue (code-quality):** Avoid loops in tests. ([`no-loop-in-tests`](https://docs.sourcery.ai/Reference/Rules-and-In-Line-Suggestions/Python/Default-Rules/no-loop-in-tests))

<details><summary>Explanation</summary>Avoid complex code, like loops, in test functions.

Google's software engineering guidelines says:
"Clear tests are trivially correct upon inspection"
To reach that avoid complex code in tests:
* loops
* conditionals

Some ways to fix this:

* Use parametrized tests to get rid of the loop.
* Move the complex logic into helpers.
* Move the complex part into pytest fixtures.

> Complexity is most often introduced in the form of logic. Logic is defined via the imperative parts of programming languages such as operators, loops, and conditionals. When a piece of code contains logic, you need to do a bit of mental computation to determine its result instead of just reading it off of the screen. It doesn't take much logic to make a test more difficult to reason about.

Software Engineering at Google / [Don't Put Logic in Tests](https://abseil.io/resources/swe-book/html/ch12.html#donapostrophet_put_logic_in_tests)
</details>
</issue_to_address>

### Comment 7
<location> `tests/func/test_checkpoints.py:106-107` </location>
<code_context>

</code_context>

<issue_to_address>
**issue (code-quality):** Avoid loops in tests. ([`no-loop-in-tests`](https://docs.sourcery.ai/Reference/Rules-and-In-Line-Suggestions/Python/Default-Rules/no-loop-in-tests))

<details><summary>Explanation</summary>Avoid complex code, like loops, in test functions.

Google's software engineering guidelines says:
"Clear tests are trivially correct upon inspection"
To reach that avoid complex code in tests:
* loops
* conditionals

Some ways to fix this:

* Use parametrized tests to get rid of the loop.
* Move the complex logic into helpers.
* Move the complex part into pytest fixtures.

> Complexity is most often introduced in the form of logic. Logic is defined via the imperative parts of programming languages such as operators, loops, and conditionals. When a piece of code contains logic, you need to do a bit of mental computation to determine its result instead of just reading it off of the screen. It doesn't take much logic to make a test more difficult to reason about.

Software Engineering at Google / [Don't Put Logic in Tests](https://abseil.io/resources/swe-book/html/ch12.html#donapostrophet_put_logic_in_tests)
</details>
</issue_to_address>

### Comment 8
<location> `tests/func/test_checkpoints.py:132-137` </location>
<code_context>

</code_context>

<issue_to_address>
**issue (code-quality):** Avoid loops in tests. ([`no-loop-in-tests`](https://docs.sourcery.ai/Reference/Rules-and-In-Line-Suggestions/Python/Default-Rules/no-loop-in-tests))

<details><summary>Explanation</summary>Avoid complex code, like loops, in test functions.

Google's software engineering guidelines says:
"Clear tests are trivially correct upon inspection"
To reach that avoid complex code in tests:
* loops
* conditionals

Some ways to fix this:

* Use parametrized tests to get rid of the loop.
* Move the complex logic into helpers.
* Move the complex part into pytest fixtures.

> Complexity is most often introduced in the form of logic. Logic is defined via the imperative parts of programming languages such as operators, loops, and conditionals. When a piece of code contains logic, you need to do a bit of mental computation to determine its result instead of just reading it off of the screen. It doesn't take much logic to make a test more difficult to reason about.

Software Engineering at Google / [Don't Put Logic in Tests](https://abseil.io/resources/swe-book/html/ch12.html#donapostrophet_put_logic_in_tests)
</details>
</issue_to_address>

### Comment 9
<location> `tests/func/test_checkpoints.py:172-177` </location>
<code_context>

</code_context>

<issue_to_address>
**issue (code-quality):** Avoid loops in tests. ([`no-loop-in-tests`](https://docs.sourcery.ai/Reference/Rules-and-In-Line-Suggestions/Python/Default-Rules/no-loop-in-tests))

<details><summary>Explanation</summary>Avoid complex code, like loops, in test functions.

Google's software engineering guidelines says:
"Clear tests are trivially correct upon inspection"
To reach that avoid complex code in tests:
* loops
* conditionals

Some ways to fix this:

* Use parametrized tests to get rid of the loop.
* Move the complex logic into helpers.
* Move the complex part into pytest fixtures.

> Complexity is most often introduced in the form of logic. Logic is defined via the imperative parts of programming languages such as operators, loops, and conditionals. When a piece of code contains logic, you need to do a bit of mental computation to determine its result instead of just reading it off of the screen. It doesn't take much logic to make a test more difficult to reason about.

Software Engineering at Google / [Don't Put Logic in Tests](https://abseil.io/resources/swe-book/html/ch12.html#donapostrophet_put_logic_in_tests)
</details>
</issue_to_address>

### Comment 10
<location> `src/datachain/catalog/catalog.py:2058-2059` </location>
<code_context>
    def _remove_checkpoint(self, checkpoint: Checkpoint) -> None:
        """
        Remove a checkpoint and its associated UDF tables.
        Internal helper method for checkpoint cleanup operations.

        Args:
            checkpoint: The checkpoint object to remove.
        """
        # Find and drop UDF tables for this checkpoint
        # UDF table prefix pattern: udf_{job_id}_{hash}
        # TODO move this table prefix pattern to some common place as we
        # repeat this in multiple places (e.g in UDFStep and here)
        table_prefix = f"udf_{checkpoint.job_id}_{checkpoint.hash}"
        matching_tables = self.warehouse.db.list_tables(prefix=table_prefix)
        if matching_tables:
            self.warehouse.cleanup_tables(matching_tables)

        # Remove the checkpoint from metastore
        self.metastore.remove_checkpoint(checkpoint)

</code_context>

<issue_to_address>
**suggestion (code-quality):** Use named expression to simplify assignment and conditional ([`use-named-expression`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/use-named-expression/))

```suggestion
        if matching_tables := self.warehouse.db.list_tables(prefix=table_prefix):
```
</issue_to_address>

### Comment 11
<location> `src/datachain/catalog/catalog.py:2074-2077` </location>
<code_context>
    def remove_checkpoint_by_hash(self, job_id: str, checkpoint_hash: str) -> None:
        """
        Remove a specific checkpoint by job_id and hash, along with its UDF tables.

        Args:
            job_id: The job ID of the checkpoint to remove.
            checkpoint_hash: The hash of the checkpoint to remove.
        """
        # Find the checkpoint
        checkpoint = self.metastore.find_checkpoint(job_id, checkpoint_hash)
        if not checkpoint:
            # Checkpoint doesn't exist, nothing to do
            return

        self._remove_checkpoint(checkpoint)

</code_context>

<issue_to_address>
**issue (code-quality):** We've found these issues:

- Use named expression to simplify assignment and conditional ([`use-named-expression`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/use-named-expression/))
- Lift code into else after jump in control flow ([`reintroduce-else`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/reintroduce-else/))
- Swap if/else branches ([`swap-if-else-branches`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/swap-if-else-branches/))
</issue_to_address>

### Comment 12
<location> `src/datachain/data_storage/metastore.py:1890-1892` </location>
<code_context>
    def create_checkpoint(
        self,
        job_id: str,
        _hash: str,
        partial: bool = False,
        conn: Any | None = None,
    ) -> Checkpoint:
        """
        Creates a new checkpoint or returns existing one if already exists.
        This is idempotent - calling it multiple times with the same job_id and hash
        will not create duplicates.
        """
        # First check if checkpoint already exists
        existing = self.find_checkpoint(job_id, _hash, partial=partial, conn=conn)
        if existing:
            return existing

        checkpoint_id = str(uuid4())
        query = self._checkpoints_insert().values(
            id=checkpoint_id,
            job_id=job_id,
            hash=_hash,
            partial=partial,
            created_at=datetime.now(timezone.utc),
        )

        # Use on_conflict_do_nothing to handle race conditions
        if hasattr(query, "on_conflict_do_nothing"):
            query = query.on_conflict_do_nothing(index_elements=["job_id", "hash"])

        self.db.execute(query, conn=conn)

        return self.find_checkpoint(job_id, _hash, partial=partial, conn=conn)  # type: ignore[return-value]

</code_context>

<issue_to_address>
**suggestion (code-quality):** Use named expression to simplify assignment and conditional ([`use-named-expression`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/use-named-expression/))

```suggestion
        if existing := self.find_checkpoint(
            job_id, _hash, partial=partial, conn=conn
        ):
```
</issue_to_address>

### Comment 13
<location> `tests/func/test_checkpoints.py:86` </location>
<code_context>
def test_cleanup_checkpoints_with_ttl(test_session, monkeypatch, nums_dataset):
    """Test that cleanup_checkpoints removes old checkpoints and their UDF tables."""
    from datetime import datetime, timedelta, timezone

    catalog = test_session.catalog
    metastore = catalog.metastore
    warehouse = catalog.warehouse

    # Create some checkpoints by running a chain with map (which creates UDF tables)
    reset_session_job_state()
    chain = dc.read_dataset("nums", session=test_session)
    chain.map(doubled=lambda num: num * 2, output=int).save("nums_doubled")
    chain.map(tripled=lambda num: num * 3, output=int).save("nums_tripled")
    job_id = test_session.get_or_create_job().id

    checkpoints_before = list(metastore.list_checkpoints(job_id))
    assert len(checkpoints_before) == 6

    # Verify UDF tables exist
    udf_tables = []
    for checkpoint in checkpoints_before:
        table_prefix = f"udf_{checkpoint.job_id}_{checkpoint.hash}"
        matching_tables = warehouse.db.list_tables(prefix=table_prefix)
        udf_tables.extend(matching_tables)

    # At least some UDF tables should exist
    assert len(udf_tables) > 0

    # Modify checkpoint created_at to be older than TTL (4 hours by default)
    ch = metastore._checkpoints
    old_time = datetime.now(timezone.utc) - timedelta(hours=5)
    for checkpoint in checkpoints_before:
        metastore.db.execute(
            metastore._checkpoints.update()
            .where(ch.c.id == checkpoint.id)
            .values(created_at=old_time)
        )

    # Run cleanup_checkpoints
    catalog.cleanup_checkpoints()

    # Verify checkpoints were removed
    checkpoints_after = list(metastore.list_checkpoints(job_id))
    assert len(checkpoints_after) == 0

    # Verify UDF tables were removed
    for table_name in udf_tables:
        assert not warehouse.db.has_table(table_name)

</code_context>

<issue_to_address>
**issue (code-quality):** Simplify sequence length comparison [×2] ([`simplify-len-comparison`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/simplify-len-comparison/))
</issue_to_address>

### Comment 14
<location> `tests/func/test_checkpoints.py:143` </location>
<code_context>
def test_cleanup_checkpoints_with_custom_ttl(test_session, monkeypatch, nums_dataset):
    """Test that cleanup_checkpoints respects custom TTL from environment variable."""
    from datetime import datetime, timedelta, timezone

    catalog = test_session.catalog
    metastore = catalog.metastore

    # Set custom TTL to 1 hour
    monkeypatch.setenv("CHECKPOINT_TTL", "3600")

    # Create some checkpoints
    reset_session_job_state()
    chain = dc.read_dataset("nums", session=test_session)
    chain.map(doubled=lambda num: num * 2, output=int).save("nums_doubled")
    job_id = test_session.get_or_create_job().id

    checkpoints = list(metastore.list_checkpoints(job_id))
    assert len(checkpoints) == 3

    # Modify all checkpoints to be 2 hours old (older than custom TTL)
    ch = metastore._checkpoints
    old_time = datetime.now(timezone.utc) - timedelta(hours=2)
    for checkpoint in checkpoints:
        metastore.db.execute(
            metastore._checkpoints.update()
            .where(ch.c.id == checkpoint.id)
            .values(created_at=old_time)
        )

    # Run cleanup with custom TTL
    catalog.cleanup_checkpoints()

    # Verify checkpoints were removed
    assert len(list(metastore.list_checkpoints(job_id))) == 0

</code_context>

<issue_to_address>
**suggestion (code-quality):** Simplify sequence length comparison ([`simplify-len-comparison`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/simplify-len-comparison/))

```suggestion
    assert not list(metastore.list_checkpoints(job_id))
```
</issue_to_address>

### Comment 15
<location> `tests/func/test_checkpoints.py:183` </location>
<code_context>
def test_cleanup_checkpoints_for_specific_job(test_session, monkeypatch, nums_dataset):
    """Test that cleanup_checkpoints can target a specific job."""
    from datetime import datetime, timedelta, timezone

    catalog = test_session.catalog
    metastore = catalog.metastore

    # Create checkpoints for two different jobs
    reset_session_job_state()
    chain = dc.read_dataset("nums", session=test_session)
    chain.map(doubled=lambda num: num * 2, output=int).save("nums_doubled")
    first_job_id = test_session.get_or_create_job().id

    reset_session_job_state()
    chain.map(tripled=lambda num: num * 3, output=int).save("nums_tripled")
    second_job_id = test_session.get_or_create_job().id

    # Verify both jobs have checkpoints
    first_checkpoints = list(metastore.list_checkpoints(first_job_id))
    second_checkpoints = list(metastore.list_checkpoints(second_job_id))
    assert len(first_checkpoints) == 3
    assert len(second_checkpoints) == 3

    # Make both checkpoints old
    ch = metastore._checkpoints
    old_time = datetime.now(timezone.utc) - timedelta(hours=5)
    for checkpoint in first_checkpoints + second_checkpoints:
        metastore.db.execute(
            metastore._checkpoints.update()
            .where(ch.c.id == checkpoint.id)
            .values(created_at=old_time)
        )

    # Clean up only first job's checkpoints
    catalog.cleanup_checkpoints(job_id=first_job_id)

    # Verify only first job's checkpoints were removed
    assert len(list(metastore.list_checkpoints(first_job_id))) == 0
    assert len(list(metastore.list_checkpoints(second_job_id))) == 3

</code_context>

<issue_to_address>
**suggestion (code-quality):** Simplify sequence length comparison ([`simplify-len-comparison`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/simplify-len-comparison/))

```suggestion
    assert not list(metastore.list_checkpoints(first_job_id))
```
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

@codecov
Copy link

codecov bot commented Oct 22, 2025

Codecov Report

❌ Patch coverage is 92.36111% with 22 lines in your changes missing coverage. Please review.

Files with missing lines Patch % Lines
src/datachain/query/dataset.py 94.08% 7 Missing and 4 partials ⚠️
src/datachain/data_storage/db_engine.py 44.44% 4 Missing and 1 partial ⚠️
src/datachain/data_storage/sqlite.py 75.00% 3 Missing and 1 partial ⚠️
src/datachain/hash_utils.py 66.66% 2 Missing ⚠️

📢 Thoughts on this report? Let us know!

mapper = Mock(side_effect=lambda file_path: len(file_path))
call_count = {"count": 0}

def mapper(file_path):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what was the issue here? hash calculation for callable?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, if I remember correctly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would it be a problem with certain types of callables? e.g. custom class that dynamically imitates call?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added comprehensive pydocs for hash_callable() function that is used to hash this where it's explained what is supported and how and what is not supported.

test_session, monkeypatch, nums_dataset
):
"""
Test that UDF execution creates checkpoints, but subsequent calls in the same
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are these tests "unit"? they are scripts like, they use high level API (map, read, etc, etc). Probably a bit too heavy for union ...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've put most of the tests in func/test_checkpoints.py. These left here are little bit different as they are testing really specific thing, although it's true that setup for that test required multiple running multiple steps.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so, they are not unit then? let's move them .. or I don't understand what is the criteria then? I'm usually fine even to have a simple read_values based test in units, but not like this ones tbh

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I decided to completely reorganize these checkpoint tests. Now all tests are in tests/func/checkpoints directory since they all look like functional one anyway so no point in mixing them between unit and func - it just creates confusion as you stated as well.

Here is the organisation based on what tests are actually testing / semantics of them:

  - test_checkpoint_workflows.py (6 tests) - Core checkpoint save/reuse behavior, dataset lifecycle
  - test_checkpoint_udf_tables.py (7 tests) - UDF table creation, naming, lifecycle
  - test_checkpoint_parallel.py (4 tests) - Parallel execution behavior
  - test_checkpoint_recovery.py (6 tests) - Resuming from partial results after failures
  - test_checkpoint_invalidation.py (7 tests) - Cache invalidation and forced reruns
  - test_checkpoint_job_linking.py (4 tests) - Job-dataset relationship database schema

@cloudflare-workers-and-pages
Copy link

cloudflare-workers-and-pages bot commented Dec 10, 2025

Deploying datachain with  Cloudflare Pages  Cloudflare Pages

Latest commit: 96f9de9
Status: ✅  Deploy successful!
Preview URL: https://c171190a.datachain-2g6.pages.dev
Branch Preview URL: https://ilongin-1392-udf-checkpoints.datachain-2g6.pages.dev

View logs

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

UDF checkpoints

4 participants