Skip to content

hash agg spill #22087

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions pkg/sql/colexec/aggexec/aggContext.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,10 @@ func (a *AggContext) decodeGroupContexts(encodings [][]byte, resultType types.Ty
}
}

func (a *AggContext) Size() int64 {
return 0
}

// AggCommonExecContext stores the common context for all the groups.
// like the type scale, timezone and so on.
type AggCommonExecContext interface {
Expand Down
20 changes: 20 additions & 0 deletions pkg/sql/colexec/aggexec/approx_count.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,16 @@ func (exec *approxCountFixedExec[T]) Free() {
exec.groups = nil
}

func (exec *approxCountFixedExec[T]) Size() int64 {
var size int64
for _, s := range exec.groups {
//TODO how?
_ = s
//size += int64(s.Size())
}
return exec.ret.Size() + size
}

func (exec *approxCountVarExec) GroupGrow(more int) error {
oldLen, newLen := len(exec.groups), len(exec.groups)+more
if cap(exec.groups) >= newLen {
Expand Down Expand Up @@ -465,3 +475,13 @@ func (exec *approxCountVarExec) Free() {
exec.ret.free()
exec.groups = nil
}

func (exec *approxCountVarExec) Size() int64 {
var size int64
for _, s := range exec.groups {
//TODO how?
_ = s
//size += int64(s.Size())
}
return exec.ret.Size() + size
}
4 changes: 4 additions & 0 deletions pkg/sql/colexec/aggexec/concat.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,10 @@ func (exec *groupConcatExec) Free() {
exec.ret.free()
}

func (exec *groupConcatExec) Size() int64 {
return exec.ret.Size() + exec.distinctHash.Size() + int64(cap(exec.separator))
}

var GroupConcatUnsupportedTypes = []types.T{
types.T_tuple,
}
Expand Down
8 changes: 8 additions & 0 deletions pkg/sql/colexec/aggexec/count.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,10 @@ func (exec *countColumnExec) Free() {
exec.distinctHash.free()
}

func (exec *countColumnExec) Size() int64 {
return exec.ret.Size() + exec.distinctHash.Size()
}

type countStarExec struct {
singleAggInfo
singleAggExecExtraInformation
Expand Down Expand Up @@ -359,3 +363,7 @@ func (exec *countStarExec) Flush() ([]*vector.Vector, error) {
func (exec *countStarExec) Free() {
exec.ret.free()
}

func (exec *countStarExec) Size() int64 {
return exec.ret.Size()
}
10 changes: 10 additions & 0 deletions pkg/sql/colexec/aggexec/distinct.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,3 +153,13 @@ func (d *distinctHash) free() {
}
}
}

func (d *distinctHash) Size() int64 {
var size int64
for _, m := range d.maps {
if m != nil {
size += m.Size()
}
}
return size
}
4 changes: 4 additions & 0 deletions pkg/sql/colexec/aggexec/fromBytesRetBytes.go
Original file line number Diff line number Diff line change
Expand Up @@ -549,3 +549,7 @@ func (exec *aggregatorFromBytesToBytes) Free() {
exec.ret.free()
exec.distinctHash.free()
}

func (exec *aggregatorFromBytesToBytes) Size() int64 {
return exec.ret.Size() + exec.distinctHash.Size() + exec.execContext.Size()
}
4 changes: 4 additions & 0 deletions pkg/sql/colexec/aggexec/fromBytesRetFixed.go
Original file line number Diff line number Diff line change
Expand Up @@ -647,3 +647,7 @@ func (exec *aggregatorFromBytesToFixed[to]) Free() {
exec.ret.free()
exec.distinctHash.free()
}

func (exec *aggregatorFromBytesToFixed[to]) Size() int64 {
return exec.ret.Size() + exec.distinctHash.Size() + exec.execContext.Size()
}
4 changes: 4 additions & 0 deletions pkg/sql/colexec/aggexec/fromFixedRetBytes.go
Original file line number Diff line number Diff line change
Expand Up @@ -666,3 +666,7 @@ func (exec *aggregatorFromFixedToBytes[from]) Free() {
exec.ret.free()
exec.distinctHash.free()
}

func (exec *aggregatorFromFixedToBytes[from]) Size() int64 {
return exec.ret.Size() + exec.distinctHash.Size() + exec.execContext.Size()
}
4 changes: 4 additions & 0 deletions pkg/sql/colexec/aggexec/fromFixedRetFixed.go
Original file line number Diff line number Diff line change
Expand Up @@ -721,3 +721,7 @@ func (exec *aggregatorFromFixedToFixed[from, to]) Free() {
exec.ret.free()
exec.distinctHash.free()
}

func (exec *aggregatorFromFixedToFixed[from, to]) Size() int64 {
return exec.ret.Size() + exec.distinctHash.Size() + exec.execContext.Size()
}
10 changes: 10 additions & 0 deletions pkg/sql/colexec/aggexec/median.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,16 @@ func (exec *medianColumnExecSelf[T, R]) Free() {
exec.distinctHash.free()
}

func (exec *medianColumnExecSelf[T, R]) Size() int64 {
var size int64
for _, v := range exec.groups {
if v != nil {
size += int64(v.Length()) * int64(exec.singleAggInfo.argType.TypeSize())
}
}
return exec.ret.Size() + exec.distinctHash.Size() + size
}

type medianColumnNumericExec[T numeric] struct {
medianColumnExecSelf[T, float64]
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/colexec/aggexec/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func RegisterCountColumnAgg(id int64) {

func RegisterCountStarAgg(id int64) {
specialAgg[id] = true
aggIdOfCountStar = id
AggIdOfCountStar = id
}

func RegisterGroupConcatAgg(id int64, sep string) {
Expand Down Expand Up @@ -90,7 +90,7 @@ var (

// list of special aggregation function IDs.
aggIdOfCountColumn = int64(-1)
aggIdOfCountStar = int64(-2)
AggIdOfCountStar = int64(-2)
aggIdOfGroupConcat = int64(-3)
aggIdOfApproxCount = int64(-4)
aggIdOfMedian = int64(-5)
Expand Down
11 changes: 11 additions & 0 deletions pkg/sql/colexec/aggexec/result.go
Original file line number Diff line number Diff line change
Expand Up @@ -546,6 +546,17 @@ func (r *optSplitResult) free() {
r.emptyList = nil
}

func (r *optSplitResult) Size() int64 {
var size int64
for _, v := range r.resultList {
if v != nil {
size += int64(v.Allocated())
}
}
size += int64(len(r.emptyList) * int(types.T_bool.ToType().TypeSize()))
return size
}

func setValueFromX1Y1ToX2Y2[T types.FixedSizeTExceptStrType](
src [][]T, x1, y1 int, x2, y2 int, value T) {

Expand Down
10 changes: 7 additions & 3 deletions pkg/sql/colexec/aggexec/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ type AggFuncExec interface {
// Flush return the aggregation result.
Flush() ([]*vector.Vector, error)

Size() int64

// Free clean the resource and reuse the aggregation if possible.
Free()
}
Expand Down Expand Up @@ -150,7 +152,8 @@ func (m SimpleAggMemoryManager) Mp() *mpool.MPool {
func MakeAgg(
mg AggMemoryManager,
aggID int64, isDistinct bool,
param ...types.Type) (AggFuncExec, error) {
param ...types.Type,
) (AggFuncExec, error) {
exec, ok, err := makeSpecialAggExec(mg, aggID, isDistinct, param...)
if err != nil {
return nil, err
Expand Down Expand Up @@ -214,12 +217,13 @@ func makeSingleAgg(

func makeSpecialAggExec(
mg AggMemoryManager,
id int64, isDistinct bool, params ...types.Type) (AggFuncExec, bool, error) {
id int64, isDistinct bool, params ...types.Type,
) (AggFuncExec, bool, error) {
if _, ok := specialAgg[id]; ok {
switch id {
case aggIdOfCountColumn:
return makeCount(mg, false, id, isDistinct, params[0]), true, nil
case aggIdOfCountStar:
case AggIdOfCountStar:
return makeCount(mg, true, id, isDistinct, params[0]), true, nil
case aggIdOfMedian:
exec, err := makeMedian(mg, id, isDistinct, params[0])
Expand Down
9 changes: 9 additions & 0 deletions pkg/sql/colexec/aggexec/window.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,15 @@ func (exec *singleWindowExec) Free() {
exec.ret.free()
}

func (exec *singleWindowExec) Size() int64 {
var size int64
size += exec.ret.Size()
for _, group := range exec.groups {
size += int64(cap(group)) * int64(types.T_int64.ToType().TypeSize())
}
return size
}

func (exec *singleWindowExec) flushRank() ([]*vector.Vector, error) {
values := exec.ret.values

Expand Down
Loading
Loading