diff --git a/src/flb_plugin_proxy.c b/src/flb_plugin_proxy.c index b812f8a370b..8227f7c37a6 100644 --- a/src/flb_plugin_proxy.c +++ b/src/flb_plugin_proxy.c @@ -33,7 +33,12 @@ #include #include #include +#include +#include +#include #include +#include +#include /* Proxies */ #include "proxy/go/go.h" @@ -75,8 +80,12 @@ static int flb_proxy_input_cb_collect(struct flb_input_instance *ins, struct flb_config *config, void *in_context) { int ret = FLB_OK; + int event_type; size_t len = 0; + size_t offset = 0; void *data = NULL; + struct cmt *cmt = NULL; + struct ctrace *ctr = NULL; struct flb_plugin_input_proxy_context *ctx = (struct flb_plugin_input_proxy_context *) in_context; #ifdef FLB_HAVE_PROXY_GO @@ -85,7 +94,7 @@ static int flb_proxy_input_cb_collect(struct flb_input_instance *ins, ret = proxy_go_input_collect(ctx, &data, &len); if (len == 0) { - flb_trace("[GO] No logs are ingested"); + flb_trace("[GO] No data ingested"); return -1; } @@ -94,17 +103,58 @@ static int flb_proxy_input_cb_collect(struct flb_input_instance *ins, return -1; } - flb_input_log_append(ins, NULL, 0, data, len); + event_type = ctx->proxy->def->event_type; + + if (event_type == FLB_INPUT_METRICS) { + ret = cmt_decode_msgpack_create(&cmt, (char *) data, len, &offset); + if (ret != CMT_DECODE_MSGPACK_SUCCESS) { + flb_error("[proxy] failed to decode metrics msgpack (error %d)", ret); + proxy_go_input_cleanup(ctx, data); + return -1; + } + ret = flb_input_metrics_append(ins, NULL, 0, cmt); + if (ret != 0) { + flb_error("[proxy] could not append metrics, ret=%d", ret); + cmt_decode_msgpack_destroy(cmt); + proxy_go_input_cleanup(ctx, data); + return -1; + } + cmt_decode_msgpack_destroy(cmt); + } + else if (event_type == FLB_INPUT_TRACES) { + ret = ctr_decode_msgpack_create(&ctr, (char *) data, len, &offset); + if (ret != CTR_DECODE_MSGPACK_SUCCESS) { + flb_error("[proxy] failed to decode traces msgpack (error %d)", ret); + proxy_go_input_cleanup(ctx, data); + return -1; + } + ret = flb_input_trace_append(ins, NULL, 0, ctr); + if (ret != 0) { + flb_error("[proxy] could not append traces, ret=%d", ret); + ctr_decode_msgpack_destroy(ctr); + proxy_go_input_cleanup(ctx, data); + return -1; + } + /* flb_input_trace_append takes ownership of ctr and destroys it on success */ + } + else if (event_type == FLB_INPUT_LOGS) { + ret = flb_input_log_append(ins, NULL, 0, data, len); + } + else { + flb_error("[proxy] unsupported event_type %d for input plugin '%s'", + event_type, ins->name); + proxy_go_input_cleanup(ctx, data); + return -1; + } - ret = proxy_go_input_cleanup(ctx, data); - if (ret == -1) { + if (proxy_go_input_cleanup(ctx, data) == -1) { flb_errno(); return -1; } } #endif - return 0; + return ret; } static int flb_proxy_input_cb_init(struct flb_input_instance *ins,