Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
5c8115e
cb for ch output plugin
Gwynbleidd0241 Mar 23, 2026
ca693af
cb for ch output plugin
Gwynbleidd0241 Mar 23, 2026
450afc9
Merge branch 'master' into 954-circuit-breaker-ch-plugin
Gwynbleidd0241 Mar 23, 2026
ab289dc
docs generate
Gwynbleidd0241 Mar 23, 2026
df2e46f
fix
Gwynbleidd0241 Mar 23, 2026
56abe5a
fix
Gwynbleidd0241 Apr 2, 2026
7bcf16a
cb
Gwynbleidd0241 Apr 13, 2026
ad16cfc
fix
Gwynbleidd0241 Apr 13, 2026
87d2efb
magic
Gwynbleidd0241 Apr 13, 2026
9303d57
Merge branch 'master' into 964-circuit-breaker
Gwynbleidd0241 Apr 14, 2026
cd432f8
fix
Gwynbleidd0241 Apr 14, 2026
6cdd995
fix
Gwynbleidd0241 Apr 14, 2026
91dffa4
remove cb in ch
Gwynbleidd0241 Apr 27, 2026
c9eeca8
refactor cb for http plugins
Gwynbleidd0241 Apr 27, 2026
7f18480
fix
Gwynbleidd0241 Apr 27, 2026
60396ae
Merge branch 'master' into 964-circuit-breaker
Gwynbleidd0241 May 6, 2026
341a95a
fix
Gwynbleidd0241 May 6, 2026
6c18f29
fix
Gwynbleidd0241 May 6, 2026
a334814
fix
Gwynbleidd0241 May 8, 2026
506beb5
Merge branch 'master' into 964-circuit-breaker
Gwynbleidd0241 May 20, 2026
8371f26
fix docs
Gwynbleidd0241 May 20, 2026
b3ee12e
change switch to if
Gwynbleidd0241 May 22, 2026
b43ebce
fix docs
Gwynbleidd0241 May 26, 2026
0cad8f6
fix
Gwynbleidd0241 May 27, 2026
d0fc78a
fix
Gwynbleidd0241 May 28, 2026
9ebe601
fix
Gwynbleidd0241 May 28, 2026
4002f45
without cb when one host
Gwynbleidd0241 May 29, 2026
26290db
Merge branch 'master' into 964-circuit-breaker
Gwynbleidd0241 May 29, 2026
86a0527
update e2e
Gwynbleidd0241 May 29, 2026
b1ae2af
fix
Gwynbleidd0241 May 29, 2026
a91fbc1
fix
Gwynbleidd0241 May 29, 2026
7507ca8
fix
Gwynbleidd0241 May 29, 2026
ccf7aac
fix
Gwynbleidd0241 May 29, 2026
6969042
fix
Gwynbleidd0241 May 29, 2026
e4c0dcd
fix
Gwynbleidd0241 May 29, 2026
90902c8
fix
Gwynbleidd0241 May 29, 2026
fe08704
fix
Gwynbleidd0241 May 29, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion e2e/file_elasticsearch/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ pipelines:
output:
type: elasticsearch
endpoints:
- http://localhost:9200
- http://localhost:19200
username: SOME_USERNAME
password: SOME_PASSWORD
index_format: SOME_INDEX
19 changes: 19 additions & 0 deletions e2e/file_elasticsearch/config_cb.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
pipelines:
file_elasticsearch_cb:
input:
type: file
persistence_mode: async
watching_dir: SOME_DIR
offsets_file: SOME_FILE
offsets_op: reset
output:
type: elasticsearch
endpoints:
- http://localhost:19200
- http://localhost:19201
username: SOME_USERNAME
password: SOME_PASSWORD
index_format: SOME_INDEX
batch_size: 2
ban_period: 6s
reconnect_interval: 3s
21 changes: 20 additions & 1 deletion e2e/file_elasticsearch/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
# https://github.com/elastic/start-local/tree/main
services:
elasticsearch:
image: elasticsearch:8.17.0
Expand All @@ -20,3 +19,23 @@ services:
interval: 10s
timeout: 10s
retries: 10
elasticsearch2:
image: elasticsearch:8.17.0
container_name: es-local-test-2
ports:
- "19201:9200"
environment:
- discovery.type=single-node
- ELASTIC_PASSWORD=elastic
- xpack.security.enabled=true
- xpack.security.http.ssl.enabled=false
mem_limit: 1073741824
healthcheck:
test:
[
"CMD-SHELL",
"curl --output /dev/null --silent --head --fail -u elastic:elastic http://elasticsearch2:19201",
]
interval: 10s
timeout: 10s
retries: 10
39 changes: 24 additions & 15 deletions e2e/file_elasticsearch/file_elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@ import (

// Config for file-elasticsearch plugin e2e test
type Config struct {
Count int
Endpoint string
Pipeline string
Username string
Password string
dir string
index string
Count int
Endpoints []string
Pipeline string
Username string
Password string
dir string
index string
}

// Configure sets additional fields for input and output plugins
Expand All @@ -43,10 +43,12 @@ func (c *Config) Configure(t *testing.T, conf *cfg.Config, pipelineName string)
output.Set("ingest_pipeline", c.Pipeline)
output.Set("username", c.Username)
output.Set("password", c.Password)
output.Set("endpoints", []string{c.Endpoint})
output.Set("endpoints", c.Endpoints)

err := createIngestPipeline(c.Endpoint, c.Pipeline, c.Username, c.Password)
require.NoError(t, err)
for _, endpoint := range c.Endpoints {
err := createIngestPipeline(endpoint, c.Pipeline, c.Username, c.Password, c.Count)
require.NoError(t, err)
}
}

// Send creates file and writes messages
Expand All @@ -63,12 +65,19 @@ func (c *Config) Send(t *testing.T) {

// Validate waits for the message processing to complete
func (c *Config) Validate(t *testing.T) {
err := waitUntilIndexReady(c.Endpoint, c.index, c.Username, c.Password, c.Count, 10, 250*time.Millisecond)
require.NoError(t, err)
docs, err := getDocumentsFromIndex(c.Endpoint, c.index, c.Username, c.Password)
err := waitUntilIndexReady(c.Endpoints, c.index, c.Username, c.Password, c.Count, 10, 250*time.Millisecond)
require.NoError(t, err)
require.Len(t, docs, c.Count)
for _, doc := range docs {

allDocs := make([]map[string]any, 0, c.Count)
for _, endpoint := range c.Endpoints {
docs, err := getDocumentsFromIndex(endpoint, c.index, c.Username, c.Password)
require.NoError(t, err)
t.Logf("endpoint %s: %d docs", endpoint, len(docs))
allDocs = append(allDocs, docs...)
}

require.Len(t, allDocs, c.Count)
for _, doc := range allDocs {
if _, ok := doc["processed_at"]; !ok {
t.Errorf("doc %v doesn't have processed_at field", doc)
}
Expand Down
147 changes: 82 additions & 65 deletions e2e/file_elasticsearch/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,41 +9,55 @@ import (
"time"
)

func createIngestPipeline(elasticURL, pipelineID, username, password string) error {
func createIngestPipeline(elasticURL, pipelineID, username, password string, retries int) error {
url := fmt.Sprintf("%s/_ingest/pipeline/%s", elasticURL, pipelineID)

pipelineBody := `{"description":"test ingest pipeline","processors":[{"set":{"field":"processed_at","value":"{{_ingest.timestamp}}"}}]}`

req, err := http.NewRequest(http.MethodPut, url, strings.NewReader(pipelineBody))
if err != nil {
return fmt.Errorf("failed to create request: %w", err)
}
var err error
for i := range retries {
err = func() error {
req, err := http.NewRequest(http.MethodPut, url, strings.NewReader(pipelineBody))
if err != nil {
return fmt.Errorf("failed to create request: %w", err)
}

req.Header.Set("Content-Type", "application/json")
if username != "" && password != "" {
req.SetBasicAuth(username, password)
}
req.Header.Set("Content-Type", "application/json")
if username != "" && password != "" {
req.SetBasicAuth(username, password)
}

client := &http.Client{Timeout: time.Second}
resp, err := client.Do(req)
if err != nil {
return fmt.Errorf("failed to make HTTP request: %w", err)
}
defer func() { _ = resp.Body.Close() }()
client := &http.Client{Timeout: time.Second}
resp, err := client.Do(req)
if err != nil {
return fmt.Errorf("failed to make HTTP request: %w", err)
}
defer func() { _ = resp.Body.Close() }()

respBody, err := io.ReadAll(resp.Body)
if err != nil {
return fmt.Errorf("failed to read body response: %w", err)
}
respBody, err := io.ReadAll(resp.Body)
if err != nil {
return fmt.Errorf("failed to read body response: %w", err)
}

if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated {
return fmt.Errorf("unexpected status: %d, body: %s", resp.StatusCode, string(respBody))
}

if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated {
return fmt.Errorf("unexpected status: %d, body: %s", resp.StatusCode, string(respBody))
return nil
}()

if err == nil {
return nil
}

if i < retries-1 {
time.Sleep(time.Second)
}
}

return nil
return fmt.Errorf("can't create pipeline after %d retries: %w", retries, err)
}

func getDocumentsFromIndex(elasticURL, indexName, username, password string) ([]map[string]interface{}, error) {
func getDocumentsFromIndex(elasticURL, indexName, username, password string) ([]map[string]any, error) {
url := fmt.Sprintf("%s/%s/_search", elasticURL, indexName)

body := `{"query":{"match_all":{}}}`
Expand Down Expand Up @@ -77,7 +91,7 @@ func getDocumentsFromIndex(elasticURL, indexName, username, password string) ([]
type searchResponse struct {
Hits struct {
Hits []struct {
Source map[string]interface{} `json:"_source"`
Source map[string]any `json:"_source"`
} `json:"hits"`
} `json:"hits"`
}
Expand All @@ -87,7 +101,7 @@ func getDocumentsFromIndex(elasticURL, indexName, username, password string) ([]
return nil, fmt.Errorf("failed to decode response: %w", err)
}

resultDocs := make([]map[string]interface{}, 0, len(result.Hits.Hits))
resultDocs := make([]map[string]any, 0, len(result.Hits.Hits))

for _, hit := range result.Hits.Hits {
resultDocs = append(resultDocs, hit.Source)
Expand All @@ -96,63 +110,66 @@ func getDocumentsFromIndex(elasticURL, indexName, username, password string) ([]
return resultDocs, nil
}

func waitUntilIndexReady(elasticURL, indexName, username, password string, minDocs, retries int, delay time.Duration) error {
client := &http.Client{
Timeout: time.Second,
}

func getDocCount(client *http.Client, elasticURL, indexName, username, password string) (int, error) {
url := fmt.Sprintf("%s/%s/_count", elasticURL, indexName)
req, err := http.NewRequest(http.MethodGet, url, http.NoBody)
if err != nil {
return fmt.Errorf("failed to create request: %w", err)
return 0, fmt.Errorf("failed to create request: %w", err)
}

req.Header.Set("Content-Type", "application/json")
if username != "" && password != "" {
req.SetBasicAuth(username, password)
}

for i := 0; i < retries; i++ {
ok, err := func() (bool, error) {
resp, err := client.Do(req)
if err != nil {
return false, fmt.Errorf("failed to make request: %w", err)
}
defer func() { _ = resp.Body.Close() }()
resp, err := client.Do(req)
if err != nil {
return 0, fmt.Errorf("failed to make request: %w", err)
}
defer func() { _ = resp.Body.Close() }()

if resp.StatusCode == http.StatusNotFound || resp.StatusCode == http.StatusServiceUnavailable {
return false, nil
}
if resp.StatusCode == http.StatusNotFound || resp.StatusCode == http.StatusServiceUnavailable {
return 0, nil
}

if resp.StatusCode != http.StatusOK {
return false, fmt.Errorf("unexpected status code: %d", resp.StatusCode)
}
if resp.StatusCode != http.StatusOK {
return 0, fmt.Errorf("unexpected status code: %d", resp.StatusCode)
}

body, err := io.ReadAll(resp.Body)
if err != nil {
return false, fmt.Errorf("failed to read response: %w", err)
}
body, err := io.ReadAll(resp.Body)
if err != nil {
return 0, fmt.Errorf("failed to read response: %w", err)
}

var result map[string]interface{}
if err := json.Unmarshal(body, &result); err != nil {
return false, fmt.Errorf("failed to decode response: %w", err)
}
var result map[string]any
if err := json.Unmarshal(body, &result); err != nil {
return 0, fmt.Errorf("failed to decode response: %w", err)
}

if count, ok := result["count"].(float64); ok {
if int(count) >= minDocs {
return true, nil
}
} else {
return false, fmt.Errorf("unexpected response structure")
}
count, ok := result["count"].(float64)
if !ok {
return 0, fmt.Errorf("unexpected response structure")
}

return false, nil
}()
return int(count), nil
}

func waitUntilIndexReady(elasticURLs []string, indexName, username, password string, minDocs, retries int, delay time.Duration) error {
client := &http.Client{
Timeout: time.Second,
}

if err != nil {
return err
for range retries {
total := 0
for _, elasticURL := range elasticURLs {
count, err := getDocCount(client, elasticURL, indexName, username, password)
if err != nil {
return err
}
total += count
}
if ok {

if total >= minDocs {
return nil
}
time.Sleep(delay)
Expand Down
21 changes: 16 additions & 5 deletions e2e/start_work_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,14 +166,25 @@ func TestE2EStabilityWorkCase(t *testing.T) {
{
name: "file_elasticsearch",
e2eTest: &file_elasticsearch.Config{
Count: 10,
Pipeline: "test-ingest-pipeline",
Endpoint: "http://localhost:19200",
Username: "elastic",
Password: "elastic",
Count: 10,
Pipeline: "test-ingest-pipeline",
Endpoints: []string{"http://localhost:19200"},
Username: "elastic",
Password: "elastic",
},
cfgPath: "./file_elasticsearch/config.yml",
},
{
name: "file_elasticsearch_cb",
e2eTest: &file_elasticsearch.Config{
Count: 10,
Pipeline: "test-ingest-pipeline",
Endpoints: []string{"http://localhost:19200", "http://localhost:19201"},
Username: "elastic",
Password: "elastic",
},
cfgPath: "./file_elasticsearch/config_cb.yml",
},
{
name: "file_es_split",
e2eTest: &file_es_split.Config{},
Expand Down
13 changes: 13 additions & 0 deletions plugin/output/elasticsearch/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -170,5 +170,18 @@ Process ES response and report errors, if any.

<br>

**`ban_period`** *`cfg.Duration`* *`default=10s`*

Period for which addresses will be banned in case of unavailability.
If set to 0, circuit breaker is disabled.

<br>

**`reconnect_interval`** *`cfg.Duration`* *`default=5s`*

Interval for checking banned endpoints availability.

<br>


<br>*Generated using [__insane-doc__](https://github.com/vitkovskii/insane-doc)*
Loading
Loading