Skip to content

Add EmitTo::FirstBlock for grouped aggregate emission#23274

Draft
hhhizzz wants to merge 2 commits into
apache:mainfrom
hhhizzz:codex/issue23249-firstblock-main-merge-partial-materialize
Draft

Add EmitTo::FirstBlock for grouped aggregate emission#23274
hhhizzz wants to merge 2 commits into
apache:mainfrom
hhhizzz:codex/issue23249-firstblock-main-merge-partial-materialize

Conversation

@hhhizzz

@hhhizzz hhhizzz commented Jul 1, 2026

Copy link
Copy Markdown
Contributor

Which issue does this PR close?

Rationale for this change

This PR narrows the scope to the smallest first step for the longer-term aggregate output work: adding an explicit API variant for block-bounded grouped aggregate emission.

The larger implementation and performance work for #23249 can be discussed and reviewed separately. This PR only introduces the API shape so reviewers can first agree on the direction.

What changes are included in this PR?

  • Add EmitTo::FirstBlock(usize).
  • Document that existing implementations may initially handle it like EmitTo::First(usize).
  • Update existing exhaustive EmitTo matches so the new variant is handled safely.
  • Keep current behavior unchanged by falling back to First(n) semantics.
  • Keep the existing FFI ABI by mapping FirstBlock(n) to FFI_EmitTo::First(n).
  • Add a small FFI test for the fallback behavior.

This PR does not yet make hash aggregate output use FirstBlock. That will be follow-up work.

Are these changes tested?

I ran the targeted checks for the affected crates:

cargo check -p datafusion-ffi -p datafusion-functions-aggregate-common -p datafusion-functions-aggregate -p datafusion-physical-plan --all-targets
cargo fmt --all -- --check
cargo test -p datafusion-ffi --lib --tests --features integration-tests -- --nocapture
cargo test -p datafusion-functions-aggregate-common --lib -- --nocapture
cargo test -p datafusion-functions-aggregate --lib -- --nocapture
cargo test -p datafusion-physical-plan group_values -- --nocapture

I also fixed one nondeterministic sqllogictest expectation found by CI.

Are there any user-facing changes?

Yes. This intentionally adds a new public EmitTo::FirstBlock enum variant, so this PR is an API change.

@github-actions github-actions Bot added logical-expr Logical plan and expressions functions Changes to functions implementation physical-plan Changes to the physical-plan crate labels Jul 1, 2026
@github-actions

github-actions Bot commented Jul 1, 2026

Copy link
Copy Markdown

Thank you for opening this pull request!

Reviewer note: cargo-semver-checks reported the current version number is not SemVer-compatible with the changes in this pull request (compared against the base branch).

Details
     Cloning apache/main
    Building datafusion-expr-common v54.0.0 (current)
       Built [  25.489s] (current)
     Parsing datafusion-expr-common v54.0.0 (current)
      Parsed [   0.018s] (current)
    Building datafusion-expr-common v54.0.0 (baseline)
       Built [  19.299s] (baseline)
     Parsing datafusion-expr-common v54.0.0 (baseline)
      Parsed [   0.019s] (baseline)
    Checking datafusion-expr-common v54.0.0 -> v54.0.0 (no change; assume patch)
     Checked [   0.206s] 223 checks: 222 pass, 1 fail, 0 warn, 30 skip

--- failure enum_variant_added: enum variant added on exhaustive enum ---

Description:
A publicly-visible enum without #[non_exhaustive] has a new variant.
        ref: https://doc.rust-lang.org/cargo/reference/semver.html#enum-variant-new
       impl: https://github.com/obi1kenobi/cargo-semver-checks/tree/v0.48.0/src/lints/enum_variant_added.ron

Failed in:
  variant EmitTo:FirstBlock in /home/runner/work/datafusion/datafusion/datafusion/expr-common/src/groups_accumulator.rs:39

     Summary semver requires new major version: 1 major and 0 minor checks failed
    Finished [  45.776s] datafusion-expr-common
    Building datafusion-ffi v54.0.0 (current)
       Built [  59.471s] (current)
     Parsing datafusion-ffi v54.0.0 (current)
      Parsed [   0.060s] (current)
    Building datafusion-ffi v54.0.0 (baseline)
       Built [  59.854s] (baseline)
     Parsing datafusion-ffi v54.0.0 (baseline)
      Parsed [   0.061s] (baseline)
    Checking datafusion-ffi v54.0.0 -> v54.0.0 (no change; assume patch)
     Checked [   0.242s] 223 checks: 223 pass, 30 skip
     Summary no semver update required
    Finished [ 121.008s] datafusion-ffi
    Building datafusion-functions-aggregate v54.0.0 (current)
       Built [  31.698s] (current)
     Parsing datafusion-functions-aggregate v54.0.0 (current)
      Parsed [   0.045s] (current)
    Building datafusion-functions-aggregate v54.0.0 (baseline)
       Built [  32.785s] (baseline)
     Parsing datafusion-functions-aggregate v54.0.0 (baseline)
      Parsed [   0.048s] (baseline)
    Checking datafusion-functions-aggregate v54.0.0 -> v54.0.0 (no change; assume patch)
     Checked [   0.203s] 223 checks: 223 pass, 30 skip
     Summary no semver update required
    Finished [  65.879s] datafusion-functions-aggregate
    Building datafusion-functions-aggregate-common v54.0.0 (current)
       Built [  21.949s] (current)
     Parsing datafusion-functions-aggregate-common v54.0.0 (current)
      Parsed [   0.020s] (current)
    Building datafusion-functions-aggregate-common v54.0.0 (baseline)
       Built [  21.568s] (baseline)
     Parsing datafusion-functions-aggregate-common v54.0.0 (baseline)
      Parsed [   0.019s] (baseline)
    Checking datafusion-functions-aggregate-common v54.0.0 -> v54.0.0 (no change; assume patch)
     Checked [   0.144s] 223 checks: 223 pass, 30 skip
     Summary no semver update required
    Finished [  44.777s] datafusion-functions-aggregate-common
    Building datafusion-physical-plan v54.0.0 (current)
       Built [  37.465s] (current)
     Parsing datafusion-physical-plan v54.0.0 (current)
      Parsed [   0.132s] (current)
    Building datafusion-physical-plan v54.0.0 (baseline)
       Built [  38.641s] (baseline)
     Parsing datafusion-physical-plan v54.0.0 (baseline)
      Parsed [   0.133s] (baseline)
    Checking datafusion-physical-plan v54.0.0 -> v54.0.0 (no change; assume patch)
     Checked [   0.580s] 223 checks: 223 pass, 30 skip
     Summary no semver update required
    Finished [  78.155s] datafusion-physical-plan
    Building datafusion-sqllogictest v54.0.0 (current)
       Built [ 185.204s] (current)
     Parsing datafusion-sqllogictest v54.0.0 (current)
      Parsed [   0.021s] (current)
    Building datafusion-sqllogictest v54.0.0 (baseline)
       Built [ 178.482s] (baseline)
     Parsing datafusion-sqllogictest v54.0.0 (baseline)
      Parsed [   0.022s] (baseline)
    Checking datafusion-sqllogictest v54.0.0 -> v54.0.0 (no change; assume patch)
     Checked [   0.091s] 223 checks: 223 pass, 30 skip
     Summary no semver update required
    Finished [ 366.742s] datafusion-sqllogictest

@github-actions github-actions Bot added auto detected api change Auto detected API change ffi Changes to the ffi crate labels Jul 1, 2026
@hhhizzz hhhizzz force-pushed the codex/issue23249-firstblock-main-merge-partial-materialize branch from 8b472b2 to 3df1619 Compare July 1, 2026 07:12
@hhhizzz hhhizzz changed the title [WIP] Bound final hash aggregate output with FirstBlock [WIP] Add FirstBlock emit mode for grouped aggregates Jul 1, 2026
@github-actions github-actions Bot added the sqllogictest SQL Logic Tests (.slt) label Jul 1, 2026
@hhhizzz hhhizzz changed the title [WIP] Add FirstBlock emit mode for grouped aggregates Add EmitTo::FirstBlock for grouped aggregate emission Jul 1, 2026
@hhhizzz hhhizzz marked this pull request as ready for review July 1, 2026 10:31
@2010YOUY01

Copy link
Copy Markdown
Contributor

I think this design should be implemented as the first PR for #7065 🤔 . It does not seem possible to introduce an intermediate step like this PR that only splits out the EmitTo part, since it is tightly coupled with the execution logic and the accumulator physical layout.

I agree that the initial blocked-state PR may become large, so we will need to figure out how to arrange and split the work better. For coordination, the initial blocked-state PR would depend on #22710 being finished first.

@hhhizzz

hhhizzz commented Jul 1, 2026

Copy link
Copy Markdown
Contributor Author

I think this design should be implemented as the first PR for #7065 🤔 . It does not seem possible to introduce an intermediate step like this PR that only splits out the EmitTo part, since it is tightly coupled with the execution logic and the accumulator physical layout.

I agree that the initial blocked-state PR may become large, so we will need to figure out how to arrange and split the work better. For coordination, the initial blocked-state PR would depend on #22710 being finished first.

Thanks for the guidance. That makes sense to me.

I was trying to split the work into the smallest possible first step, but I agree that if EmitTo::FirstBlock is too tightly coupled with the execution logic and accumulator layout, introducing it as an API-only PR may be premature.

I will pause this PR for now and follow the blocked-state / #7065 direction instead. I can keep the current API experiment and benchmark results as reference material, and revisit the API shape when the initial blocked-state implementation is ready after the #22710 stream-splitting work.

Thanks again for helping clarify the right sequencing.

@hhhizzz hhhizzz marked this pull request as draft July 1, 2026 11:03
@Rachelint

Copy link
Copy Markdown
Contributor

@2010YOUY01 do you think, it's stable enough to push forward #15591 again?
I think maybe should wait all exist features were ported?

@Rachelint

Copy link
Copy Markdown
Contributor

@hhhizzz I reviewed the sorted path, I think emit first block may can't satisfy its demand.
As I see maybe we should design specific group values for sorted.
It can also make sorted aggr faster in theory.

@hhhizzz

hhhizzz commented Jul 2, 2026

Copy link
Copy Markdown
Contributor Author

@hhhizzz I reviewed the sorted path, I think emit first block may can't satisfy its demand. As I see maybe we should design specific group values for sorted. It can also make sorted aggr faster in theory.

Thanks for checking the sorted path.

I’ll keep this PR as an experiment/reference, and focus the follow-up investigation on the blocked-state direction for hash aggregation, while treating sorted aggregation as a possibly separate design with sorted-specific GroupValues.

@2010YOUY01

Copy link
Copy Markdown
Contributor

@2010YOUY01 do you think, it's stable enough to push forward #15591 again? I think maybe should wait all exist features were ported?

For merging, I don't think it's ready yet. We should wait until the refactoring EPIC is closed first. The goal is to ship the whole refactor soon to reduce disruption; otherwise, we may run into even more development conflicts.

But I think we can still review and discuss the plan for now, since that part shouldn’t depend on the refactor. We may be able to agree on the blocked-state implementation design and plan sooner, so once the refactor is complete, the remaining work would be to resolve small conflicts and merge soon after.

@2010YOUY01

2010YOUY01 commented Jul 2, 2026

Copy link
Copy Markdown
Contributor

@hhhizzz I reviewed the sorted path, I think emit first block may can't satisfy its demand. As I see maybe we should design specific group values for sorted. It can also make sorted aggr faster in theory.

An alternative is also remove EmitTo::First(k) in GroupOrdering, and only support EmitTo::FirstBlock

Conceptually, GroupOrdering, GroupValues, GroupsAccumulator are all chunked and aligned with block size, this block size must be configured during initialization.

If we set it to 100, and at some point there are 280 groups accumulated so far, the internal layout is:

gorup_values: vec(100), vec(100), vec(100)
accumulator:   vec(100), vec(100), vec(100)

And GroupOrdering is aware it should only emit at the block size granularity.

  • If there are 120 finished groups so far, group_ordering.emit_to() returns EmitTo::FirstBlock, so it's safe to remove and output first block (vec of length 100) from all aligned GroupValues/GroupsAccumulators
  • If there are 80 finished groups, group_ordering.emit_to() returns None, indicating we can't early emit since the internal storage is block aligned.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

auto detected api change Auto detected API change ffi Changes to the ffi crate functions Changes to functions implementation logical-expr Logical plan and expressions physical-plan Changes to the physical-plan crate sqllogictest SQL Logic Tests (.slt)

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants