diff --git a/plugin/output/http/README.idoc.md b/plugin/output/http/README.idoc.md index 16b7b98d4..dacb3bdf1 100644 --- a/plugin/output/http/README.idoc.md +++ b/plugin/output/http/README.idoc.md @@ -2,4 +2,4 @@ @introduction ### Config params -@config-params|description +@config-params|description \ No newline at end of file diff --git a/plugin/output/http/README.md b/plugin/output/http/README.md index c5b659b2d..3c3810aaa 100755 --- a/plugin/output/http/README.md +++ b/plugin/output/http/README.md @@ -17,6 +17,17 @@ Content-Type header for HTTP requests.
+**`encoding`** *`EncodingConfig`* + +Configure event serialization before sending. +Includes: +1) Type - codec to use for serializing events: +* `json` - serializes the full event as a JSON object (default). +* `raw` - extracts a single field and sends its value as-is. +2) Params - Encoder parameters. + +
+ **`use_gzip`** *`bool`* *`default=false`* If set, the plugin will use gzip encoding. @@ -133,5 +144,4 @@ After a non-retryable write error, fall with a non-zero exit code or not
-
*Generated using [__insane-doc__](https://github.com/vitkovskii/insane-doc)* \ No newline at end of file diff --git a/plugin/output/http/encoding.go b/plugin/output/http/encoding.go new file mode 100644 index 000000000..90578e34c --- /dev/null +++ b/plugin/output/http/encoding.go @@ -0,0 +1,78 @@ +package http + +import ( + "encoding/json" + "fmt" + + "github.com/ozontech/file.d/pipeline" +) + +const ( + EncoderTypeJSON = "json" + EncoderTypeRaw = "raw" +) + +type Encoder interface { + Encode(event *pipeline.Event, buf []byte) []byte +} + +type JSONEncoderParams struct{} + +type JSONEncoder struct{} + +func newJSONEncoder(_ *JSONEncoderParams) *JSONEncoder { + return &JSONEncoder{} +} + +func (e *JSONEncoder) Encode(event *pipeline.Event, buf []byte) []byte { + buf, _ = event.Encode(buf) + return buf +} + +type RawEncoderParams struct { + Field string `json:"field" default:"message"` +} + +type RawEncoder struct { + field string +} + +func newRawEncoder(params *RawEncoderParams) *RawEncoder { + field := params.Field + if field == "" { + field = "message" + } + return &RawEncoder{field: field} +} + +func (e *RawEncoder) Encode(event *pipeline.Event, buf []byte) []byte { + node := event.Root.Dig(e.field) + if node == nil { + return buf[:0] + } + return node.Encode(buf) +} + +type EncodingConfig struct { + Type string `json:"type" default:"json" options:"json|raw"` + Params json.RawMessage `json:"params"` +} + +func NewEncoder(cfg EncodingConfig) (Encoder, error) { + switch cfg.Type { + case EncoderTypeJSON, "": + return newJSONEncoder(&JSONEncoderParams{}), nil + + case EncoderTypeRaw: + var params RawEncoderParams + if len(cfg.Params) > 0 { + if err := json.Unmarshal(cfg.Params, ¶ms); err != nil { + return nil, fmt.Errorf("raw encoder params: %w", err) + } + } + return newRawEncoder(¶ms), nil + + default: + return nil, fmt.Errorf("unknown encoding type %q; supported: json, raw", cfg.Type) + } +} diff --git a/plugin/output/http/http.go b/plugin/output/http/http.go index c3ed464ca..03edf9c51 100644 --- a/plugin/output/http/http.go +++ b/plugin/output/http/http.go @@ -31,7 +31,8 @@ const ( type Plugin struct { config *Config - client *xhttp.Client + client *xhttp.Client + encoder Encoder logger *zap.Logger controller pipeline.OutputPluginController @@ -61,6 +62,17 @@ type Config struct { // > Content-Type header for HTTP requests. ContentType string `json:"content_type" default:"application/json"` // * + // > @3@4@5@6 + // > + // > Configure event serialization before sending. + // > Includes: + // > 1) Type - codec to use for serializing events: + // > * `json` - serializes the full event as a JSON object (default). + // > * `raw` - extracts a single field and sends its value as-is. + // > By default `json` is used. + // > 2) Params - Encoder parameters. + Encoding EncodingConfig `json:"encoding" child:"true"` // * + // > @3@4@5@6 // > // > If set, the plugin will use gzip encoding. @@ -200,6 +212,12 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP p.registerMetrics(params.MetricCtl) p.mu = &sync.Mutex{} + var err error + p.encoder, err = NewEncoder(p.config.Encoding) + if err != nil { + p.logger.Fatal("can't create encoder", zap.Error(err)) + } + p.prepareClient() p.logger.Info("starting batcher", zap.Duration("timeout", p.config.BatchFlushTimeout_)) @@ -326,7 +344,7 @@ func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) err batch.ForEach(func(event *pipeline.Event) { eventsCount++ data.begin = append(data.begin, len(data.outBuf)) - data.outBuf, _ = event.Encode(data.outBuf) + data.outBuf = p.encoder.Encode(event, data.outBuf) data.outBuf = append(data.outBuf, '\n') }) data.begin = append(data.begin, len(data.outBuf)) diff --git a/plugin/output/http/http_test.go b/plugin/output/http/http_test.go index e0e6cb5c2..9dc0fe33a 100644 --- a/plugin/output/http/http_test.go +++ b/plugin/output/http/http_test.go @@ -20,15 +20,26 @@ func TestAppendEvent(t *testing.T) { p.Start(config, test.NewEmptyOutputPluginParams()) - root, _ := insaneJSON.DecodeBytes([]byte(`{"field_a":"AAAA","field_b":"BBBB"}`)) + root, _ := insaneJSON.DecodeBytes([]byte(`{"message":"[INFO] some event","field_a":"AAAA","field_b":"BBBB"}`)) defer insaneJSON.Release(root) data := data{} event := &pipeline.Event{Root: root} - data.outBuf, _ = event.Encode(data.outBuf) + encoder := newJSONEncoder(&JSONEncoderParams{}) + data.outBuf = encoder.Encode(event, data.outBuf) data.outBuf = append(data.outBuf, '\n') - expected := fmt.Sprintf("%s\n", `{"field_a":"AAAA","field_b":"BBBB"}`) + expected := fmt.Sprintf("%s\n", `{"message":"[INFO] some event","field_a":"AAAA","field_b":"BBBB"}`) + assert.Equal(t, expected, string(data.outBuf), "wrong request content") + + data.outBuf = data.outBuf[:0] + var params RawEncoderParams + + rawEncoder := newRawEncoder(¶ms) + data.outBuf = rawEncoder.Encode(event, data.outBuf) + data.outBuf = append(data.outBuf, '\n') + + expected = fmt.Sprintf("%s\n", `"[INFO] some event"`) assert.Equal(t, expected, string(data.outBuf), "wrong request content") }