Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion plugin/output/http/README.idoc.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@
@introduction

### Config params
@config-params|description
@config-params|description
12 changes: 11 additions & 1 deletion plugin/output/http/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,17 @@ Content-Type header for HTTP requests.

<br>

**`encoding`** *`EncodingConfig`*

Configure event serialization before sending.
Includes:
1) Type - codec to use for serializing events:
* `json` - serializes the full event as a JSON object (default).
* `raw` - extracts a single field and sends its value as-is.
2) Params - Encoder parameters.

<br>

**`use_gzip`** *`bool`* *`default=false`*

If set, the plugin will use gzip encoding.
Expand Down Expand Up @@ -133,5 +144,4 @@ After a non-retryable write error, fall with a non-zero exit code or not

<br>


<br>*Generated using [__insane-doc__](https://github.com/vitkovskii/insane-doc)*
78 changes: 78 additions & 0 deletions plugin/output/http/encoding.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package http

import (
"encoding/json"
"fmt"

"github.com/ozontech/file.d/pipeline"
)

const (
EncoderTypeJSON = "json"
EncoderTypeRaw = "raw"
)

type Encoder interface {
Encode(event *pipeline.Event, buf []byte) []byte
}

type JSONEncoderParams struct{}

type JSONEncoder struct{}

func newJSONEncoder(_ *JSONEncoderParams) *JSONEncoder {
return &JSONEncoder{}
}

func (e *JSONEncoder) Encode(event *pipeline.Event, buf []byte) []byte {
buf, _ = event.Encode(buf)
return buf
}

type RawEncoderParams struct {
Field string `json:"field" default:"message"`
}

type RawEncoder struct {
field string
}

func newRawEncoder(params *RawEncoderParams) *RawEncoder {
field := params.Field
if field == "" {
field = "message"
}
return &RawEncoder{field: field}
}

func (e *RawEncoder) Encode(event *pipeline.Event, buf []byte) []byte {
node := event.Root.Dig(e.field)
if node == nil {
return buf[:0]
}
return node.Encode(buf)
}

type EncodingConfig struct {
Type string `json:"type" default:"json" options:"json|raw"`
Params json.RawMessage `json:"params"`
}

func NewEncoder(cfg EncodingConfig) (Encoder, error) {
switch cfg.Type {
case EncoderTypeJSON, "":
return newJSONEncoder(&JSONEncoderParams{}), nil

case EncoderTypeRaw:
var params RawEncoderParams
if len(cfg.Params) > 0 {
if err := json.Unmarshal(cfg.Params, &params); err != nil {
return nil, fmt.Errorf("raw encoder params: %w", err)
}
}
return newRawEncoder(&params), nil

default:
return nil, fmt.Errorf("unknown encoding type %q; supported: json, raw", cfg.Type)
}
}
22 changes: 20 additions & 2 deletions plugin/output/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ const (
type Plugin struct {
config *Config

client *xhttp.Client
client *xhttp.Client
encoder Encoder

logger *zap.Logger
controller pipeline.OutputPluginController
Expand Down Expand Up @@ -61,6 +62,17 @@ type Config struct {
// > Content-Type header for HTTP requests.
ContentType string `json:"content_type" default:"application/json"` // *

// > @3@4@5@6
// >
// > Configure event serialization before sending.
// > Includes:
// > 1) Type - codec to use for serializing events:
// > * `json` - serializes the full event as a JSON object (default).
// > * `raw` - extracts a single field and sends its value as-is.
// > By default `json` is used.
// > 2) Params - Encoder parameters.
Encoding EncodingConfig `json:"encoding" child:"true"` // *

// > @3@4@5@6
// >
// > If set, the plugin will use gzip encoding.
Expand Down Expand Up @@ -200,6 +212,12 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP
p.registerMetrics(params.MetricCtl)
p.mu = &sync.Mutex{}

var err error
p.encoder, err = NewEncoder(p.config.Encoding)
if err != nil {
p.logger.Fatal("can't create encoder", zap.Error(err))
}

p.prepareClient()

p.logger.Info("starting batcher", zap.Duration("timeout", p.config.BatchFlushTimeout_))
Expand Down Expand Up @@ -326,7 +344,7 @@ func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) err
batch.ForEach(func(event *pipeline.Event) {
eventsCount++
data.begin = append(data.begin, len(data.outBuf))
data.outBuf, _ = event.Encode(data.outBuf)
data.outBuf = p.encoder.Encode(event, data.outBuf)
data.outBuf = append(data.outBuf, '\n')
})
data.begin = append(data.begin, len(data.outBuf))
Expand Down
17 changes: 14 additions & 3 deletions plugin/output/http/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,26 @@ func TestAppendEvent(t *testing.T) {

p.Start(config, test.NewEmptyOutputPluginParams())

root, _ := insaneJSON.DecodeBytes([]byte(`{"field_a":"AAAA","field_b":"BBBB"}`))
root, _ := insaneJSON.DecodeBytes([]byte(`{"message":"[INFO] some event","field_a":"AAAA","field_b":"BBBB"}`))
defer insaneJSON.Release(root)

data := data{}
event := &pipeline.Event{Root: root}

data.outBuf, _ = event.Encode(data.outBuf)
encoder := newJSONEncoder(&JSONEncoderParams{})
data.outBuf = encoder.Encode(event, data.outBuf)
data.outBuf = append(data.outBuf, '\n')

expected := fmt.Sprintf("%s\n", `{"field_a":"AAAA","field_b":"BBBB"}`)
expected := fmt.Sprintf("%s\n", `{"message":"[INFO] some event","field_a":"AAAA","field_b":"BBBB"}`)
assert.Equal(t, expected, string(data.outBuf), "wrong request content")

data.outBuf = data.outBuf[:0]
var params RawEncoderParams

rawEncoder := newRawEncoder(&params)
data.outBuf = rawEncoder.Encode(event, data.outBuf)
data.outBuf = append(data.outBuf, '\n')

expected = fmt.Sprintf("%s\n", `"[INFO] some event"`)
assert.Equal(t, expected, string(data.outBuf), "wrong request content")
}
Loading