Skip to content

hash agg spill #22087

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft

hash agg spill #22087

wants to merge 1 commit into from

Conversation

reusee
Copy link
Contributor

@reusee reusee commented Jul 2, 2025

User description

What type of PR is this?

  • API-change
  • BUG
  • Improvement
  • Documentation
  • Feature
  • Test and CI
  • Code Refactoring

Which issue(s) this PR fixes:

issue #3433

What this PR does / why we need it:

add spill for hash aggregation


PR Type

Enhancement


Description

  • Add spill functionality for hash aggregation operations

  • Implement memory tracking and threshold-based spilling

  • Add serialization support for hash tables

  • Create spiller infrastructure for disk-based operations


Changes diagram

flowchart LR
  A["Memory Usage Monitor"] --> B["Spill Threshold Check"]
  B --> C["Serialize Hash Tables"]
  C --> D["Write to Disk Files"]
  D --> E["Clean Memory State"]
  F["Recall Process"] --> G["Read from Disk"]
  G --> H["Deserialize State"]
  H --> I["Merge Results"]
Loading

Changes walkthrough 📝

Relevant files
Tests
1 files
hash_test.go
Add marshal/unmarshal tests for hash tables                           
+270/-0 
Enhancement
18 files
int64_hash_map.go
Implement MarshalBinary/UnmarshalBinary for Int64HashMap 
+74/-0   
string_hash_map.go
Implement MarshalBinary/UnmarshalBinary for StringHashMap
+78/-0   
aggContext.go
Add Size method to AggContext                                                       
+4/-0     
approx_count.go
Add Size method to approx count executors                               
+20/-0   
concat.go
Add Size method to group concat executor                                 
+4/-0     
count.go
Add Size method to count executors                                             
+8/-0     
distinct.go
Add Size method to distinct hash                                                 
+10/-0   
fromBytesRetBytes.go
Add Size method to bytes-to-bytes aggregator                         
+4/-0     
fromBytesRetFixed.go
Add Size method to bytes-to-fixed aggregator                         
+4/-0     
fromFixedRetBytes.go
Add Size method to fixed-to-bytes aggregator                         
+4/-0     
fromFixedRetFixed.go
Add Size method to fixed-to-fixed aggregator                         
+4/-0     
median.go
Add Size method to median executor                                             
+10/-0   
result.go
Add Size method to result structures                                         
+11/-0   
types.go
Add Size method to AggFuncExec interface                                 
+2/-0     
window.go
Add Size method to window executor                                             
+9/-0     
exec.go
Add memory tracking and spill triggering logic                     
+60/-0   
spill.go
Create new spiller infrastructure for disk operations       
+198/-0 
types.go
Add spiller fields and initialization to container             
+29/-0   

Need help?
  • Type /help how to ... in the comments thread for any questions about Qodo Merge usage.
  • Check out the documentation for more information.
  • Copy link

    qodo-merge-pro bot commented Jul 2, 2025

    PR Reviewer Guide 🔍

    Here are some key observations to aid the review process:

    🎫 Ticket compliance analysis 🔶

    3433 - Partially compliant

    Compliant requirements:

    • Hash aggregation memory tracking implemented via Size() methods
    • Spill functionality added with threshold-based triggering
    • Serialization support added for hash tables (MarshalBinary/UnmarshalBinary)

    Non-compliant requirements:

    • Hash join spill functionality not implemented (only hash aggregation)

    Requires further human verification:

    • Memory threshold calculation logic (proc.Mp().Cap() / 2)
    • Actual spill/recall implementation completeness (TODO comments present)
    • Performance impact of serialization overhead

    ⏱️ Estimated effort to review: 4 🔵🔵🔵🔵⚪
    🧪 PR contains tests
    🔒 No security concerns identified
    ⚡ Recommended focus areas for review

    Incomplete Implementation

    The spillCurrentState and recallAndMergeSpilledData functions contain only TODO comments without actual implementation, making the spill functionality non-functional.

    func (group *Group) spillCurrentState(proc *process.Process) error {
    	// TODO: Implement actual spilling logic here.
    	return nil
    }
    
    func (group *Group) recallAndMergeSpilledData(proc *process.Process) error {
    	// TODO: Implement actual recall and merge logic here.
    	return nil
    }
    Null Pointer Risk

    The freeCannotReuse function calls group.ctr.spiller.clean() without checking if spiller is nil, which could cause a panic.

    group.ctr.spiller.clean()
    group.ctr.spilled = false
    Incomplete Size Calculation

    The Size() method contains commented-out code and TODO comments, returning incomplete memory usage calculations that could affect spill threshold decisions.

    func (exec *approxCountFixedExec[T]) Size() int64 {
    	var size int64
    	for _, s := range exec.groups {
    		//TODO how?
    		_ = s
    		//size += int64(s.Size())
    	}
    	return exec.ret.Size() + size
    }

    Copy link

    qodo-merge-pro bot commented Jul 2, 2025

    PR Code Suggestions ✨

    Explore these optional code suggestions:

    CategorySuggestion                                                                                                                                    Impact
    Possible issue
    Add nil check before method call
    Suggestion Impact:The suggestion was directly implemented in the commit. The nil check was added before calling group.ctr.spiller.clean() in the freeCannotReuse method exactly as suggested.

    code diff:

    +	if group.ctr.spiller != nil {
    +		group.ctr.spiller.clean()
    +	}

    Add nil check before calling clean() method on spiller to prevent potential
    panic. The spiller might be nil if initialization failed or was never called.

    pkg/sql/colexec/group/types.go [205-212]

     func (group *Group) freeCannotReuse(mp *mpool.MPool) {
     	group.ctr.hr.Free0()
     	group.ctr.result1.Free0(mp)
     	group.ctr.result2.Free0(mp)
    -	group.ctr.spiller.clean()
    +	if group.ctr.spiller != nil {
    +		group.ctr.spiller.clean()
    +	}
     	group.ctr.spilled = false
     	group.ctr.spiller = nil
     }

    [To ensure code accuracy, apply this suggestion manually]

    Suggestion importance[1-10]: 9

    __

    Why: The suggestion correctly identifies a potential nil pointer dereference on group.ctr.spiller, which would cause a panic if initSpiller fails and freeCannotReuse is subsequently called.

    High
    Validate buffer size before reading

    Add validation to ensure the buffer has sufficient data before reading. The
    Next() method could return less than 8 bytes if the buffer is too small, leading
    to incorrect deserialization.

    pkg/container/hashtable/int64_hash_map.go [368-376]

     func (ht *Int64HashMap) UnmarshalBinary(data []byte, allocator malloc.Allocator) error {
    +	if len(data) < 40 { // 5 * 8 bytes for basic metadata
    +		return moerr.NewInternalErrorNoCtx("insufficient data for unmarshaling")
    +	}
     	r := bytes.NewBuffer(data)
     
     	// Read basic metadata
     	ht.elemCnt = types.DecodeUint64(r.Next(8))
     	ht.cellCnt = types.DecodeUint64(r.Next(8))
     	ht.blockCellCnt = types.DecodeUint64(r.Next(8))
     	ht.blockMaxElemCnt = types.DecodeUint64(r.Next(8))
     	ht.cellCntMask = types.DecodeUint64(r.Next(8))
     	...
     }

    [To ensure code accuracy, apply this suggestion manually]

    Suggestion importance[1-10]: 8

    __

    Why: The suggestion correctly points out that r.Next(8) can cause a panic if the input data slice is too short, and adding a length check prevents this potential crash.

    Medium
    • Update

    @reusee reusee marked this pull request as draft July 15, 2025 11:43
    Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
    Labels
    kind/enhancement Review effort 4/5 size/XL Denotes a PR that changes [1000, 1999] lines
    Projects
    None yet
    Development

    Successfully merging this pull request may close these issues.

    3 participants