Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions plugins/out_kafka/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Fluent Bit Kafka Output plugin
set(src
kafka_config.c
kafka_schema_registry.c
kafka_topic.c
kafka.c)

Expand Down
78 changes: 77 additions & 1 deletion plugins/out_kafka/kafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,12 @@ int produce_message(struct flb_time *tm, msgpack_object *map,
#ifdef FLB_HAVE_AVRO_ENCODER
else if (ctx->format == FLB_KAFKA_FMT_AVRO) {

ret = flb_kafka_schema_registry_resolve(ctx);
if (ret != FLB_OK) {
msgpack_sbuffer_destroy(&mp_sbuf);
return ret;
}

flb_plg_debug(ctx->ins, "avro schema ID:%d:\n", ctx->avro_fields.schema_id);
flb_plg_debug(ctx->ins, "avro schema string:%s:\n", ctx->avro_fields.schema_str);

Expand Down Expand Up @@ -1497,6 +1503,76 @@ static struct flb_config_map config_map[] = {
0, FLB_TRUE, offsetof(struct flb_out_kafka, avro_fields) + offsetof(struct flb_avro_fields, schema_id),
"Set AVRO schema ID."
},
{
FLB_CONFIG_MAP_STR, "schema_registry_url", (char *)NULL,
0, FLB_TRUE, offsetof(struct flb_out_kafka, schema_registry_url),
"Set the Confluent Schema Registry base URL for AVRO schemas."
},
{
FLB_CONFIG_MAP_STR, "schema.registry.url", (char *)NULL,
0, FLB_TRUE, offsetof(struct flb_out_kafka, schema_registry_url),
"Set the Confluent Schema Registry base URL for AVRO schemas."
},
{
FLB_CONFIG_MAP_STR, "schema_registry_subject", (char *)NULL,
0, FLB_TRUE, offsetof(struct flb_out_kafka, schema_registry_subject),
"Set the Confluent Schema Registry subject for AVRO schemas."
},
{
FLB_CONFIG_MAP_STR, "schema.registry.subject", (char *)NULL,
0, FLB_TRUE, offsetof(struct flb_out_kafka, schema_registry_subject),
"Set the Confluent Schema Registry subject for AVRO schemas."
},
{
FLB_CONFIG_MAP_STR, "schema_registry_version", "latest",
0, FLB_TRUE, offsetof(struct flb_out_kafka, schema_registry_version),
"Set the Confluent Schema Registry subject version for AVRO schemas."
},
{
FLB_CONFIG_MAP_STR, "schema.registry.version", (char *)NULL,
0, FLB_TRUE, offsetof(struct flb_out_kafka, schema_registry_version),
"Set the Confluent Schema Registry subject version for AVRO schemas."
},
{
FLB_CONFIG_MAP_STR, "schema_registry_http_user", (char *)NULL,
0, FLB_TRUE, offsetof(struct flb_out_kafka, schema_registry_http_user),
"Set the Confluent Schema Registry HTTP basic authentication user."
},
{
FLB_CONFIG_MAP_STR, "schema.registry.http.user", (char *)NULL,
0, FLB_TRUE, offsetof(struct flb_out_kafka, schema_registry_http_user),
"Set the Confluent Schema Registry HTTP basic authentication user."
},
{
FLB_CONFIG_MAP_STR, "schema_registry_http_passwd", (char *)NULL,
0, FLB_TRUE, offsetof(struct flb_out_kafka, schema_registry_http_passwd),
"Set the Confluent Schema Registry HTTP basic authentication password."
},
{
FLB_CONFIG_MAP_STR, "schema.registry.http.password", (char *)NULL,
0, FLB_TRUE, offsetof(struct flb_out_kafka, schema_registry_http_passwd),
"Set the Confluent Schema Registry HTTP basic authentication password."
},
{
FLB_CONFIG_MAP_STR, "schema_registry_bearer_token", (char *)NULL,
0, FLB_TRUE, offsetof(struct flb_out_kafka, schema_registry_bearer_token),
"Set the Confluent Schema Registry bearer token."
},
{
FLB_CONFIG_MAP_STR, "schema.registry.bearer.token", (char *)NULL,
0, FLB_TRUE, offsetof(struct flb_out_kafka, schema_registry_bearer_token),
"Set the Confluent Schema Registry bearer token."
},
{
FLB_CONFIG_MAP_STR, "schema_registry_framing", "cp1",
0, FLB_TRUE, offsetof(struct flb_out_kafka, schema_registry_framing),
"Set the Schema Registry serializer framing. Only cp1 is supported."
},
{
FLB_CONFIG_MAP_STR, "serializer.framing", (char *)NULL,
0, FLB_TRUE, offsetof(struct flb_out_kafka, schema_registry_framing),
"Set the Schema Registry serializer framing. Only cp1 is supported."
},
#endif
{
FLB_CONFIG_MAP_STR, "topics", (char *)NULL,
Expand Down Expand Up @@ -1556,6 +1632,6 @@ struct flb_output_plugin out_kafka_plugin = {
.cb_flush = cb_kafka_flush,
.cb_exit = cb_kafka_exit,
.config_map = config_map,
.flags = 0,
.flags = FLB_IO_OPT_TLS,
.event_type = FLB_OUTPUT_LOGS
};
10 changes: 10 additions & 0 deletions plugins/out_kafka/kafka_config.c
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ struct flb_out_kafka *flb_out_kafka_create(struct flb_output_instance *ins,
ctx->ins = ins;
ctx->blocked = FLB_FALSE;
mk_list_init(&ctx->topics);
#ifdef FLB_HAVE_AVRO_ENCODER
mk_list_init(&ctx->schema_registry_endpoints);
#endif

ret = flb_output_config_map_set(ins, (void*) ctx);
if (ret == -1) {
Expand Down Expand Up @@ -266,6 +269,12 @@ struct flb_out_kafka *flb_out_kafka_create(struct flb_output_instance *ins,
if (tmp) {
ctx->avro_fields.schema_str = flb_sds_create(tmp);
}

ret = flb_kafka_schema_registry_configure(ctx, config);
if (ret == -1) {
flb_out_kafka_destroy(ctx);
return NULL;
}
#endif

/* Config: Topic */
Expand Down Expand Up @@ -341,6 +350,7 @@ int flb_out_kafka_destroy(struct flb_out_kafka *ctx)
#ifdef FLB_HAVE_AVRO_ENCODER
// avro
flb_sds_destroy(ctx->avro_fields.schema_str);
flb_kafka_schema_registry_destroy(ctx);
#endif

flb_free(ctx);
Expand Down
36 changes: 36 additions & 0 deletions plugins/out_kafka/kafka_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#endif

#include <fluent-bit/flb_kafka.h>
#include <fluent-bit/flb_upstream.h>
#include <fluent-bit/aws/flb_aws_msk_iam.h>

#define FLB_KAFKA_FMT_JSON 0
Expand Down Expand Up @@ -56,6 +57,19 @@
#define FLB_JSON_DATE_ISO8601_NS 2
#define FLB_JSON_DATE_ISO8601_FMT "%Y-%m-%dT%H:%M:%S"

#ifdef FLB_HAVE_AVRO_ENCODER
struct flb_kafka_schema_registry_endpoint {
flb_sds_t host;
flb_sds_t uri;
int port;
#ifdef FLB_HAVE_TLS
struct flb_tls *tls;
#endif
struct flb_upstream *upstream;
struct mk_list _head;
};
#endif

struct flb_kafka_topic {
int name_len;
char *name;
Expand Down Expand Up @@ -127,6 +141,18 @@ struct flb_out_kafka {
// flb_sds_t avro_schema_str;
// flb_sds_t avro_schema_id;
struct flb_avro_fields avro_fields;

/* Optional Confluent Schema Registry resolver for Avro schemas */
flb_sds_t schema_registry_url;
flb_sds_t schema_registry_subject;
flb_sds_t schema_registry_version;
flb_sds_t schema_registry_http_user;
flb_sds_t schema_registry_http_passwd;
flb_sds_t schema_registry_bearer_token;
flb_sds_t schema_registry_framing;
int schema_registry_endpoint_count;
int schema_registry_endpoint_index;
struct mk_list schema_registry_endpoints;
#endif

#ifdef FLB_HAVE_AWS_MSK_IAM
Expand All @@ -147,4 +173,14 @@ struct flb_out_kafka *flb_out_kafka_create(struct flb_output_instance *ins,
struct flb_config *config);
int flb_out_kafka_destroy(struct flb_out_kafka *ctx);

#ifdef FLB_HAVE_AVRO_ENCODER
int flb_kafka_schema_registry_configure(struct flb_out_kafka *ctx,
struct flb_config *config);
int flb_kafka_schema_registry_resolve(struct flb_out_kafka *ctx);
int flb_kafka_schema_registry_parse_response(struct flb_out_kafka *ctx,
const char *payload,
size_t payload_size);
void flb_kafka_schema_registry_destroy(struct flb_out_kafka *ctx);
#endif

#endif
Loading
Loading