perf: split row groups by file range for parallel single-row-group scans (morsel splitting)#23285
perf: split row groups by file range for parallel single-row-group scans (morsel splitting)#23285Dandandan wants to merge 2 commits into
Conversation
…ans (morsel splitting) When a parquet scan is restricted to a byte range that only partially overlaps a row group, read the proportional slice of the row group's rows (via a RowSelection) instead of assigning the whole row group to the one range containing its first data page. Since FileGroupPartitioner already tiles files into byte ranges, this lets all partitions decode disjoint slices of the same row group in parallel, parallelizing scans of files with fewer row groups than partitions (e.g. a single large row group). Row boundaries are computed with identical integer arithmetic on both sides of each range boundary, so every row is read exactly once. The offset index is now loaded whenever the access plan contains row selections so each partition fetches and decodes only the pages covering its slice. Controlled by `datafusion.execution.parquet.split_row_groups_by_range` (default: true). TPC-DS SF=1 with one row group per file: 2.2x faster overall (16.8s -> 7.5s), 82/99 queries faster, up to 5.3x. TPC-H SF=1 (multi-row-group files): 1.16x faster overall, no significant regressions. Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
|
run benchmarks |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing split-row-groups-by-range (6d1a25a) to 3bb9314 (merge-base) diff using: tpch File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing split-row-groups-by-range (6d1a25a) to 3bb9314 (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing split-row-groups-by-range (6d1a25a) to 3bb9314 (merge-base) diff using: tpcds File an issue against this benchmark runner |
|
Thank you for opening this pull request! Reviewer note: cargo-semver-checks reported the current version number is not SemVer-compatible with the changes in this pull request (compared against the base branch). Details |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpch — base (merge-base)
tpch — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
|
run benchmarks |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing split-row-groups-by-range (6d1a25a) to 3bb9314 (merge-base) diff using: tpcds File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing split-row-groups-by-range (6d1a25a) to 3bb9314 (merge-base) diff using: tpch File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing split-row-groups-by-range (6d1a25a) to 3bb9314 (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpch — base (merge-base)
tpch — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
… morsels When a pop would leave the shared work queue empty, split the final byte range into small morsels (halving down to a floor) and push the excess back, so sibling streams that finish their pieces early steal a share of the last piece instead of idling behind one straggler. Work items are returned exactly as the planner sized them while the queue is deep. The morsel floor is ~1MiB of data the scan actually reads: the file-range floor is scaled by the fraction of file columns referenced by the projection and filter, so narrow projections produce proportionally larger byte ranges and the fixed per-piece open cost stays amortized. TPC-DS SF=1 (single-row-group files): 4.4% faster overall (7.53s -> 7.21s), 28/99 queries faster, up to 1.14x, no meaningful regressions. TPC-H SF=1: 6.1% faster overall (1084ms -> 1022ms), 9/22 faster, 0 slower. Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
|
run benchmarks |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing split-row-groups-by-range (95185fa) to 3bb9314 (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing split-row-groups-by-range (95185fa) to 3bb9314 (merge-base) diff using: tpch File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing split-row-groups-by-range (95185fa) to 3bb9314 (merge-base) diff using: tpcds File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpch — base (merge-base)
tpch — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
Which issue does this PR close?
Rationale for this change
DataFusion parallelizes parquet scans by splitting files into byte ranges (
FileGroupPartitioner), but each row group is assigned entirely to the single range that contains the offset of its first data page. Files with fewer row groups thantarget_partitionstherefore cannot use all cores: a file with one large row group is decoded by exactly one partition, no matter how many cores are available.This PR implements "morsel"-style splitting within row groups: a byte range that partially overlaps a row group now reads the proportional slice of that row group's rows via a
RowSelection, so all partitions decode disjoint slices of the same row group in parallel.Benchmark results (M-series, 10 cores)
TPC-DS SF=1 rewritten with a single row group per file (e.g.
store_sales= 2.88M rows / 100MB / 1 row group):inventory): 0.95 cores → 7.4 cores process-wide average; instruction overhead from splitting is only +7%TPC-H SF=1 (standard multi-row-group files, checking the new default doesn't regress the common case):
What changes are included in this PR?
RowGroupAccessPlanFilter::split_by_range: maps a row group's byte span (first dictionary/data page offset + compressed size) linearly onto its rows and assigns each range the row interval proportional to its byte overlap. Boundaries are computed with identical flooring integer arithmetic on both sides of every range boundary, so ranges that tile a file assign every row to exactly one range - no duplicates, no gaps.split_by_rangeinstead ofprune_by_rangewhen the new option is enabled; the old midpoint behavior is preserved when disabled.should_load_page_indexnow also loads the page index when the access plan contains row selections, so the reader fetches and decodes only the pages covering each partition's slice (instead of whole column chunks).SharedWorkSourcetail morsel splitting: work items are returned exactly as the planner sized them while the shared queue is deep; only when a pop would leave the queue empty is the final byte range split into small morsels (halving, pushed back for siblings to steal), so streams that finish early steal a share of the last piece instead of idling behind one straggler. The morsel floor is ~1MiB of data the scan actually reads: the file-range floor is scaled by the fraction of file columns referenced by projection + filter, keeping the fixed per-piece open cost amortized under narrow projections. This adds a further 4.4% on TPC-DS SF=1 (7.53s → 7.21s, 28/99 faster, up to 1.14x) and 6.1% on TPC-H SF=1 (1084ms → 1022ms, 9/22 faster, 0 slower).datafusion.execution.parquet.split_row_groups_by_range, defaulttrue, with proto serialization,information_schemaand config docs updates.Existing machinery composes unchanged: page-index pruning intersects with the range selection (
scan_selection), limit pruning counts selection rows,reverse()remaps selections, and sorted-scan range splitting produces contiguous in-order slices.Are these changes tested?
split_by_range: exact tiling across ranges, full-coverage keepsScan, no-overlap skips, splitting across multiple row groups, zero-row slices, and respecting already-skipped row groups.store_sales(2.88M rows, single row group): counts/sums/distinct aggregates identical to the multi-row-group original, and the (item, ticket) primary key appears exactly once per row.Are there any user-facing changes?
New config option
datafusion.execution.parquet.split_row_groups_by_range(defaulttrue). With the default, partitions of a ranged parquet scan that previously returned no rows (their range contained no row group start) now return the rows proportional to their byte range; total scan output is unchanged.🤖 Generated with Claude Code