diff --git a/e2e/file_elasticsearch/config.yml b/e2e/file_elasticsearch/config.yml
index 47063cb3f..d5967fe36 100644
--- a/e2e/file_elasticsearch/config.yml
+++ b/e2e/file_elasticsearch/config.yml
@@ -9,7 +9,7 @@ pipelines:
output:
type: elasticsearch
endpoints:
- - http://localhost:9200
+ - http://localhost:19200
username: SOME_USERNAME
password: SOME_PASSWORD
index_format: SOME_INDEX
diff --git a/e2e/file_elasticsearch/config_cb.yml b/e2e/file_elasticsearch/config_cb.yml
new file mode 100644
index 000000000..584661e4e
--- /dev/null
+++ b/e2e/file_elasticsearch/config_cb.yml
@@ -0,0 +1,19 @@
+pipelines:
+ file_elasticsearch_cb:
+ input:
+ type: file
+ persistence_mode: async
+ watching_dir: SOME_DIR
+ offsets_file: SOME_FILE
+ offsets_op: reset
+ output:
+ type: elasticsearch
+ endpoints:
+ - http://localhost:19200
+ - http://localhost:19201
+ username: SOME_USERNAME
+ password: SOME_PASSWORD
+ index_format: SOME_INDEX
+ batch_size: 2
+ ban_period: 6s
+ reconnect_interval: 3s
diff --git a/e2e/file_elasticsearch/docker-compose.yml b/e2e/file_elasticsearch/docker-compose.yml
index 162df6eef..17a65833f 100644
--- a/e2e/file_elasticsearch/docker-compose.yml
+++ b/e2e/file_elasticsearch/docker-compose.yml
@@ -1,4 +1,3 @@
-# https://github.com/elastic/start-local/tree/main
services:
elasticsearch:
image: elasticsearch:8.17.0
@@ -20,3 +19,23 @@ services:
interval: 10s
timeout: 10s
retries: 10
+ elasticsearch2:
+ image: elasticsearch:8.17.0
+ container_name: es-local-test-2
+ ports:
+ - "19201:9200"
+ environment:
+ - discovery.type=single-node
+ - ELASTIC_PASSWORD=elastic
+ - xpack.security.enabled=true
+ - xpack.security.http.ssl.enabled=false
+ mem_limit: 1073741824
+ healthcheck:
+ test:
+ [
+ "CMD-SHELL",
+ "curl --output /dev/null --silent --head --fail -u elastic:elastic http://elasticsearch2:19201",
+ ]
+ interval: 10s
+ timeout: 10s
+ retries: 10
diff --git a/e2e/file_elasticsearch/file_elasticsearch.go b/e2e/file_elasticsearch/file_elasticsearch.go
index 7ada448dd..ae3512a70 100644
--- a/e2e/file_elasticsearch/file_elasticsearch.go
+++ b/e2e/file_elasticsearch/file_elasticsearch.go
@@ -18,13 +18,13 @@ import (
// Config for file-elasticsearch plugin e2e test
type Config struct {
- Count int
- Endpoint string
- Pipeline string
- Username string
- Password string
- dir string
- index string
+ Count int
+ Endpoints []string
+ Pipeline string
+ Username string
+ Password string
+ dir string
+ index string
}
// Configure sets additional fields for input and output plugins
@@ -43,10 +43,12 @@ func (c *Config) Configure(t *testing.T, conf *cfg.Config, pipelineName string)
output.Set("ingest_pipeline", c.Pipeline)
output.Set("username", c.Username)
output.Set("password", c.Password)
- output.Set("endpoints", []string{c.Endpoint})
+ output.Set("endpoints", c.Endpoints)
- err := createIngestPipeline(c.Endpoint, c.Pipeline, c.Username, c.Password)
- require.NoError(t, err)
+ for _, endpoint := range c.Endpoints {
+ err := createIngestPipeline(endpoint, c.Pipeline, c.Username, c.Password, c.Count)
+ require.NoError(t, err)
+ }
}
// Send creates file and writes messages
@@ -63,12 +65,19 @@ func (c *Config) Send(t *testing.T) {
// Validate waits for the message processing to complete
func (c *Config) Validate(t *testing.T) {
- err := waitUntilIndexReady(c.Endpoint, c.index, c.Username, c.Password, c.Count, 10, 250*time.Millisecond)
- require.NoError(t, err)
- docs, err := getDocumentsFromIndex(c.Endpoint, c.index, c.Username, c.Password)
+ err := waitUntilIndexReady(c.Endpoints, c.index, c.Username, c.Password, c.Count, 10, 250*time.Millisecond)
require.NoError(t, err)
- require.Len(t, docs, c.Count)
- for _, doc := range docs {
+
+ allDocs := make([]map[string]any, 0, c.Count)
+ for _, endpoint := range c.Endpoints {
+ docs, err := getDocumentsFromIndex(endpoint, c.index, c.Username, c.Password)
+ require.NoError(t, err)
+ t.Logf("endpoint %s: %d docs", endpoint, len(docs))
+ allDocs = append(allDocs, docs...)
+ }
+
+ require.Len(t, allDocs, c.Count)
+ for _, doc := range allDocs {
if _, ok := doc["processed_at"]; !ok {
t.Errorf("doc %v doesn't have processed_at field", doc)
}
diff --git a/e2e/file_elasticsearch/helpers.go b/e2e/file_elasticsearch/helpers.go
index 42269eeba..11145499e 100644
--- a/e2e/file_elasticsearch/helpers.go
+++ b/e2e/file_elasticsearch/helpers.go
@@ -9,41 +9,55 @@ import (
"time"
)
-func createIngestPipeline(elasticURL, pipelineID, username, password string) error {
+func createIngestPipeline(elasticURL, pipelineID, username, password string, retries int) error {
url := fmt.Sprintf("%s/_ingest/pipeline/%s", elasticURL, pipelineID)
-
pipelineBody := `{"description":"test ingest pipeline","processors":[{"set":{"field":"processed_at","value":"{{_ingest.timestamp}}"}}]}`
- req, err := http.NewRequest(http.MethodPut, url, strings.NewReader(pipelineBody))
- if err != nil {
- return fmt.Errorf("failed to create request: %w", err)
- }
+ var err error
+ for i := range retries {
+ err = func() error {
+ req, err := http.NewRequest(http.MethodPut, url, strings.NewReader(pipelineBody))
+ if err != nil {
+ return fmt.Errorf("failed to create request: %w", err)
+ }
- req.Header.Set("Content-Type", "application/json")
- if username != "" && password != "" {
- req.SetBasicAuth(username, password)
- }
+ req.Header.Set("Content-Type", "application/json")
+ if username != "" && password != "" {
+ req.SetBasicAuth(username, password)
+ }
- client := &http.Client{Timeout: time.Second}
- resp, err := client.Do(req)
- if err != nil {
- return fmt.Errorf("failed to make HTTP request: %w", err)
- }
- defer func() { _ = resp.Body.Close() }()
+ client := &http.Client{Timeout: time.Second}
+ resp, err := client.Do(req)
+ if err != nil {
+ return fmt.Errorf("failed to make HTTP request: %w", err)
+ }
+ defer func() { _ = resp.Body.Close() }()
- respBody, err := io.ReadAll(resp.Body)
- if err != nil {
- return fmt.Errorf("failed to read body response: %w", err)
- }
+ respBody, err := io.ReadAll(resp.Body)
+ if err != nil {
+ return fmt.Errorf("failed to read body response: %w", err)
+ }
+
+ if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated {
+ return fmt.Errorf("unexpected status: %d, body: %s", resp.StatusCode, string(respBody))
+ }
- if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated {
- return fmt.Errorf("unexpected status: %d, body: %s", resp.StatusCode, string(respBody))
+ return nil
+ }()
+
+ if err == nil {
+ return nil
+ }
+
+ if i < retries-1 {
+ time.Sleep(time.Second)
+ }
}
- return nil
+ return fmt.Errorf("can't create pipeline after %d retries: %w", retries, err)
}
-func getDocumentsFromIndex(elasticURL, indexName, username, password string) ([]map[string]interface{}, error) {
+func getDocumentsFromIndex(elasticURL, indexName, username, password string) ([]map[string]any, error) {
url := fmt.Sprintf("%s/%s/_search", elasticURL, indexName)
body := `{"query":{"match_all":{}}}`
@@ -77,7 +91,7 @@ func getDocumentsFromIndex(elasticURL, indexName, username, password string) ([]
type searchResponse struct {
Hits struct {
Hits []struct {
- Source map[string]interface{} `json:"_source"`
+ Source map[string]any `json:"_source"`
} `json:"hits"`
} `json:"hits"`
}
@@ -87,7 +101,7 @@ func getDocumentsFromIndex(elasticURL, indexName, username, password string) ([]
return nil, fmt.Errorf("failed to decode response: %w", err)
}
- resultDocs := make([]map[string]interface{}, 0, len(result.Hits.Hits))
+ resultDocs := make([]map[string]any, 0, len(result.Hits.Hits))
for _, hit := range result.Hits.Hits {
resultDocs = append(resultDocs, hit.Source)
@@ -96,15 +110,11 @@ func getDocumentsFromIndex(elasticURL, indexName, username, password string) ([]
return resultDocs, nil
}
-func waitUntilIndexReady(elasticURL, indexName, username, password string, minDocs, retries int, delay time.Duration) error {
- client := &http.Client{
- Timeout: time.Second,
- }
-
+func getDocCount(client *http.Client, elasticURL, indexName, username, password string) (int, error) {
url := fmt.Sprintf("%s/%s/_count", elasticURL, indexName)
req, err := http.NewRequest(http.MethodGet, url, http.NoBody)
if err != nil {
- return fmt.Errorf("failed to create request: %w", err)
+ return 0, fmt.Errorf("failed to create request: %w", err)
}
req.Header.Set("Content-Type", "application/json")
@@ -112,47 +122,54 @@ func waitUntilIndexReady(elasticURL, indexName, username, password string, minDo
req.SetBasicAuth(username, password)
}
- for i := 0; i < retries; i++ {
- ok, err := func() (bool, error) {
- resp, err := client.Do(req)
- if err != nil {
- return false, fmt.Errorf("failed to make request: %w", err)
- }
- defer func() { _ = resp.Body.Close() }()
+ resp, err := client.Do(req)
+ if err != nil {
+ return 0, fmt.Errorf("failed to make request: %w", err)
+ }
+ defer func() { _ = resp.Body.Close() }()
- if resp.StatusCode == http.StatusNotFound || resp.StatusCode == http.StatusServiceUnavailable {
- return false, nil
- }
+ if resp.StatusCode == http.StatusNotFound || resp.StatusCode == http.StatusServiceUnavailable {
+ return 0, nil
+ }
- if resp.StatusCode != http.StatusOK {
- return false, fmt.Errorf("unexpected status code: %d", resp.StatusCode)
- }
+ if resp.StatusCode != http.StatusOK {
+ return 0, fmt.Errorf("unexpected status code: %d", resp.StatusCode)
+ }
- body, err := io.ReadAll(resp.Body)
- if err != nil {
- return false, fmt.Errorf("failed to read response: %w", err)
- }
+ body, err := io.ReadAll(resp.Body)
+ if err != nil {
+ return 0, fmt.Errorf("failed to read response: %w", err)
+ }
- var result map[string]interface{}
- if err := json.Unmarshal(body, &result); err != nil {
- return false, fmt.Errorf("failed to decode response: %w", err)
- }
+ var result map[string]any
+ if err := json.Unmarshal(body, &result); err != nil {
+ return 0, fmt.Errorf("failed to decode response: %w", err)
+ }
- if count, ok := result["count"].(float64); ok {
- if int(count) >= minDocs {
- return true, nil
- }
- } else {
- return false, fmt.Errorf("unexpected response structure")
- }
+ count, ok := result["count"].(float64)
+ if !ok {
+ return 0, fmt.Errorf("unexpected response structure")
+ }
- return false, nil
- }()
+ return int(count), nil
+}
+
+func waitUntilIndexReady(elasticURLs []string, indexName, username, password string, minDocs, retries int, delay time.Duration) error {
+ client := &http.Client{
+ Timeout: time.Second,
+ }
- if err != nil {
- return err
+ for range retries {
+ total := 0
+ for _, elasticURL := range elasticURLs {
+ count, err := getDocCount(client, elasticURL, indexName, username, password)
+ if err != nil {
+ return err
+ }
+ total += count
}
- if ok {
+
+ if total >= minDocs {
return nil
}
time.Sleep(delay)
diff --git a/e2e/start_work_test.go b/e2e/start_work_test.go
index c7c349eb2..7dcaca2c2 100644
--- a/e2e/start_work_test.go
+++ b/e2e/start_work_test.go
@@ -166,14 +166,25 @@ func TestE2EStabilityWorkCase(t *testing.T) {
{
name: "file_elasticsearch",
e2eTest: &file_elasticsearch.Config{
- Count: 10,
- Pipeline: "test-ingest-pipeline",
- Endpoint: "http://localhost:19200",
- Username: "elastic",
- Password: "elastic",
+ Count: 10,
+ Pipeline: "test-ingest-pipeline",
+ Endpoints: []string{"http://localhost:19200"},
+ Username: "elastic",
+ Password: "elastic",
},
cfgPath: "./file_elasticsearch/config.yml",
},
+ {
+ name: "file_elasticsearch_cb",
+ e2eTest: &file_elasticsearch.Config{
+ Count: 10,
+ Pipeline: "test-ingest-pipeline",
+ Endpoints: []string{"http://localhost:19200", "http://localhost:19201"},
+ Username: "elastic",
+ Password: "elastic",
+ },
+ cfgPath: "./file_elasticsearch/config_cb.yml",
+ },
{
name: "file_es_split",
e2eTest: &file_es_split.Config{},
diff --git a/plugin/output/elasticsearch/README.md b/plugin/output/elasticsearch/README.md
index bcf034035..03df043be 100755
--- a/plugin/output/elasticsearch/README.md
+++ b/plugin/output/elasticsearch/README.md
@@ -170,5 +170,18 @@ Process ES response and report errors, if any.
+**`ban_period`** *`cfg.Duration`* *`default=10s`*
+
+Period for which addresses will be banned in case of unavailability.
+If set to 0, circuit breaker is disabled.
+
+
+
+**`reconnect_interval`** *`cfg.Duration`* *`default=5s`*
+
+Interval for checking banned endpoints availability.
+
+
+
*Generated using [__insane-doc__](https://github.com/vitkovskii/insane-doc)*
\ No newline at end of file
diff --git a/plugin/output/elasticsearch/elasticsearch.go b/plugin/output/elasticsearch/elasticsearch.go
index 4bdaea55e..dbf5d801e 100644
--- a/plugin/output/elasticsearch/elasticsearch.go
+++ b/plugin/output/elasticsearch/elasticsearch.go
@@ -203,6 +203,19 @@ type Config struct {
// >
// > Process ES response and report errors, if any.
ProcessResponse bool `json:"process_response" default:"true"` // *
+
+ // > @3@4@5@6
+ // >
+ // > Period for which addresses will be banned in case of unavailability.
+ // > If set to 0, circuit breaker is disabled.
+ BanPeriod cfg.Duration `json:"ban_period" default:"10s" parse:"duration"` // *
+ BanPeriod_ time.Duration
+
+ // > @3@4@5@6
+ // >
+ // > Interval for checking banned endpoints availability.
+ ReconnectInterval cfg.Duration `json:"reconnect_interval" default:"5s" parse:"duration"` // *
+ ReconnectInterval_ time.Duration
}
type KeepAliveConfig struct {
@@ -243,8 +256,17 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP
if len(p.config.IndexValues) == 0 {
p.config.IndexValues = append(p.config.IndexValues, "@time")
}
+ if p.config.ReconnectInterval_ < 1 {
+ p.logger.Fatal("'reconnect_interval' can't be <1")
+ }
+ if p.config.BanPeriod_ < 0 {
+ p.logger.Fatal("'ban_period' cant't be <0")
+ }
+
+ ctx, cancel := context.WithCancel(context.Background())
+ p.cancel = cancel
- p.prepareClient()
+ p.prepareClient(ctx)
p.maintenance(nil)
@@ -295,9 +317,6 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP
onError,
)
- ctx, cancel := context.WithCancel(context.Background())
- p.cancel = cancel
-
p.batcher.Start(ctx)
}
@@ -315,11 +334,13 @@ func (p *Plugin) registerMetrics(ctl *metric.Ctl) {
p.indexingErrorsMetric = ctl.RegisterCounter("output_elasticsearch_index_error_total", "Number of elasticsearch indexing errors")
}
-func (p *Plugin) prepareClient() {
+func (p *Plugin) prepareClient(ctx context.Context) {
config := &xhttp.ClientConfig{
Endpoints: prepareEndpoints(p.config.Endpoints, p.config.IngestPipeline),
ConnectionTimeout: p.config.ConnectionTimeout_ * 2,
AuthHeader: p.getAuthHeader(),
+ BanPeriod: p.config.BanPeriod_,
+ ReconnectInterval: p.config.ReconnectInterval_,
KeepAlive: &xhttp.ClientKeepAliveConfig{
MaxConnDuration: p.config.KeepAlive.MaxConnDuration_,
MaxIdleConnDuration: p.config.KeepAlive.MaxIdleConnDuration_,
@@ -335,7 +356,7 @@ func (p *Plugin) prepareClient() {
}
var err error
- p.client, err = xhttp.NewClient(config)
+ p.client, err = xhttp.NewClient(ctx, config)
if err != nil {
p.logger.Fatal("can't create http client", zap.Error(err))
}
diff --git a/plugin/output/http/README.md b/plugin/output/http/README.md
index 3c3810aaa..7c6a51b66 100755
--- a/plugin/output/http/README.md
+++ b/plugin/output/http/README.md
@@ -144,4 +144,17 @@ After a non-retryable write error, fall with a non-zero exit code or not
+**`ban_period`** *`cfg.Duration`* *`default=10s`*
+
+Period for which addresses will be banned in case of unavailability.
+If set to 0, circuit breaker is disabled.
+
+
+
+**`reconnect_interval`** *`cfg.Duration`* *`default=5s`*
+
+Interval for checking banned endpoints availability.
+
+
+
*Generated using [__insane-doc__](https://github.com/vitkovskii/insane-doc)*
\ No newline at end of file
diff --git a/plugin/output/http/http.go b/plugin/output/http/http.go
index 03edf9c51..425658199 100644
--- a/plugin/output/http/http.go
+++ b/plugin/output/http/http.go
@@ -176,6 +176,19 @@ type Config struct {
// >
// > After a non-retryable write error, fall with a non-zero exit code or not
Strict bool `json:"strict" default:"false"` // *
+
+ // > @3@4@5@6
+ // >
+ // > Period for which addresses will be banned in case of unavailability.
+ // > If set to 0, circuit breaker is disabled.
+ BanPeriod cfg.Duration `json:"ban_period" default:"10s" parse:"duration"` // *
+ BanPeriod_ time.Duration
+
+ // > @3@4@5@6
+ // >
+ // > Interval for checking banned endpoints availability.
+ ReconnectInterval cfg.Duration `json:"reconnect_interval" default:"5s" parse:"duration"` // *
+ ReconnectInterval_ time.Duration
}
type KeepAliveConfig struct {
@@ -212,13 +225,23 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP
p.registerMetrics(params.MetricCtl)
p.mu = &sync.Mutex{}
+ if p.config.ReconnectInterval_ < 1 {
+ p.logger.Fatal("'reconnect_interval' can't be <1")
+ }
+ if p.config.BanPeriod_ < 0 {
+ p.logger.Fatal("'ban_period' cant't be <0")
+ }
+
var err error
p.encoder, err = NewEncoder(p.config.Encoding)
if err != nil {
p.logger.Fatal("can't create encoder", zap.Error(err))
}
- p.prepareClient()
+ ctx, cancel := context.WithCancel(context.Background())
+ p.cancel = cancel
+
+ p.prepareClient(ctx)
p.logger.Info("starting batcher", zap.Duration("timeout", p.config.BatchFlushTimeout_))
@@ -267,9 +290,6 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP
onError,
)
- ctx, cancel := context.WithCancel(context.Background())
- p.cancel = cancel
-
p.batcher.Start(ctx)
}
@@ -286,11 +306,13 @@ func (p *Plugin) registerMetrics(ctl *metric.Ctl) {
p.sendErrorMetric = ctl.RegisterCounterVec("output_http_send_error_total", "Total HTTP send errors", "status_code")
}
-func (p *Plugin) prepareClient() {
+func (p *Plugin) prepareClient(ctx context.Context) {
config := &xhttp.ClientConfig{
Endpoints: p.prepareEndpoints(),
ConnectionTimeout: p.config.ConnectionTimeout_ * 2,
AuthHeader: p.getAuthHeader(),
+ BanPeriod: p.config.BanPeriod_,
+ ReconnectInterval: p.config.ReconnectInterval_,
KeepAlive: &xhttp.ClientKeepAliveConfig{
MaxConnDuration: p.config.KeepAlive.MaxConnDuration_,
MaxIdleConnDuration: p.config.KeepAlive.MaxIdleConnDuration_,
@@ -306,7 +328,7 @@ func (p *Plugin) prepareClient() {
}
var err error
- p.client, err = xhttp.NewClient(config)
+ p.client, err = xhttp.NewClient(ctx, config)
if err != nil {
p.logger.Fatal("can't create http client", zap.Error(err))
}
diff --git a/plugin/output/loki/README.md b/plugin/output/loki/README.md
index 1ce47cee1..26b9fec02 100644
--- a/plugin/output/loki/README.md
+++ b/plugin/output/loki/README.md
@@ -149,5 +149,18 @@ Multiplier for exponential increase of retention between retries
+**`ban_period`** *`cfg.Duration`* *`default=10s`*
+
+Period for which addresses will be banned in case of unavailability.
+If set to 0, circuit breaker is disabled.
+
+
+
+**`reconnect_interval`** *`cfg.Duration`* *`default=5s`*
+
+Interval for checking banned endpoints availability.
+
+
+
*Generated using [__insane-doc__](https://github.com/vitkovskii/insane-doc)*
\ No newline at end of file
diff --git a/plugin/output/loki/loki.go b/plugin/output/loki/loki.go
index 160054675..85b7f8c36 100644
--- a/plugin/output/loki/loki.go
+++ b/plugin/output/loki/loki.go
@@ -178,6 +178,19 @@ type Config struct {
// >
// > Multiplier for exponential increase of retention between retries
RetentionExponentMultiplier int `json:"retention_exponentially_multiplier" default:"2"` // *
+
+ // > @3@4@5@6
+ // >
+ // > Period for which addresses will be banned in case of unavailability.
+ // > If set to 0, circuit breaker is disabled.
+ BanPeriod cfg.Duration `json:"ban_period" default:"10s" parse:"duration"` // *
+ BanPeriod_ time.Duration
+
+ // > @3@4@5@6
+ // >
+ // > Interval for checking banned endpoints availability.
+ ReconnectInterval cfg.Duration `json:"reconnect_interval" default:"5s" parse:"duration"` // *
+ ReconnectInterval_ time.Duration
}
type AuthStrategy byte
@@ -259,7 +272,18 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP
p.labels = p.parseLabels()
- p.prepareClient()
+ if p.config.ReconnectInterval_ < 1 {
+ p.logger.Fatal("'reconnect_interval' can't be <1")
+ }
+ if p.config.BanPeriod_ < 0 {
+ p.logger.Fatal("'ban_period' cant't be <0")
+ }
+
+ ctx, cancel := context.WithCancel(context.Background())
+ p.ctx = ctx
+ p.cancel = cancel
+
+ p.prepareClient(ctx)
batcherOpts := &pipeline.BatcherOptions{
PipelineName: params.PipelineName,
@@ -303,10 +327,6 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP
onError,
)
- ctx, cancel := context.WithCancel(context.Background())
- p.ctx = ctx
- p.cancel = cancel
-
p.batcher.Start(ctx)
}
@@ -430,12 +450,14 @@ func (p *Plugin) registerMetrics(ctl *metric.Ctl) {
p.sendErrorMetric = ctl.RegisterCounterVec("output_loki_send_error_total", "Total Loki send errors", "status_code")
}
-func (p *Plugin) prepareClient() {
+func (p *Plugin) prepareClient(ctx context.Context) {
config := &xhttp.ClientConfig{
Endpoints: []string{fmt.Sprintf("%s/loki/api/v1/push", p.config.Address)},
ConnectionTimeout: p.config.ConnectionTimeout_ * 2,
AuthHeader: p.getAuthHeader(),
CustomHeaders: p.getCustomHeaders(),
+ BanPeriod: p.config.BanPeriod_,
+ ReconnectInterval: p.config.ReconnectInterval_,
KeepAlive: &xhttp.ClientKeepAliveConfig{
MaxConnDuration: p.config.KeepAlive.MaxConnDuration_,
MaxIdleConnDuration: p.config.KeepAlive.MaxIdleConnDuration_,
@@ -443,7 +465,7 @@ func (p *Plugin) prepareClient() {
}
var err error
- p.client, err = xhttp.NewClient(config)
+ p.client, err = xhttp.NewClient(ctx, config)
if err != nil {
p.logger.Fatal("can't create http client", zap.Error(err))
}
diff --git a/plugin/output/splunk/README.md b/plugin/output/splunk/README.md
index b9df5013f..e6b8f2913 100755
--- a/plugin/output/splunk/README.md
+++ b/plugin/output/splunk/README.md
@@ -153,5 +153,18 @@ or the "event" key with any of its subkeys.
+**`ban_period`** *`cfg.Duration`* *`default=10s`*
+
+Period for which addresses will be banned in case of unavailability.
+If set to 0, circuit breaker is disabled.
+
+
+
+**`reconnect_interval`** *`cfg.Duration`* *`default=5s`*
+
+Interval for checking banned endpoints availability.
+
+
+
*Generated using [__insane-doc__](https://github.com/vitkovskii/insane-doc)*
\ No newline at end of file
diff --git a/plugin/output/splunk/splunk.go b/plugin/output/splunk/splunk.go
index 090ae5048..9f0e9bdf2 100644
--- a/plugin/output/splunk/splunk.go
+++ b/plugin/output/splunk/splunk.go
@@ -202,6 +202,19 @@ type Config struct {
// > Supports copying whole original event, but does not allow to copy directly to the output root
// > or the "event" key with any of its subkeys.
CopyFields []CopyField `json:"copy_fields" slice:"true"` // *
+
+ // > @3@4@5@6
+ // >
+ // > Period for which addresses will be banned in case of unavailability.
+ // > If set to 0, circuit breaker is disabled.
+ BanPeriod cfg.Duration `json:"ban_period" default:"10s" parse:"duration"` // *
+ BanPeriod_ time.Duration
+
+ // > @3@4@5@6
+ // >
+ // > Interval for checking banned endpoints availability.
+ ReconnectInterval cfg.Duration `json:"reconnect_interval" default:"5s" parse:"duration"` // *
+ ReconnectInterval_ time.Duration
}
type KeepAliveConfig struct {
@@ -235,7 +248,18 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP
p.avgEventSize = params.PipelineSettings.AvgEventSize
p.config = config.(*Config)
p.registerMetrics(params.MetricCtl)
- p.prepareClient()
+
+ if p.config.ReconnectInterval_ < 1 {
+ p.logger.Fatal("'reconnect_interval' can't be <1")
+ }
+ if p.config.BanPeriod_ < 0 {
+ p.logger.Fatal("'ban_period' cant't be <0")
+ }
+
+ ctx, cancel := context.WithCancel(context.Background())
+ p.cancel = cancel
+
+ p.prepareClient(ctx)
for _, cf := range p.config.CopyFields {
if cf.To == "" {
@@ -296,9 +320,6 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP
onError,
)
- ctx, cancel := context.WithCancel(context.Background())
- p.cancel = cancel
-
p.batcher.Start(ctx)
}
@@ -319,11 +340,13 @@ func (p *Plugin) registerMetrics(ctl *metric.Ctl) {
)
}
-func (p *Plugin) prepareClient() {
+func (p *Plugin) prepareClient(ctx context.Context) {
config := &xhttp.ClientConfig{
Endpoints: []string{p.config.Endpoint},
ConnectionTimeout: p.config.RequestTimeout_,
AuthHeader: "Splunk " + p.config.Token,
+ BanPeriod: p.config.BanPeriod_,
+ ReconnectInterval: p.config.ReconnectInterval_,
KeepAlive: &xhttp.ClientKeepAliveConfig{
MaxConnDuration: p.config.KeepAlive.MaxConnDuration_,
MaxIdleConnDuration: p.config.KeepAlive.MaxIdleConnDuration_,
@@ -338,7 +361,7 @@ func (p *Plugin) prepareClient() {
}
var err error
- p.client, err = xhttp.NewClient(config)
+ p.client, err = xhttp.NewClient(ctx, config)
if err != nil {
p.logger.Fatal("can't create http client", zap.Error(err))
}
diff --git a/plugin/output/splunk/splunk_test.go b/plugin/output/splunk/splunk_test.go
index 40626e076..fec6de177 100644
--- a/plugin/output/splunk/splunk_test.go
+++ b/plugin/output/splunk/splunk_test.go
@@ -1,6 +1,7 @@
package splunk
import (
+ "context"
"io"
"net/http"
"net/http/httptest"
@@ -54,7 +55,7 @@ func TestSplunk(t *testing.T) {
},
logger: zap.NewExample().Sugar(),
}
- plugin.prepareClient()
+ plugin.prepareClient(context.Background())
batch := pipeline.NewPreparedBatch([]*pipeline.Event{
{Root: input},
@@ -185,7 +186,7 @@ func TestCopyFields(t *testing.T) {
copyFieldsPaths: tt.copyFields,
logger: zap.NewExample().Sugar(),
}
- plugin.prepareClient()
+ plugin.prepareClient(context.Background())
batch := pipeline.NewPreparedBatch([]*pipeline.Event{
{Root: input},
diff --git a/xhttp/circuit_breaker.go b/xhttp/circuit_breaker.go
new file mode 100644
index 000000000..ad6c3e896
--- /dev/null
+++ b/xhttp/circuit_breaker.go
@@ -0,0 +1,110 @@
+package xhttp
+
+import (
+ "context"
+ "math/rand"
+ "sync"
+ "time"
+
+ "github.com/ozontech/file.d/xtime"
+ "github.com/valyala/fasthttp"
+)
+
+type endpoint struct {
+ uri *fasthttp.URI
+ banUntil time.Time
+}
+
+type circuitBreaker struct {
+ endpoints []endpoint
+ activeEndpoints []int
+ idxByURI map[string]int
+ banPeriod time.Duration
+ mu sync.RWMutex
+}
+
+func newCircuitBreaker(ctx context.Context, uris []*fasthttp.URI, banPeriod, reconnectInterval time.Duration) *circuitBreaker {
+ if banPeriod <= 0 || len(uris) == 1 {
+ return nil
+ }
+
+ cb := &circuitBreaker{
+ endpoints: make([]endpoint, 0, len(uris)),
+ activeEndpoints: make([]int, 0, len(uris)),
+ idxByURI: make(map[string]int, len(uris)),
+ banPeriod: banPeriod,
+ }
+
+ for i, uri := range uris {
+ cb.endpoints = append(cb.endpoints, endpoint{uri: uri})
+ cb.idxByURI[uri.String()] = i
+ cb.activeEndpoints = append(cb.activeEndpoints, i)
+ }
+
+ go cb.checkBannedEndpoints(ctx, reconnectInterval)
+
+ return cb
+}
+
+func (cb *circuitBreaker) getEndpoint() *fasthttp.URI {
+ cb.mu.RLock()
+ defer cb.mu.RUnlock()
+
+ if len(cb.activeEndpoints) == 0 {
+ return nil
+ }
+
+ idx := rand.Intn(len(cb.activeEndpoints))
+ return cb.endpoints[cb.activeEndpoints[idx]].uri
+}
+
+func (cb *circuitBreaker) banEndpoint(uri *fasthttp.URI) {
+ cb.mu.Lock()
+ defer cb.mu.Unlock()
+
+ idx := cb.idxByURI[uri.String()]
+ cb.endpoints[idx].banUntil = xtime.GetInaccurateTime().Add(cb.banPeriod)
+
+ for i, activeIdx := range cb.activeEndpoints {
+ if activeIdx == idx {
+ cb.activeEndpoints[i] = cb.activeEndpoints[len(cb.activeEndpoints)-1]
+ cb.activeEndpoints = cb.activeEndpoints[:len(cb.activeEndpoints)-1]
+ break
+ }
+ }
+}
+
+func (cb *circuitBreaker) restoreBannedEndpoints() {
+ cb.mu.RLock()
+ if len(cb.endpoints) == len(cb.activeEndpoints) {
+ cb.mu.RUnlock()
+ return
+ }
+ cb.mu.RUnlock()
+
+ cb.mu.Lock()
+ defer cb.mu.Unlock()
+
+ now := xtime.GetInaccurateTime()
+ for i := range cb.endpoints {
+ e := &cb.endpoints[i]
+ if !e.banUntil.IsZero() && now.After(e.banUntil) {
+ e.banUntil = time.Time{}
+ cb.activeEndpoints = append(cb.activeEndpoints, i)
+ }
+ }
+}
+
+func (cb *circuitBreaker) checkBannedEndpoints(ctx context.Context, reconnectInterval time.Duration) {
+ ticker := time.NewTicker(reconnectInterval)
+ defer ticker.Stop()
+
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ case <-ticker.C:
+ cb.restoreBannedEndpoints()
+ }
+ }
+}
diff --git a/xhttp/client.go b/xhttp/client.go
index f9ea6e6a9..ec9bb9880 100644
--- a/xhttp/client.go
+++ b/xhttp/client.go
@@ -1,6 +1,7 @@
package xhttp
import (
+ "context"
"fmt"
"math/rand"
"net/http"
@@ -30,17 +31,20 @@ type ClientConfig struct {
GzipCompressionLevel string
TLS *ClientTLSConfig
KeepAlive *ClientKeepAliveConfig
+ BanPeriod time.Duration
+ ReconnectInterval time.Duration
}
type Client struct {
client *fasthttp.Client
endpoints []*fasthttp.URI
+ cb *circuitBreaker
authHeader string
customHeaders map[string]string
gzipCompressionLevel int
}
-func NewClient(cfg *ClientConfig) (*Client, error) {
+func NewClient(ctx context.Context, cfg *ClientConfig) (*Client, error) {
client := &fasthttp.Client{
ReadTimeout: cfg.ConnectionTimeout,
WriteTimeout: cfg.ConnectionTimeout,
@@ -72,6 +76,7 @@ func NewClient(cfg *ClientConfig) (*Client, error) {
return &Client{
client: client,
endpoints: endpoints,
+ cb: newCircuitBreaker(ctx, endpoints, cfg.BanPeriod, cfg.ReconnectInterval),
authHeader: cfg.AuthHeader,
customHeaders: cfg.CustomHeaders,
gzipCompressionLevel: parseGzipCompressionLevel(cfg.GzipCompressionLevel),
@@ -89,16 +94,15 @@ func (c *Client) DoTimeout(
resp := fasthttp.AcquireResponse()
defer fasthttp.ReleaseResponse(resp)
- var endpoint *fasthttp.URI
- if len(c.endpoints) == 1 {
- endpoint = c.endpoints[0]
- } else {
- endpoint = c.endpoints[rand.Int()%len(c.endpoints)]
+ endpoint := c.getEndpoint()
+ if endpoint == nil {
+ return 0, fmt.Errorf("no available endpoints")
}
c.prepareRequest(req, endpoint, method, contentType, body)
if err := c.client.DoTimeout(req, resp, timeout); err != nil {
+ c.banEndpoint(endpoint)
return 0, fmt.Errorf("can't send request to %s: %w", endpoint.String(), err)
}
@@ -106,6 +110,9 @@ func (c *Client) DoTimeout(
statusCode := resp.Header.StatusCode()
if !(http.StatusOK <= statusCode && statusCode <= http.StatusAccepted) {
+ if shouldBanEndpoint(statusCode) {
+ c.banEndpoint(endpoint)
+ }
return statusCode, fmt.Errorf("response status from %s isn't OK: status=%d, body=%s", endpoint.String(), statusCode, string(respContent))
}
@@ -168,3 +175,32 @@ func parseGzipCompressionLevel(level string) int {
return -1
}
}
+
+func (c *Client) getEndpoint() *fasthttp.URI {
+ if c.cb != nil {
+ return c.cb.getEndpoint()
+ }
+
+ if len(c.endpoints) == 0 {
+ return nil
+ }
+ return c.endpoints[rand.Intn(len(c.endpoints))]
+}
+
+func (c *Client) banEndpoint(endpoint *fasthttp.URI) {
+ if c.cb != nil {
+ c.cb.banEndpoint(endpoint)
+ }
+}
+
+func shouldBanEndpoint(statusCode int) bool {
+ switch statusCode {
+ case http.StatusBadGateway,
+ http.StatusServiceUnavailable,
+ http.StatusGatewayTimeout,
+ http.StatusTooManyRequests:
+ return true
+ default:
+ return false
+ }
+}