Skip to content

feat(datafusion): parallel file scanning with eager task bucketing#44

Merged
phillipleblanc merged 1 commit into
spiceai-0.9.1-df-52from
phillip/parallel-file-scanning-df52
Jun 15, 2026
Merged

feat(datafusion): parallel file scanning with eager task bucketing#44
phillipleblanc merged 1 commit into
spiceai-0.9.1-df-52from
phillip/parallel-file-scanning-df52

Conversation

@phillipleblanc

Copy link
Copy Markdown

Ports apache/iceberg-rust#2298 onto the DF52 fork line (spiceai-0.9.1-df-52, renamed from spiceai-52). DF53 counterpart: #43.

IcebergTableProvider::scan() plans files eagerly and distributes FileScanTasks into min(target_partitions, n_files) buckets — one bucket per DataFusion partition — so file reads are scheduled concurrently instead of through a single UnknownPartitioning(1) partition. Identity-partitioned tables declare Partitioning::Hash so downstream joins/aggregates can skip a RepartitionExec; everything else falls back to UnknownPartitioning(N) while still bucketing.

Adaptations for the DF52 line (vs. the DF53 port #43):

  • to_arrow_from_tasks uses this line's 1-arg ArrowReaderBuilder::new(self.file_io.clone()) and the direct .read(tasks) return (no .map(|r| r.stream())).
  • IcebergTableScan stores a bare PlanProperties (DF52's ExecutionPlan::properties() returns &PlanProperties, not &Arc<PlanProperties>).
  • EXPLAIN snapshots retain DF52's CooperativeExec node and gain the buckets:[N] file_count:[M] suffix (+ updated input_partitions).
  • The fork's limit-pushdown (with_limit) is preserved.

Validation: iceberg-datafusion 86 lib tests (incl. 6 new bucketing tests + existing limit tests), all 9 df_* sqllogictest schedules, integration_datafusion_test::test_provider_plan_stream_schema; clippy + fmt clean (against DataFusion 52.2).

Port of apache#2298 onto the spiceai-0.9.0 fork.

IcebergTableProvider::scan() now plans files eagerly and distributes
FileScanTasks into min(target_partitions, n_files) buckets, one bucket
per DataFusion partition, so file reads are scheduled concurrently
instead of streaming through a single UnknownPartitioning(1) partition.
When the table is identity-partitioned (single spec, supported column
types, partition columns projected) the scan declares Partitioning::Hash
so downstream joins/aggregates can skip a RepartitionExec.

- TableScan::to_arrow_from_tasks: replay pre-collected FileScanTasks
  through the Arrow reader; preserves the spice fork's ArrowReaderBuilder
  (file_io, runtime) signature and row-selection config.
- IcebergTableScan gains new_with_tasks (eager) alongside new (lazy,
  used by IcebergStaticTableProvider); execute(i) streams buckets[i].
  Constructors made pub; with_new_children now errors on children.
- New table/bucketing.rs: identity-hash bucketing via
  REPARTITION_RANDOM_STATE + create_hashes, fallback to data_file_path.
- Spice limit pushdown preserved: with_limit threaded into the planning
  builder and build_table_scan.
- Drop the unused convert_filters_to_predicate re-export.
Copilot AI review requested due to automatic review settings June 15, 2026 02:50

Copilot AI left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR ports eager task planning + parallel file scanning to the DataFusion 52 fork line by planning Iceberg FileScanTasks up-front, bucketing them into multiple DataFusion partitions, and (when safe) declaring Partitioning::Hash for identity-partitioned tables to avoid downstream repartitioning.

Changes:

  • Eagerly plans FileScanTasks in IcebergTableProvider::scan() and buckets them into min(target_partitions, n_files) partitions, computing Partitioning::Hash when identity partition columns are projected and fully present.
  • Adds a bucketing module that reproduces DataFusion’s repartition hashing for identity partition keys, with a deterministic per-file fallback.
  • Extends Iceberg scan APIs to support to_arrow_from_tasks, updates IcebergTableScan to execute per-bucket tasks, and refreshes integration/sqllogictest plan snapshots accordingly.

Reviewed changes

Copilot reviewed 9 out of 9 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
crates/integrations/datafusion/src/table/mod.rs Plans tasks eagerly, buckets them, and selects Hash vs UnknownPartitioning; adds bucketing-focused tests.
crates/integrations/datafusion/src/table/bucketing.rs Implements task bucketing + identity-hash computation aligned with DataFusion hashing.
crates/integrations/datafusion/src/physical_plan/scan.rs Adds eager task-bucket execution mode, new display suffix, and shared scan/stream helpers.
crates/iceberg/src/scan/mod.rs Introduces to_arrow_from_tasks and routes to_arrow() through it.
crates/integrations/datafusion/tests/integration_datafusion_test.rs Updates execution to use partition 0 in tests.
crates/sqllogictest/testdata/slts/df_test/timestamp_predicate_pushdown.slt Updates expected physical plan text to include bucketing suffix.
crates/sqllogictest/testdata/slts/df_test/like_predicate_pushdown.slt Updates expected partition counts / bucketing suffix in physical plan output.
crates/sqllogictest/testdata/slts/df_test/boolean_predicate_pushdown.slt Updates expected physical plan text to include bucketing suffix.
crates/sqllogictest/testdata/slts/df_test/binary_predicate_pushdown.slt Updates expected physical plan text to include bucketing suffix (including empty-table case).

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread crates/integrations/datafusion/src/table/mod.rs
Comment thread crates/integrations/datafusion/src/physical_plan/scan.rs
Comment thread crates/integrations/datafusion/src/table/bucketing.rs
@phillipleblanc phillipleblanc self-assigned this Jun 15, 2026
@phillipleblanc phillipleblanc merged commit ff09ec5 into spiceai-0.9.1-df-52 Jun 15, 2026
17 of 18 checks passed
phillipleblanc added a commit to spiceai/spiceai that referenced this pull request Jun 15, 2026
Fork PR spiceai/iceberg-rust#44 merged; re-pin from the branch head
(65463fe6) to the merge commit ff09ec55 on spiceai-0.9.1-df-52.
phillipleblanc added a commit to spiceai/spiceai that referenced this pull request Jun 15, 2026
Fork PR spiceai/iceberg-rust#44 merged; re-pin from the branch head
(65463fe6) to the merge commit ff09ec55 on spiceai-0.9.1-df-52. Lock diff
kept iceberg-only.
phillipleblanc added a commit to spiceai/spiceai that referenced this pull request Jun 15, 2026
…e/2.0) (#11331)

* chore(deps): bump iceberg-rust to parallel file scanning fork (release/2.0)

Re-pins iceberg-rust to spiceai-0.9.1-df-52 (rev 65463fe6), picking up
the eager task-bucketing parallel-scan change (fork PR
spiceai/iceberg-rust#44, port of apache/iceberg-rust#2298).

release/2.0 counterpart of the trunk change in #11328. The DF52 fork line
spiceai-52 is renamed spiceai-0.9.1-df-52 per the spiceai-<iceberg>-df-<df>
convention (alias retained; version req stays caret 0.9.0, satisfied by
0.9.1). No Spice code changes — only IcebergTableProvider::try_new is used.

* chore(deps): re-pin iceberg-rust to merged spiceai-0.9.1-df-52 commit

Fork PR spiceai/iceberg-rust#44 merged; re-pin from the branch head
(65463fe6) to the merge commit ff09ec55 on spiceai-0.9.1-df-52.

* chore(deps): re-pin iceberg-rust to merged spiceai-0.9.1-df-52 commit

Fork PR spiceai/iceberg-rust#44 merged; re-pin from the branch head
(65463fe6) to the merge commit ff09ec55 on spiceai-0.9.1-df-52. Lock diff
kept iceberg-only.

* chore(deps): point iceberg pin comments at spiceai-0.9.1-df-52

The release/2.0 iceberg pins were annotated `# spiceai-52` (the
pre-rename alias). Update to `# branch: spiceai-0.9.1-df-52` to name the
renamed fork branch, matching the convention on #11328.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants