diff --git a/conn_process.go b/conn_process.go index ca452a71f3..9d8c6f127d 100644 --- a/conn_process.go +++ b/conn_process.go @@ -32,6 +32,8 @@ type onProcess struct { progress func(*Progress) profileInfo func(*ProfileInfo) profileEvents func([]ProfileEvent) + gotData func() + endOfProcess func() } func (c *connect) firstBlock(ctx context.Context, on *onProcess) (*proto.Block, error) { @@ -143,6 +145,9 @@ func (c *connect) process(ctx context.Context, on *onProcess) error { func (c *connect) processImpl(ctx context.Context, on *onProcess) error { c.readerMutex.Lock() defer c.readerMutex.Unlock() + if on.endOfProcess != nil { + defer on.endOfProcess() + } for { if c.reader == nil { @@ -178,6 +183,9 @@ func (c *connect) handle(ctx context.Context, packet byte, on *onProcess) error } if block.Rows() != 0 && on.data != nil { on.data(block) + if on.gotData != nil { + on.gotData() + } } case proto.ServerException: return c.exception() diff --git a/context.go b/context.go index a322e1aa10..b542574734 100644 --- a/context.go +++ b/context.go @@ -22,8 +22,9 @@ import ( "maps" "time" - "github.com/ClickHouse/clickhouse-go/v2/ext" "go.opentelemetry.io/otel/trace" + + "github.com/ClickHouse/clickhouse-go/v2/ext" ) var _contextOptionKey = &QueryOptions{ @@ -59,6 +60,8 @@ type ( progress func(*Progress) profileInfo func(*ProfileInfo) profileEvents func([]ProfileEvent) + gotData func() + endOfProcess func() } settings Settings parameters Parameters @@ -119,6 +122,8 @@ func WithParameters(params Parameters) QueryOption { } } +// WithLogs sets a callback function to handle log events during query execution. +// The provided function `fn` will be called synchronous in the handler goroutine so must not block. func WithLogs(fn func(*Log)) QueryOption { return func(o *QueryOptions) error { o.events.logs = fn @@ -126,6 +131,8 @@ func WithLogs(fn func(*Log)) QueryOption { } } +// WithProgress sets a callback function to handle progress events during query execution. +// The provided function `fn` will be called synchronous in the handler goroutine so must not block. func WithProgress(fn func(*Progress)) QueryOption { return func(o *QueryOptions) error { o.events.progress = fn @@ -133,6 +140,8 @@ func WithProgress(fn func(*Progress)) QueryOption { } } +// WithProfileInfo sets a callback function to handle profile information events during query execution. +// The provided function `fn` will be called synchronous in the handler goroutine so must not block. func WithProfileInfo(fn func(*ProfileInfo)) QueryOption { return func(o *QueryOptions) error { o.events.profileInfo = fn @@ -140,6 +149,8 @@ func WithProfileInfo(fn func(*ProfileInfo)) QueryOption { } } +// WithProfileEvents sets a callback function to handle ClickHouse Profile Events during query execution. +// The provided function `fn` will be called synchronous in the handler goroutine so must not block. func WithProfileEvents(fn func([]ProfileEvent)) QueryOption { return func(o *QueryOptions) error { o.events.profileEvents = fn @@ -147,6 +158,24 @@ func WithProfileEvents(fn func([]ProfileEvent)) QueryOption { } } +// WithGotData sets a callback function to be executed when data is received during query execution. +// The provided function `fn` will be called synchronous in the handler goroutine so must not block. +func WithGotData(fn func()) QueryOption { + return func(o *QueryOptions) error { + o.events.gotData = fn + return nil + } +} + +// WithEndOfProcess sets a callback function to be executed at the end of the query execution process. +// The provided function `fn` will be called synchronous in the handler goroutine so must not block. +func WithEndOfProcess(fn func()) QueryOption { + return func(o *QueryOptions) error { + o.events.endOfProcess = fn + return nil + } +} + func WithExternalTable(t ...*ext.Table) QueryOption { return func(o *QueryOptions) error { o.external = append(o.external, t...) @@ -273,6 +302,16 @@ func (q *QueryOptions) onProcess() *onProcess { q.events.profileEvents(events) } }, + gotData: func() { + if q.events.gotData != nil { + q.events.gotData() + } + }, + endOfProcess: func() { + if q.events.endOfProcess != nil { + q.events.endOfProcess() + } + }, } } diff --git a/examples/clickhouse_api/end_of_process.go b/examples/clickhouse_api/end_of_process.go new file mode 100644 index 0000000000..5d91562ed2 --- /dev/null +++ b/examples/clickhouse_api/end_of_process.go @@ -0,0 +1,53 @@ +// Licensed to ClickHouse, Inc. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. ClickHouse, Inc. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package clickhouse_api + +import ( + "context" + "fmt" + + "github.com/ClickHouse/clickhouse-go/v2" +) + +func EndOfProcessAndGotData() error { + conn, err := GetNativeConnection(clickhouse.Settings{ + "send_logs_level": "trace", + }, nil, nil) + if err != nil { + return err + } + var totalBlocks int + // use context to pass a call back for end of process and got data + ctx := clickhouse.Context(context.Background(), clickhouse.WithEndOfProcess(func() { + fmt.Println("process is finished") + }), clickhouse.WithGotData(func() { + totalBlocks++ + })) + + rows, err := conn.Query(ctx, "SELECT number from numbers(1000000) LIMIT 1000000") + if err != nil { + return err + } + defer rows.Close() + + for rows.Next() { + } + + fmt.Printf("Total data blocks: %d\n", totalBlocks) + return rows.Err() +} diff --git a/examples/clickhouse_api/main_test.go b/examples/clickhouse_api/main_test.go index 375c867bd1..f3b373a9be 100644 --- a/examples/clickhouse_api/main_test.go +++ b/examples/clickhouse_api/main_test.go @@ -20,11 +20,13 @@ package clickhouse_api import ( "context" "fmt" - clickhouse_tests "github.com/ClickHouse/clickhouse-go/v2/tests" - "github.com/stretchr/testify/require" "os" "strconv" "testing" + + "github.com/stretchr/testify/require" + + clickhouse_tests "github.com/ClickHouse/clickhouse-go/v2/tests" ) func TestMain(m *testing.M) { @@ -237,3 +239,7 @@ func TestJSONStringExample(t *testing.T) { t.Skip("client cannot receive JSON strings") require.NoError(t, JSONStringExample()) } + +func TestEndOfProcessAndGotBlock(t *testing.T) { + require.NoError(t, EndOfProcessAndGotData()) +} diff --git a/examples/std/end_of_process.go b/examples/std/end_of_process.go new file mode 100644 index 0000000000..68593c634a --- /dev/null +++ b/examples/std/end_of_process.go @@ -0,0 +1,53 @@ +// Licensed to ClickHouse, Inc. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. ClickHouse, Inc. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package std + +import ( + "context" + "fmt" + + "github.com/ClickHouse/clickhouse-go/v2" +) + +func EndOfProcessAndGotData() error { + conn, err := GetStdOpenDBConnection(clickhouse.Native, clickhouse.Settings{ + "send_logs_level": "trace", + }, nil, nil) + if err != nil { + return err + } + var totalBlocks int + // use context to pass a call back for end of process and got data + ctx := clickhouse.Context(context.Background(), clickhouse.WithEndOfProcess(func() { + fmt.Println("process is finished") + }), clickhouse.WithGotData(func() { + totalBlocks++ + })) + + rows, err := conn.QueryContext(ctx, "SELECT number from numbers(1000000) LIMIT 1000000") + if err != nil { + return err + } + defer rows.Close() + + for rows.Next() { + } + + fmt.Printf("Total data blocks: %d\n", totalBlocks) + return rows.Err() +} diff --git a/examples/std/main_test.go b/examples/std/main_test.go index ebc1b66dbe..f2c884820a 100644 --- a/examples/std/main_test.go +++ b/examples/std/main_test.go @@ -26,8 +26,9 @@ import ( "testing" "time" - clickhouse_tests "github.com/ClickHouse/clickhouse-go/v2/tests" "github.com/stretchr/testify/require" + + clickhouse_tests "github.com/ClickHouse/clickhouse-go/v2/tests" ) func TestMain(m *testing.M) { @@ -169,3 +170,7 @@ func TestJSONStringExample(t *testing.T) { t.Skip("client cannot receive JSON strings") require.NoError(t, JSONStringExample()) } + +func TestEndOfProcessAndGotBlock(t *testing.T) { + require.NoError(t, EndOfProcessAndGotData()) +}