Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 16 additions & 4 deletions e2e/file_clickhouse/clickhouse_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,16 @@ type Config struct {
conn *ch.Client
samples []Sample
sampleTime time.Time

tableName string
missingTableName string
}

func (c *Config) Configure(t *testing.T, conf *cfg.Config, pipelineName string) {
r := require.New(t)

c.tableName = "test_table_insert_" + pipelineName
c.missingTableName = "test_table_insert_not_exists_" + pipelineName
c.ctx, c.cancel = context.WithTimeout(context.Background(), time.Minute*2)

conn, err := ch.Dial(c.ctx, ch.Options{
Expand All @@ -40,11 +46,11 @@ func (c *Config) Configure(t *testing.T, conf *cfg.Config, pipelineName string)
c.conn = conn

err = conn.Do(c.ctx, ch.Query{
Body: `DROP TABLE IF EXISTS test_table_insert`})
Body: `DROP TABLE IF EXISTS ` + c.tableName})
r.NoError(err)

err = conn.Do(c.ctx, ch.Query{
Body: `CREATE TABLE IF NOT EXISTS test_table_insert
Body: `CREATE TABLE IF NOT EXISTS ` + c.tableName + `
(
c1 String,
c2 Int8,
Expand Down Expand Up @@ -79,6 +85,12 @@ func (c *Config) Configure(t *testing.T, conf *cfg.Config, pipelineName string)
input.Set("filename_pattern", "input.log")
input.Set("offsets_file", filepath.Join(offsetsDir, "offsets.yaml"))

output := conf.Pipelines[pipelineName].Raw.Get("output")
output.Set("table", c.missingTableName)

dq := output.Get("deadqueue")
dq.Set("table", c.tableName)

c.sampleTime = time.Now()
c.samples = []Sample{
{
Expand Down Expand Up @@ -172,7 +184,7 @@ func (c *Config) Validate(t *testing.T) {
for range 1000 {
cnt := proto.ColUInt64{}
err := c.conn.Do(c.ctx, ch.Query{
Body: `select count(*) from test_table_insert`,
Body: `select count(*) from ` + c.tableName,
Result: proto.Results{
{Name: "count()", Data: &cnt},
},
Expand Down Expand Up @@ -216,7 +228,7 @@ func (c *Config) Validate(t *testing.T) {
sampleIdx := 0
r.NoError(c.conn.Do(c.ctx, ch.Query{
Body: `select c1, c2, c3, c4, c5, level, ipv4, ipv6, ts, ts_with_tz, ts64, ts64_auto, ts_rfc3339nano, f32, f64, lc_str, str_arr, map_str_str, uuid, uuid_nullable
from test_table_insert
from ` + c.tableName + `
order by c1`,
Result: proto.Results{
proto.ResultColumn{Name: "c1", Data: c1},
Expand Down
117 changes: 117 additions & 0 deletions e2e/file_clickhouse/config_main_output_no_available_hosts_dlq.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
pipelines:
file_clickhouse_main_output_no_available_hosts_dlq:
input:
type: file
actions:
- type: set_time
format: unixtime
field: ts
override: false
- type: set_time
format: unixtime
field: ts_with_tz
override: false
- type: set_time
format: timestampnano
field: ts64_auto
override: true
- type: set_time
format: rfc3339nano
field: ts_rfc3339nano
override: true
- type: debug
output:
deadqueue:
type: clickhouse
addresses:
- 127.0.0.1:9001
table: test_table_insert
insert_timeout: 1m
columns:
- name: c1
type: String
- name: c2
type: Int8
- name: c3
type: Int16
- name: c4
type: Nullable(Int16)
- name: c5
type: Nullable(String)
- name: level
type: Enum8('error'=1, 'warn'=2, 'info'=3, 'debug'=4)
- name: ipv4
type: Nullable(IPv4)
- name: ipv6
type: Nullable(IPv6)
- name: ts
type: DateTime
- name: ts_with_tz
type: DateTime('Europe/Moscow')
- name: ts64
type: DateTime64(3, 'UTC')
- name: ts64_auto
type: DateTime64(9, 'UTC')
- name: ts_rfc3339nano
type: DateTime64(9)
- name: f32
type: Float32
- name: f64
type: Float64
- name: lc_str
type: LowCardinality(String)
- name: str_arr
type: Array(String)
- name: map_str_str
type: Map(String,String)
- name: uuid
type: UUID
- name: uuid_nullable
type: Nullable(UUID)
type: clickhouse
addresses:
- doesnotexist
table: test_table_insert_not_exists
insert_timeout: 1m
retry: 0
columns:
- name: c1
type: String
- name: c2
type: Int8
- name: c3
type: Int16
- name: c4
type: Nullable(Int16)
- name: c5
type: Nullable(String)
- name: level
type: Enum8('error'=1, 'warn'=2, 'info'=3, 'debug'=4)
- name: ipv4
type: Nullable(IPv4)
- name: ipv6
type: Nullable(IPv6)
- name: ts
type: DateTime
- name: ts_with_tz
type: DateTime('Europe/Moscow')
- name: ts64
type: DateTime64(3, 'UTC')
- name: ts64_auto
type: DateTime64(9, 'UTC')
- name: ts_rfc3339nano
type: DateTime64(9)
- name: f32
type: Float32
- name: f64
type: Float64
- name: lc_str
type: LowCardinality(String)
- name: str_arr
type: Array(String)
- name: map_str_str
type: Map(String,String)
- name: uuid
type: UUID
- name: uuid_nullable
type: Nullable(UUID)
5 changes: 5 additions & 0 deletions e2e/start_work_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,11 @@ func TestE2EStabilityWorkCase(t *testing.T) {
e2eTest: &file_clickhouse.Config{},
cfgPath: "./file_clickhouse/config.yml",
},
{
name: "file_clickhouse_main_output_no_available_hosts_dlq",
e2eTest: &file_clickhouse.Config{},
cfgPath: "./file_clickhouse/config_main_output_no_available_hosts_dlq.yml",
},
{
name: "file_elasticsearch",
e2eTest: &file_elasticsearch.Config{
Expand Down
12 changes: 12 additions & 0 deletions plugin/output/clickhouse/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -215,5 +215,17 @@ After this timeout batch will be sent even if batch isn't completed.

<br>

**`ban_period`** *`cfg.Duration`* *`default=10s`*

Period for which addresses will be banned in case of unavailability.

<br>

**`reconnect_interval`** *`cfg.Duration`* *`default=5s`*

Interval for reconnecting to addresses that are unavailable during initialization.

<br>


<br>*Generated using [__insane-doc__](https://github.com/vitkovskii/insane-doc)*
Loading
Loading