Skip to content

Conversation

@gene-bordegaray
Copy link
Contributor

@gene-bordegaray gene-bordegaray commented Nov 24, 2025

Full Report

Issue 18777 Parallelize Key Partitioned Data.pdf

Which issue does this PR close?

Rationale for this change

Optimize aggregations on Hive-partitioned tables by eliminating unnecessary repartitioning/coalescing when grouping by partition columns. This enables parallel computation of complete results without a merge bottleneck.

What changes are included in this PR?

  • Introduce new partitioning type KeyPartitioned
  • Save and propagate file partition metadata through query plan
  • Change aggregation mode selection in physical planner
  • Update enforce distribution rules to eliminate unnecessary repartitioning

Are these changes tested?

  • Unit and integration tests added for all new logic

Benchmarking

For tpch it was unaffected as expected (not partitioned):

Screenshot 2025-11-24 at 1 47 20 PM Screenshot 2025-11-24 at 1 47 38 PM

I create my own benchmark and saw these results:

Benchmarking hive_partitioned_agg/with_key_partitioned: Collecting 100 samples in estimated 6
hive_partitioned_agg/with_key_partitioned
                        time:   [12.356 ms 12.428 ms 12.505 ms]
                        change: [−1.6022% −0.8538% −0.0780%] (p = 0.03 < 0.05)
                        Change within noise threshold.
Found 2 outliers among 100 measurements (2.00%)
  1 (1.00%) high mild
  1 (1.00%) high severe
Benchmarking hive_partitioned_agg/without_key_partitioned: Collecting 100 samples in estimate
hive_partitioned_agg/without_key_partitioned
                        time:   [13.179 ms 13.278 ms 13.382 ms]
                        change: [−0.8465% +0.2090% +1.2419%] (p = 0.70 > 0.05)
                        No change in performance detected.
Found 4 outliers among 100 measurements (4.00%)

These are not huge improvements as in memory hashing is pretty efficient but these are consistent gain (ran many times).

These improvements will be crucial for distributed datafusion as network shuffles are much less efficient than in memory repartitioning.

Are there any user-facing changes?

  • Yes, new configuration option: listing_table_preserve_partition_values
  • Changes query plans when activated

@github-actions github-actions bot added physical-expr Changes to the physical-expr crates optimizer Optimizer rules core Core DataFusion crate sqllogictest SQL Logic Tests (.slt) catalog Related to the catalog crate common Related to common crate proto Related to proto crate datasource Changes to the datasource crate physical-plan Changes to the physical-plan crate labels Nov 24, 2025
@github-actions github-actions bot added the documentation Improvements or additions to documentation label Nov 25, 2025
pub preserve_partition_values: bool,
/// Cached result of key_partition_exprs computation to avoid repeated work
#[allow(clippy::type_complexity)]
key_partition_exprs_cache: OnceLock<Option<Vec<Arc<dyn PhysicalExpr>>>>,
Copy link
Contributor Author

@gene-bordegaray gene-bordegaray Nov 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caches results of compute_key_partition_exprs() which is expensive:

  • loops through file groups and does hash set operations
  • called multiple times (output_partitioning() and eq_properties())

}
Distribution::KeyPartitioned(_) => {
// Nothing to do: treated as satisfied upstream
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No-op because we can guarantee that our data is correctly distributed

02)--AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[nth_value(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted
03)----SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true]
04)------CoalesceBatchesExec: target_batch_size=8192
05)--------RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4
Copy link
Contributor Author

@gene-bordegaray gene-bordegaray Nov 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Eliminates this hash because it would break ordering guarantees

@gene-bordegaray gene-bordegaray marked this pull request as ready for review November 26, 2025 06:09
@gene-bordegaray
Copy link
Contributor Author

gene-bordegaray commented Nov 26, 2025

cc: @NGA-TRAN @alamb this is updated solution with report on why I chose the approach I did

@fmonjalet
Copy link
Contributor

fmonjalet commented Nov 27, 2025

Thanks a lot for the description and companion doc, they are super useful.

This work is super nice and is even crucial for distributed DataFusion. Reusing partitioning and avoiding repartitions can make a huge difference when the repartition is done on the network. The plans you posted as examples are exactly what we should be aiming for.

I think I am still missing part of the point of KeyPartitioned vs reusing Hash. I'll explain what I understand and you can correct me:

  • Anything KeyPartitioned is Hash partitioned (but the opposite is not true) ==> is this correct?
  • KeyPartitioned means each key is in a distinct partition ==> is this correct?
  • If the above is correct (if it's not, my reasoning does not hold and you can ignore the rest of this comment), I am not sure how this applies to high cardinality keys, for example date_bin(timestamp, 15m) or id hash ranges (say you have a million files, each one having a distinct range). I imagine we'd want to be able to group multiple "keys" into the same processing partition, to avoid having thousands of partitions. My understanding is that DataFusion partitions will add overhead if there are too many (subsequent repartitions, coalesce, merge sort), but I may be mistaken.
  • Once we group KeyPartitioned partitions together, they become Hash partitions. ==> is this correct?
  • So in practice, it appears to me that we'll almost always need to resort to Hash partitions.
  • What we'd loose compared to KeyPartition is the SortExec elision when aggregating then sorting by the partition key, but I'd argue that if you had one group per partition, then probably the sorting is cheap enough. ==> Do we lose something else?
    (This point is not challenging the PR as a whole but just an implementation choice.)

So my current understanding is: KeyPartitioned is indeed different from Hash (a specific case carrying more information) but the ratio complexity / added value is not obvious. The reason we'd not take full advantage of KeyPartitioned may be that DF partitions are actually bound to processing units (~threads), and maybe there would be value in separating the notion of processing thread and the notion of data partition, where you could have N processing unit per partitions (with partial repartitions), or N partitions per thread. But this sounds like a completely different topic and I don't know how much it makes sense.

Sorry for the wall of text, I am mostly trying to wrap my head around this, please correct anything I missed in here.

Comment on lines 118 to +123
/// Allocate rows based on a hash of one of more expressions and the specified number of
/// partitions
Hash(Vec<Arc<dyn PhysicalExpr>>, usize),
/// Partitions that are already organized by disjoint key values for the provided expressions.
/// Rows that have the same values for these expressions are guaranteed to be in the same partition.
KeyPartitioned(Vec<Arc<dyn PhysicalExpr>>, usize),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My impression over all is that KeyPartitioned should not be adding anything that is not already representable with Hash. I was planning on doing a longer reasoning on this, but @fmonjalet is right on point in his comment here #18919 (comment), so I'd just +1 his comment, grab some 🍿, and see what comes out of it.

@gene-bordegaray
Copy link
Contributor Author

gene-bordegaray commented Nov 28, 2025

  • Anything KeyPartitioned is Hash partitioned (but the opposite is not true) ==> is this correct?

Yes, key partitioning guarantees that each distinct value of the key is fully contained within a single partition which is pretty much a stronger hash partitioning. Another thing to note is that key partitioning can only root from the file scan level as of now compared to hash which of course has a repartitioning operator.

  • KeyPartitioned means each key is in a distinct partition ==> is this correct?

Yes, I believe you have the right idea but to be sure, KeyPartitioned, in theory allows, multiple keys in a single partition as long as they are fully contained.

  • If the above is correct (if it's not, my reasoning does not hold and you can ignore the rest of this comment), I am not sure how this applies to high cardinality keys, for example date_bin(timestamp, 15m) or id hash ranges (say you have a million files, each one having a distinct range). I imagine we'd want to be able to group multiple "keys" into the same processing partition, to avoid having thousands of partitions. My understanding is that DataFusion partitions will add overhead if there are too many (subsequent repartitions, coalesce, merge sort), but I may be mistaken.

Yes, this is a noted limit to the original design. I added the comment: "best with moderate partition counts (10-100 partitions)." in the config. This is rooting from splitting distinct keys into their own partitions as of now. I did this to keep the first iteration relatively simple as the PR is large. In a follow up issue, some gerat work would be to merge group to target_partitions when size allows. This would still have keys fully contained within each partition but allow for higher cardinality.

  • Once we group KeyPartitioned partitions together, they become Hash partitions. ==> is this correct?

It depends what "group" means. If we simply merge key partitioned data into a single partition, no, this is still key partitioned as each key is still fully in one place. If we are repartitioning or shuffling data, we lose key partitioning and fallback to hash

  • So in practice, it appears to me that we'll almost always need to resort to Hash partitions.

For this first PR, yes, but I think this isn't a one PR fix all scenario. I think this comes down to how intentional the user is. Yes, key partitioned data is rarer than say hash, but it is powerful enough for people to consider it. The use cases will also increse as follow up issues are resolved: higher cardinality, propagation through joins, etc.

  • What we'd loose compared to KeyPartition is the SortExec elision when aggregating then sorting by the partition key, but I'd argue that if you had one group per partition, then probably the sorting is cheap enough. ==> Do we lose something else?
    (This point is not challenging the PR as a whole but just an implementation choice.)
    We would also lose rpartitioning between partial and final aggregations which is the main overhead we are trying to avoid:

BEFORE: DataSourceExec -> Aggregate Partial (gby: a) -> Repartition Hash(a) -> Aggregate Final (gby: a)
AFTER: DataSourceExec -> Aggregate FinalPartitioned (gby: a)

In some cases we also eliminate bottlenecks due to SPMs between aggregations:
BEFORE: DataSourceExec -> Aggregate Partial (gby: a) -> SPM -> Aggregate Final (gby: a) - single-threaded!
AFTER: DataSourceExec -> Aggregate FinalPartitioned (gby: a) -> SPM

So my current understanding is: KeyPartitioned is indeed different from Hash (a specific case carrying more information) but the ratio complexity / added value is not obvious. The reason we'd not take full advantage of KeyPartitioned may be that DF partitions are actually bound to processing units (~threads), and maybe there would be value in separating the notion of processing thread and the notion of data partition, where you could have N processing unit per partitions (with partial repartitions), or N partitions per thread. But this sounds like a completely different topic and I don't know how much it makes sense.

I am in favor of keeping Hash and KeyPartitioned separate as I see them as two distinct methods of partitioning. I also don't knowif adding more information into Hash partitioning will eliminate cimplexity and raher just cause more indirection. I do like the idea of merging file groups for higher cardinality as this was my main concern with this v1 (as noted in the comments) but chose to refrain due to complexity.

Sorry for the wall of text, I am mostly trying to wrap my head around this, please correct anything I missed in here.

Do not apologize, this is a lot of the internal debates I was / am having and am glad to talk about the trade offs. Let me know what you think 😄

CC: @gabotechs

@gene-bordegaray
Copy link
Contributor Author

I suggest, that this PR remain the limited scope, not meant for high cardinality queries. This was my motivation for having this option set to false by default. Then submit follow up issues to address grouping files into partitions to help with higher cardinality. I just do not want to introduce to many things in this PR and adding this seems like another substantial PR in itself.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

catalog Related to the catalog crate common Related to common crate core Core DataFusion crate datasource Changes to the datasource crate documentation Improvements or additions to documentation optimizer Optimizer rules physical-expr Changes to the physical-expr crates physical-plan Changes to the physical-plan crate proto Related to proto crate sqllogictest SQL Logic Tests (.slt)

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Enable Parallel Aggregation for Non-Overlapping Partitioned Data

3 participants