diff --git a/examples/kafka_filter/README.md b/examples/kafka_filter/README.md new file mode 100644 index 00000000000..23a30e9d516 --- /dev/null +++ b/examples/kafka_filter/README.md @@ -0,0 +1,201 @@ +# Fluent Bit Kafka Examples + +This directory contains examples for using Fluent Bit with Apache Kafka, including support for AWS MSK (Managed Streaming for Apache Kafka) with IAM authentication. + +## Examples + +### 1. Basic Kafka Example (`kafka.conf`) + +A simple example demonstrating Kafka input and output with a Lua filter. + +**Features:** +- Kafka consumer input +- Lua filter for message transformation +- Kafka producer output + +**Usage:** +```bash +docker-compose up +``` + +### 2. AWS MSK IAM Authentication (`kafka_msk_iam.conf`) + +Comprehensive examples for AWS MSK with IAM authentication, covering various deployment scenarios. + +**Scenarios covered:** +- Standard MSK cluster (auto-detected region) +- MSK via PrivateLink (explicit region) +- MSK Serverless (auto-detected region) +- VPC Endpoint (auto-detected region) + +## AWS MSK IAM Authentication + +### Overview + +AWS MSK supports IAM authentication, which eliminates the need to manage separate credentials for Kafka. Fluent Bit seamlessly integrates with AWS MSK IAM authentication. + +### Configuration + +Enable MSK IAM authentication by setting: +```ini +rdkafka.sasl.mechanism aws_msk_iam +``` + +### Region Detection + +Fluent Bit can automatically detect the AWS region from standard MSK broker hostnames: +- `b-1.example.kafka.us-east-1.amazonaws.com` → region: `us-east-1` +- `boot-abc.kafka-serverless.us-west-2.amazonaws.com` → region: `us-west-2` +- `vpce-123.kafka.eu-west-1.vpce.amazonaws.com` → region: `eu-west-1` + +### Custom DNS / PrivateLink + +When using PrivateLink aliases or custom DNS names that don't contain `.amazonaws.com`, you **must** explicitly specify the region: + +```ini +[OUTPUT] + Name kafka + Match * + brokers my-privatelink-alias.internal.example.com:9098 + topics my-topic + rdkafka.sasl.mechanism aws_msk_iam + aws_region us-east-1 # REQUIRED for custom DNS +``` + +### AWS Credentials + +MSK IAM authentication uses the standard AWS credentials chain: + +1. Environment variables (`AWS_ACCESS_KEY_ID`, `AWS_SECRET_ACCESS_KEY`) +2. EC2 instance profile / ECS task role (recommended for production) +3. AWS credentials file (`~/.aws/credentials`) + +### Required IAM Permissions + +Your IAM role or user needs the following permissions: + +```json +{ + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Allow", + "Action": [ + "kafka-cluster:Connect", + "kafka-cluster:DescribeCluster", + "kafka-cluster:ReadData", + "kafka-cluster:WriteData" + ], + "Resource": [ + "arn:aws:kafka:REGION:ACCOUNT:cluster/CLUSTER_NAME/*", + "arn:aws:kafka:REGION:ACCOUNT:topic/CLUSTER_NAME/*", + "arn:aws:kafka:REGION:ACCOUNT:group/CLUSTER_NAME/*" + ] + } + ] +} +``` + +**Note:** Adjust permissions based on your use case: +- Consumers need: `Connect`, `DescribeCluster`, `ReadData` +- Producers need: `Connect`, `WriteData` + +## Configuration Parameters + +### Common Parameters + +| Parameter | Description | Required | +|-----------|-------------|----------| +| `brokers` | Comma-separated list of Kafka brokers | Yes | +| `topics` | Topic name(s) for input or output | Yes | +| `rdkafka.sasl.mechanism` | Set to `aws_msk_iam` for MSK IAM auth | For MSK IAM | +| `aws_region` | AWS region (auto-detected if not set) | Only for custom DNS | +| `group_id` | Consumer group ID | For input | + +### Additional librdkafka Parameters + +You can pass any librdkafka configuration using the `rdkafka.` prefix: + +```ini +rdkafka.socket.timeout.ms 60000 +rdkafka.metadata.max.age.ms 180000 +rdkafka.request.timeout.ms 30000 +``` + +For a complete list of parameters, see the [librdkafka configuration documentation](https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md). + +## Testing + +### Local Kafka (Docker) + +1. Start the Kafka stack: + ```bash + cd examples/kafka_filter + docker-compose up -d + ``` + +2. Run Fluent Bit: + ```bash + fluent-bit -c kafka.conf + ``` + +3. Produce test messages: + ```bash + ./scripts/kafka-produce.sh + ``` + +4. Consume messages: + ```bash + ./scripts/kafka-consume.sh + ``` + +### AWS MSK + +1. Update `kafka_msk_iam.conf` with your MSK cluster details +2. Ensure AWS credentials are configured +3. Run Fluent Bit: + ```bash + fluent-bit -c kafka_msk_iam.conf + ``` + +## Troubleshooting + +### Authentication Failures + +**Error:** `failed to setup MSK IAM authentication OAuth callback` + +**Solutions:** +- For custom DNS/PrivateLink: Add `aws_region` parameter +- Verify AWS credentials are available +- Check IAM permissions + +### Region Detection Issues + +**Error:** `failed to auto-detect region from broker address` + +**Solution:** +Explicitly set the region: +```ini +aws_region us-east-1 +``` + +### Connection Timeouts + +**Solution:** +Increase timeout values: +```ini +rdkafka.socket.timeout.ms 60000 +rdkafka.metadata.max.age.ms 180000 +``` + +## Additional Resources + +- [Fluent Bit Kafka Documentation](https://docs.fluentbit.io/) +- [AWS MSK IAM Access Control](https://docs.aws.amazon.com/msk/latest/developerguide/iam-access-control.html) +- [librdkafka Configuration](https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md) + +## Support + +For issues or questions: +- [Fluent Bit GitHub Issues](https://github.com/fluent/fluent-bit/issues) +- [Fluent Bit Slack Community](https://fluentbit.io/slack) diff --git a/examples/kafka_filter/kafka_msk_iam.conf b/examples/kafka_filter/kafka_msk_iam.conf new file mode 100644 index 00000000000..a2ef253f297 --- /dev/null +++ b/examples/kafka_filter/kafka_msk_iam.conf @@ -0,0 +1,141 @@ +# Fluent Bit configuration example for AWS MSK with IAM authentication +# This example demonstrates how to configure Kafka input/output plugins +# with AWS MSK IAM authentication for different scenarios. + +[SERVICE] + Flush 5 + Grace 30 + Log_Level info + +# ============================================================================== +# Example 1: Standard MSK cluster with auto-detected region +# ============================================================================== +# The region is automatically extracted from the broker hostname +# Works for standard MSK endpoints like: +# b-1.example.kafka.us-east-1.amazonaws.com +# boot-abc123.kafka-serverless.us-east-1.amazonaws.com + +[INPUT] + Name kafka + brokers b-1.example.kafka.us-east-1.amazonaws.com:9098,b-2.example.kafka.us-east-1.amazonaws.com:9098 + topics my-input-topic + group_id my-consumer-group + # Enable AWS MSK IAM authentication + rdkafka.sasl.mechanism aws_msk_iam + # Region will be auto-detected from broker hostname + # No need to set aws_region explicitly + +[OUTPUT] + Name kafka + Match * + brokers b-1.example.kafka.us-east-1.amazonaws.com:9098,b-2.example.kafka.us-east-1.amazonaws.com:9098 + topics my-output-topic + # Enable AWS MSK IAM authentication + rdkafka.sasl.mechanism aws_msk_iam + # Region will be auto-detected from broker hostname + +# ============================================================================== +# Example 2: MSK cluster via PrivateLink with explicit region +# ============================================================================== +# When using PrivateLink aliases or custom DNS names that don't contain +# .amazonaws.com, you must explicitly specify the aws_region parameter + +[INPUT] + Name kafka + Tag kafka.privatelink + brokers my-privatelink-alias.internal.example.com:9098 + topics my-input-topic + group_id my-consumer-group + # Enable AWS MSK IAM authentication + rdkafka.sasl.mechanism aws_msk_iam + # REQUIRED: Explicitly set region for custom DNS names + aws_region us-east-1 + +[OUTPUT] + Name kafka + Match kafka.privatelink + brokers my-privatelink-alias.internal.example.com:9098 + topics my-output-topic + # Enable AWS MSK IAM authentication + rdkafka.sasl.mechanism aws_msk_iam + # REQUIRED: Explicitly set region for custom DNS names + aws_region us-east-1 + +# ============================================================================== +# Example 3: MSK Serverless with auto-detected region +# ============================================================================== +# MSK Serverless endpoints are automatically detected + +[INPUT] + Name kafka + Tag kafka.serverless + brokers boot-abc123.c1.kafka-serverless.us-west-2.amazonaws.com:9098 + topics my-serverless-topic + group_id my-serverless-group + # Enable AWS MSK IAM authentication + rdkafka.sasl.mechanism aws_msk_iam + # Region will be auto-detected from broker hostname + +[OUTPUT] + Name kafka + Match kafka.serverless + brokers boot-abc123.c1.kafka-serverless.us-west-2.amazonaws.com:9098 + topics my-serverless-output + # Enable AWS MSK IAM authentication + rdkafka.sasl.mechanism aws_msk_iam + +# ============================================================================== +# Example 4: VPC Endpoint with auto-detected region +# ============================================================================== +# VPC endpoints are also supported with auto-detection + +[INPUT] + Name kafka + Tag kafka.vpce + brokers vpce-abc123.kafka.us-east-1.vpce.amazonaws.com:9098 + topics my-vpce-topic + group_id my-vpce-group + # Enable AWS MSK IAM authentication + rdkafka.sasl.mechanism aws_msk_iam + # Region will be auto-detected from VPC endpoint hostname + +[OUTPUT] + Name kafka + Match kafka.vpce + brokers vpce-abc123.kafka.us-east-1.vpce.amazonaws.com:9098 + topics my-vpce-output + # Enable AWS MSK IAM authentication + rdkafka.sasl.mechanism aws_msk_iam + +# ============================================================================== +# Notes: +# ============================================================================== +# +# 1. AWS Credentials: +# MSK IAM authentication uses the standard AWS credentials chain: +# - Environment variables (AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY) +# - EC2 instance profile / ECS task role +# - AWS credentials file (~/.aws/credentials) +# +# 2. IAM Permissions Required: +# Your IAM role/user needs the following permissions: +# - kafka-cluster:Connect +# - kafka-cluster:DescribeCluster (for consumers) +# - kafka-cluster:ReadData (for consumers) +# - kafka-cluster:WriteData (for producers) +# +# 3. When to use aws_region parameter: +# - REQUIRED for PrivateLink aliases or any custom DNS names +# - OPTIONAL for standard AWS MSK endpoints (auto-detected) +# - OPTIONAL for MSK Serverless endpoints (auto-detected) +# - OPTIONAL for VPC endpoints (auto-detected) +# +# 4. Security Protocol: +# When using aws_msk_iam, the security protocol is automatically +# set to SASL_SSL. You don't need to configure it explicitly. +# +# 5. Additional rdkafka options: +# You can pass any librdkafka configuration option using the +# rdkafka. prefix, for example: +# - rdkafka.socket.timeout.ms 60000 +# - rdkafka.metadata.max.age.ms 180000 diff --git a/include/fluent-bit/aws/flb_aws_msk_iam.h b/include/fluent-bit/aws/flb_aws_msk_iam.h index df0ea258557..e331a6bac21 100644 --- a/include/fluent-bit/aws/flb_aws_msk_iam.h +++ b/include/fluent-bit/aws/flb_aws_msk_iam.h @@ -28,20 +28,21 @@ struct flb_aws_msk_iam; -struct flb_msk_iam_cb { - void *plugin_ctx; - struct flb_aws_msk_iam *iam; - char *broker_host; /* Store the actual broker hostname */ -}; - /* * Register the oauthbearer refresh callback for MSK IAM authentication. + * Parameters: + * - config: Fluent Bit configuration + * - kconf: rdkafka configuration + * - opaque: Kafka opaque context (will be set with MSK IAM context) + * - brokers: Comma-separated list of broker addresses (used to extract AWS region if region is NULL) + * - region: Optional AWS region (if NULL, will be auto-detected from brokers) * Returns context pointer on success or NULL on failure. */ struct flb_aws_msk_iam *flb_aws_msk_iam_register_oauth_cb(struct flb_config *config, rd_kafka_conf_t *kconf, - const char *cluster_arn, - struct flb_kafka_opaque *opaque); + struct flb_kafka_opaque *opaque, + const char *brokers, + const char *region); void flb_aws_msk_iam_destroy(struct flb_aws_msk_iam *ctx); #endif diff --git a/plugins/in_kafka/in_kafka.c b/plugins/in_kafka/in_kafka.c index e07d7970a7c..31001f9806e 100644 --- a/plugins/in_kafka/in_kafka.c +++ b/plugins/in_kafka/in_kafka.c @@ -268,40 +268,43 @@ static int in_kafka_init(struct flb_input_instance *ins, return -1; } -#ifdef FLB_HAVE_AWS_MSK_IAM - /* - * When MSK IAM auth is enabled, default the required - * security settings so users don't need to specify them. - */ - if (ctx->aws_msk_iam && ctx->aws_msk_iam_cluster_arn) { - conf = flb_input_get_property("rdkafka.security.protocol", ins); - if (!conf) { - flb_input_set_property(ins, "rdkafka.security.protocol", "SASL_SSL"); + /* Retrieve SASL mechanism if configured */ + conf = flb_input_get_property("rdkafka.sasl.mechanism", ins); + if (conf) { + ctx->sasl_mechanism = flb_sds_create(conf); + if (!ctx->sasl_mechanism) { + flb_plg_error(ins, "failed to allocate SASL mechanism string"); + flb_free(ctx); + return -1; } - - conf = flb_input_get_property("rdkafka.sasl.mechanism", ins); - if (!conf) { + flb_plg_info(ins, "SASL mechanism configured: %s", ctx->sasl_mechanism); + +#ifdef FLB_HAVE_AWS_MSK_IAM + /* Check if using aws_msk_iam as SASL mechanism */ + if (strcasecmp(conf, "aws_msk_iam") == 0) { + /* Mark that user explicitly requested AWS MSK IAM */ + ctx->aws_msk_iam = FLB_TRUE; + + /* Set SASL mechanism to OAUTHBEARER for librdkafka */ flb_input_set_property(ins, "rdkafka.sasl.mechanism", "OAUTHBEARER"); + flb_sds_destroy(ctx->sasl_mechanism); ctx->sasl_mechanism = flb_sds_create("OAUTHBEARER"); + if (!ctx->sasl_mechanism) { + flb_plg_error(ins, "failed to allocate SASL mechanism string"); + flb_free(ctx); + return -1; + } + + /* Ensure security protocol is set */ + conf = flb_input_get_property("rdkafka.security.protocol", ins); + if (!conf) { + flb_input_set_property(ins, "rdkafka.security.protocol", "SASL_SSL"); + } + + flb_plg_info(ins, "AWS MSK IAM authentication enabled via rdkafka.sasl.mechanism"); } - else { - ctx->sasl_mechanism = flb_sds_create(conf); - flb_plg_info(ins, "SASL mechanism configured: %s", ctx->sasl_mechanism); - } - } - else { #endif - - /* Retrieve SASL mechanism if configured */ - conf = flb_input_get_property("rdkafka.sasl.mechanism", ins); - if (conf) { - ctx->sasl_mechanism = flb_sds_create(conf); - flb_plg_info(ins, "SASL mechanism configured: %s", ctx->sasl_mechanism); - } - -#ifdef FLB_HAVE_AWS_MSK_IAM } -#endif kafka_conf = flb_kafka_conf_create(&ctx->kafka, &ins->properties, 1); if (!kafka_conf) { @@ -351,26 +354,47 @@ static int in_kafka_init(struct flb_input_instance *ins, flb_kafka_opaque_set(ctx->opaque, ctx, NULL); rd_kafka_conf_set_opaque(kafka_conf, ctx->opaque); + /* + * Enable SASL queue for all OAUTHBEARER configurations. + * This allows librdkafka to handle OAuth token refresh in a background thread, + * which is essential for idle connections or when poll intervals are large. + * This benefits all OAUTHBEARER methods: AWS IAM, OIDC, custom OAuth, etc. + */ + if (ctx->sasl_mechanism && strcasecmp(ctx->sasl_mechanism, "OAUTHBEARER") == 0) { + rd_kafka_conf_enable_sasl_queue(kafka_conf, 1); + flb_plg_debug(ins, "SASL queue enabled for OAUTHBEARER mechanism"); + } + #ifdef FLB_HAVE_AWS_MSK_IAM - if (ctx->aws_msk_iam && ctx->aws_msk_iam_cluster_arn && ctx->sasl_mechanism && + /* Only register MSK IAM if user explicitly requested it via rdkafka.sasl.mechanism=aws_msk_iam */ + if (ctx->aws_msk_iam && ctx->sasl_mechanism && strcasecmp(ctx->sasl_mechanism, "OAUTHBEARER") == 0) { - flb_plg_info(ins, "registering MSK IAM authentication with cluster ARN: %s", - ctx->aws_msk_iam_cluster_arn); - ctx->msk_iam = flb_aws_msk_iam_register_oauth_cb(config, - kafka_conf, - ctx->aws_msk_iam_cluster_arn, - ctx->opaque); - if (!ctx->msk_iam) { - flb_plg_error(ins, "failed to setup MSK IAM authentication"); + /* Register MSK IAM OAuth callback */ + if (ctx->kafka.brokers) { + flb_plg_info(ins, "registering AWS MSK IAM authentication OAuth callback"); + ctx->msk_iam = flb_aws_msk_iam_register_oauth_cb(config, + kafka_conf, + ctx->opaque, + ctx->kafka.brokers, + ctx->aws_region); + + if (!ctx->msk_iam) { + flb_plg_error(ins, "failed to setup MSK IAM authentication OAuth callback"); + goto init_error; + } + else { + res = rd_kafka_conf_set(kafka_conf, "sasl.oauthbearer.config", + "principal=admin", errstr, sizeof(errstr)); + if (res != RD_KAFKA_CONF_OK) { + flb_plg_error(ins, + "failed to set sasl.oauthbearer.config: %s", + errstr); + } + } } else { - res = rd_kafka_conf_set(kafka_conf, "sasl.oauthbearer.config", - "principal=admin", errstr, sizeof(errstr)); - if (res != RD_KAFKA_CONF_OK) { - flb_plg_error(ins, - "failed to set sasl.oauthbearer.config: %s", - errstr); - } + flb_plg_error(ins, "brokers configuration is required for MSK IAM authentication"); + goto init_error; } } #endif @@ -380,9 +404,36 @@ static int in_kafka_init(struct flb_input_instance *ins, /* Create Kafka consumer handle */ if (!ctx->kafka.rk) { flb_plg_error(ins, "Failed to create new consumer: %s", errstr); + /* rd_kafka_new() did NOT take ownership on failure; kafka_conf is + * still valid and will be destroyed by init_error cleanup path. */ goto init_error; } + /* rd_kafka_new() takes ownership of kafka_conf on success */ + kafka_conf = NULL; + + /* + * Enable SASL background callbacks for all OAUTHBEARER configurations. + * This ensures OAuth tokens are refreshed automatically even when: + * - Poll intervals are large + * - Topics have no messages + * - Collector is paused + * This benefits all OAUTHBEARER methods: AWS IAM, OIDC, custom OAuth, etc. + */ + if (ctx->sasl_mechanism && strcasecmp(ctx->sasl_mechanism, "OAUTHBEARER") == 0) { + rd_kafka_error_t *error; + error = rd_kafka_sasl_background_callbacks_enable(ctx->kafka.rk); + if (error) { + flb_plg_warn(ins, "failed to enable SASL background callbacks: %s. " + "OAuth tokens may not refresh during idle periods.", + rd_kafka_error_string(error)); + rd_kafka_error_destroy(error); + } + else { + flb_plg_info(ins, "OAUTHBEARER: SASL background callbacks enabled"); + } + } + /* Trigger initial token refresh for OAUTHBEARER */ rd_kafka_poll(ctx->kafka.rk, 0); @@ -449,15 +500,23 @@ static int in_kafka_init(struct flb_input_instance *ins, } if (ctx->kafka.rk) { rd_kafka_consumer_close(ctx->kafka.rk); + /* rd_kafka_destroy also destroys the conf that was passed to rd_kafka_new */ rd_kafka_destroy(ctx->kafka.rk); } + else if (kafka_conf) { + /* If rd_kafka was never created, we need to destroy conf manually */ + rd_kafka_conf_destroy(kafka_conf); + } if (ctx->opaque) { flb_kafka_opaque_destroy(ctx->opaque); } - else if (kafka_conf) { - /* conf is already destroyed when rd_kafka is initialized */ - rd_kafka_conf_destroy(kafka_conf); + +#ifdef FLB_HAVE_AWS_MSK_IAM + if (ctx->msk_iam) { + flb_aws_msk_iam_destroy(ctx->msk_iam); } +#endif + flb_sds_destroy(ctx->sasl_mechanism); flb_free(ctx); @@ -552,6 +611,15 @@ static struct flb_config_map config_map[] = { 0, FLB_FALSE, 0, "Set the librdkafka options" }, +#ifdef FLB_HAVE_AWS_MSK_IAM + { + FLB_CONFIG_MAP_STR, "aws_region", (char *)NULL, + 0, FLB_TRUE, offsetof(struct flb_in_kafka_config, aws_region), + "AWS region for MSK IAM authentication. If not set, region will be " + "auto-detected from broker hostname (only works for standard MSK endpoints). " + "Required when using custom DNS names (e.g., PrivateLink) with MSK IAM." + }, +#endif { FLB_CONFIG_MAP_SIZE, "buffer_max_size", FLB_IN_KAFKA_BUFFER_MAX_SIZE, 0, FLB_TRUE, offsetof(struct flb_in_kafka_config, buffer_max_size), @@ -571,19 +639,6 @@ static struct flb_config_map config_map[] = { "Rely on kafka auto-commit and commit messages in batches" }, -#ifdef FLB_HAVE_AWS_MSK_IAM - { - FLB_CONFIG_MAP_STR, "aws_msk_iam_cluster_arn", (char *)NULL, - 0, FLB_TRUE, offsetof(struct flb_in_kafka_config, aws_msk_iam_cluster_arn), - "ARN of the MSK cluster when using AWS IAM authentication" - }, - { - FLB_CONFIG_MAP_BOOL, "aws_msk_iam", "false", - 0, FLB_TRUE, offsetof(struct flb_in_kafka_config, aws_msk_iam), - "Enable AWS MSK IAM authentication" - }, -#endif - /* EOF */ {0} }; diff --git a/plugins/in_kafka/in_kafka.h b/plugins/in_kafka/in_kafka.h index 096cf1c561b..dd6ce87c632 100644 --- a/plugins/in_kafka/in_kafka.h +++ b/plugins/in_kafka/in_kafka.h @@ -55,12 +55,12 @@ struct flb_in_kafka_config { struct flb_kafka_opaque *opaque; #ifdef FLB_HAVE_AWS_MSK_IAM - flb_sds_t aws_msk_iam_cluster_arn; struct flb_aws_msk_iam *msk_iam; + int aws_msk_iam; /* Flag to indicate user explicitly requested AWS MSK IAM */ + char *aws_region; /* AWS region for MSK IAM (optional, auto-detected if not set) */ #endif /* SASL mechanism configured in rdkafka.sasl.mechanism */ - int aws_msk_iam; flb_sds_t sasl_mechanism; }; diff --git a/plugins/out_kafka/kafka.c b/plugins/out_kafka/kafka.c index dadd4725f74..79059f54186 100644 --- a/plugins/out_kafka/kafka.c +++ b/plugins/out_kafka/kafka.c @@ -670,6 +670,15 @@ static struct flb_config_map config_map[] = { 0, FLB_FALSE, 0, "Set the kafka group_id." }, +#ifdef FLB_HAVE_AWS_MSK_IAM + { + FLB_CONFIG_MAP_STR, "aws_region", (char *)NULL, + 0, FLB_TRUE, offsetof(struct flb_out_kafka, aws_region), + "AWS region for MSK IAM authentication. If not set, region will be " + "auto-detected from broker hostname (only works for standard MSK endpoints). " + "Required when using custom DNS names (e.g., PrivateLink) with MSK IAM." + }, +#endif { FLB_CONFIG_MAP_STR, "raw_log_key", NULL, 0, FLB_TRUE, offsetof(struct flb_out_kafka, raw_log_key), @@ -678,19 +687,6 @@ static struct flb_config_map config_map[] = { "that key will be sent to Kafka." }, -#ifdef FLB_HAVE_AWS_MSK_IAM - { - FLB_CONFIG_MAP_STR, "aws_msk_iam_cluster_arn", NULL, - 0, FLB_TRUE, offsetof(struct flb_out_kafka, aws_msk_iam_cluster_arn), - "ARN of the MSK cluster when using AWS IAM authentication" - }, - { - FLB_CONFIG_MAP_BOOL, "aws_msk_iam", "false", - 0, FLB_TRUE, offsetof(struct flb_out_kafka, aws_msk_iam), - "Enable AWS MSK IAM authentication" - }, -#endif - /* EOF */ {0} }; diff --git a/plugins/out_kafka/kafka_config.c b/plugins/out_kafka/kafka_config.c index b4bb9be6acf..b99b44ac871 100644 --- a/plugins/out_kafka/kafka_config.c +++ b/plugins/out_kafka/kafka_config.c @@ -58,37 +58,33 @@ struct flb_out_kafka *flb_out_kafka_create(struct flb_output_instance *ins, return NULL; } + /* Retrieve SASL mechanism if configured */ + tmp = flb_output_get_property("rdkafka.sasl.mechanism", ins); + if (tmp) { + ctx->sasl_mechanism = flb_sds_create(tmp); + flb_plg_info(ins, "SASL mechanism configured: %s", ctx->sasl_mechanism); + #ifdef FLB_HAVE_AWS_MSK_IAM - /* - * When MSK IAM auth is enabled, default the required - * security settings so users don't need to specify them. - */ - if (ctx->aws_msk_iam && ctx->aws_msk_iam_cluster_arn) { - tmp = flb_output_get_property("rdkafka.security.protocol", ins); - if (!tmp) { - flb_output_set_property(ins, "rdkafka.security.protocol", "SASL_SSL"); - } - - tmp = flb_output_get_property("rdkafka.sasl.mechanism", ins); - if (!tmp) { + /* Check if using aws_msk_iam as SASL mechanism */ + if (strcasecmp(tmp, "aws_msk_iam") == 0) { + /* Mark that user explicitly requested AWS MSK IAM */ + ctx->aws_msk_iam = FLB_TRUE; + + /* Set SASL mechanism to OAUTHBEARER for librdkafka */ flb_output_set_property(ins, "rdkafka.sasl.mechanism", "OAUTHBEARER"); + flb_sds_destroy(ctx->sasl_mechanism); ctx->sasl_mechanism = flb_sds_create("OAUTHBEARER"); + + /* Ensure security protocol is set */ + tmp = flb_output_get_property("rdkafka.security.protocol", ins); + if (!tmp) { + flb_output_set_property(ins, "rdkafka.security.protocol", "SASL_SSL"); + } + + flb_plg_info(ins, "AWS MSK IAM authentication enabled via rdkafka.sasl.mechanism"); } - else { - ctx->sasl_mechanism = flb_sds_create(tmp); - } - } - else { #endif - /* Retrieve SASL mechanism if configured */ - tmp = flb_output_get_property("rdkafka.sasl.mechanism", ins); - if (tmp) { - ctx->sasl_mechanism = flb_sds_create(tmp); - } - -#ifdef FLB_HAVE_AWS_MSK_IAM } -#endif /* rdkafka config context */ ctx->conf = flb_kafka_conf_create(&ctx->kafka, &ins->properties, 0); @@ -210,18 +206,35 @@ struct flb_out_kafka *flb_out_kafka_create(struct flb_output_instance *ins, flb_kafka_opaque_set(ctx->opaque, ctx, NULL); rd_kafka_conf_set_opaque(ctx->conf, ctx->opaque); + /* + * Enable SASL queue for all OAUTHBEARER configurations. + * This allows librdkafka to handle OAuth token refresh in a background thread, + * which is essential for idle connections where rd_kafka_poll() is not called. + * This benefits all OAUTHBEARER methods: AWS IAM, OIDC, custom OAuth, etc. + */ + if (ctx->sasl_mechanism && strcasecmp(ctx->sasl_mechanism, "OAUTHBEARER") == 0) { + rd_kafka_conf_enable_sasl_queue(ctx->conf, 1); + flb_plg_debug(ins, "SASL queue enabled for OAUTHBEARER mechanism"); + } + #ifdef FLB_HAVE_AWS_MSK_IAM - if (ctx->aws_msk_iam && ctx->aws_msk_iam_cluster_arn && ctx->sasl_mechanism && + /* Only register MSK IAM if user explicitly requested it via rdkafka.sasl.mechanism=aws_msk_iam */ + if (ctx->aws_msk_iam && ctx->sasl_mechanism && strcasecmp(ctx->sasl_mechanism, "OAUTHBEARER") == 0) { - - ctx->msk_iam = flb_aws_msk_iam_register_oauth_cb(config, - ctx->conf, - ctx->aws_msk_iam_cluster_arn, - ctx->opaque); - if (!ctx->msk_iam) { - flb_plg_error(ctx->ins, "failed to setup MSK IAM authentication"); - } - else { + /* Register MSK IAM OAuth callback */ + if (ctx->kafka.brokers) { + flb_plg_info(ins, "registering AWS MSK IAM authentication OAuth callback"); + ctx->msk_iam = flb_aws_msk_iam_register_oauth_cb(config, + ctx->conf, + ctx->opaque, + ctx->kafka.brokers, + ctx->aws_region); + if (!ctx->msk_iam) { + flb_plg_error(ctx->ins, "failed to setup MSK IAM authentication OAuth callback"); + flb_out_kafka_destroy(ctx); + return NULL; + } + res = rd_kafka_conf_set(ctx->conf, "sasl.oauthbearer.config", "principal=admin", errstr, sizeof(errstr)); if (res != RD_KAFKA_CONF_OK) { @@ -230,19 +243,49 @@ struct flb_out_kafka *flb_out_kafka_create(struct flb_output_instance *ins, errstr); } } + else { + flb_plg_error(ctx->ins, "brokers configuration is required for MSK IAM authentication"); + flb_out_kafka_destroy(ctx); + return NULL; + } } #endif /* Kafka Producer */ ctx->kafka.rk = rd_kafka_new(RD_KAFKA_PRODUCER, ctx->conf, errstr, sizeof(errstr)); + if (!ctx->kafka.rk) { flb_plg_error(ctx->ins, "failed to create producer: %s", errstr); + /* rd_kafka_new() did NOT take ownership on failure; ctx->conf is + * still valid and will be destroyed by flb_out_kafka_destroy(). */ flb_out_kafka_destroy(ctx); return NULL; } + /* rd_kafka_new() takes ownership of ctx->conf on success */ + ctx->conf = NULL; + + /* + * Enable SASL background callbacks for all OAUTHBEARER configurations. + * This ensures OAuth tokens are refreshed automatically even on idle connections. + * This benefits all OAUTHBEARER methods: AWS IAM, OIDC, custom OAuth, etc. + */ + if (ctx->sasl_mechanism && strcasecmp(ctx->sasl_mechanism, "OAUTHBEARER") == 0) { + rd_kafka_error_t *error; + error = rd_kafka_sasl_background_callbacks_enable(ctx->kafka.rk); + if (error) { + flb_plg_warn(ctx->ins, "failed to enable SASL background callbacks: %s. " + "OAuth tokens may not refresh on idle connections.", + rd_kafka_error_string(error)); + rd_kafka_error_destroy(error); + } + else { + flb_plg_info(ctx->ins, "OAUTHBEARER: SASL background callbacks enabled"); + } + } + #ifdef FLB_HAVE_AVRO_ENCODER /* Config AVRO */ tmp = flb_output_get_property("schema_str", ins); @@ -301,8 +344,13 @@ int flb_out_kafka_destroy(struct flb_out_kafka *ctx) flb_kafka_topic_destroy_all(ctx); if (ctx->kafka.rk) { + /* rd_kafka_destroy also destroys the conf that was passed to rd_kafka_new */ rd_kafka_destroy(ctx->kafka.rk); } + else if (ctx->conf) { + /* If rd_kafka was never created, we need to destroy conf manually */ + rd_kafka_conf_destroy(ctx->conf); + } if (ctx->opaque) { flb_kafka_opaque_destroy(ctx->opaque); diff --git a/plugins/out_kafka/kafka_config.h b/plugins/out_kafka/kafka_config.h index e1ebc04e65c..4376a59b760 100644 --- a/plugins/out_kafka/kafka_config.h +++ b/plugins/out_kafka/kafka_config.h @@ -126,12 +126,11 @@ struct flb_out_kafka { #endif #ifdef FLB_HAVE_AWS_MSK_IAM - flb_sds_t aws_msk_iam_cluster_arn; struct flb_aws_msk_iam *msk_iam; + int aws_msk_iam; /* Flag to indicate user explicitly requested AWS MSK IAM */ + char *aws_region; /* AWS region for MSK IAM (optional, auto-detected if not set) */ #endif - int aws_msk_iam; - struct flb_kafka_opaque *opaque; /* SASL mechanism configured in rdkafka.sasl.mechanism */ diff --git a/src/aws/flb_aws_credentials_ec2.c b/src/aws/flb_aws_credentials_ec2.c index 2722e26d223..9aa1444f1fb 100644 --- a/src/aws/flb_aws_credentials_ec2.c +++ b/src/aws/flb_aws_credentials_ec2.c @@ -130,6 +130,7 @@ int refresh_fn_ec2(struct flb_aws_provider *provider) { int ret = -1; flb_debug("[aws_credentials] Refresh called on the EC2 IMDS provider"); + if (try_lock_provider(provider)) { ret = get_creds_ec2(implementation); unlock_provider(provider); diff --git a/src/aws/flb_aws_credentials_profile.c b/src/aws/flb_aws_credentials_profile.c index 48cb9299572..7ad7099ff45 100644 --- a/src/aws/flb_aws_credentials_profile.c +++ b/src/aws/flb_aws_credentials_profile.c @@ -663,8 +663,7 @@ static int get_shared_credentials(char* credentials_path, if (flb_read_file(credentials_path, &buf, &size) < 0) { if (errno == ENOENT) { - AWS_CREDS_ERROR_OR_DEBUG(debug_only, "Shared credentials file %s does not exist", - credentials_path); + AWS_CREDS_DEBUG("Shared credentials file %s does not exist", credentials_path); } else { flb_errno(); AWS_CREDS_ERROR_OR_DEBUG(debug_only, "Could not read shared credentials file %s", diff --git a/src/aws/flb_aws_credentials_sts.c b/src/aws/flb_aws_credentials_sts.c index 554fac20353..155a41d3998 100644 --- a/src/aws/flb_aws_credentials_sts.c +++ b/src/aws/flb_aws_credentials_sts.c @@ -175,7 +175,7 @@ int refresh_fn_sts(struct flb_aws_provider *provider) { struct flb_aws_provider_sts *implementation = provider->implementation; flb_debug("[aws_credentials] Refresh called on the STS provider"); - + if (try_lock_provider(provider)) { ret = sts_assume_role_request(implementation->sts_client, &implementation->creds, implementation->uri, @@ -480,6 +480,7 @@ int refresh_fn_eks(struct flb_aws_provider *provider) { struct flb_aws_provider_eks *implementation = provider->implementation; flb_debug("[aws_credentials] Refresh called on the EKS provider"); + if (try_lock_provider(provider)) { ret = assume_with_web_identity(implementation); unlock_provider(provider); diff --git a/src/aws/flb_aws_msk_iam.c b/src/aws/flb_aws_msk_iam.c index cf8af7d0cc8..97c91df9f41 100644 --- a/src/aws/flb_aws_msk_iam.c +++ b/src/aws/flb_aws_msk_iam.c @@ -28,6 +28,7 @@ #include #include #include +#include #include #include @@ -36,15 +37,22 @@ #include #include #include +#include + +/* + * OAuth token lifetime of 5 minutes (industry standard). + * Matches AWS Go SDK and Kafka Connect implementations. + */ +#define MSK_IAM_TOKEN_LIFETIME_SECONDS 300 -/* Lightweight config - NO persistent AWS provider */ struct flb_aws_msk_iam { - struct flb_config *flb_config; /* For creating AWS provider on-demand */ + struct flb_config *flb_config; flb_sds_t region; - flb_sds_t cluster_arn; + struct flb_tls *cred_tls; + struct flb_aws_provider *provider; + pthread_mutex_t lock; /* Protects credential provider access from concurrent threads */ }; -/* Utility functions - same as before */ static int to_encode(char c) { if ((c >= '0' && c <= '9') || @@ -125,52 +133,82 @@ static int hmac_sha256_sign(unsigned char out[32], return 0; } -static char *extract_region(const char *arn) +/* Extract region from MSK broker address + * Supported formats: + * - MSK Standard: b-1.example.c1.kafka..amazonaws.com:port + * - MSK Serverless: boot-.c.kafka-serverless..amazonaws.com:port + * - VPC Endpoint: vpce-.kafka..vpce.amazonaws.com:port + */ +static flb_sds_t extract_region_from_broker(const char *broker) { const char *p; - const char *r; + const char *start; + const char *end; + const char *port_pos; size_t len; - char *out; - - /* arn:partition:service:region:... */ - p = strchr(arn, ':'); - if (!p) { + flb_sds_t out; + + if (!broker || strlen(broker) == 0) { return NULL; } - p = strchr(p + 1, ':'); - if (!p) { + + /* Remove port if present (e.g., :9098) */ + port_pos = strchr(broker, ':'); + if (port_pos) { + len = port_pos - broker; + } else { + len = strlen(broker); + } + + /* Find .amazonaws.com */ + p = strstr(broker, ".amazonaws.com"); + if (!p || p >= broker + len) { return NULL; } - p = strchr(p + 1, ':'); - if (!p) { + + /* Region is between the last dot before .amazonaws.com and .amazonaws.com + * Handle VPC endpoints (vpce-xxx.kafka.region.vpce.amazonaws.com) + * Example formats: + * Standard: ...kafka.us-east-1.amazonaws.com + * Serverless: ...kafka-serverless.us-east-1.amazonaws.com + * VPC Endpoint: ...kafka.us-east-1.vpce.amazonaws.com + */ + end = p; /* Points to .amazonaws.com */ + + /* Check for VPC endpoint format: .vpce.amazonaws.com */ + if (p - broker >= 5 && strncmp(p - 5, ".vpce", 5) == 0) { + /* For VPC endpoints, region ends at .vpce */ + end = p - 5; + } + + /* Find the start of region by going backwards to find the previous dot */ + start = end; + while (start > broker && *(start - 1) != '.') { + start--; + } + + len = end - start; + + /* Sanity check on region length (AWS regions are typically 9-20 chars) */ + if (len == 0 || len > 32) { return NULL; } - - r = p + 1; - p = strchr(r, ':'); - if (!p) { - return NULL; - } - len = p - r; - out = flb_malloc(len + 1); + + out = flb_sds_create_len(start, len); if (!out) { return NULL; } - memcpy(out, r, len); - out[len] = '\0'; - + return out; } -/* Stateless payload generator - creates AWS provider on demand */ +/* Payload generator - builds MSK IAM authentication payload */ static flb_sds_t build_msk_iam_payload(struct flb_aws_msk_iam *config, - const char *host) + const char *host, + struct flb_aws_credentials *creds) { - struct flb_aws_provider *temp_provider = NULL; - struct flb_aws_credentials *creds = NULL; flb_sds_t payload = NULL; int encode_result; - char *p; size_t len; size_t url_len; size_t encoded_len; @@ -205,46 +243,17 @@ static flb_sds_t build_msk_iam_payload(struct flb_aws_msk_iam *config, /* Validate inputs */ if (!config || !config->region || flb_sds_len(config->region) == 0) { - flb_error("[aws_msk_iam] build_msk_iam_payload: region is not set or invalid"); + flb_error("[aws_msk_iam] region is not set or invalid"); return NULL; } if (!host || strlen(host) == 0) { - flb_error("[aws_msk_iam] build_msk_iam_payload: host is required"); - return NULL; - } - - flb_info("[aws_msk_iam] build_msk_iam_payload: generating payload for host: %s, region: %s", - host, config->region); - - /* Create AWS provider on-demand */ - temp_provider = flb_standard_chain_provider_create(config->flb_config, NULL, - config->region, NULL, NULL, - flb_aws_client_generator(), - NULL); - if (!temp_provider) { - flb_error("[aws_msk_iam] build_msk_iam_payload: failed to create AWS credentials provider"); - return NULL; - } - - if (temp_provider->provider_vtable->init(temp_provider) != 0) { - flb_error("[aws_msk_iam] build_msk_iam_payload: failed to initialize AWS credentials provider"); - flb_aws_provider_destroy(temp_provider); + flb_error("[aws_msk_iam] host is required"); return NULL; } - /* Get credentials */ - creds = temp_provider->provider_vtable->get_credentials(temp_provider); - if (!creds) { - flb_error("[aws_msk_iam] build_msk_iam_payload: failed to get credentials"); - flb_aws_provider_destroy(temp_provider); - return NULL; - } - - if (!creds->access_key_id || !creds->secret_access_key) { - flb_error("[aws_msk_iam] build_msk_iam_payload: incomplete credentials"); - flb_aws_credentials_destroy(creds); - flb_aws_provider_destroy(temp_provider); + if (!creds || !creds->access_key_id || !creds->secret_access_key) { + flb_error("[aws_msk_iam] invalid or incomplete credentials"); return NULL; } @@ -269,19 +278,17 @@ static flb_sds_t build_msk_iam_payload(struct flb_aws_msk_iam *config, goto error; } - /* CRITICAL: Encode the action parameter */ action_enc = uri_encode_params("kafka-cluster:Connect", 21); if (!action_enc) { goto error; } - /* Build canonical query string with ACTION parameter first (alphabetical order) */ + /* Build canonical query string */ query = flb_sds_create_size(8192); if (!query) { goto error; } - /* note: Action must be FIRST in alphabetical order */ query = flb_sds_printf(&query, "Action=%s&X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=%s" "&X-Amz-Date=%s&X-Amz-Expires=900", @@ -290,27 +297,23 @@ static flb_sds_t build_msk_iam_payload(struct flb_aws_msk_iam *config, goto error; } - /* Add session token if present (before SignedHeaders alphabetically) */ + /* Add session token if present */ if (creds->session_token && flb_sds_len(creds->session_token) > 0) { session_token_enc = uri_encode_params(creds->session_token, flb_sds_len(creds->session_token)); if (!session_token_enc) { - flb_error("[aws_msk_iam] build_msk_iam_payload: failed to encode session token"); goto error; } tmp = flb_sds_printf(&query, "&X-Amz-Security-Token=%s", session_token_enc); if (!tmp) { - flb_error("[aws_msk_iam] build_msk_iam_payload: failed to append session token to query"); goto error; } query = tmp; } - /* Add SignedHeaders LAST (alphabetically after Security-Token) */ tmp = flb_sds_printf(&query, "&X-Amz-SignedHeaders=host"); if (!tmp) { - flb_error("[aws_msk_iam] build_msk_iam_payload: failed to append SignedHeaders"); goto error; } query = tmp; @@ -321,10 +324,8 @@ static flb_sds_t build_msk_iam_payload(struct flb_aws_msk_iam *config, goto error; } - /* CRITICAL: MSK IAM canonical request format - use SHA256 of empty string, not UNSIGNED-PAYLOAD */ if (flb_hash_simple(FLB_HASH_SHA256, (unsigned char *) "", 0, empty_payload_hash, sizeof(empty_payload_hash)) != FLB_CRYPTO_SUCCESS) { - flb_error("[aws_msk_iam] build_msk_iam_payload: failed to hash empty payload"); goto error; } @@ -338,17 +339,15 @@ static flb_sds_t build_msk_iam_payload(struct flb_aws_msk_iam *config, query, host, empty_payload_hex); flb_sds_destroy(empty_payload_hex); - empty_payload_hex = NULL; /* Prevent double-free */ + empty_payload_hex = NULL; if (!canonical) { - flb_error("[aws_msk_iam] build_msk_iam_payload: failed to build canonical request"); goto error; } - /* Hash canonical request immediately */ + /* Hash canonical request */ if (flb_hash_simple(FLB_HASH_SHA256, (unsigned char *) canonical, flb_sds_len(canonical), sha256_buf, sizeof(sha256_buf)) != FLB_CRYPTO_SUCCESS) { - flb_error("[aws_msk_iam] build_msk_iam_payload: failed to hash canonical request"); goto error; } @@ -384,34 +383,28 @@ static flb_sds_t build_msk_iam_payload(struct flb_aws_msk_iam *config, len = strlen(datestamp); if (hmac_sha256_sign(key_date, (unsigned char *) key, flb_sds_len(key), (unsigned char *) datestamp, len) != 0) { - flb_error("[aws_msk_iam] build_msk_iam_payload: failed to sign date"); goto error; } - /* Clean up key immediately after use - prevent double-free */ flb_sds_destroy(key); key = NULL; - len = strlen(config->region); + len = flb_sds_len(config->region); if (hmac_sha256_sign(key_region, key_date, 32, (unsigned char *) config->region, len) != 0) { - flb_error("[aws_msk_iam] build_msk_iam_payload: failed to sign region"); goto error; } if (hmac_sha256_sign(key_service, key_region, 32, (unsigned char *) "kafka-cluster", 13) != 0) { - flb_error("[aws_msk_iam] build_msk_iam_payload: failed to sign service"); goto error; } if (hmac_sha256_sign(key_signing, key_service, 32, (unsigned char *) "aws4_request", 12) != 0) { - flb_error("[aws_msk_iam] build_msk_iam_payload: failed to create signing key"); goto error; } if (hmac_sha256_sign(sig, key_signing, 32, (unsigned char *) string_to_sign, flb_sds_len(string_to_sign)) != 0) { - flb_error("[aws_msk_iam] build_msk_iam_payload: failed to sign request"); goto error; } @@ -420,85 +413,28 @@ static flb_sds_t build_msk_iam_payload(struct flb_aws_msk_iam *config, goto error; } - /* Append signature to query */ tmp = flb_sds_printf(&query, "&X-Amz-Signature=%s", hexsig); if (!tmp) { goto error; } query = tmp; - /* Build the complete presigned URL */ - presigned_url = flb_sds_create_size(16384); - if (!presigned_url) { - goto error; - } - - presigned_url = flb_sds_printf(&presigned_url, "https://%s/?%s", host, query); - if (!presigned_url) { - goto error; - } - - /* Base64 URL encode the presigned URL */ - url_len = flb_sds_len(presigned_url); - encoded_len = ((url_len + 2) / 3) * 4 + 1; /* Base64 encoding size + null terminator */ - - payload = flb_sds_create_size(encoded_len); - if (!payload) { - goto error; - } - - encode_result = flb_base64_encode((unsigned char*) payload, encoded_len, &actual_encoded_len, - (const unsigned char*) presigned_url, url_len); - if (encode_result == -1) { - flb_error("[aws_msk_iam] build_msk_iam_payload: failed to base64 encode URL"); - goto error; - } - flb_sds_len_set(payload, actual_encoded_len); - - /* Convert to Base64 URL encoding (replace + with -, / with _, remove padding =) */ - p = payload; - while (*p) { - if (*p == '+') { - *p = '-'; - } - else if (*p == '/') { - *p = '_'; - } - p++; - } - - /* Remove padding */ - len = flb_sds_len(payload); - while (len > 0 && payload[len-1] == '=') { - len--; - } - flb_sds_len_set(payload, len); - payload[len] = '\0'; - - /* Build the complete presigned URL */ - flb_sds_destroy(presigned_url); + /* Build complete presigned URL */ presigned_url = flb_sds_create_size(16384); if (!presigned_url) { goto error; } - presigned_url = flb_sds_printf(&presigned_url, "https://%s/?%s", host, query); + presigned_url = flb_sds_printf(&presigned_url, "https://%s/?%s&User-Agent=fluent-bit-msk-iam", + host, query); if (!presigned_url) { goto error; } - /* Add User-Agent parameter to the signed URL (like Go implementation) */ - tmp = flb_sds_printf(&presigned_url, "&User-Agent=fluent-bit-msk-iam"); - if (!tmp) { - goto error; - } - presigned_url = tmp; - - /* Base64 URL encode the presigned URL (RawURLEncoding - no padding like Go) */ + /* Base64 URL encode */ url_len = flb_sds_len(presigned_url); - encoded_len = ((url_len + 2) / 3) * 4 + 1; /* Base64 encoding size + null terminator */ + encoded_len = ((url_len + 2) / 3) * 4 + 1; - flb_sds_destroy(payload); payload = flb_sds_create_size(encoded_len); if (!payload) { goto error; @@ -507,34 +443,28 @@ static flb_sds_t build_msk_iam_payload(struct flb_aws_msk_iam *config, encode_result = flb_base64_encode((unsigned char*) payload, encoded_len, &actual_encoded_len, (const unsigned char *) presigned_url, url_len); if (encode_result == -1) { - flb_error("[aws_msk_iam] build_msk_iam_payload: failed to base64 encode URL"); goto error; } - /* Update the SDS length to match actual encoded length */ flb_sds_len_set(payload, actual_encoded_len); - /* Convert to Base64 URL encoding AND remove padding (RawURLEncoding like Go) */ - p = payload; - while (*p) { - if (*p == '+') { - *p = '-'; + /* Convert to Base64 URL encoding and remove padding */ + final_len = flb_sds_len(payload); + for (size_t i = 0; i < final_len; i++) { + if (payload[i] == '+') { + payload[i] = '-'; } - else if (*p == '/') { - *p = '_'; + else if (payload[i] == '/') { + payload[i] = '_'; } - p++; } - - /* Remove ALL padding (RawURLEncoding) */ - final_len = flb_sds_len(payload); while (final_len > 0 && payload[final_len-1] == '=') { final_len--; } flb_sds_len_set(payload, final_len); payload[final_len] = '\0'; - /* Clean up before successful return */ + /* Clean up */ flb_sds_destroy(credential); flb_sds_destroy(credential_enc); flb_sds_destroy(canonical); @@ -547,65 +477,28 @@ static flb_sds_t build_msk_iam_payload(struct flb_aws_msk_iam *config, if (session_token_enc) { flb_sds_destroy(session_token_enc); } - if (creds) { - flb_aws_credentials_destroy(creds); - } - if (temp_provider) { - flb_aws_provider_destroy(temp_provider); - } return payload; error: - /* Clean up everything - check for NULL to prevent double-free */ - if (credential) { - flb_sds_destroy(credential); - } - if (credential_enc) { - flb_sds_destroy(credential_enc); - } - if (canonical) { - flb_sds_destroy(canonical); - } - if (hexhash) { - flb_sds_destroy(hexhash); - } - if (string_to_sign) { - flb_sds_destroy(string_to_sign); - } - if (hexsig) { - flb_sds_destroy(hexsig); - } - if (query) { - flb_sds_destroy(query); - } - if (action_enc) { - flb_sds_destroy(action_enc); - } - if (presigned_url) { - flb_sds_destroy(presigned_url); - } - if (key) { /* Only destroy if not already destroyed */ - flb_sds_destroy(key); - } - if (payload) { - flb_sds_destroy(payload); - } - if (session_token_enc) { - flb_sds_destroy(session_token_enc); - } - if (creds) { - flb_aws_credentials_destroy(creds); - } - if (temp_provider) { - flb_aws_provider_destroy(temp_provider); - } + if (credential) flb_sds_destroy(credential); + if (credential_enc) flb_sds_destroy(credential_enc); + if (canonical) flb_sds_destroy(canonical); + if (hexhash) flb_sds_destroy(hexhash); + if (string_to_sign) flb_sds_destroy(string_to_sign); + if (hexsig) flb_sds_destroy(hexsig); + if (query) flb_sds_destroy(query); + if (action_enc) flb_sds_destroy(action_enc); + if (presigned_url) flb_sds_destroy(presigned_url); + if (key) flb_sds_destroy(key); + if (payload) flb_sds_destroy(payload); + if (session_token_enc) flb_sds_destroy(session_token_enc); + if (empty_payload_hex) flb_sds_destroy(empty_payload_hex); return NULL; } - -/* Stateless callback - creates AWS provider on-demand for each refresh */ +/* OAuth token refresh callback */ static void oauthbearer_token_refresh_cb(rd_kafka_t *rk, const char *oauthbearer_config, void *opaque) @@ -614,101 +507,116 @@ static void oauthbearer_token_refresh_cb(rd_kafka_t *rk, flb_sds_t payload = NULL; rd_kafka_resp_err_t err; char errstr[512]; - int64_t now; + time_t now; int64_t md_lifetime_ms; - const char *s3_suffix = "-s3"; - size_t arn_len; - size_t suffix_len; struct flb_aws_msk_iam *config; struct flb_aws_credentials *creds = NULL; struct flb_kafka_opaque *kafka_opaque; - struct flb_aws_provider *temp_provider = NULL; (void) oauthbearer_config; kafka_opaque = (struct flb_kafka_opaque *) opaque; if (!kafka_opaque || !kafka_opaque->msk_iam_ctx) { - flb_error("[aws_msk_iam] oauthbearer_token_refresh_cb: invalid opaque context"); + flb_error("[aws_msk_iam] invalid opaque context"); rd_kafka_oauthbearer_set_token_failure(rk, "invalid context"); return; } - flb_debug("[aws_msk_iam] running OAuth bearer token refresh callback"); - - /* get the msk_iam config (not persistent context!) */ config = kafka_opaque->msk_iam_ctx; - /* validate region (mandatory) */ if (!config->region || flb_sds_len(config->region) == 0) { - flb_error("[aws_msk_iam] region is not set or invalid"); + flb_error("[aws_msk_iam] region is not set"); rd_kafka_oauthbearer_set_token_failure(rk, "region not set"); return; } - /* Determine host endpoint */ - if (config->cluster_arn) { - arn_len = strlen(config->cluster_arn); - suffix_len = strlen(s3_suffix); - if (arn_len >= suffix_len && strcmp(config->cluster_arn + arn_len - suffix_len, s3_suffix) == 0) { - snprintf(host, sizeof(host), "kafka-serverless.%s.amazonaws.com", config->region); - flb_info("[aws_msk_iam] MSK Serverless cluster, using generic endpoint: %s", host); - } - else { - snprintf(host, sizeof(host), "kafka.%s.amazonaws.com", config->region); - flb_info("[aws_msk_iam] Regular MSK cluster, using generic endpoint: %s", host); + /* + * Construct service-level hostname for signing (kafka.{region}.amazonaws.com). + * This approach solves the multi-broker authentication issue since librdkafka's + * OAuth callback doesn't provide per-broker context. Using a consistent service + * hostname works for all brokers and supports PrivateLink/Custom DNS scenarios. + */ + snprintf(host, sizeof(host), "kafka.%s.amazonaws.com", config->region); + + flb_debug("[aws_msk_iam] OAuth token refresh callback triggered for host: %s", host); + + /* + * CRITICAL CONCURRENCY FIX: + * Lock the credential provider to prevent race conditions. + * The librdkafka refresh callback executes in its internal thread context, + * while Fluent Bit may access the same provider from other threads. + * Without synchronization, concurrent refresh/get_credentials calls can + * corrupt provider state and cause authentication failures. + */ + if (pthread_mutex_lock(&config->lock) != 0) { + flb_error("[aws_msk_iam] failed to acquire credential provider lock"); + rd_kafka_oauthbearer_set_token_failure(rk, "internal locking error"); + return; + } + + /* Refresh credentials */ + if (config->provider->provider_vtable->refresh(config->provider) < 0) { + if (pthread_mutex_unlock(&config->lock) != 0) { + flb_error("[aws_msk_iam] failed to release credential provider lock"); } + flb_warn("[aws_msk_iam] credential refresh failed, will retry on next callback"); + rd_kafka_oauthbearer_set_token_failure(rk, "credential refresh failed"); + return; } - else { - snprintf(host, sizeof(host), "kafka.%s.amazonaws.com", config->region); - flb_info("[aws_msk_iam] Regular MSK cluster, using generic endpoint: %s", host); + + /* Get credentials */ + creds = config->provider->provider_vtable->get_credentials(config->provider); + if (!creds) { + if (pthread_mutex_unlock(&config->lock) != 0) { + flb_error("[aws_msk_iam] failed to release credential provider lock"); + } + flb_error("[aws_msk_iam] failed to get AWS credentials from provider"); + rd_kafka_oauthbearer_set_token_failure(rk, "credential retrieval failed"); + return; } - flb_info("[aws_msk_iam] requesting MSK IAM payload for region: %s, host: %s", config->region, host); + /* Unlock immediately after getting credentials - no need to hold lock during payload generation */ + if (pthread_mutex_unlock(&config->lock) != 0) { + flb_error("[aws_msk_iam] failed to release credential provider lock"); + } - /* Generate payload using stateless function - creates and destroys AWS provider internally */ - payload = build_msk_iam_payload(config, host); + /* Generate payload */ + payload = build_msk_iam_payload(config, host, creds); if (!payload) { - flb_error("[aws_msk_iam] failed to generate MSK IAM payload"); - rd_kafka_oauthbearer_set_token_failure(rk, "payload generation failed"); + flb_error("[aws_msk_iam] failed to generate authentication token. " + "Possible causes: 1) Invalid AWS credentials, " + "2) Missing IAM permissions for kafka-cluster:Connect, " + "3) Incorrect region configuration (%s)", config->region); + flb_aws_credentials_destroy(creds); + rd_kafka_oauthbearer_set_token_failure(rk, "authentication token generation failed"); return; } - /* Get credentials for principal (create temporary provider just for this) */ - temp_provider = flb_standard_chain_provider_create(config->flb_config, NULL, - config->region, NULL, NULL, - flb_aws_client_generator(), - NULL); - if (temp_provider) { - if (temp_provider->provider_vtable->init(temp_provider) == 0) { - creds = temp_provider->provider_vtable->get_credentials(temp_provider); - } - } - + /* + * Set OAuth token with fixed 5-minute lifetime (AWS industry standard). + * librdkafka's background thread will automatically trigger a refresh callback + * at 80% of the token's lifetime (4 minutes) to ensure the token never expires, + * even on completely idle connections. + */ now = time(NULL); - md_lifetime_ms = (now + 900) * 1000; + md_lifetime_ms = ((int64_t)now + MSK_IAM_TOKEN_LIFETIME_SECONDS) * 1000; err = rd_kafka_oauthbearer_set_token(rk, payload, md_lifetime_ms, - creds ? creds->access_key_id : "unknown", + creds->access_key_id, NULL, 0, errstr, sizeof(errstr)); + flb_aws_credentials_destroy(creds); + if (err != RD_KAFKA_RESP_ERR_NO_ERROR) { flb_error("[aws_msk_iam] failed to set OAuth bearer token: %s", errstr); rd_kafka_oauthbearer_set_token_failure(rk, errstr); } else { - flb_info("[aws_msk_iam] OAuth bearer token successfully set"); - } - - /* Clean up everything immediately - no memory leaks possible! */ - if (creds) { - flb_aws_credentials_destroy(creds); - } - if (temp_provider) { - flb_aws_provider_destroy(temp_provider); + flb_debug("[aws_msk_iam] OAuth bearer token refreshed successfully"); } if (payload) { @@ -716,86 +624,170 @@ static void oauthbearer_token_refresh_cb(rd_kafka_t *rk, } } -/* Register callback with lightweight config - keeps your current interface */ +/* Register OAuth callback */ struct flb_aws_msk_iam *flb_aws_msk_iam_register_oauth_cb(struct flb_config *config, rd_kafka_conf_t *kconf, - const char *cluster_arn, - struct flb_kafka_opaque *opaque) + struct flb_kafka_opaque *opaque, + const char *brokers, + const char *region) { struct flb_aws_msk_iam *ctx; - char *region_str; - - flb_info("[aws_msk_iam] registering OAuth callback with cluster ARN: %s", cluster_arn); + flb_sds_t region_str = NULL; + char *first_broker = NULL; + char *comma; - if (!cluster_arn) { - flb_error("[aws_msk_iam] cluster ARN is required"); + /* Validate inputs */ + if (!opaque) { + flb_error("[aws_msk_iam] opaque context is required"); return NULL; } - /* Allocate lightweight config - NO AWS provider! */ + /* Validate brokers configuration is provided */ + if (!brokers || strlen(brokers) == 0) { + flb_error("[aws_msk_iam] brokers configuration is required"); + return NULL; + } + + /* Extract first broker from comma-separated list for region detection only */ + first_broker = flb_strdup(brokers); + if (!first_broker) { + flb_error("[aws_msk_iam] failed to allocate memory for broker parsing"); + return NULL; + } + + comma = strchr(first_broker, ','); + if (comma) { + *comma = '\0'; /* Terminate at first comma */ + } + + /* Determine region: use provided region or extract from brokers */ + if (region && strlen(region) > 0) { + /* User provided explicit region */ + region_str = flb_sds_create(region); + if (!region_str) { + flb_error("[aws_msk_iam] failed to allocate region string"); + flb_free(first_broker); + return NULL; + } + flb_info("[aws_msk_iam] using user-configured region: %s", region_str); + } + else { + /* Attempt to auto-detect region from broker hostname */ + region_str = extract_region_from_broker(first_broker); + if (!region_str || flb_sds_len(region_str) == 0) { + flb_error("[aws_msk_iam] failed to auto-detect region from broker address: %s. " + "Please set the 'aws_region' configuration parameter explicitly.", + brokers); + flb_free(first_broker); + if (region_str) { + flb_sds_destroy(region_str); + } + return NULL; + } + + flb_info("[aws_msk_iam] auto-detected region from broker hostname: %s", region_str); + } + + /* Done with first_broker string */ + flb_free(first_broker); + first_broker = NULL; + + /* Create MSK IAM context */ ctx = flb_calloc(1, sizeof(struct flb_aws_msk_iam)); if (!ctx) { flb_errno(); + flb_sds_destroy(region_str); return NULL; } - /* Store the flb_config for on-demand provider creation */ ctx->flb_config = config; - - ctx->cluster_arn = flb_sds_create(cluster_arn); - if (!ctx->cluster_arn) { - flb_error("[aws_msk_iam] failed to create cluster ARN string"); + ctx->region = region_str; + + flb_info("[aws_msk_iam] initialized MSK IAM authentication for region: %s", region_str); + + /* Create TLS instance */ + ctx->cred_tls = flb_tls_create(FLB_TLS_CLIENT_MODE, + FLB_TRUE, + 0, /* TLS debug off by default */ + NULL, NULL, NULL, NULL, NULL, NULL); + if (!ctx->cred_tls) { + flb_error("[aws_msk_iam] failed to create TLS instance"); + flb_sds_destroy(ctx->region); flb_free(ctx); return NULL; } - /* Extract region */ - region_str = extract_region(cluster_arn); - if (!region_str || strlen(region_str) == 0) { - flb_error("[aws_msk_iam] failed to extract region from cluster ARN: %s", cluster_arn); - flb_sds_destroy(ctx->cluster_arn); + /* Create AWS provider */ + ctx->provider = flb_standard_chain_provider_create(config, + ctx->cred_tls, + ctx->region, + NULL, NULL, + flb_aws_client_generator(), + NULL); + if (!ctx->provider) { + flb_error("[aws_msk_iam] failed to create AWS credentials provider"); + flb_tls_destroy(ctx->cred_tls); + flb_sds_destroy(ctx->region); flb_free(ctx); - if (region_str) flb_free(region_str); return NULL; } - ctx->region = flb_sds_create(region_str); - flb_free(region_str); - - if (!ctx->region) { - flb_error("[aws_msk_iam] failed to create region string"); - flb_sds_destroy(ctx->cluster_arn); + /* Initialize provider */ + ctx->provider->provider_vtable->sync(ctx->provider); + if (ctx->provider->provider_vtable->init(ctx->provider) != 0) { + flb_error("[aws_msk_iam] failed to initialize AWS credentials provider"); + flb_aws_provider_destroy(ctx->provider); + flb_tls_destroy(ctx->cred_tls); + flb_sds_destroy(ctx->region); flb_free(ctx); return NULL; } + ctx->provider->provider_vtable->async(ctx->provider); - flb_info("[aws_msk_iam] extracted region: %s", ctx->region); + /* Initialize mutex to protect credential provider access from concurrent threads */ + if (pthread_mutex_init(&ctx->lock, NULL) != 0) { + flb_error("[aws_msk_iam] failed to initialize credential provider mutex"); + flb_aws_provider_destroy(ctx->provider); + flb_tls_destroy(ctx->cred_tls); + flb_sds_destroy(ctx->region); + flb_free(ctx); + return NULL; + } - /* Set the callback and opaque */ - rd_kafka_conf_set_oauthbearer_token_refresh_cb(kconf, oauthbearer_token_refresh_cb); + /* + * Set MSK IAM context in opaque - now opaque->msk_iam_ctx only holds + * struct flb_aws_msk_iam * throughout its lifetime, eliminating type confusion. + */ flb_kafka_opaque_set(opaque, NULL, ctx); rd_kafka_conf_set_opaque(kconf, opaque); - - flb_info("[aws_msk_iam] OAuth callback registered successfully"); + + /* Register OAuth token refresh callback */ + rd_kafka_conf_set_oauthbearer_token_refresh_cb(kconf, oauthbearer_token_refresh_cb); return ctx; } -/* Simple destroy - just config cleanup, no AWS provider to leak! */ +/* Destroy MSK IAM config */ void flb_aws_msk_iam_destroy(struct flb_aws_msk_iam *ctx) { if (!ctx) { return; } - flb_info("[aws_msk_iam] destroying MSK IAM config"); + if (ctx->provider) { + flb_aws_provider_destroy(ctx->provider); + } - /* NO AWS provider to destroy! */ + if (ctx->cred_tls) { + flb_tls_destroy(ctx->cred_tls); + } + if (ctx->region) { flb_sds_destroy(ctx->region); } - if (ctx->cluster_arn) { - flb_sds_destroy(ctx->cluster_arn); - } + + /* Destroy the credential provider mutex */ + pthread_mutex_destroy(&ctx->lock); + flb_free(ctx); } diff --git a/src/flb_kafka.c b/src/flb_kafka.c index 316c9ba9719..6a76c0dca33 100644 --- a/src/flb_kafka.c +++ b/src/flb_kafka.c @@ -95,7 +95,7 @@ rd_kafka_conf_t *flb_kafka_conf_create(struct flb_kafka *kafka, err: if (kafka_cfg) { - flb_free(kafka_cfg); + rd_kafka_conf_destroy(kafka_cfg); } return NULL; }