feat!(datafusion): enable parallel file scanning with eager task bucketing#2298
feat!(datafusion): enable parallel file scanning with eager task bucketing#2298toutane wants to merge 32 commits into
Conversation
timsaucer
left a comment
There was a problem hiding this comment.
I'm no expert on Iceberg but I've worked a lot on DataFusion, particularly table providers. I wrote a blog on the datafusion site recently, but since you first put this PR up. In case it's in any way useful: https://datafusion.apache.org/blog/2026/03/31/writing-table-providers/
Overall I think the approach here is definitely reasonable. My comments are mostly around opportunities to squeeze out a little more performance based on having done something similar at my work.
| self: Arc<Self>, | ||
| _children: Vec<Arc<dyn ExecutionPlan>>, | ||
| ) -> DFResult<Arc<dyn ExecutionPlan>> { | ||
| Ok(self) |
There was a problem hiding this comment.
Since this doesn't support children, I'd recommend an error if _children is not empty. Not a blocker for merge.
There was a problem hiding this comment.
Yes, you're right thanks! Pushed a fix that returns a DataFusionError::Internal, matching the pattern used in IcebergCommitExec::with_new_children.
Side note: IcebergTableScan::with_new_children has the same issue. This could be the subject of another PR.
| &self, | ||
| filters: &[&Expr], | ||
| ) -> DFResult<Vec<TableProviderFilterPushDown>> { | ||
| Ok(vec![TableProviderFilterPushDown::Inexact; filters.len()]) |
There was a problem hiding this comment.
Can we do better than this? If we have partitioned scan and the filter is on the partitions I would expect to be able to get an exact pushdown. That would entirely remove a filter operation for cases where it matches, and I think that's a big win and common use case I've seen in other work.
There was a problem hiding this comment.
Yes, you're right there's something to do here, I agree.
I'd prefer to tackle this in a follow-up PR: doing it correctly requires a per-filter conversion API (currently convert_filters_to_predicate collapses everything into a single combined predicate and silently drops non-convertible filters) and, in a partition-spec-aware check, only Identity-transformed partition columns can be safely marked Exact; bucket, truncate, year/month/etc. are lossy and must stay Inexact to avoid incorrect results.
Happy to open a tracking issue. However, if you think it's simple enough, I can go ahead and make the changes directly in the PR.
| .map_err(to_datafusion_error)? | ||
| .try_collect::<Vec<_>>() | ||
| .await | ||
| .map_err(to_datafusion_error)?; |
There was a problem hiding this comment.
It looks like the number of output partitions will be the number of files, right? I'm wondering if there's an opportunity to do better than that. We're specifying that the output partitioning in the exec is unknown, but don't we have information about the partitioning we could utilize?
There was a problem hiding this comment.
By better I mean could we be more performant if we were to go ahead and get the target partitions from the session and output in those number of partitions already with hashing?
There was a problem hiding this comment.
Thanks for raising this, please push back if any of the below is off.
For context, the long-term direction for this is tracked in the EPIC #1604 (row-group-based parallel scan with a GroupPruner that can split/merge FileScanTask below the file grain). What I was hoping to land with this PR is a more immediate, scoped optimization that stays within the current file-grain contract, so we don't preempt the design choices in #1604. The file-grouping step you're pointing at is essentially what #2220 describes as the intermediate improvement on the path toward #1604.
If you think it's appropriate, I'd be happy to pick up a short-term follow-up along these lines:
- Switch
IcebergPartitionedScanfrom tasks:Vec<FileScanTask>to file_groups:Vec<Vec<FileScanTask>>, to follow the convention used by DataFusion's ownFileScanConfig, each group = one DataFusion partition that streams its files sequentially throughArrowReaderBuilder::read. - In
IcebergPartitionedTableProvider::scan, readstate.config().target_partitions()and group tasks intomin(n_files, target_partitions)buckets. - When
n_files<target_partitions, parallelism is still capped atn_files. I think that's inherent to the file grain, but let me know if I'm missing something.
I'm happy to open the follow-up issue/PR myself, or defer to you if you'd rather frame it, whatever works best.
There was a problem hiding this comment.
I suppose I'd need to understand those conversations. I think I mentioned in one of the other comments on this PR, but I found the whole discussion difficult to track. Maybe I can find some time this weekend to look through that sized based partitioning they mention.
There was a problem hiding this comment.
I wrote this PR targeting your branch. Let me know what you think!
The one issue I have is that I do not personally have access to any iceberg catalogs that I could use for benchmarking. My ability to test it is very limited right now.
There was a problem hiding this comment.
Hey Tim, thanks a lot for the proposal. It is really clean and smart.
I created an issue for the redundant FilterExec you were mentioning (#2363), so nice that you're addressed it here.
For the benchmark, we can do it in our infra by shadowing real traffic (our ultimate goal would be to distribute execution on multiple workers, based on the output partitioning). I will not be a standard benchmark but at least it will show if things are improving on real world queries.
What do you finally thing of merging this new provider/scan with the current one so that we only maintain one path as you suggested? If I understand correctly the current path is reachable by setting target_partitions to 1.
Last thing, I'll try to support partitioning based on Iceberg bucket transform, the tricky thing being that DataFusion and Iceberg aren't using the same hash function making the bucket hash incompatible with RepartitionExec.
There was a problem hiding this comment.
Personally I strongly believe you should be updating the existing table provider instead of creating a new one. I think it's just more work in the long run to keep to near identical bits of code.
I don't think you'll be able to use iceberg bucket transforms for the datafusion hashing output.
|
Thanks for the PR, @toutane! One thing I noticed: |
0a7af45 to
fde61f6
Compare
|
More broadly, is adding in a second path really the best answer? It seems like now you're going to increase your maintenance load. Is there any reason not to have a single path and the fallback be that it's a partitioned scan of N=1? I am going to spend a little more time trying to understand the issues. It's difficult because some of them are marked as unplanned or stale and some of the links do not have good descriptions. I suppose I'll need to look at the java source to get a better idea of what the long term goal is. |
|
Hey Tim, I think you're absolutely right about consolidating everything into a single The only reason I kept separate paths was to avoid introducing breaking changes. I am going to explore a design where partitioned file scan becomes the default behavior, with the current provider's logic as a fallback as you suggested. On a related note, it could be worth thinking about the next step: exposing |
I understand a desire to not introduce breaking changes. Is the concern that the API is changing or do you have implementation concerns? If it's just the API change, then I think a good upgrade documentation is often sufficient, especially since it looks like the change would be fairly straightforward for a downstream consumer. Please correct me if that's not correct. If it's concern about the implementation, then I think the real solution is to make sure there's robust testing both in the repo and against some real life workloads to verify performance at different scales and partitioning structures. With respect to the question about output partitioning, I think any time you can do that you should. Any time we can give more information about these kinds of things we're going to see performance gains, and sometimes significant gains. |
0a2ba62 to
70bc487
Compare
1f87e13 to
7ff1f6d
Compare
…ning with eager task bucketing (apache#2298) (#18) * feat(datafusion): add IcebergPartitionedTableProvider and IcebergPartitionedScan for parallel file scanning (cherry picked from commit b9819b4) * docs(datafusion): update comment in IcebergPartitionedScan (cherry picked from commit c362b0f) * Update crates/integrations/datafusion/src/table/mod.rs Co-authored-by: Tim Saucer <timsaucer@gmail.com> (cherry picked from commit ec1bd37) * fix(datafusion): reject non-empty children in IcebergPartitionedScan::with_new_children (cherry picked from commit 5e53cb8) * fix(datafusion): use ArrowReaderBuilder existing configuration path (cherry picked from commit cc6a833) * format (cherry picked from commit 75e521d) * feat(datafusion): bucket FileScanTasks across target_partitions with identity-hash partitioning Replace the one-task-per-partition layout in IcebergPartitionedScan with N buckets sized from the session's target_partitions. When the table's default spec exposes identity-transform columns and every task carries the corresponding partition values, tasks are bucketed by hashing those values via DataFusion's REPARTITION_RANDOM_STATE so the resulting partitioning matches what RepartitionExec would produce. The scan then declares Partitioning::Hash(exprs, N), letting downstream joins and aggregates skip an extra repartition. Hash declaration is conservative and only stands when: - the table has a single partition spec (no spec evolution) - every identity source column is present in the output projection - every column type is supported by literal_to_array - every task supplied a full identity key Any miss collapses to UnknownPartitioning(N) while bucketing falls back to a hash of data_file_path so partitions still distribute. IcebergPartitionedScan now stores Vec<Vec<FileScanTask>> and execute(i) streams every task in buckets[i] through to_arrow_with_tasks. Bucket count is capped at min(target_partitions, num_files), and an empty table still yields zero partitions to avoid out-of-bounds execute calls. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> (cherry picked from commit 9edd54a) * feat(datafusion): mark identity-partition filters as Exact pushdown `IcebergPartitionedTableProvider::supports_filters_pushdown` previously returned `Inexact` for every filter, forcing DataFusion to re-evaluate even filters that Iceberg's manifest-level pruning has fully resolved. Per-filter the provider now returns `Exact` when both: - the iceberg conversion can represent the filter, so manifest pruning will remove every row that fails it, and - every leaf is a comparison or null check against an identity- partition column with a literal RHS. Identity-partitioned column names are cached at `try_new` from the table's default spec; tables with spec evolution (>1 historical specs) fall back to an empty set so all filters stay `Inexact`. Supported shapes: =, !=, <, <=, >, >=, IS NULL, IS NOT NULL, IN/NOT IN, plus AND/OR/NOT compositions of the above. Every other shape is `Inexact`. `convert_filter_to_predicate` is promoted to `pub(crate)` so the provider can probe convertibility per filter without rebuilding the whole AND-collapsed predicate. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> (cherry picked from commit add5e35) * feat(datafusion): allow Exact pushdown across spec evolution via per-column intersection Previously identity_partition_col_names returned an empty set whenever the table had more than one historical partition spec, forcing every filter back to Inexact under spec evolution. This was overly conservative: Iceberg evaluates partition predicates against each manifest's own spec, so a column that is identity-partitioned in every spec is fully prunable across the entire table regardless of which spec a given file was written under. Replace the multi-spec gate with an intersection across every spec's identity-source set. A column survives only if every spec includes it with Transform::Identity; columns that appear with non-identity transforms in some spec, or are missing from a spec entirely, are dropped. The result remains an honest set of columns for which Exact pushdown is provably safe across all surviving files. Hash bucketing (compute_identity_cols) keeps its single-spec gate because slot-order alignment with the table's default spec depends on each task carrying its own spec id, which the native plan flow does not yet do. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> (cherry picked from commit e8771e4) * Revert "feat(datafusion): allow Exact pushdown across spec evolution via per-column intersection" This reverts commit b2613e3. (cherry picked from commit 826f054) (cherry picked from commit aba4523) * Revert "feat(datafusion): mark identity-partition filters as Exact pushdown" This reverts commit 6d0ed4c. (cherry picked from commit 4381f00) (cherry picked from commit 598c5de) * refactor(datafusion): merge IcebergPartitionedTableProvider into IcebergTableProvider IcebergPartitionedTableProvider and IcebergPartitionedScan were introduced to enable parallel file scanning by bucketing FileScanTasks across DataFusion partitions. However, maintaining two TableProvider implementations is redundant: the new provider is strictly more capable, and its degenerate case (target_partitions=1) reproduces the old single-partition behavior exactly. This commit folds the partitioned provider into IcebergTableProvider and the partitioned scan into IcebergTableScan, eliminating the parallel types. Changes: - IcebergTableProvider::scan() now eagerly calls plan_files() and distributes FileScanTasks into buckets using the same identity-hash strategy (REPARTITION_RANDOM_STATE + create_hashes) that was in IcebergPartitionedTableProvider, enabling Partitioning::Hash declarations that align with DataFusion's RepartitionExec. - IcebergTableScan gains a new_with_tasks() constructor that accepts pre-planned buckets and a caller-supplied Partitioning. execute(i) streams the tasks in buckets[i] via TableScan::to_arrow_with_tasks, rebuilding the TableScan per-partition to avoid serializing PlanContext Arc-shared caches across workers. - The original new() constructor and the to_arrow() lazy path are kept unchanged for IcebergStaticTableProvider, which does not pre-plan tasks. - Limit slicing (try_filter_map truncation) from the old IcebergTableScan is preserved in both execution paths. - Bucketing helpers (IdentityCol, compute_identity_cols, bucket_tasks, identity_hash, fallback_hash, literal_to_array, is_supported_dtype) are moved verbatim into a new private table/bucketing.rs module. - Unit tests from partitioned.rs are migrated to table/mod.rs and updated to use IcebergTableProvider and IcebergTableScan. - integration_datafusion_test.rs: fix test_provider_plan_stream_schema to call execute(0) instead of execute(1). The old call worked only because the previous IcebergTableScan silently ignored the partition index. (cherry picked from commit d2e5e04) (cherry picked from commit 23f3d8f) * refactor(datafusion): polish scan API and add bucketing tests Review pass over the partitioned-scan branch ahead of upstream contribution. - Rename `TableScan::to_arrow_with_tasks` to `to_arrow_from_tasks` — `from` better signals that the tasks are the input source rather than a builder-style modifier. - Restructure the doc with a `# Correctness` section that calls out the projection/filter contract while clarifying that reader-side configuration (concurrency, batch size, row-group filtering, row selection) is taken from `self`. - Make `IcebergTableScan::new` and `new_with_tasks` `pub` (were `pub(crate)`) so external users can construct the node directly, matching the public visibility of the struct itself. - Drop the `convert_filters_to_predicate` re-export from `physical_plan/mod.rs`: it was unused outside the module. - Extract a private `new_inner` constructor on `IcebergTableScan` so `new` and `new_with_tasks` share a single source of truth for the `PlanProperties` / projection / predicate setup. - Split `IcebergTableScan::execute` into a linear pipeline backed by three helpers: `build_table_scan` (synchronous scan-builder plumbing), `build_record_batch_stream` (async stream construction for the lazy/eager modes), and `apply_limit`. - Trim the `IcebergTableScan` struct doc and field comments to match the rest of the file's style; drop the verbose `to_arrow_with_tasks` rationale (the `# Correctness` doc carries the load-bearing info). - Tighten `DisplayAs::fmt_as`: remove the file-path enumeration (file count alone is enough for `EXPLAIN`) and factor the common prefix. - Trim several narrating comments in `table/mod.rs` and the module doc that duplicated information already evident from the code. - Add `test_identity_partitioned_declares_hash`: verifies the happy path where an identity-partitioned table with the partition column in the projection produces `Partitioning::Hash` referencing that column. This was the main missing coverage for the bucketing logic. - Add `test_projection_without_partition_col_falls_back_to_unknown`: verifies the `compute_identity_cols → None` branch when the projection omits the partition source column. - Add helpers (`make_partitioned_catalog_and_table_for_bucketing`, `append_partitioned_fake_data_files`) to build identity-partitioned fixtures without writing real Parquet files. (cherry picked from commit b1f2d66) (cherry picked from commit 616dcdc) * test(sqllogictest): update EXPLAIN snapshots for eager bucketing output IcebergTableProvider::scan now plans files eagerly and buckets them across DataFusion partitions before returning the ExecutionPlan. As a result, IcebergTableScan's DisplayAs output always includes `buckets:[N] file_count:[M]` - even for unpartitioned tables where N = 1. Update the four .slt files whose EXPLAIN snapshots were missing this suffix, and fix the like_predicate_pushdown snapshots that also had a stale input_partitions count on RepartitionExec (the table now has multiple files across multiple buckets). (cherry picked from commit 6ae4a71) (cherry picked from commit 581dde7) * fix(datafusion): resolve conflicts (cherry picked from commit 7ad9dcc) * fix(datafusion): format (cherry picked from commit 7ff1f6d) --------- Co-authored-by: Tim Saucer <timsaucer@gmail.com> Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
timsaucer
left a comment
There was a problem hiding this comment.
From my perspective on the datafusion side this looks great! I appreciate the responsiveness to the earlier feedback
mbutrovich
left a comment
There was a problem hiding this comment.
Thanks for tackling this @toutane! First round of feedback.
| /// time and are not re-applied here. Reader-side configuration | ||
| /// (concurrency, batch size, row-group filtering, row selection) is | ||
| /// taken from `self` and may differ from the planning scan. | ||
| pub fn to_arrow_from_tasks(&self, tasks: FileScanTaskStream) -> Result<ArrowRecordBatchStream> { |
There was a problem hiding this comment.
This is now public on the core TableScan, and the only thing keeping it correct is the doc-comment contract that the caller's tasks were planned with the same projection/filter as self. Since the predicate is baked into the tasks and isn't re-derived here, passing tasks from a differently-filtered scan would silently return wrong rows with no error. Could we keep this pub(crate) (or behind the integration) until there's a concrete external need? If it does need to be public, is there a way to make the type system carry the invariant (e.g., consuming a wrapper that proves the tasks came from this scan) rather than relying on the doc?
Separately: as written, the body just copies five reader-config fields off self (file_io, concurrency_limit_data_files, row_group_filtering_enabled, row_selection_enabled, batch_size) onto an ArrowReaderBuilder and calls read(tasks); nothing in it needs the Table. And ArrowReaderBuilder is already public and Table-free, so the "execute pre-planned tasks" capability already exists for external callers: datafusion-comet's IcebergScanExec, which holds only a metadata_location and JVM-planned tasks (no Table), builds its reader that way today. So this method doesn't unlock anything new for downstream engines; its only value is reusing a TableScan's already-configured reader settings. Given that, does it earn pub on the core TableScan, or is pub(crate) enough? Callers without a TableScan are better served by ArrowReaderBuilder directly.
There was a problem hiding this comment.
I think your comment makes it clear that we don't need this function at all. As you pointed out, we can read the tasks directly from ArrowReaderBuilder without going through a TableScan. That means the only reason for IcebergTableScan to carry a Table (other than to obtain a FileIO instance) would be the "planning at execution" path.
You mentioned that the datafusion-comet implementation has already chosen to drop the Table from the scanning node, since the tasks are pre-planned. With that in mind, I'm wondering whether the same approach might be desirable for iceberg-datafusion, which would let us remove its "planning at execution" path. What do you think?
| /// `lit`. The Arrow type must match what DataFusion will see for this column | ||
| /// at scan time, otherwise `create_hashes` would dispatch on a different type | ||
| /// and produce a hash that disagrees with DataFusion's row-wise hashing. | ||
| fn literal_to_array(lit: &Literal, dt: &DataType) -> Option<ArrayRef> { |
There was a problem hiding this comment.
I think this duplicates create_primitive_array_single_element in crates/iceberg/src/arrow/value.rs:627 (currently pub(crate)), which already maps a Literal to a single-element array for ~20 types including Timestamp, Decimal128, and Binary, and handles nulls. Could we promote that to pub (or re-export it) and call it here? That would drop ~60 lines and, combined with is_supported_dtype, widen the set of partition types that can declare Hash instead of falling back. The deliberate type gate can stay where it is.
There was a problem hiding this comment.
Addressed in b6369e2. Rather than call create_primitive_array_single_element per task (which would keep the per-file allocations flagged in your other comment), I extracted the shared literal to Arrow dispatch into a new pub PrimitiveLiteralArrayBuilder (arrow/value.rs), re-exported as iceberg::arrow::PrimitiveLiteralArrayBuilder:
create_primitive_array_single_elementnow delegates to it , so the type mapping lives in one place.- The duplicated
literal_to_arrayin bucketing.rs is deleted; the hashing path builds through the same builder. is_supported_dtypeis widened so identity partition columns now declareHashinstead of falling back. New co-location testbucket_tasks_hashes_decimal_and_timestamp_identity_columnsexercises the widened set.- The deliberate type gate stays in
compute_identity_cols, exactly as you suggested.
Net lines went up rather than down (the shared builder + wider type coverage + tests), but the duplication itself is gone and the dispatch is now a single source of truth shared by both the single-element and batched-hashing paths.
| /// [`REPARTITION_RANDOM_STATE`] so the bucket assignment matches DataFusion's | ||
| /// hash-repartition convention. Returns `None` if the task lacks partition | ||
| /// data or any required slot is null/unsupported. | ||
| fn identity_hash(task: &FileScanTask, cols: &[IdentityCol]) -> Option<u64> { |
There was a problem hiding this comment.
The Hash declaration is only sound if the array we build here hashes identically to the column DataFusion sees at scan time (same create_hashes + REPARTITION_RANDOM_STATE). That holds today because the reader emits Utf8/Int32/Date32 and we rebuild those exact types, but it's a quiet trap if it ever drifts. For example DataFusion hashes Utf8 via <&str>::hash_one but Utf8View via the view word (hash_array vs hash_generic_byte_view_array in datafusion/common/src/hash_utils.rs), so the same string hashes differently. If the reader (or a downstream coercion) ever produces Utf8View without repartitioning this side, equal keys would split across partitions and a join/aggregate that trusts the declared partitioning would return wrong results silently.
Two requests: (1) a comment here making the "array dtype must equal the reader's output dtype" invariant explicit, and (2) a test that actually exercises co-location: run create_hashes(REPARTITION_RANDOM_STATE) over a real array of the partition values and assert each row lands in the same bucket this code assigned. Right now the tests assert the shape of the partitioning but never that the bucketing agrees with RepartitionExec.
Separately, the hashing is structured the slow way: for every task it allocates a Vec<ArrayRef>, builds one single-element array per identity column, allocates a one-element hashes buffer, and makes its own create_hashes call. That is O(tasks * cols) tiny allocations plus one hash-kernel invocation per file, all on the planning path that gates every query. This should be a single pass: build one array of length n_tasks per identity column (one element per task), call create_hashes once to fill an n_tasks buffer, then take hash % n_partitions per task. Same result, but allocations drop from per-task to per-column and the kernel runs once instead of N times. Tables with tens of thousands of files are routine, so I would rather fix this now than ship a per-file hot loop in planning.
There was a problem hiding this comment.
Two requests: (1) a comment here making the "array dtype must equal the reader's output dtype" invariant explicit, and (2) a test that actually exercises co-location: run
create_hashes(REPARTITION_RANDOM_STATE)over a real array of the partition values and assert each row lands in the same bucket this code assigned.
Both done in ace9b7e:
-
(1) The invariant is now stated where we declare
Partitioning::Hash(table/mod.rs): the declaration is only sound if the arrays we build from partition literals hash identically to what the reader emits at scan time. Since DataFusion's hash dispatch is dtype-specific, any drift in the reader output dtype (e.g. Utf8 to Utf8View) must either materialize that exact dtype on the bucketing path or fall back toUnknownPartitioning. (Wording later refined in 971aa0f.) -
(2)
test_identity_partitioned_hash_buckets_match_datafusion_repartitionrunscreate_hashes+REPARTITION_RANDOM_STATEover a real array of the partition values and asserts every row lands in the bucketbucket_tasksassigned -- i.e. agreement withRepartitionExec, not just the partitioning shape.
If the reader (or a downstream coercion) ever produces
Utf8Viewwithout repartitioning this side, equal keys would split across partitions and a join/aggregate that trusts the declared partitioning would return wrong results silently.
This exact trap is locked by test_unsupported_output_partition_dtype_falls_back_to_unknown_partitioning
(2802af4): it wires an output schema with Utf8View as a deliberately unsupported dtype and asserts the scan declares UnknownPartitioning instead of Hash.
Separately, the hashing is structured the slow way: for every task it allocates a
Vec<ArrayRef>, builds one single-element array per identity column, allocates a one-elementhashesbuffer, and makes its owncreate_hashescall. That is O(tasks * cols) tiny allocations plus one hash-kernel invocation per file [...]. This should be a single pass: build one array of lengthn_tasksper identity column [...] callcreate_hashesonce [...] then takehash % n_partitionsper task.
Done in b6369e2. identity_hash (per-task) is replaced by identity_hashes_for_tasks: one PrimitiveLiteralArrayBuilder per identity column with capacity = n_tasks, a single pass appending each task's partition literal, then one create_hashes call filling an n_tasks buffer; the bucket is hash % n_partitions per task. Allocations drop from per-task to per-column and the hash kernel runs once instead of N times. The per-file fallback (fallback_hash) is unchanged and still only applies to tasks missing a full identity key.
| /// missing partition data fall back to hashing `data_file_path`, which still | ||
| /// distributes evenly but breaks the `Hash` contract — the second tuple | ||
| /// element flags whether every task supplied a full identity key. | ||
| pub(super) fn bucket_tasks( |
There was a problem hiding this comment.
Tasks are distributed purely by hash % n_partitions, with no awareness of file_size_in_bytes. A table with one large file plus many small ones would put very uneven work in one bucket and serialize the query on it. iceberg-java does size-based bin-packing here (TableScanUtil.planTaskGroups + BinPacking, weighted by file/delete size with a target split size), which is also what #128 is asking for. I'm fine with count-based as a first cut, but could we at least note the limitation in the module doc and file/track the size-based follow-up? The size field is already on FileScanTask, so first-fit-decreasing on bytes would be a fairly contained extension.
There was a problem hiding this comment.
Agreed, documented the limitation in the bucketing module doc, and tracked the size-based bin-packing follow-up in #128.
| output_schema: &ArrowSchema, | ||
| ) -> Option<Vec<IdentityCol>> { | ||
| let metadata = table.metadata(); | ||
| if metadata.partition_specs_iter().len() > 1 { |
There was a problem hiding this comment.
Returning None whenever there's more than one historical spec is safe but stricter than iceberg-java, which intersects the identity fields present across all specs (Partitioning.groupingKeyType / commonActiveFieldIds) and still reports a grouping key on the common columns. I see the history tried the per-column intersection and reverted it as out of scope, which is totally reasonable for this PR. Could we add a one-line comment pointing at that behavior so the conservatism reads as intentional, and link a follow-up issue?
There was a problem hiding this comment.
Thanks. iceberg-java intersects the identity fields common to all specs and still emits a grouping key on them; we bail out instead because the bucketing path aligns slots to the default spec and FileScanTask doesn't yet carry its own spec id (related to e0d6add). Added a comment at the guard and filed #2658 to track it.
| /// Partition `i` streams `buckets[i]`. The caller is responsible for | ||
| /// ensuring `partitioning` matches the bucketing. Used by | ||
| /// [`IcebergTableProvider`][crate::table::IcebergTableProvider]. | ||
| #[allow(clippy::too_many_arguments)] |
There was a problem hiding this comment.
new_with_tasks takes raw buckets and a Partitioning with nothing enforcing buckets.len() == partitioning.partition_count(), and both this and new need #[allow(clippy::too_many_arguments)]. Could we collapse these into a small params struct (or builder) that validates the bucket/partitioning consistency and returns Result? That also removes the new/new_with_tasks/new_inner parameter repetition.
Related: new and new_with_tasks are now pub. Is that needed for the two in-crate providers, or could they stay pub(crate)? Public constructors that can build an inconsistent node are easy to misuse.
There was a problem hiding this comment.
You're right, this could let a user build an inconsistent node.
I added a pub struct IcebergTableScanBuilder that throws an error if buckets.len() != partition_count. This removes the duplication between new, new_with_tasks, and new_inner.
I'm interested in keeping the ability to build an IcebergTableScan from outside, which is why new and new_with_tasks were previously pub. We currently rely on a catalog and schema provider other than the ones in iceberg-datafusion. This is also something other users have asked for, as mentioned in this proposal: #2123. For that reason, I've kept build as pub.
| predicates: Option<Predicate>, | ||
| /// Optional limit on the number of rows to return | ||
| /// Pre-planned file scan tasks per partition (eager mode), or `None` (lazy mode). | ||
| buckets: Option<Vec<Vec<FileScanTask>>>, |
There was a problem hiding this comment.
The Option plus the buckets() accessor returning &[] via unwrap_or(&[]) could just be a Vec that's empty in lazy mode, which would drop a layer. That said, the Option does signal "lazy vs eager" intent, so I could go either way; your call.
There was a problem hiding this comment.
That is indeed misleading.
I think keeping Option is better, as it lets us distinguish the case where we've already planned but got 0 tasks from the case where we haven't planned yet. I changed the accessor accordingly, so it now returns an Option.
I also added a comment above the struct field to make explicit that None means lazy mode and Some means eager mode.
| ) -> DFResult<SendableRecordBatchStream> { | ||
| let fut = get_batch_stream( | ||
| let bucket = match &self.buckets { | ||
| Some(buckets) => Some(buckets.get(partition).cloned().ok_or_else(|| { |
There was a problem hiding this comment.
buckets.get(partition).cloned() deep-copies a whole Vec<FileScanTask> on every execute(partition), and each FileScanTask carries a predicate tree plus schema, so this is a real per-partition copy of all the planned task state, not a cheap clone. Storing the buckets as Arc<[Vec<FileScanTask>]> (or wrapping each bucket in an Arc) lets execute hand out a pointer and drop the copy entirely. Since execute runs per partition on the query hot path, I would make this an Arc rather than clone.
There was a problem hiding this comment.
Done: buckets is now Option<Arc<[Arc<[FileScanTask]>]>>
| /// Builds the `RecordBatch` stream for a single partition. When `bucket` is | ||
| /// `Some`, streams the pre-planned tasks via `to_arrow_from_tasks`; when | ||
| /// `None`, plans and reads the full scan via `to_arrow`. | ||
| async fn build_record_batch_stream( |
There was a problem hiding this comment.
Both arms repeat .map_err(to_datafusion_error)? ... .map_err(to_datafusion_error). Could we produce the inner stream in the match and apply the error mapping once afterward to cut the duplication? (No behavior change intended.)
There was a problem hiding this comment.
Done, the stream is produced by the match and the error mapping is applied in only one location.
| } | ||
|
|
||
| /// Identity-partitioned table whose source column is in the projection | ||
| /// must produce `Partitioning::Hash` referencing that column. |
There was a problem hiding this comment.
Nice fixtures (MemoryCatalog + synthesized data files, no Parquet needed). A few branches I don't see covered that seem worth locking down since they fail silently if they regress:
- co-location correctness vs
RepartitionExec(see the bucketing.rs:135 comment), the most important one; - spec evolution ->
UnknownPartitioning; - an unsupported partition dtype (e.g., timestamp) ->
UnknownPartitioning; - a null partition value -> fallback hashing /
Unknown.
There was a problem hiding this comment.
Added the missing provider-level coverage in crates/integrations/datafusion/src/table/mod.rs:
- Co-location vs DataFusion repartition hash was already covered by
test_identity_partitioned_hash_buckets_match_datafusion_repartition - Added test_spec_evolution_falls_back_to_unknown_partitioning`
- Added
test_unsupported_output_partition_dtype_falls_back_to_unknown_partitioning(timestamp is currently supported by this bucketing path, so I usedUtf8Viewas the unsupported dtype.) - Added
test_null_partition_value_falls_back_to_unknown_partitioning
Also kept the existing lower-level null fallback behavior covered in bucketing.rs
…column intersection Previously identity_partition_col_names returned an empty set whenever the table had more than one historical partition spec, forcing every filter back to Inexact under spec evolution. This was overly conservative: Iceberg evaluates partition predicates against each manifest's own spec, so a column that is identity-partitioned in every spec is fully prunable across the entire table regardless of which spec a given file was written under. Replace the multi-spec gate with an intersection across every spec's identity-source set. A column survives only if every spec includes it with Transform::Identity; columns that appear with non-identity transforms in some spec, or are missing from a spec entirely, are dropped. The result remains an honest set of columns for which Exact pushdown is provably safe across all surviving files. Hash bucketing (compute_identity_cols) keeps its single-spec gate because slot-order alignment with the table's default spec depends on each task carrying its own spec id, which the native plan flow does not yet do. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…ergTableProvider IcebergPartitionedTableProvider and IcebergPartitionedScan were introduced to enable parallel file scanning by bucketing FileScanTasks across DataFusion partitions. However, maintaining two TableProvider implementations is redundant: the new provider is strictly more capable, and its degenerate case (target_partitions=1) reproduces the old single-partition behavior exactly. This commit folds the partitioned provider into IcebergTableProvider and the partitioned scan into IcebergTableScan, eliminating the parallel types. Changes: - IcebergTableProvider::scan() now eagerly calls plan_files() and distributes FileScanTasks into buckets using the same identity-hash strategy (REPARTITION_RANDOM_STATE + create_hashes) that was in IcebergPartitionedTableProvider, enabling Partitioning::Hash declarations that align with DataFusion's RepartitionExec. - IcebergTableScan gains a new_with_tasks() constructor that accepts pre-planned buckets and a caller-supplied Partitioning. execute(i) streams the tasks in buckets[i] via TableScan::to_arrow_with_tasks, rebuilding the TableScan per-partition to avoid serializing PlanContext Arc-shared caches across workers. - The original new() constructor and the to_arrow() lazy path are kept unchanged for IcebergStaticTableProvider, which does not pre-plan tasks. - Limit slicing (try_filter_map truncation) from the old IcebergTableScan is preserved in both execution paths. - Bucketing helpers (IdentityCol, compute_identity_cols, bucket_tasks, identity_hash, fallback_hash, literal_to_array, is_supported_dtype) are moved verbatim into a new private table/bucketing.rs module. - Unit tests from partitioned.rs are migrated to table/mod.rs and updated to use IcebergTableProvider and IcebergTableScan. - integration_datafusion_test.rs: fix test_provider_plan_stream_schema to call execute(0) instead of execute(1). The old call worked only because the previous IcebergTableScan silently ignored the partition index. (cherry picked from commit d2e5e04)
Review pass over the partitioned-scan branch ahead of upstream contribution. - Rename `TableScan::to_arrow_with_tasks` to `to_arrow_from_tasks` — `from` better signals that the tasks are the input source rather than a builder-style modifier. - Restructure the doc with a `# Correctness` section that calls out the projection/filter contract while clarifying that reader-side configuration (concurrency, batch size, row-group filtering, row selection) is taken from `self`. - Make `IcebergTableScan::new` and `new_with_tasks` `pub` (were `pub(crate)`) so external users can construct the node directly, matching the public visibility of the struct itself. - Drop the `convert_filters_to_predicate` re-export from `physical_plan/mod.rs`: it was unused outside the module. - Extract a private `new_inner` constructor on `IcebergTableScan` so `new` and `new_with_tasks` share a single source of truth for the `PlanProperties` / projection / predicate setup. - Split `IcebergTableScan::execute` into a linear pipeline backed by three helpers: `build_table_scan` (synchronous scan-builder plumbing), `build_record_batch_stream` (async stream construction for the lazy/eager modes), and `apply_limit`. - Trim the `IcebergTableScan` struct doc and field comments to match the rest of the file's style; drop the verbose `to_arrow_with_tasks` rationale (the `# Correctness` doc carries the load-bearing info). - Tighten `DisplayAs::fmt_as`: remove the file-path enumeration (file count alone is enough for `EXPLAIN`) and factor the common prefix. - Trim several narrating comments in `table/mod.rs` and the module doc that duplicated information already evident from the code. - Add `test_identity_partitioned_declares_hash`: verifies the happy path where an identity-partitioned table with the partition column in the projection produces `Partitioning::Hash` referencing that column. This was the main missing coverage for the bucketing logic. - Add `test_projection_without_partition_col_falls_back_to_unknown`: verifies the `compute_identity_cols → None` branch when the projection omits the partition source column. - Add helpers (`make_partitioned_catalog_and_table_for_bucketing`, `append_partitioned_fake_data_files`) to build identity-partitioned fixtures without writing real Parquet files. (cherry picked from commit b1f2d66)
IcebergTableProvider::scan now plans files eagerly and buckets them across DataFusion partitions before returning the ExecutionPlan. As a result, IcebergTableScan's DisplayAs output always includes `buckets:[N] file_count:[M]` - even for unpartitioned tables where N = 1. Update the four .slt files whose EXPLAIN snapshots were missing this suffix, and fix the like_predicate_pushdown snapshots that also had a stale input_partitions count on RepartitionExec (the table now has multiple files across multiple buckets). (cherry picked from commit 6ae4a71)
803912c to
277ee9c
Compare
Document that task bucketing distributes by file count, not file_size_in_bytes, and track the size-based bin-packing follow-up in apache#128.
bc91fff to
39da3c0
Compare
Note that compute_identity_cols intentionally bails out on any partition spec evolution rather than intersecting identity fields across specs like iceberg-java does. Link the follow-up issue tracking that relaxation. Closes review comment on apache#2298. Tracked in apache#2658.
@mbutrovich thank you very much for this initial review and for your clear and insightful comments, which are essential for determining the direction of the implementation. I’ve tried to address each of your comments. Unfortunately, the PR has doubled in size due to the addition of tests, the creation of a builder for the scan node, the refactoring of the bucketing.rs module, etc. Is this a blocker? If that's the case, one option might be to split the PR into two parts: the first part on task pre-planning and the second on leveraging the physical partitioning of the Iceberg table (e.g setting DataFusion's output-partitioning to Also, regarding your comment about adding a cache for tasks to avoid re-reading the metadata files if |
This PR has been split up, first part is #2671
Which issue does this PR close?
Motivation
Today the DataFusion scan node (
IcebergTableScan) streams every file through a single partition and reportsUnknownPartitioning. Two costs follow:No parallelism. File reads aren't spread across DataFusion partitions, so we don't use the parallel execution it's built for.
Redundant shuffles. Because the node hides its partitioning, the optimizer can't tell the data is already physically partitioned by Iceberg, and inserts a
RepartitionExecfor joins/aggregations on partition columns -- re-shuffling already co-located rows.This PR is a scoped fix at the file grain (one
FileScanTask= smallest unit). It doesn't preempt the sub-file / row-group direction tracked in EPIC: Support parallel scan in iceberg-datafusion #1604; it's the intermediate step toward it.What changes are included in this PR?
Breaking change to
IcebergTableProviderandIcebergTableScan. The scan node now has two modes:Eager (new default).
IcebergTableProvider::scanplans the tasks at planning time and buckets them intomin(target_partitions, n_files)groups; each bucket is one output partition the scan streams in parallel. Tasks are computed once and shared across all partitions; the plan is reproducible;execute(partition)is pure I/O with no catalog round-trips.IcebergTableProvider::scannow does network I/O (catalog + metadata reads), which is unusual for a planning-phase method. An alternative design - planning lazily at execute time - would keepscan()cheap but requires oneplan_files()call per partition (redundant). A future extension could expose this as an option for use cases where snapshot staleness matters more than plan reproducibility.Lazy (legacy). Tasks are planned at execution time by
IcebergStaticTableProvider, as before -- the fallback. Static snapshots do not benefit from eager planning because the task list is fixed by construction.A single scan node carries both modes (as opposed to introducing new types as originally proposed), so there's one provider and one scan node to maintain as suggested by @timsaucer.
Output partitioning. When the table is identity-partitioned on the scanned columns, the node declares
Partitioning::Hash(...)instead ofUnknownPartitioning, bucketing tasks so the result matches DataFusion's repartition hashing (create_hashes+REPARTITION_RANDOM_STATE); the optimizer can then drop the redundantRepartitionExec. It falls back toUnknownPartitioningwhenever that match isn't guaranteed: spec evolution, unsupporteddtypes, null partition values, or non-identity transforms (bucket/truncate/temporal).Key changes
Eager planning moves into
IcebergTableProvider::scan. In eager modeplan_files()runs at planning time and the tasks are bucketed (bucketing::bucket_tasks) intomin(target_partitions, n_files)groups stored on the scan node. In lazy mode (IcebergStaticTableProvider),plan_files()stays inexecute().Eager
execute(partition)builds the Arrow reader directly. It streams its pre-assigned bucket through anArrowReaderBuilder, reusing the scan's reader settings (concurrency limit, row-group filtering, row selection, batch size) -- no per-partitionplan_files()or catalog round-trip. Buckets are held asArc<[Arc<[FileScanTask]>]>, soexecutehands out a pointer instead of cloning the task list.New
bucketingmodule (proposed by @timsaucer). Assigns tasks to buckets and decides the declaredPartitioning: for a single-spec identity-partitioned table it hashes tasks on their partition values (create_hashes+REPARTITION_RANDOM_STATE) and declaresPartitioning::Hash, otherwiseUnknownPartitioning.New
IcebergTableScanBuilder. Replaces thenew/new_with_tasks/new_innerconstructors (and theirtoo_many_arguments), validatingbuckets.len() == partition_countand returningResultso an inconsistent node can't be built.with_new_childrennow errors.IcebergTableScanis a leaf node; non-empty children returnDataFusionError::Internalinstead of being silently dropped (matchingIcebergCommitExec).The builder's
buildis nowpub. Kept public so downstream integrations with their own catalog / schema provider can construct anIcebergTableScandirectly, as requested in feat(datafusion): export table provider constructor #2123.Known limitations
Limited type support for
Partitioning::Hash. Each identity column is hashed via a single-element Arrow array, so only handled dtypes can declareHash. The set has been broadened (timestamps now work); a few types such asUtf8Viewstill fall back toUnknownPartitioning.Spec evolution disables
Partitioning::Hash. With more than one historical spec, the module returnsUnknownPartitioningrather than risk a partition-tuple mismatch -- stricter than iceberg-java (see follow-ups). Covered bytest_spec_evolution_falls_back_to_unknown_partitioning.Bucketing is count-based, not size-aware. Distribution is
hash % n_partitions, ignoringfile_size_in_bytes; one large file among small ones can serialize the query on a single bucket. iceberg-java bin-packs by size (Plan file scan task according scan file size. #128).Planning runs in
scan()and may re-run.scan()does catalog + manifest I/O; if DataFusion calls it multiple times during optimization, planning repeats -- no task caching yet, whereas iceberg-java caches behindtasks()/taskGroups().Follow-up work
Redundant
FilterExec(IcebergTableProvider::supports_filters_pushdown marks every filter as Inexact, causing a redundant FilterExec above IcebergTableScan #2363) -- @timsaucer reports thatsupports_filters_pushdownreturnsInexactfor all filters, causing DataFusion to insert aFilterExecaboveIcebergTableScaneven though the Arrow reader already applies the predicate viaArrowPredicateFn. ReturningExactfor losslessly-converted filters would eliminate this redundant re-evaluation (only identity transforms are safe to markExact). He proposed a solution in earlier commits, but those have been reverted as out of scope for this PR.Size-based bin-packing (Plan file scan task according scan file size. #128) -- distribute tasks by
file_size_in_bytes(e.g. first-fit-decreasing) rather than by count; the size is already onFileScanTask.Cache planned tasks -- key the bucketed tasks by (snapshot, projection, filter) so repeated
scan()calls during optimization don't re-read manifests, mirroring iceberg-java's lazy caching.Grouping key across specs (datafusion: relax identity grouping-key gate across partition-spec evolution #2658) -- instead of disabling
Partitioning::Hashwhen multiple specs exist, intersect the identity fields common to all specs and declareHashon those columns (iceberg-java'sgroupingKeyType/commonActiveFieldIds). The per-column intersection was tried and reverted as out of scope here.Widen dtype coverage -- reuse
create_primitive_array_single_element(arrow/value.rs,pub(crate)) for the bucketing arrays; it already maps ~20 literal types and handles nulls, so more partition types could declareHash.Are these changes tested?
Yes. Unit tests in
table/mod.rscovering the new bucketed scan path. Additional tests are added forIcebergTableProviderto cover limit pushdown, insert behavior, and schema consistency, ensuring the refactor introduces no regressions on existing functionality.EXPLAINSQL logic tests snapshots are updated to reflect the newbuckets:[N] file_count:[M]display format and the correctinput_partitionscounts.We plan to test these changes in our infrastructure by shadowing real-world queries.