From 5c8115e032ebc90e982e22c105328fcaf0c74ac6 Mon Sep 17 00:00:00 2001 From: Sergey Lazarenko Date: Mon, 23 Mar 2026 16:08:34 +0300 Subject: [PATCH 01/20] cb for ch output plugin --- _mydata/offsets.yaml | 6 ++ _mydata/push-image.sh | 21 ++++ _mydata/test_config.yaml | 21 ++++ _mydata/welcome.json | 6 ++ plugin/output/clickhouse/clickhouse.go | 135 ++++++++++++++++++++----- 5 files changed, 165 insertions(+), 24 deletions(-) create mode 100644 _mydata/offsets.yaml create mode 100755 _mydata/push-image.sh create mode 100644 _mydata/test_config.yaml create mode 100644 _mydata/welcome.json diff --git a/_mydata/offsets.yaml b/_mydata/offsets.yaml new file mode 100644 index 000000000..cc2b75b8a --- /dev/null +++ b/_mydata/offsets.yaml @@ -0,0 +1,6 @@ +- file: /Users/serlazarenko/dev/repos/file.d/_mydata/welcome.json + inode: 5043215 + source_id: 2049134542 + last_read_timestamp: 1772625961393116000 + streams: + not_set: 499 diff --git a/_mydata/push-image.sh b/_mydata/push-image.sh new file mode 100755 index 000000000..45f398f05 --- /dev/null +++ b/_mydata/push-image.sh @@ -0,0 +1,21 @@ +#!/bin/bash + +VERSION=$(git describe --abbrev=4 --dirty --always --tags | sed -E 's/-.+$//') + +if [[ ! -z "$1" ]]; then + VERSION=$1 +fi + +echo "Build version $VERSION" +docker buildx build --file ./build/package/Dockerfile_dev --build-arg VERSION=$VERSION --platform linux/amd64 -t gitlab-registry.ozon.ru/sre/images/file-d:$VERSION . + +read -p "Do you want to push image? (y/n) " -r +echo +if [[ ! $REPLY =~ ^[Yy]$ ]] +then +echo "Cancelled by user"; +exit 1; +fi + +echo "Push image to registry"; +docker push gitlab-registry.ozon.ru/sre/images/file-d:$VERSION \ No newline at end of file diff --git a/_mydata/test_config.yaml b/_mydata/test_config.yaml new file mode 100644 index 000000000..af7606b7d --- /dev/null +++ b/_mydata/test_config.yaml @@ -0,0 +1,21 @@ +pipelines: + test_pipeline: + input: + type: file + persistence_mode: async + watching_dir: ./_mydata + filename_pattern: "welcome.json" + offsets_file: ./_mydata/offsets.yaml + actions: + - type: rename + override: true + systemd\.unit: service + syslog\.identifier: service + k8s_pod_label_app: service + - type: discard + do_if: + op: equal + field: service + values: [null, ""] + output: + type: stdout diff --git a/_mydata/welcome.json b/_mydata/welcome.json new file mode 100644 index 000000000..6b8585777 --- /dev/null +++ b/_mydata/welcome.json @@ -0,0 +1,6 @@ +{"level":"error","message":"1: k8s_pod_label_app","k8s_pod_label_app":"k8s-service"} +{"level":"info","message":"2: systemd.unit","systemd.unit":"systemd-service"} +{"level":"warning","message":"3: syslog.identifier","syslog.identifier":"syslog-service"} +{"level":"debug","message":"4: No service field"} +{"level":"error","message":"5: Empty k8s_pod_label_app","k8s_pod_label_app":""} +{"level":"info","message":"6: Multiple sources","k8s_pod_label_app":"k8s-service","systemd.unit":"systemd-service"} diff --git a/plugin/output/clickhouse/clickhouse.go b/plugin/output/clickhouse/clickhouse.go index 7f6c8366a..63fb7a7af 100644 --- a/plugin/output/clickhouse/clickhouse.go +++ b/plugin/output/clickhouse/clickhouse.go @@ -8,6 +8,7 @@ import ( "fmt" "net" "strings" + "sync" "time" "github.com/ClickHouse/ch-go" @@ -61,6 +62,10 @@ type Plugin struct { queriesCountMetric *metric.Counter router *pipeline.Router + + bannedHosts map[Address]time.Time + pendingHosts map[Address]struct{} + mu sync.Mutex } type Setting struct { @@ -334,6 +339,18 @@ type Config struct { // > After this timeout batch will be sent even if batch isn't completed. BatchFlushTimeout cfg.Duration `json:"batch_flush_timeout" default:"200ms" parse:"duration"` // * BatchFlushTimeout_ time.Duration + + // > @3@4@5@6 + // > + // > Timeout for banning host if it fails. + BanTimeout cfg.Duration `json:"ban_timeout" default:"10s" parse:"duration"` // * + BanTimeout_ time.Duration + + // > @3@4@5@6 + // > + // > Interval for retrying connections to banned hosts + RetryInterval cfg.Duration `json:"retry_interval" default:"5s" parse:"duration"` // * + RetryInterval_ time.Duration } func init() { @@ -355,6 +372,9 @@ func (p *Plugin) registerMetrics(ctl *metric.Ctl) { func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginParams) { p.logger = params.Logger.Desugar() + p.bannedHosts = make(map[Address]time.Time, len(p.config.Addresses)) // Заранее выделил память под мапку + p.pendingHosts = make(map[Address]struct{}, len(p.config.Addresses)) // не знал, как обыграть момент с удалением успешно + // подключенного(ранее ожидающего) хоста, поэтому поменял на мапу (этот момент видно будет в checkBannedHosts) p.config = config.(*Config) p.registerMetrics(params.MetricCtl) p.ctx, p.cancelFunc = context.WithCancel(context.Background()) @@ -404,28 +424,14 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP } for _, addr := range p.config.Addresses { - addr.Addr = addrWithDefaultPort(addr.Addr, "9000") - pool, err := chpool.New(p.ctx, chpool.Options{ - ClientOptions: ch.Options{ - Logger: p.logger.Named("driver"), - Address: addr.Addr, - Database: p.config.Database, - User: p.config.User, - Password: p.config.Password, - QuotaKey: p.config.QuotaKey, - Compression: compression, - Settings: p.config.ClickhouseSettings.toProtoSettings(), - DialTimeout: time.Second * 10, - TLS: b.Build(), - HandshakeTimeout: time.Minute, - }, - MaxConnLifetime: p.config.MaxConnLifetime_, - MaxConnIdleTime: p.config.MaxConnIdleTime_, - MaxConns: p.config.MaxConns_, - MinConns: p.config.MinConns_, - HealthCheckPeriod: p.config.HealthCheckPeriod_, - }) + pool, err := p.createConnection(addr, compression, b) if err != nil { + var netError net.Error + if errors.As(err, &netError) { + p.mu.Lock() + p.bannedHosts[addr] = time.Now().Add(p.config.BanTimeout_) + p.mu.Unlock() + } p.logger.Error("create clickhouse connection pool", zap.Error(err), zap.String("addr", addr.Addr)) } else { for j := 0; j < *addr.Weight; j++ { @@ -433,11 +439,16 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP } } } - - if len(p.instances) == 0 { + // CORELOG-3268 если все хосты недоступны и fatal_on_failed_insert=false, пропускаем логи и идем дальше, + // если все хосты недоступны и fatal_on_failed_insert=true — падаем. + if len(p.instances) == 0 && p.config.FatalOnFailedInsert == true { p.logger.Fatal("cannot start: no available clickhouse addresses in config") } - + // Объясни момент, в чем критическая важность reconnect делать именно в out, + // а не сразу в start, чтобы к моменту out мб поднялся какой-то хост и мы создали instance + if len(p.bannedHosts) > 0 { + go p.checkBannedHosts(compression, b) + } batcherOpts := pipeline.BatcherOptions{ PipelineName: params.PipelineName, OutputType: outPluginType, @@ -484,6 +495,82 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP p.batcher.Start(p.ctx) } +func (p *Plugin) createConnection(addr Address, compression ch.Compression, b xtls.ConfigBuilder) (*chpool.Pool, error) { + addr.Addr = addrWithDefaultPort(addr.Addr, "9000") + pool, err := chpool.New(p.ctx, chpool.Options{ + ClientOptions: ch.Options{ + Logger: p.logger.Named("driver"), + Address: addr.Addr, + Database: p.config.Database, + User: p.config.User, + Password: p.config.Password, + QuotaKey: p.config.QuotaKey, + Compression: compression, + Settings: p.config.ClickhouseSettings.toProtoSettings(), + DialTimeout: time.Second * 10, + TLS: b.Build(), + HandshakeTimeout: time.Minute, + }, + MaxConnLifetime: p.config.MaxConnLifetime_, + MaxConnIdleTime: p.config.MaxConnIdleTime_, + MaxConns: p.config.MaxConns_, + MinConns: p.config.MinConns_, + HealthCheckPeriod: p.config.HealthCheckPeriod_, + }) + + return pool, err +} + +func (p *Plugin) checkBannedHosts(compression ch.Compression, b xtls.ConfigBuilder) { + ticker := time.NewTicker(p.config.RetryInterval_) + for { + select { + case <-p.ctx.Done(): + return + case <-ticker.C: + p.mu.Lock() + if len(p.bannedHosts) == 0 { // Так обыграл раннее завершение горутины + p.mu.Unlock() + return + } + for host, banUntil := range p.bannedHosts { + if time.Now().After(banUntil) { + p.pendingHosts[host] = struct{}{} + } + } + for host := range p.pendingHosts { + err := p.retryConnect(host, compression, b) + if err != nil { + p.logger.Error("failed to reconnect to banned host", zap.Error(err), zap.String("addr", host.Addr)) + continue + } + delete(p.bannedHosts, host) + delete(p.pendingHosts, host) + } + p.mu.Unlock() + } + } +} + +func (p *Plugin) retryConnect(addr Address, compression ch.Compression, b xtls.ConfigBuilder) error { + pool, err := p.createConnection(addr, compression, b) + if err != nil { + var netError net.Error + if errors.As(err, &netError) { + p.mu.Lock() + p.bannedHosts[addr] = time.Now().Add(p.config.BanTimeout_) + p.mu.Unlock() + } + return err + } + + for j := 0; j < *addr.Weight; j++ { + p.instances = append(p.instances, pool) + } + + return nil +} + func (p *Plugin) Stop() { p.cancelFunc() p.batcher.Stop() From ca693aff32c35a85cdaa2f2c264173e49caf87cd Mon Sep 17 00:00:00 2001 From: Sergey Lazarenko Date: Mon, 23 Mar 2026 16:12:40 +0300 Subject: [PATCH 02/20] cb for ch output plugin --- _mydata/offsets.yaml | 6 ------ _mydata/push-image.sh | 21 --------------------- _mydata/test_config.yaml | 21 --------------------- _mydata/welcome.json | 6 ------ 4 files changed, 54 deletions(-) delete mode 100644 _mydata/offsets.yaml delete mode 100755 _mydata/push-image.sh delete mode 100644 _mydata/test_config.yaml delete mode 100644 _mydata/welcome.json diff --git a/_mydata/offsets.yaml b/_mydata/offsets.yaml deleted file mode 100644 index cc2b75b8a..000000000 --- a/_mydata/offsets.yaml +++ /dev/null @@ -1,6 +0,0 @@ -- file: /Users/serlazarenko/dev/repos/file.d/_mydata/welcome.json - inode: 5043215 - source_id: 2049134542 - last_read_timestamp: 1772625961393116000 - streams: - not_set: 499 diff --git a/_mydata/push-image.sh b/_mydata/push-image.sh deleted file mode 100755 index 45f398f05..000000000 --- a/_mydata/push-image.sh +++ /dev/null @@ -1,21 +0,0 @@ -#!/bin/bash - -VERSION=$(git describe --abbrev=4 --dirty --always --tags | sed -E 's/-.+$//') - -if [[ ! -z "$1" ]]; then - VERSION=$1 -fi - -echo "Build version $VERSION" -docker buildx build --file ./build/package/Dockerfile_dev --build-arg VERSION=$VERSION --platform linux/amd64 -t gitlab-registry.ozon.ru/sre/images/file-d:$VERSION . - -read -p "Do you want to push image? (y/n) " -r -echo -if [[ ! $REPLY =~ ^[Yy]$ ]] -then -echo "Cancelled by user"; -exit 1; -fi - -echo "Push image to registry"; -docker push gitlab-registry.ozon.ru/sre/images/file-d:$VERSION \ No newline at end of file diff --git a/_mydata/test_config.yaml b/_mydata/test_config.yaml deleted file mode 100644 index af7606b7d..000000000 --- a/_mydata/test_config.yaml +++ /dev/null @@ -1,21 +0,0 @@ -pipelines: - test_pipeline: - input: - type: file - persistence_mode: async - watching_dir: ./_mydata - filename_pattern: "welcome.json" - offsets_file: ./_mydata/offsets.yaml - actions: - - type: rename - override: true - systemd\.unit: service - syslog\.identifier: service - k8s_pod_label_app: service - - type: discard - do_if: - op: equal - field: service - values: [null, ""] - output: - type: stdout diff --git a/_mydata/welcome.json b/_mydata/welcome.json deleted file mode 100644 index 6b8585777..000000000 --- a/_mydata/welcome.json +++ /dev/null @@ -1,6 +0,0 @@ -{"level":"error","message":"1: k8s_pod_label_app","k8s_pod_label_app":"k8s-service"} -{"level":"info","message":"2: systemd.unit","systemd.unit":"systemd-service"} -{"level":"warning","message":"3: syslog.identifier","syslog.identifier":"syslog-service"} -{"level":"debug","message":"4: No service field"} -{"level":"error","message":"5: Empty k8s_pod_label_app","k8s_pod_label_app":""} -{"level":"info","message":"6: Multiple sources","k8s_pod_label_app":"k8s-service","systemd.unit":"systemd-service"} From ab289dc43f10cb91bf76bfb6f428f703648edc25 Mon Sep 17 00:00:00 2001 From: Sergey Lazarenko Date: Mon, 23 Mar 2026 16:18:46 +0300 Subject: [PATCH 03/20] docs generate --- plugin/output/clickhouse/README.md | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/plugin/output/clickhouse/README.md b/plugin/output/clickhouse/README.md index 08b1b374e..e64bd85d1 100644 --- a/plugin/output/clickhouse/README.md +++ b/plugin/output/clickhouse/README.md @@ -215,5 +215,17 @@ After this timeout batch will be sent even if batch isn't completed.
+**`ban_timeout`** *`cfg.Duration`* *`default=10s`* -
*Generated using [__insane-doc__](https://github.com/vitkovskii/insane-doc)* \ No newline at end of file +Timeout for banning host if it fails. + +
+ +**`retry_interval`** *`cfg.Duration`* *`default=5s`* + +Interval for retrying connections to banned hosts + +
+ + +
*Generated using [__insane-doc__](https://github.com/vitkovskii/insane-doc)* From df2e46f62d864cad7f874547a22922ad6bf19f77 Mon Sep 17 00:00:00 2001 From: Sergey Lazarenko Date: Mon, 23 Mar 2026 16:22:37 +0300 Subject: [PATCH 04/20] fix --- plugin/output/clickhouse/README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/plugin/output/clickhouse/README.md b/plugin/output/clickhouse/README.md index e64bd85d1..a7e25405c 100644 --- a/plugin/output/clickhouse/README.md +++ b/plugin/output/clickhouse/README.md @@ -229,3 +229,4 @@ Interval for retrying connections to banned hosts
*Generated using [__insane-doc__](https://github.com/vitkovskii/insane-doc)* + From 56abe5a6f3db637a10fae7cad54ecf5fe06f84f7 Mon Sep 17 00:00:00 2001 From: Sergey Lazarenko Date: Fri, 3 Apr 2026 01:27:25 +0300 Subject: [PATCH 05/20] fix --- plugin/output/clickhouse/README.md | 11 +- plugin/output/clickhouse/clickhouse.go | 204 +++++++++++++------- plugin/output/clickhouse/clickhouse_test.go | 26 ++- 3 files changed, 159 insertions(+), 82 deletions(-) diff --git a/plugin/output/clickhouse/README.md b/plugin/output/clickhouse/README.md index a7e25405c..4ca236c6c 100644 --- a/plugin/output/clickhouse/README.md +++ b/plugin/output/clickhouse/README.md @@ -215,18 +215,17 @@ After this timeout batch will be sent even if batch isn't completed.
-**`ban_timeout`** *`cfg.Duration`* *`default=10s`* +**`failure_cooldown_period`** *`cfg.Duration`* *`default=10s`* -Timeout for banning host if it fails. +Period for which addresses will be banned in case of unavailability.
-**`retry_interval`** *`cfg.Duration`* *`default=5s`* +**`reconnect_interval`** *`cfg.Duration`* *`default=5s`* -Interval for retrying connections to banned hosts +Interval for reconnecting to addresses that are unavailable during initialization.
-
*Generated using [__insane-doc__](https://github.com/vitkovskii/insane-doc)* - +
*Generated using [__insane-doc__](https://github.com/vitkovskii/insane-doc)* \ No newline at end of file diff --git a/plugin/output/clickhouse/clickhouse.go b/plugin/output/clickhouse/clickhouse.go index 63fb7a7af..d2dccc6ed 100644 --- a/plugin/output/clickhouse/clickhouse.go +++ b/plugin/output/clickhouse/clickhouse.go @@ -3,6 +3,7 @@ package clickhouse import ( "bytes" "context" + "crypto/tls" "encoding/json" "errors" "fmt" @@ -43,6 +44,11 @@ type Clickhouse interface { Do(ctx context.Context, query ch.Query) error } +type instance struct { + addr Address + pool Clickhouse +} + type Plugin struct { logger *zap.Logger @@ -54,7 +60,7 @@ type Plugin struct { query string // TODO: support shards - instances []Clickhouse + instances []instance requestID atomic.Int64 // plugin metrics @@ -63,9 +69,13 @@ type Plugin struct { router *pipeline.Router + compression ch.Compression + tlsConfig *tls.Config + + poolsByAddr map[Address]Clickhouse bannedHosts map[Address]time.Time pendingHosts map[Address]struct{} - mu sync.Mutex + mu sync.RWMutex } type Setting struct { @@ -342,15 +352,15 @@ type Config struct { // > @3@4@5@6 // > - // > Timeout for banning host if it fails. - BanTimeout cfg.Duration `json:"ban_timeout" default:"10s" parse:"duration"` // * - BanTimeout_ time.Duration + // > Period for which addresses will be banned in case of unavailability. + FailureCooldownPeriod cfg.Duration `json:"failure_cooldown_period" default:"10s" parse:"duration"` // * + FailureCooldownPeriod_ time.Duration // > @3@4@5@6 // > - // > Interval for retrying connections to banned hosts - RetryInterval cfg.Duration `json:"retry_interval" default:"5s" parse:"duration"` // * - RetryInterval_ time.Duration + // > Interval for reconnecting to addresses that are unavailable during initialization. + ReconnectInterval cfg.Duration `json:"reconnect_interval" default:"5s" parse:"duration"` // * + ReconnectInterval_ time.Duration } func init() { @@ -371,11 +381,12 @@ func (p *Plugin) registerMetrics(ctl *metric.Ctl) { func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginParams) { p.logger = params.Logger.Desugar() - - p.bannedHosts = make(map[Address]time.Time, len(p.config.Addresses)) // Заранее выделил память под мапку - p.pendingHosts = make(map[Address]struct{}, len(p.config.Addresses)) // не знал, как обыграть момент с удалением успешно - // подключенного(ранее ожидающего) хоста, поэтому поменял на мапу (этот момент видно будет в checkBannedHosts) p.config = config.(*Config) + + p.bannedHosts = make(map[Address]time.Time, len(p.config.Addresses)) + p.pendingHosts = make(map[Address]struct{}, len(p.config.Addresses)) + p.poolsByAddr = make(map[Address]Clickhouse, len(p.config.Addresses)) + p.registerMetrics(params.MetricCtl) p.ctx, p.cancelFunc = context.WithCancel(context.Background()) @@ -385,6 +396,12 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP if p.config.InsertTimeout_ < 1 { p.logger.Fatal("'db_request_timeout' can't be <1") } + if p.config.ReconnectInterval_ < 1 { + p.logger.Fatal("'reconnect_interval' can't be <1") + } + if p.config.FailureCooldownPeriod_ < 1 { + p.logger.Fatal("'failure_cooldown_period' cant't be <1") + } schema, err := inferInsaneColInputs(p.config.Columns) if err != nil { @@ -400,55 +417,56 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP p.config.InsertStrategy_ = StrategyInOrder } - var compression ch.Compression switch strings.ToLower(p.config.Compression) { default: fallthrough case "disabled": - compression = ch.CompressionDisabled + p.compression = ch.CompressionDisabled case "lz4": - compression = ch.CompressionLZ4 + p.compression = ch.CompressionLZ4 case "zstd": - compression = ch.CompressionZSTD + p.compression = ch.CompressionZSTD case "none": - compression = ch.CompressionNone + p.compression = ch.CompressionNone } - var b xtls.ConfigBuilder if p.config.CACert != "" { b := xtls.NewConfigBuilder() - err := b.AppendCARoot(p.config.CACert) - if err != nil { + if err := b.AppendCARoot(p.config.CACert); err != nil { p.logger.Fatal("can't append CA root", zap.Error(err)) } + p.tlsConfig = b.Build() } for _, addr := range p.config.Addresses { - pool, err := p.createConnection(addr, compression, b) + pool, err := p.createConnection(addr) if err != nil { var netError net.Error if errors.As(err, &netError) { - p.mu.Lock() - p.bannedHosts[addr] = time.Now().Add(p.config.BanTimeout_) - p.mu.Unlock() + p.pendingHosts[addr] = struct{}{} } p.logger.Error("create clickhouse connection pool", zap.Error(err), zap.String("addr", addr.Addr)) - } else { - for j := 0; j < *addr.Weight; j++ { - p.instances = append(p.instances, pool) - } + continue + } + p.poolsByAddr[addr] = pool + for j := 0; j < *addr.Weight; j++ { + p.instances = append(p.instances, instance{ + addr: addr, + pool: pool, + }) } } - // CORELOG-3268 если все хосты недоступны и fatal_on_failed_insert=false, пропускаем логи и идем дальше, - // если все хосты недоступны и fatal_on_failed_insert=true — падаем. - if len(p.instances) == 0 && p.config.FatalOnFailedInsert == true { + + if len(p.instances) == 0 && p.config.FatalOnFailedInsert { p.logger.Fatal("cannot start: no available clickhouse addresses in config") } - // Объясни момент, в чем критическая важность reconnect делать именно в out, - // а не сразу в start, чтобы к моменту out мб поднялся какой-то хост и мы создали instance - if len(p.bannedHosts) > 0 { - go p.checkBannedHosts(compression, b) + + go p.checkBannedHosts() + + if len(p.pendingHosts) > 0 { + go p.checkPendingHosts() } + batcherOpts := pipeline.BatcherOptions{ PipelineName: params.PipelineName, OutputType: outPluginType, @@ -495,7 +513,7 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP p.batcher.Start(p.ctx) } -func (p *Plugin) createConnection(addr Address, compression ch.Compression, b xtls.ConfigBuilder) (*chpool.Pool, error) { +func (p *Plugin) createConnection(addr Address) (*chpool.Pool, error) { addr.Addr = addrWithDefaultPort(addr.Addr, "9000") pool, err := chpool.New(p.ctx, chpool.Options{ ClientOptions: ch.Options{ @@ -505,10 +523,10 @@ func (p *Plugin) createConnection(addr Address, compression ch.Compression, b xt User: p.config.User, Password: p.config.Password, QuotaKey: p.config.QuotaKey, - Compression: compression, + Compression: p.compression, Settings: p.config.ClickhouseSettings.toProtoSettings(), DialTimeout: time.Second * 10, - TLS: b.Build(), + TLS: p.tlsConfig, HandshakeTimeout: time.Minute, }, MaxConnLifetime: p.config.MaxConnLifetime_, @@ -521,61 +539,72 @@ func (p *Plugin) createConnection(addr Address, compression ch.Compression, b xt return pool, err } -func (p *Plugin) checkBannedHosts(compression ch.Compression, b xtls.ConfigBuilder) { - ticker := time.NewTicker(p.config.RetryInterval_) +func (p *Plugin) checkPendingHosts() { + ticker := time.NewTicker(p.config.ReconnectInterval_) for { select { case <-p.ctx.Done(): return case <-ticker.C: - p.mu.Lock() - if len(p.bannedHosts) == 0 { // Так обыграл раннее завершение горутины - p.mu.Unlock() + if len(p.pendingHosts) == 0 { return } - for host, banUntil := range p.bannedHosts { - if time.Now().After(banUntil) { - p.pendingHosts[host] = struct{}{} - } - } - for host := range p.pendingHosts { - err := p.retryConnect(host, compression, b) + + for addr := range p.pendingHosts { + pool, err := p.createConnection(addr) if err != nil { - p.logger.Error("failed to reconnect to banned host", zap.Error(err), zap.String("addr", host.Addr)) + p.logger.Error("failed to reconnect to pending host", zap.Error(err), zap.String("addr", addr.Addr)) continue } - delete(p.bannedHosts, host) - delete(p.pendingHosts, host) + + p.mu.Lock() + p.poolsByAddr[addr] = pool + for j := 0; j < *addr.Weight; j++ { + p.instances = append(p.instances, instance{ + addr: addr, + pool: pool, + }) + } + delete(p.pendingHosts, addr) + p.mu.Unlock() } - p.mu.Unlock() } } } -func (p *Plugin) retryConnect(addr Address, compression ch.Compression, b xtls.ConfigBuilder) error { - pool, err := p.createConnection(addr, compression, b) - if err != nil { - var netError net.Error - if errors.As(err, &netError) { +func (p *Plugin) checkBannedHosts() { + ticker := time.NewTicker(p.config.ReconnectInterval_) + for { + select { + case <-p.ctx.Done(): + return + case <-ticker.C: p.mu.Lock() - p.bannedHosts[addr] = time.Now().Add(p.config.BanTimeout_) + for addr, banUntil := range p.bannedHosts { + if time.Now().After(banUntil) { + pool, ok := p.poolsByAddr[addr] + if ok { + for i := 0; i < *addr.Weight; i++ { + p.instances = append(p.instances, instance{ + addr: addr, + pool: pool, + }) + } + } + delete(p.bannedHosts, addr) + } + } p.mu.Unlock() } - return err - } - - for j := 0; j < *addr.Weight; j++ { - p.instances = append(p.instances, pool) } - - return nil } func (p *Plugin) Stop() { p.cancelFunc() p.batcher.Stop() - for _, clickhouse := range p.instances { - clickhouse.Close() + + for _, pool := range p.poolsByAddr { + pool.Close() } } @@ -638,14 +667,27 @@ func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) err } }) + p.mu.RLock() + attempts := len(p.instances) + p.mu.RUnlock() + + if attempts == 0 && p.config.FatalOnFailedInsert { + p.logger.Fatal("no available clickhouse addresses") + } + var err error - for i := range p.instances { + for i := 0; i < attempts; i++ { requestID := p.requestID.Inc() - clickhouse := p.getInstance(requestID, i) - err = p.do(clickhouse, data.input) + instance := p.getInstance(requestID, i) + err = p.do(instance.pool, data.input) if err == nil { return nil } + + var netErr net.Error + if errors.As(err, &netErr) { + p.banInstance(instance) + } } if err != nil { p.insertErrorsMetric.Inc() @@ -670,7 +712,23 @@ func (p *Plugin) do(clickhouse Clickhouse, queryInput proto.Input) error { }) } -func (p *Plugin) getInstance(requestID int64, retry int) Clickhouse { +func (p *Plugin) banInstance(inst instance) { + p.mu.Lock() + defer p.mu.Unlock() + + filtered := p.instances[:0] + for _, it := range p.instances { + if it.addr != inst.addr { + filtered = append(filtered, it) + } + } + p.instances = filtered + p.bannedHosts[inst.addr] = time.Now().Add(p.config.FailureCooldownPeriod_) +} + +func (p *Plugin) getInstance(requestID int64, retry int) instance { + p.mu.RLock() + defer p.mu.RUnlock() var instanceIdx int switch p.config.InsertStrategy_ { case StrategyInOrder: diff --git a/plugin/output/clickhouse/clickhouse_test.go b/plugin/output/clickhouse/clickhouse_test.go index 9adf5afa4..14829360a 100644 --- a/plugin/output/clickhouse/clickhouse_test.go +++ b/plugin/output/clickhouse/clickhouse_test.go @@ -14,7 +14,7 @@ func TestPlugin_getInstance(t *testing.T) { ctrl := gomock.NewController(t) - instances := []Clickhouse{ + pools := []Clickhouse{ mockclickhouse.NewMockClickhouse(ctrl), mockclickhouse.NewMockClickhouse(ctrl), mockclickhouse.NewMockClickhouse(ctrl), @@ -22,16 +22,32 @@ func TestPlugin_getInstance(t *testing.T) { mockclickhouse.NewMockClickhouse(ctrl), } + addrs := []Address{ + {Addr: "addr1", Weight: intPtr(1)}, + {Addr: "addr2", Weight: intPtr(2)}, + {Addr: "addr3", Weight: intPtr(3)}, + {Addr: "addr4", Weight: intPtr(4)}, + {Addr: "addr5", Weight: intPtr(5)}, + } + + instances := []instance{ + {addr: addrs[0], pool: pools[0]}, + {addr: addrs[1], pool: pools[1]}, + {addr: addrs[2], pool: pools[2]}, + {addr: addrs[3], pool: pools[3]}, + {addr: addrs[4], pool: pools[4]}, + } + type args struct { id int64 retry int } tests := []struct { name string - instances []Clickhouse + instances []instance stategy InsertStrategy args args - want Clickhouse + want instance }{ // in-order { @@ -223,3 +239,7 @@ func TestAddress_UnmarshalJSON(t *testing.T) { }) } } + +func intPtr(a int) *int { + return &a +} From 833b5883ad122d93c794c81c89a31ba748d749ef Mon Sep 17 00:00:00 2001 From: Sergey Lazarenko Date: Mon, 13 Apr 2026 14:28:05 +0300 Subject: [PATCH 06/20] fix --- plugin/output/clickhouse/clickhouse.go | 58 +++++++++++++------------- 1 file changed, 30 insertions(+), 28 deletions(-) diff --git a/plugin/output/clickhouse/clickhouse.go b/plugin/output/clickhouse/clickhouse.go index d2dccc6ed..a104fe658 100644 --- a/plugin/output/clickhouse/clickhouse.go +++ b/plugin/output/clickhouse/clickhouse.go @@ -19,6 +19,7 @@ import ( "github.com/ozontech/file.d/fd" "github.com/ozontech/file.d/metric" "github.com/ozontech/file.d/pipeline" + "github.com/ozontech/file.d/xtime" "github.com/ozontech/file.d/xtls" "go.uber.org/atomic" "go.uber.org/zap" @@ -353,8 +354,8 @@ type Config struct { // > @3@4@5@6 // > // > Period for which addresses will be banned in case of unavailability. - FailureCooldownPeriod cfg.Duration `json:"failure_cooldown_period" default:"10s" parse:"duration"` // * - FailureCooldownPeriod_ time.Duration + BanPeriod cfg.Duration `json:"ban_period" default:"10s" parse:"duration"` // * + BanPeriod_ time.Duration // > @3@4@5@6 // > @@ -399,8 +400,8 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP if p.config.ReconnectInterval_ < 1 { p.logger.Fatal("'reconnect_interval' can't be <1") } - if p.config.FailureCooldownPeriod_ < 1 { - p.logger.Fatal("'failure_cooldown_period' cant't be <1") + if p.config.BanPeriod_ < 1 { + p.logger.Fatal("'ban_period' cant't be <1") } schema, err := inferInsaneColInputs(p.config.Columns) @@ -515,7 +516,7 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP func (p *Plugin) createConnection(addr Address) (*chpool.Pool, error) { addr.Addr = addrWithDefaultPort(addr.Addr, "9000") - pool, err := chpool.New(p.ctx, chpool.Options{ + return chpool.New(p.ctx, chpool.Options{ ClientOptions: ch.Options{ Logger: p.logger.Named("driver"), Address: addr.Addr, @@ -535,12 +536,12 @@ func (p *Plugin) createConnection(addr Address) (*chpool.Pool, error) { MinConns: p.config.MinConns_, HealthCheckPeriod: p.config.HealthCheckPeriod_, }) - - return pool, err } func (p *Plugin) checkPendingHosts() { ticker := time.NewTicker(p.config.ReconnectInterval_) + defer ticker.Stop() + for { select { case <-p.ctx.Done(): @@ -574,6 +575,8 @@ func (p *Plugin) checkPendingHosts() { func (p *Plugin) checkBannedHosts() { ticker := time.NewTicker(p.config.ReconnectInterval_) + defer ticker.Stop() + for { select { case <-p.ctx.Done(): @@ -581,18 +584,21 @@ func (p *Plugin) checkBannedHosts() { case <-ticker.C: p.mu.Lock() for addr, banUntil := range p.bannedHosts { - if time.Now().After(banUntil) { - pool, ok := p.poolsByAddr[addr] - if ok { - for i := 0; i < *addr.Weight; i++ { - p.instances = append(p.instances, instance{ - addr: addr, - pool: pool, - }) - } - } - delete(p.bannedHosts, addr) + if !xtime.GetInaccurateTime().After(banUntil) { + continue } + pool, ok := p.poolsByAddr[addr] + if !ok { + delete(p.bannedHosts, addr) // Я не знаю, по идее такой ситуации вообще не должно быть + continue // и можно просто ограничиться pool := p.poolsByAddr и дальше в цикл пробрасывать + } + for i := 0; i < *addr.Weight; i++ { + p.instances = append(p.instances, instance{ + addr: addr, + pool: pool, + }) + } + delete(p.bannedHosts, addr) } p.mu.Unlock() } @@ -667,16 +673,12 @@ func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) err } }) - p.mu.RLock() - attempts := len(p.instances) - p.mu.RUnlock() - - if attempts == 0 && p.config.FatalOnFailedInsert { + if len(p.instances) == 0 && p.config.FatalOnFailedInsert { p.logger.Fatal("no available clickhouse addresses") } var err error - for i := 0; i < attempts; i++ { + for i := range p.instances { requestID := p.requestID.Inc() instance := p.getInstance(requestID, i) err = p.do(instance.pool, data.input) @@ -686,7 +688,7 @@ func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) err var netErr net.Error if errors.As(err, &netErr) { - p.banInstance(instance) + p.banInstance(instance.addr) } } if err != nil { @@ -712,18 +714,18 @@ func (p *Plugin) do(clickhouse Clickhouse, queryInput proto.Input) error { }) } -func (p *Plugin) banInstance(inst instance) { +func (p *Plugin) banInstance(addr Address) { p.mu.Lock() defer p.mu.Unlock() filtered := p.instances[:0] for _, it := range p.instances { - if it.addr != inst.addr { + if it.addr != addr { filtered = append(filtered, it) } } p.instances = filtered - p.bannedHosts[inst.addr] = time.Now().Add(p.config.FailureCooldownPeriod_) + p.bannedHosts[addr] = xtime.GetInaccurateTime().Add(p.config.BanPeriod_) } func (p *Plugin) getInstance(requestID int64, retry int) instance { From 8e406933ccf14465a3449530d267c6b6b175f1a4 Mon Sep 17 00:00:00 2001 From: Sergey Lazarenko Date: Mon, 13 Apr 2026 14:34:44 +0300 Subject: [PATCH 07/20] fix --- plugin/output/clickhouse/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugin/output/clickhouse/README.md b/plugin/output/clickhouse/README.md index 4ca236c6c..4d12e9860 100644 --- a/plugin/output/clickhouse/README.md +++ b/plugin/output/clickhouse/README.md @@ -215,7 +215,7 @@ After this timeout batch will be sent even if batch isn't completed.
-**`failure_cooldown_period`** *`cfg.Duration`* *`default=10s`* +**`ban_period`** *`cfg.Duration`* *`default=10s`* Period for which addresses will be banned in case of unavailability. From 52283e03ebee22aeb3c5e93883fc2b54556fde23 Mon Sep 17 00:00:00 2001 From: Sergey Lazarenko Date: Tue, 14 Apr 2026 09:40:27 +0300 Subject: [PATCH 08/20] fix --- plugin/output/clickhouse/clickhouse.go | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/plugin/output/clickhouse/clickhouse.go b/plugin/output/clickhouse/clickhouse.go index a104fe658..9e087f262 100644 --- a/plugin/output/clickhouse/clickhouse.go +++ b/plugin/output/clickhouse/clickhouse.go @@ -587,11 +587,7 @@ func (p *Plugin) checkBannedHosts() { if !xtime.GetInaccurateTime().After(banUntil) { continue } - pool, ok := p.poolsByAddr[addr] - if !ok { - delete(p.bannedHosts, addr) // Я не знаю, по идее такой ситуации вообще не должно быть - continue // и можно просто ограничиться pool := p.poolsByAddr и дальше в цикл пробрасывать - } + pool := p.poolsByAddr[addr] for i := 0; i < *addr.Weight; i++ { p.instances = append(p.instances, instance{ addr: addr, From ad0638ad68be96cac2c413214baac179f0a110a3 Mon Sep 17 00:00:00 2001 From: Sergey Lazarenko Date: Mon, 20 Apr 2026 15:28:43 +0300 Subject: [PATCH 09/20] fix --- plugin/output/clickhouse/clickhouse.go | 39 +++++++++++++++++--------- 1 file changed, 25 insertions(+), 14 deletions(-) diff --git a/plugin/output/clickhouse/clickhouse.go b/plugin/output/clickhouse/clickhouse.go index 9e087f262..15e0eebf9 100644 --- a/plugin/output/clickhouse/clickhouse.go +++ b/plugin/output/clickhouse/clickhouse.go @@ -669,30 +669,43 @@ func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) err } }) - if len(p.instances) == 0 && p.config.FatalOnFailedInsert { + p.mu.RLock() + attempts := len(p.instances) + p.mu.RUnlock() + + var err error + if attempts == 0 && p.config.FatalOnFailedInsert { + p.insertErrorsMetric.Inc() p.logger.Fatal("no available clickhouse addresses") } - var err error - for i := range p.instances { + noAvailableHosts := false + for i := 0; i < attempts; i++ { requestID := p.requestID.Inc() + + p.mu.RLock() + if len(p.instances) == 0 { + p.mu.RUnlock() + err = errors.New("no available clickhouse addresses") + noAvailableHosts = true + break + } instance := p.getInstance(requestID, i) + p.mu.RUnlock() + err = p.do(instance.pool, data.input) if err == nil { return nil } - - var netErr net.Error - if errors.As(err, &netErr) { - p.banInstance(instance.addr) - } + p.banInstance(instance.addr) } if err != nil { p.insertErrorsMetric.Inc() - p.logger.Error( - "an attempt to insert a batch failed", - zap.Error(err), - ) + if noAvailableHosts { + p.logger.Error("no available clickhouse addresses", zap.Error(err)) + } else { + p.logger.Error("an attempt to insert a batch failed", zap.Error(err)) + } } return err @@ -725,8 +738,6 @@ func (p *Plugin) banInstance(addr Address) { } func (p *Plugin) getInstance(requestID int64, retry int) instance { - p.mu.RLock() - defer p.mu.RUnlock() var instanceIdx int switch p.config.InsertStrategy_ { case StrategyInOrder: From 632b69ed4bded12dec7bdc5a320129b6196d730e Mon Sep 17 00:00:00 2001 From: Sergey Lazarenko Date: Tue, 21 Apr 2026 18:35:24 +0300 Subject: [PATCH 10/20] fix e2e --- plugin/output/clickhouse/clickhouse.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/plugin/output/clickhouse/clickhouse.go b/plugin/output/clickhouse/clickhouse.go index 15e0eebf9..845e02f1f 100644 --- a/plugin/output/clickhouse/clickhouse.go +++ b/plugin/output/clickhouse/clickhouse.go @@ -697,7 +697,11 @@ func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) err if err == nil { return nil } - p.banInstance(instance.addr) + + var netError net.Error + if errors.As(err, &netError) { + p.banInstance(instance.addr) + } } if err != nil { p.insertErrorsMetric.Inc() From 2bdb145561af58fb2ff37bacec2d1b56fe5a8526 Mon Sep 17 00:00:00 2001 From: Sergey Lazarenko Date: Wed, 22 Apr 2026 14:50:26 +0300 Subject: [PATCH 11/20] fix --- ...fig_main_output_no_available_hosts_dlq.yml | 117 ++++++++++++++++++ e2e/start_work_test.go | 5 + plugin/output/clickhouse/clickhouse.go | 23 +++- 3 files changed, 139 insertions(+), 6 deletions(-) create mode 100644 e2e/file_clickhouse/config_main_output_no_available_hosts_dlq.yml diff --git a/e2e/file_clickhouse/config_main_output_no_available_hosts_dlq.yml b/e2e/file_clickhouse/config_main_output_no_available_hosts_dlq.yml new file mode 100644 index 000000000..453cce038 --- /dev/null +++ b/e2e/file_clickhouse/config_main_output_no_available_hosts_dlq.yml @@ -0,0 +1,117 @@ +pipelines: + file_clickhouse_main_output_no_available_hosts_dlq: + input: + type: file + actions: + - type: set_time + format: unixtime + field: ts + override: false + - type: set_time + format: unixtime + field: ts_with_tz + override: false + - type: set_time + format: timestampnano + field: ts64_auto + override: true + - type: set_time + format: rfc3339nano + field: ts_rfc3339nano + override: true + - type: debug + output: + deadqueue: + type: clickhouse + addresses: + - 127.0.0.1:9002 + table: test_table_insert + insert_timeout: 1m + columns: + - name: c1 + type: String + - name: c2 + type: Int8 + - name: c3 + type: Int16 + - name: c4 + type: Nullable(Int16) + - name: c5 + type: Nullable(String) + - name: level + type: Enum8('error'=1, 'warn'=2, 'info'=3, 'debug'=4) + - name: ipv4 + type: Nullable(IPv4) + - name: ipv6 + type: Nullable(IPv6) + - name: ts + type: DateTime + - name: ts_with_tz + type: DateTime('Europe/Moscow') + - name: ts64 + type: DateTime64(3, 'UTC') + - name: ts64_auto + type: DateTime64(9, 'UTC') + - name: ts_rfc3339nano + type: DateTime64(9) + - name: f32 + type: Float32 + - name: f64 + type: Float64 + - name: lc_str + type: LowCardinality(String) + - name: str_arr + type: Array(String) + - name: map_str_str + type: Map(String,String) + - name: uuid + type: UUID + - name: uuid_nullable + type: Nullable(UUID) + type: clickhouse + addresses: + - doesnotexist + table: test_table_insert + insert_timeout: 1m + retry: 0 + columns: + - name: c1 + type: String + - name: c2 + type: Int8 + - name: c3 + type: Int16 + - name: c4 + type: Nullable(Int16) + - name: c5 + type: Nullable(String) + - name: level + type: Enum8('error'=1, 'warn'=2, 'info'=3, 'debug'=4) + - name: ipv4 + type: Nullable(IPv4) + - name: ipv6 + type: Nullable(IPv6) + - name: ts + type: DateTime + - name: ts_with_tz + type: DateTime('Europe/Moscow') + - name: ts64 + type: DateTime64(3, 'UTC') + - name: ts64_auto + type: DateTime64(9, 'UTC') + - name: ts_rfc3339nano + type: DateTime64(9) + - name: f32 + type: Float32 + - name: f64 + type: Float64 + - name: lc_str + type: LowCardinality(String) + - name: str_arr + type: Array(String) + - name: map_str_str + type: Map(String,String) + - name: uuid + type: UUID + - name: uuid_nullable + type: Nullable(UUID) diff --git a/e2e/start_work_test.go b/e2e/start_work_test.go index a4664a2c3..a851c9d1d 100644 --- a/e2e/start_work_test.go +++ b/e2e/start_work_test.go @@ -156,6 +156,11 @@ func TestE2EStabilityWorkCase(t *testing.T) { e2eTest: &file_clickhouse.Config{}, cfgPath: "./file_clickhouse/config.yml", }, + { + name: "file_clickhouse_main_output_no_available_hosts_dlq", + e2eTest: &file_clickhouse.Config{}, + cfgPath: "./file_clickhouse/config_main_output_no_available_hosts_dlq.yml", + }, { name: "file_elasticsearch", e2eTest: &file_elasticsearch.Config{ diff --git a/plugin/output/clickhouse/clickhouse.go b/plugin/output/clickhouse/clickhouse.go index 845e02f1f..1675d9d1a 100644 --- a/plugin/output/clickhouse/clickhouse.go +++ b/plugin/output/clickhouse/clickhouse.go @@ -40,6 +40,8 @@ const ( outPluginType = "clickhouse" ) +var errNoAvailableClickhouseAddresses = errors.New("no available clickhouse addresses") + type Clickhouse interface { Close() Do(ctx context.Context, query ch.Query) error @@ -384,6 +386,10 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP p.logger = params.Logger.Desugar() p.config = config.(*Config) + if len(p.config.Addresses) == 0 { + p.logger.Fatal("config.addresses can't be empty") + } + p.bannedHosts = make(map[Address]time.Time, len(p.config.Addresses)) p.pendingHosts = make(map[Address]struct{}, len(p.config.Addresses)) p.poolsByAddr = make(map[Address]Clickhouse, len(p.config.Addresses)) @@ -674,20 +680,25 @@ func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) err p.mu.RUnlock() var err error - if attempts == 0 && p.config.FatalOnFailedInsert { + if attempts == 0 { + if p.config.FatalOnFailedInsert { + p.logger.Fatal("cannot start: no available clickhouse addresses in config") + } + + err = errNoAvailableClickhouseAddresses p.insertErrorsMetric.Inc() - p.logger.Fatal("no available clickhouse addresses") + p.logger.Error("no available clickhouse addresses", zap.Error(err)) + + return err } - noAvailableHosts := false for i := 0; i < attempts; i++ { requestID := p.requestID.Inc() p.mu.RLock() if len(p.instances) == 0 { p.mu.RUnlock() - err = errors.New("no available clickhouse addresses") - noAvailableHosts = true + err = errNoAvailableClickhouseAddresses break } instance := p.getInstance(requestID, i) @@ -705,7 +716,7 @@ func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) err } if err != nil { p.insertErrorsMetric.Inc() - if noAvailableHosts { + if errors.Is(err, errNoAvailableClickhouseAddresses) { p.logger.Error("no available clickhouse addresses", zap.Error(err)) } else { p.logger.Error("an attempt to insert a batch failed", zap.Error(err)) From 33f797418741d73484a50b1b8a90d2cf4ddfd6f3 Mon Sep 17 00:00:00 2001 From: Sergey Lazarenko Date: Wed, 22 Apr 2026 15:10:09 +0300 Subject: [PATCH 12/20] fix e2e --- e2e/start_work_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/e2e/start_work_test.go b/e2e/start_work_test.go index a851c9d1d..8cd63a298 100644 --- a/e2e/start_work_test.go +++ b/e2e/start_work_test.go @@ -158,7 +158,7 @@ func TestE2EStabilityWorkCase(t *testing.T) { }, { name: "file_clickhouse_main_output_no_available_hosts_dlq", - e2eTest: &file_clickhouse.Config{}, + e2eTest: &file_clickhouse_main_output_no_available_hosts_dlq.Config{}, cfgPath: "./file_clickhouse/config_main_output_no_available_hosts_dlq.yml", }, { From 1fbc3b28774a19cbfde189bbf981b87138fed79e Mon Sep 17 00:00:00 2001 From: Sergey Lazarenko Date: Wed, 22 Apr 2026 15:43:41 +0300 Subject: [PATCH 13/20] fix e2e --- .../config_main_output_no_available_hosts_dlq.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/e2e/file_clickhouse/config_main_output_no_available_hosts_dlq.yml b/e2e/file_clickhouse/config_main_output_no_available_hosts_dlq.yml index 453cce038..4db23a028 100644 --- a/e2e/file_clickhouse/config_main_output_no_available_hosts_dlq.yml +++ b/e2e/file_clickhouse/config_main_output_no_available_hosts_dlq.yml @@ -24,7 +24,7 @@ pipelines: deadqueue: type: clickhouse addresses: - - 127.0.0.1:9002 + - 127.0.0.1:9001 table: test_table_insert insert_timeout: 1m columns: From 78ef9381cd583ea5784470c87cf641209fb969ff Mon Sep 17 00:00:00 2001 From: Sergey Lazarenko Date: Wed, 22 Apr 2026 15:44:59 +0300 Subject: [PATCH 14/20] fix --- e2e/start_work_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/e2e/start_work_test.go b/e2e/start_work_test.go index 8cd63a298..a851c9d1d 100644 --- a/e2e/start_work_test.go +++ b/e2e/start_work_test.go @@ -158,7 +158,7 @@ func TestE2EStabilityWorkCase(t *testing.T) { }, { name: "file_clickhouse_main_output_no_available_hosts_dlq", - e2eTest: &file_clickhouse_main_output_no_available_hosts_dlq.Config{}, + e2eTest: &file_clickhouse.Config{}, cfgPath: "./file_clickhouse/config_main_output_no_available_hosts_dlq.yml", }, { From 8a64ccb8dac0e3b0ff69385201863053b334a16f Mon Sep 17 00:00:00 2001 From: Sergey Lazarenko Date: Wed, 22 Apr 2026 17:20:08 +0300 Subject: [PATCH 15/20] fix e2e --- e2e/file_clickhouse/clickhouse_file.go | 20 ++++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/e2e/file_clickhouse/clickhouse_file.go b/e2e/file_clickhouse/clickhouse_file.go index cfcfd154d..07225a084 100644 --- a/e2e/file_clickhouse/clickhouse_file.go +++ b/e2e/file_clickhouse/clickhouse_file.go @@ -27,10 +27,16 @@ type Config struct { conn *ch.Client samples []Sample sampleTime time.Time + + tableName string + missingTableName string } func (c *Config) Configure(t *testing.T, conf *cfg.Config, pipelineName string) { r := require.New(t) + + c.tableName = "test_table_insert_" + pipelineName + c.missingTableName = "test_table_not_exists_" + pipelineName c.ctx, c.cancel = context.WithTimeout(context.Background(), time.Minute*2) conn, err := ch.Dial(c.ctx, ch.Options{ @@ -40,11 +46,11 @@ func (c *Config) Configure(t *testing.T, conf *cfg.Config, pipelineName string) c.conn = conn err = conn.Do(c.ctx, ch.Query{ - Body: `DROP TABLE IF EXISTS test_table_insert`}) + Body: `DROP TABLE IF EXISTS ` + c.tableName}) r.NoError(err) err = conn.Do(c.ctx, ch.Query{ - Body: `CREATE TABLE IF NOT EXISTS test_table_insert + Body: `CREATE TABLE IF NOT EXISTS ` + c.tableName + ` ( c1 String, c2 Int8, @@ -79,6 +85,12 @@ func (c *Config) Configure(t *testing.T, conf *cfg.Config, pipelineName string) input.Set("filename_pattern", "input.log") input.Set("offsets_file", filepath.Join(offsetsDir, "offsets.yaml")) + output := conf.Pipelines[pipelineName].Raw.Get("output") + output.Set("table", c.missingTableName) + + dq := output.Get("deadqueue") + dq.Set("table", c.tableName) + c.sampleTime = time.Now() c.samples = []Sample{ { @@ -172,7 +184,7 @@ func (c *Config) Validate(t *testing.T) { for range 1000 { cnt := proto.ColUInt64{} err := c.conn.Do(c.ctx, ch.Query{ - Body: `select count(*) from test_table_insert`, + Body: `select count(*) from ` + c.tableName, Result: proto.Results{ {Name: "count()", Data: &cnt}, }, @@ -216,7 +228,7 @@ func (c *Config) Validate(t *testing.T) { sampleIdx := 0 r.NoError(c.conn.Do(c.ctx, ch.Query{ Body: `select c1, c2, c3, c4, c5, level, ipv4, ipv6, ts, ts_with_tz, ts64, ts64_auto, ts_rfc3339nano, f32, f64, lc_str, str_arr, map_str_str, uuid, uuid_nullable - from test_table_insert + from` + c.tableName + ` order by c1`, Result: proto.Results{ proto.ResultColumn{Name: "c1", Data: c1}, From 0144bf81c8ea23bb166edb275ea5ba3438ef954a Mon Sep 17 00:00:00 2001 From: Sergey Lazarenko Date: Wed, 22 Apr 2026 17:38:43 +0300 Subject: [PATCH 16/20] fix --- e2e/file_clickhouse/clickhouse_file.go | 4 ++-- .../config_main_output_no_available_hosts_dlq.yml | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/e2e/file_clickhouse/clickhouse_file.go b/e2e/file_clickhouse/clickhouse_file.go index 07225a084..fa6b3ac91 100644 --- a/e2e/file_clickhouse/clickhouse_file.go +++ b/e2e/file_clickhouse/clickhouse_file.go @@ -36,7 +36,7 @@ func (c *Config) Configure(t *testing.T, conf *cfg.Config, pipelineName string) r := require.New(t) c.tableName = "test_table_insert_" + pipelineName - c.missingTableName = "test_table_not_exists_" + pipelineName + c.missingTableName = "test_table_insert_not_exists_" + pipelineName c.ctx, c.cancel = context.WithTimeout(context.Background(), time.Minute*2) conn, err := ch.Dial(c.ctx, ch.Options{ @@ -228,7 +228,7 @@ func (c *Config) Validate(t *testing.T) { sampleIdx := 0 r.NoError(c.conn.Do(c.ctx, ch.Query{ Body: `select c1, c2, c3, c4, c5, level, ipv4, ipv6, ts, ts_with_tz, ts64, ts64_auto, ts_rfc3339nano, f32, f64, lc_str, str_arr, map_str_str, uuid, uuid_nullable - from` + c.tableName + ` + from ` + c.tableName + ` order by c1`, Result: proto.Results{ proto.ResultColumn{Name: "c1", Data: c1}, diff --git a/e2e/file_clickhouse/config_main_output_no_available_hosts_dlq.yml b/e2e/file_clickhouse/config_main_output_no_available_hosts_dlq.yml index 4db23a028..47af964d8 100644 --- a/e2e/file_clickhouse/config_main_output_no_available_hosts_dlq.yml +++ b/e2e/file_clickhouse/config_main_output_no_available_hosts_dlq.yml @@ -71,7 +71,7 @@ pipelines: type: clickhouse addresses: - doesnotexist - table: test_table_insert + table: test_table_insert_not_exists insert_timeout: 1m retry: 0 columns: From b3a4f4de8798b3c8f48920d4c77c4bd21fc82871 Mon Sep 17 00:00:00 2001 From: Sergey Lazarenko Date: Tue, 28 Apr 2026 12:29:20 +0300 Subject: [PATCH 17/20] fix --- ..._main_output_no_available_hosts_dlq.yml => config_dlq.yml} | 2 +- e2e/start_work_test.go | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) rename e2e/file_clickhouse/{config_main_output_no_available_hosts_dlq.yml => config_dlq.yml} (98%) diff --git a/e2e/file_clickhouse/config_main_output_no_available_hosts_dlq.yml b/e2e/file_clickhouse/config_dlq.yml similarity index 98% rename from e2e/file_clickhouse/config_main_output_no_available_hosts_dlq.yml rename to e2e/file_clickhouse/config_dlq.yml index 47af964d8..5b53d92d6 100644 --- a/e2e/file_clickhouse/config_main_output_no_available_hosts_dlq.yml +++ b/e2e/file_clickhouse/config_dlq.yml @@ -1,5 +1,5 @@ pipelines: - file_clickhouse_main_output_no_available_hosts_dlq: + file_clickhouse_dlq: input: type: file actions: diff --git a/e2e/start_work_test.go b/e2e/start_work_test.go index a851c9d1d..d1b9fded8 100644 --- a/e2e/start_work_test.go +++ b/e2e/start_work_test.go @@ -157,9 +157,9 @@ func TestE2EStabilityWorkCase(t *testing.T) { cfgPath: "./file_clickhouse/config.yml", }, { - name: "file_clickhouse_main_output_no_available_hosts_dlq", + name: "file_clickhouse_dlq", e2eTest: &file_clickhouse.Config{}, - cfgPath: "./file_clickhouse/config_main_output_no_available_hosts_dlq.yml", + cfgPath: "./file_clickhouse/config_dlq.yml", }, { name: "file_elasticsearch", From a4b5f20b24c664bc3f786b29f1e2a43bd172c239 Mon Sep 17 00:00:00 2001 From: Sergey Lazarenko Date: Wed, 29 Apr 2026 12:09:24 +0300 Subject: [PATCH 18/20] fix --- e2e/file_clickhouse/config_dlq.yml | 2 +- e2e/start_work_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/e2e/file_clickhouse/config_dlq.yml b/e2e/file_clickhouse/config_dlq.yml index 5b53d92d6..47af964d8 100644 --- a/e2e/file_clickhouse/config_dlq.yml +++ b/e2e/file_clickhouse/config_dlq.yml @@ -1,5 +1,5 @@ pipelines: - file_clickhouse_dlq: + file_clickhouse_main_output_no_available_hosts_dlq: input: type: file actions: diff --git a/e2e/start_work_test.go b/e2e/start_work_test.go index d1b9fded8..69dbd2ae2 100644 --- a/e2e/start_work_test.go +++ b/e2e/start_work_test.go @@ -157,7 +157,7 @@ func TestE2EStabilityWorkCase(t *testing.T) { cfgPath: "./file_clickhouse/config.yml", }, { - name: "file_clickhouse_dlq", + name: "file_clickhouse_main_output_no_available_hosts_dlq", e2eTest: &file_clickhouse.Config{}, cfgPath: "./file_clickhouse/config_dlq.yml", }, From a1566989e8075b429c0babe67087d8528a8d81a2 Mon Sep 17 00:00:00 2001 From: Sergey Lazarenko Date: Thu, 30 Apr 2026 12:07:27 +0300 Subject: [PATCH 19/20] fix name config file --- ...config.yml => config_main_output_no_available_hosts_dlq.yml} | 0 e2e/start_work_test.go | 2 +- 2 files changed, 1 insertion(+), 1 deletion(-) rename e2e/file_clickhouse/{config.yml => config_main_output_no_available_hosts_dlq.yml} (100%) diff --git a/e2e/file_clickhouse/config.yml b/e2e/file_clickhouse/config_main_output_no_available_hosts_dlq.yml similarity index 100% rename from e2e/file_clickhouse/config.yml rename to e2e/file_clickhouse/config_main_output_no_available_hosts_dlq.yml diff --git a/e2e/start_work_test.go b/e2e/start_work_test.go index 69dbd2ae2..a851c9d1d 100644 --- a/e2e/start_work_test.go +++ b/e2e/start_work_test.go @@ -159,7 +159,7 @@ func TestE2EStabilityWorkCase(t *testing.T) { { name: "file_clickhouse_main_output_no_available_hosts_dlq", e2eTest: &file_clickhouse.Config{}, - cfgPath: "./file_clickhouse/config_dlq.yml", + cfgPath: "./file_clickhouse/config_main_output_no_available_hosts_dlq.yml", }, { name: "file_elasticsearch", From d5a21dad0809845f8e2a95874fd1f66e53a2a315 Mon Sep 17 00:00:00 2001 From: Sergey Lazarenko Date: Thu, 30 Apr 2026 12:12:05 +0300 Subject: [PATCH 20/20] fix --- e2e/file_clickhouse/{config_dlq.yml => config.yml} | 4 ++-- .../config_main_output_no_available_hosts_dlq.yml | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) rename e2e/file_clickhouse/{config_dlq.yml => config.yml} (97%) diff --git a/e2e/file_clickhouse/config_dlq.yml b/e2e/file_clickhouse/config.yml similarity index 97% rename from e2e/file_clickhouse/config_dlq.yml rename to e2e/file_clickhouse/config.yml index 47af964d8..7097ea0e2 100644 --- a/e2e/file_clickhouse/config_dlq.yml +++ b/e2e/file_clickhouse/config.yml @@ -1,5 +1,5 @@ pipelines: - file_clickhouse_main_output_no_available_hosts_dlq: + file_clickhouse: input: type: file actions: @@ -70,7 +70,7 @@ pipelines: type: Nullable(UUID) type: clickhouse addresses: - - doesnotexist + - 127.0.0.1:9001 table: test_table_insert_not_exists insert_timeout: 1m retry: 0 diff --git a/e2e/file_clickhouse/config_main_output_no_available_hosts_dlq.yml b/e2e/file_clickhouse/config_main_output_no_available_hosts_dlq.yml index 7097ea0e2..47af964d8 100644 --- a/e2e/file_clickhouse/config_main_output_no_available_hosts_dlq.yml +++ b/e2e/file_clickhouse/config_main_output_no_available_hosts_dlq.yml @@ -1,5 +1,5 @@ pipelines: - file_clickhouse: + file_clickhouse_main_output_no_available_hosts_dlq: input: type: file actions: @@ -70,7 +70,7 @@ pipelines: type: Nullable(UUID) type: clickhouse addresses: - - 127.0.0.1:9001 + - doesnotexist table: test_table_insert_not_exists insert_timeout: 1m retry: 0