diff --git a/e2e/file_clickhouse/clickhouse_file.go b/e2e/file_clickhouse/clickhouse_file.go
index cfcfd154d..fa6b3ac91 100644
--- a/e2e/file_clickhouse/clickhouse_file.go
+++ b/e2e/file_clickhouse/clickhouse_file.go
@@ -27,10 +27,16 @@ type Config struct {
conn *ch.Client
samples []Sample
sampleTime time.Time
+
+ tableName string
+ missingTableName string
}
func (c *Config) Configure(t *testing.T, conf *cfg.Config, pipelineName string) {
r := require.New(t)
+
+ c.tableName = "test_table_insert_" + pipelineName
+ c.missingTableName = "test_table_insert_not_exists_" + pipelineName
c.ctx, c.cancel = context.WithTimeout(context.Background(), time.Minute*2)
conn, err := ch.Dial(c.ctx, ch.Options{
@@ -40,11 +46,11 @@ func (c *Config) Configure(t *testing.T, conf *cfg.Config, pipelineName string)
c.conn = conn
err = conn.Do(c.ctx, ch.Query{
- Body: `DROP TABLE IF EXISTS test_table_insert`})
+ Body: `DROP TABLE IF EXISTS ` + c.tableName})
r.NoError(err)
err = conn.Do(c.ctx, ch.Query{
- Body: `CREATE TABLE IF NOT EXISTS test_table_insert
+ Body: `CREATE TABLE IF NOT EXISTS ` + c.tableName + `
(
c1 String,
c2 Int8,
@@ -79,6 +85,12 @@ func (c *Config) Configure(t *testing.T, conf *cfg.Config, pipelineName string)
input.Set("filename_pattern", "input.log")
input.Set("offsets_file", filepath.Join(offsetsDir, "offsets.yaml"))
+ output := conf.Pipelines[pipelineName].Raw.Get("output")
+ output.Set("table", c.missingTableName)
+
+ dq := output.Get("deadqueue")
+ dq.Set("table", c.tableName)
+
c.sampleTime = time.Now()
c.samples = []Sample{
{
@@ -172,7 +184,7 @@ func (c *Config) Validate(t *testing.T) {
for range 1000 {
cnt := proto.ColUInt64{}
err := c.conn.Do(c.ctx, ch.Query{
- Body: `select count(*) from test_table_insert`,
+ Body: `select count(*) from ` + c.tableName,
Result: proto.Results{
{Name: "count()", Data: &cnt},
},
@@ -216,7 +228,7 @@ func (c *Config) Validate(t *testing.T) {
sampleIdx := 0
r.NoError(c.conn.Do(c.ctx, ch.Query{
Body: `select c1, c2, c3, c4, c5, level, ipv4, ipv6, ts, ts_with_tz, ts64, ts64_auto, ts_rfc3339nano, f32, f64, lc_str, str_arr, map_str_str, uuid, uuid_nullable
- from test_table_insert
+ from ` + c.tableName + `
order by c1`,
Result: proto.Results{
proto.ResultColumn{Name: "c1", Data: c1},
diff --git a/e2e/file_clickhouse/config_main_output_no_available_hosts_dlq.yml b/e2e/file_clickhouse/config_main_output_no_available_hosts_dlq.yml
new file mode 100644
index 000000000..47af964d8
--- /dev/null
+++ b/e2e/file_clickhouse/config_main_output_no_available_hosts_dlq.yml
@@ -0,0 +1,117 @@
+pipelines:
+ file_clickhouse_main_output_no_available_hosts_dlq:
+ input:
+ type: file
+ actions:
+ - type: set_time
+ format: unixtime
+ field: ts
+ override: false
+ - type: set_time
+ format: unixtime
+ field: ts_with_tz
+ override: false
+ - type: set_time
+ format: timestampnano
+ field: ts64_auto
+ override: true
+ - type: set_time
+ format: rfc3339nano
+ field: ts_rfc3339nano
+ override: true
+ - type: debug
+ output:
+ deadqueue:
+ type: clickhouse
+ addresses:
+ - 127.0.0.1:9001
+ table: test_table_insert
+ insert_timeout: 1m
+ columns:
+ - name: c1
+ type: String
+ - name: c2
+ type: Int8
+ - name: c3
+ type: Int16
+ - name: c4
+ type: Nullable(Int16)
+ - name: c5
+ type: Nullable(String)
+ - name: level
+ type: Enum8('error'=1, 'warn'=2, 'info'=3, 'debug'=4)
+ - name: ipv4
+ type: Nullable(IPv4)
+ - name: ipv6
+ type: Nullable(IPv6)
+ - name: ts
+ type: DateTime
+ - name: ts_with_tz
+ type: DateTime('Europe/Moscow')
+ - name: ts64
+ type: DateTime64(3, 'UTC')
+ - name: ts64_auto
+ type: DateTime64(9, 'UTC')
+ - name: ts_rfc3339nano
+ type: DateTime64(9)
+ - name: f32
+ type: Float32
+ - name: f64
+ type: Float64
+ - name: lc_str
+ type: LowCardinality(String)
+ - name: str_arr
+ type: Array(String)
+ - name: map_str_str
+ type: Map(String,String)
+ - name: uuid
+ type: UUID
+ - name: uuid_nullable
+ type: Nullable(UUID)
+ type: clickhouse
+ addresses:
+ - doesnotexist
+ table: test_table_insert_not_exists
+ insert_timeout: 1m
+ retry: 0
+ columns:
+ - name: c1
+ type: String
+ - name: c2
+ type: Int8
+ - name: c3
+ type: Int16
+ - name: c4
+ type: Nullable(Int16)
+ - name: c5
+ type: Nullable(String)
+ - name: level
+ type: Enum8('error'=1, 'warn'=2, 'info'=3, 'debug'=4)
+ - name: ipv4
+ type: Nullable(IPv4)
+ - name: ipv6
+ type: Nullable(IPv6)
+ - name: ts
+ type: DateTime
+ - name: ts_with_tz
+ type: DateTime('Europe/Moscow')
+ - name: ts64
+ type: DateTime64(3, 'UTC')
+ - name: ts64_auto
+ type: DateTime64(9, 'UTC')
+ - name: ts_rfc3339nano
+ type: DateTime64(9)
+ - name: f32
+ type: Float32
+ - name: f64
+ type: Float64
+ - name: lc_str
+ type: LowCardinality(String)
+ - name: str_arr
+ type: Array(String)
+ - name: map_str_str
+ type: Map(String,String)
+ - name: uuid
+ type: UUID
+ - name: uuid_nullable
+ type: Nullable(UUID)
diff --git a/e2e/start_work_test.go b/e2e/start_work_test.go
index a4664a2c3..a851c9d1d 100644
--- a/e2e/start_work_test.go
+++ b/e2e/start_work_test.go
@@ -156,6 +156,11 @@ func TestE2EStabilityWorkCase(t *testing.T) {
e2eTest: &file_clickhouse.Config{},
cfgPath: "./file_clickhouse/config.yml",
},
+ {
+ name: "file_clickhouse_main_output_no_available_hosts_dlq",
+ e2eTest: &file_clickhouse.Config{},
+ cfgPath: "./file_clickhouse/config_main_output_no_available_hosts_dlq.yml",
+ },
{
name: "file_elasticsearch",
e2eTest: &file_elasticsearch.Config{
diff --git a/plugin/output/clickhouse/README.md b/plugin/output/clickhouse/README.md
index 08b1b374e..4d12e9860 100644
--- a/plugin/output/clickhouse/README.md
+++ b/plugin/output/clickhouse/README.md
@@ -215,5 +215,17 @@ After this timeout batch will be sent even if batch isn't completed.
+**`ban_period`** *`cfg.Duration`* *`default=10s`*
+
+Period for which addresses will be banned in case of unavailability.
+
+
+
+**`reconnect_interval`** *`cfg.Duration`* *`default=5s`*
+
+Interval for reconnecting to addresses that are unavailable during initialization.
+
+
+
*Generated using [__insane-doc__](https://github.com/vitkovskii/insane-doc)*
\ No newline at end of file
diff --git a/plugin/output/clickhouse/clickhouse.go b/plugin/output/clickhouse/clickhouse.go
index 7f6c8366a..1675d9d1a 100644
--- a/plugin/output/clickhouse/clickhouse.go
+++ b/plugin/output/clickhouse/clickhouse.go
@@ -3,11 +3,13 @@ package clickhouse
import (
"bytes"
"context"
+ "crypto/tls"
"encoding/json"
"errors"
"fmt"
"net"
"strings"
+ "sync"
"time"
"github.com/ClickHouse/ch-go"
@@ -17,6 +19,7 @@ import (
"github.com/ozontech/file.d/fd"
"github.com/ozontech/file.d/metric"
"github.com/ozontech/file.d/pipeline"
+ "github.com/ozontech/file.d/xtime"
"github.com/ozontech/file.d/xtls"
"go.uber.org/atomic"
"go.uber.org/zap"
@@ -37,11 +40,18 @@ const (
outPluginType = "clickhouse"
)
+var errNoAvailableClickhouseAddresses = errors.New("no available clickhouse addresses")
+
type Clickhouse interface {
Close()
Do(ctx context.Context, query ch.Query) error
}
+type instance struct {
+ addr Address
+ pool Clickhouse
+}
+
type Plugin struct {
logger *zap.Logger
@@ -53,7 +63,7 @@ type Plugin struct {
query string
// TODO: support shards
- instances []Clickhouse
+ instances []instance
requestID atomic.Int64
// plugin metrics
@@ -61,6 +71,14 @@ type Plugin struct {
queriesCountMetric *metric.Counter
router *pipeline.Router
+
+ compression ch.Compression
+ tlsConfig *tls.Config
+
+ poolsByAddr map[Address]Clickhouse
+ bannedHosts map[Address]time.Time
+ pendingHosts map[Address]struct{}
+ mu sync.RWMutex
}
type Setting struct {
@@ -334,6 +352,18 @@ type Config struct {
// > After this timeout batch will be sent even if batch isn't completed.
BatchFlushTimeout cfg.Duration `json:"batch_flush_timeout" default:"200ms" parse:"duration"` // *
BatchFlushTimeout_ time.Duration
+
+ // > @3@4@5@6
+ // >
+ // > Period for which addresses will be banned in case of unavailability.
+ BanPeriod cfg.Duration `json:"ban_period" default:"10s" parse:"duration"` // *
+ BanPeriod_ time.Duration
+
+ // > @3@4@5@6
+ // >
+ // > Interval for reconnecting to addresses that are unavailable during initialization.
+ ReconnectInterval cfg.Duration `json:"reconnect_interval" default:"5s" parse:"duration"` // *
+ ReconnectInterval_ time.Duration
}
func init() {
@@ -354,8 +384,16 @@ func (p *Plugin) registerMetrics(ctl *metric.Ctl) {
func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginParams) {
p.logger = params.Logger.Desugar()
-
p.config = config.(*Config)
+
+ if len(p.config.Addresses) == 0 {
+ p.logger.Fatal("config.addresses can't be empty")
+ }
+
+ p.bannedHosts = make(map[Address]time.Time, len(p.config.Addresses))
+ p.pendingHosts = make(map[Address]struct{}, len(p.config.Addresses))
+ p.poolsByAddr = make(map[Address]Clickhouse, len(p.config.Addresses))
+
p.registerMetrics(params.MetricCtl)
p.ctx, p.cancelFunc = context.WithCancel(context.Background())
@@ -365,6 +403,12 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP
if p.config.InsertTimeout_ < 1 {
p.logger.Fatal("'db_request_timeout' can't be <1")
}
+ if p.config.ReconnectInterval_ < 1 {
+ p.logger.Fatal("'reconnect_interval' can't be <1")
+ }
+ if p.config.BanPeriod_ < 1 {
+ p.logger.Fatal("'ban_period' cant't be <1")
+ }
schema, err := inferInsaneColInputs(p.config.Columns)
if err != nil {
@@ -380,64 +424,56 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP
p.config.InsertStrategy_ = StrategyInOrder
}
- var compression ch.Compression
switch strings.ToLower(p.config.Compression) {
default:
fallthrough
case "disabled":
- compression = ch.CompressionDisabled
+ p.compression = ch.CompressionDisabled
case "lz4":
- compression = ch.CompressionLZ4
+ p.compression = ch.CompressionLZ4
case "zstd":
- compression = ch.CompressionZSTD
+ p.compression = ch.CompressionZSTD
case "none":
- compression = ch.CompressionNone
+ p.compression = ch.CompressionNone
}
- var b xtls.ConfigBuilder
if p.config.CACert != "" {
b := xtls.NewConfigBuilder()
- err := b.AppendCARoot(p.config.CACert)
- if err != nil {
+ if err := b.AppendCARoot(p.config.CACert); err != nil {
p.logger.Fatal("can't append CA root", zap.Error(err))
}
+ p.tlsConfig = b.Build()
}
for _, addr := range p.config.Addresses {
- addr.Addr = addrWithDefaultPort(addr.Addr, "9000")
- pool, err := chpool.New(p.ctx, chpool.Options{
- ClientOptions: ch.Options{
- Logger: p.logger.Named("driver"),
- Address: addr.Addr,
- Database: p.config.Database,
- User: p.config.User,
- Password: p.config.Password,
- QuotaKey: p.config.QuotaKey,
- Compression: compression,
- Settings: p.config.ClickhouseSettings.toProtoSettings(),
- DialTimeout: time.Second * 10,
- TLS: b.Build(),
- HandshakeTimeout: time.Minute,
- },
- MaxConnLifetime: p.config.MaxConnLifetime_,
- MaxConnIdleTime: p.config.MaxConnIdleTime_,
- MaxConns: p.config.MaxConns_,
- MinConns: p.config.MinConns_,
- HealthCheckPeriod: p.config.HealthCheckPeriod_,
- })
+ pool, err := p.createConnection(addr)
if err != nil {
- p.logger.Error("create clickhouse connection pool", zap.Error(err), zap.String("addr", addr.Addr))
- } else {
- for j := 0; j < *addr.Weight; j++ {
- p.instances = append(p.instances, pool)
+ var netError net.Error
+ if errors.As(err, &netError) {
+ p.pendingHosts[addr] = struct{}{}
}
+ p.logger.Error("create clickhouse connection pool", zap.Error(err), zap.String("addr", addr.Addr))
+ continue
+ }
+ p.poolsByAddr[addr] = pool
+ for j := 0; j < *addr.Weight; j++ {
+ p.instances = append(p.instances, instance{
+ addr: addr,
+ pool: pool,
+ })
}
}
- if len(p.instances) == 0 {
+ if len(p.instances) == 0 && p.config.FatalOnFailedInsert {
p.logger.Fatal("cannot start: no available clickhouse addresses in config")
}
+ go p.checkBannedHosts()
+
+ if len(p.pendingHosts) > 0 {
+ go p.checkPendingHosts()
+ }
+
batcherOpts := pipeline.BatcherOptions{
PipelineName: params.PipelineName,
OutputType: outPluginType,
@@ -484,11 +520,99 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP
p.batcher.Start(p.ctx)
}
+func (p *Plugin) createConnection(addr Address) (*chpool.Pool, error) {
+ addr.Addr = addrWithDefaultPort(addr.Addr, "9000")
+ return chpool.New(p.ctx, chpool.Options{
+ ClientOptions: ch.Options{
+ Logger: p.logger.Named("driver"),
+ Address: addr.Addr,
+ Database: p.config.Database,
+ User: p.config.User,
+ Password: p.config.Password,
+ QuotaKey: p.config.QuotaKey,
+ Compression: p.compression,
+ Settings: p.config.ClickhouseSettings.toProtoSettings(),
+ DialTimeout: time.Second * 10,
+ TLS: p.tlsConfig,
+ HandshakeTimeout: time.Minute,
+ },
+ MaxConnLifetime: p.config.MaxConnLifetime_,
+ MaxConnIdleTime: p.config.MaxConnIdleTime_,
+ MaxConns: p.config.MaxConns_,
+ MinConns: p.config.MinConns_,
+ HealthCheckPeriod: p.config.HealthCheckPeriod_,
+ })
+}
+
+func (p *Plugin) checkPendingHosts() {
+ ticker := time.NewTicker(p.config.ReconnectInterval_)
+ defer ticker.Stop()
+
+ for {
+ select {
+ case <-p.ctx.Done():
+ return
+ case <-ticker.C:
+ if len(p.pendingHosts) == 0 {
+ return
+ }
+
+ for addr := range p.pendingHosts {
+ pool, err := p.createConnection(addr)
+ if err != nil {
+ p.logger.Error("failed to reconnect to pending host", zap.Error(err), zap.String("addr", addr.Addr))
+ continue
+ }
+
+ p.mu.Lock()
+ p.poolsByAddr[addr] = pool
+ for j := 0; j < *addr.Weight; j++ {
+ p.instances = append(p.instances, instance{
+ addr: addr,
+ pool: pool,
+ })
+ }
+ delete(p.pendingHosts, addr)
+ p.mu.Unlock()
+ }
+ }
+ }
+}
+
+func (p *Plugin) checkBannedHosts() {
+ ticker := time.NewTicker(p.config.ReconnectInterval_)
+ defer ticker.Stop()
+
+ for {
+ select {
+ case <-p.ctx.Done():
+ return
+ case <-ticker.C:
+ p.mu.Lock()
+ for addr, banUntil := range p.bannedHosts {
+ if !xtime.GetInaccurateTime().After(banUntil) {
+ continue
+ }
+ pool := p.poolsByAddr[addr]
+ for i := 0; i < *addr.Weight; i++ {
+ p.instances = append(p.instances, instance{
+ addr: addr,
+ pool: pool,
+ })
+ }
+ delete(p.bannedHosts, addr)
+ }
+ p.mu.Unlock()
+ }
+ }
+}
+
func (p *Plugin) Stop() {
p.cancelFunc()
p.batcher.Stop()
- for _, clickhouse := range p.instances {
- clickhouse.Close()
+
+ for _, pool := range p.poolsByAddr {
+ pool.Close()
}
}
@@ -551,21 +675,52 @@ func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) err
}
})
+ p.mu.RLock()
+ attempts := len(p.instances)
+ p.mu.RUnlock()
+
var err error
- for i := range p.instances {
+ if attempts == 0 {
+ if p.config.FatalOnFailedInsert {
+ p.logger.Fatal("cannot start: no available clickhouse addresses in config")
+ }
+
+ err = errNoAvailableClickhouseAddresses
+ p.insertErrorsMetric.Inc()
+ p.logger.Error("no available clickhouse addresses", zap.Error(err))
+
+ return err
+ }
+
+ for i := 0; i < attempts; i++ {
requestID := p.requestID.Inc()
- clickhouse := p.getInstance(requestID, i)
- err = p.do(clickhouse, data.input)
+
+ p.mu.RLock()
+ if len(p.instances) == 0 {
+ p.mu.RUnlock()
+ err = errNoAvailableClickhouseAddresses
+ break
+ }
+ instance := p.getInstance(requestID, i)
+ p.mu.RUnlock()
+
+ err = p.do(instance.pool, data.input)
if err == nil {
return nil
}
+
+ var netError net.Error
+ if errors.As(err, &netError) {
+ p.banInstance(instance.addr)
+ }
}
if err != nil {
p.insertErrorsMetric.Inc()
- p.logger.Error(
- "an attempt to insert a batch failed",
- zap.Error(err),
- )
+ if errors.Is(err, errNoAvailableClickhouseAddresses) {
+ p.logger.Error("no available clickhouse addresses", zap.Error(err))
+ } else {
+ p.logger.Error("an attempt to insert a batch failed", zap.Error(err))
+ }
}
return err
@@ -583,7 +738,21 @@ func (p *Plugin) do(clickhouse Clickhouse, queryInput proto.Input) error {
})
}
-func (p *Plugin) getInstance(requestID int64, retry int) Clickhouse {
+func (p *Plugin) banInstance(addr Address) {
+ p.mu.Lock()
+ defer p.mu.Unlock()
+
+ filtered := p.instances[:0]
+ for _, it := range p.instances {
+ if it.addr != addr {
+ filtered = append(filtered, it)
+ }
+ }
+ p.instances = filtered
+ p.bannedHosts[addr] = xtime.GetInaccurateTime().Add(p.config.BanPeriod_)
+}
+
+func (p *Plugin) getInstance(requestID int64, retry int) instance {
var instanceIdx int
switch p.config.InsertStrategy_ {
case StrategyInOrder:
diff --git a/plugin/output/clickhouse/clickhouse_test.go b/plugin/output/clickhouse/clickhouse_test.go
index 9adf5afa4..14829360a 100644
--- a/plugin/output/clickhouse/clickhouse_test.go
+++ b/plugin/output/clickhouse/clickhouse_test.go
@@ -14,7 +14,7 @@ func TestPlugin_getInstance(t *testing.T) {
ctrl := gomock.NewController(t)
- instances := []Clickhouse{
+ pools := []Clickhouse{
mockclickhouse.NewMockClickhouse(ctrl),
mockclickhouse.NewMockClickhouse(ctrl),
mockclickhouse.NewMockClickhouse(ctrl),
@@ -22,16 +22,32 @@ func TestPlugin_getInstance(t *testing.T) {
mockclickhouse.NewMockClickhouse(ctrl),
}
+ addrs := []Address{
+ {Addr: "addr1", Weight: intPtr(1)},
+ {Addr: "addr2", Weight: intPtr(2)},
+ {Addr: "addr3", Weight: intPtr(3)},
+ {Addr: "addr4", Weight: intPtr(4)},
+ {Addr: "addr5", Weight: intPtr(5)},
+ }
+
+ instances := []instance{
+ {addr: addrs[0], pool: pools[0]},
+ {addr: addrs[1], pool: pools[1]},
+ {addr: addrs[2], pool: pools[2]},
+ {addr: addrs[3], pool: pools[3]},
+ {addr: addrs[4], pool: pools[4]},
+ }
+
type args struct {
id int64
retry int
}
tests := []struct {
name string
- instances []Clickhouse
+ instances []instance
stategy InsertStrategy
args args
- want Clickhouse
+ want instance
}{
// in-order
{
@@ -223,3 +239,7 @@ func TestAddress_UnmarshalJSON(t *testing.T) {
})
}
}
+
+func intPtr(a int) *int {
+ return &a
+}