diff --git a/plugins/out_cloudwatch_logs/cloudwatch_api.c b/plugins/out_cloudwatch_logs/cloudwatch_api.c index 9f79fe2208f..6b47d432af6 100644 --- a/plugins/out_cloudwatch_logs/cloudwatch_api.c +++ b/plugins/out_cloudwatch_logs/cloudwatch_api.c @@ -99,6 +99,34 @@ static struct flb_aws_header put_log_events_header[] = { }, }; +static int mock_create_log_stream_calls; +static int mock_put_log_events_calls; +static int mock_create_log_stream_after_put_calls; + +void cloudwatch_mock_call_count_reset(void) +{ + mock_create_log_stream_calls = 0; + mock_put_log_events_calls = 0; + mock_create_log_stream_after_put_calls = 0; +} + +int cloudwatch_mock_call_count_get(const char *api) +{ + if (strcmp(api, "CreateLogStream") == 0) { + return mock_create_log_stream_calls; + } + else if (strcmp(api, "PutLogEvents") == 0) { + return mock_put_log_events_calls; + } + + return 0; +} + +int cloudwatch_mock_create_after_put_count_get(void) +{ + return mock_create_log_stream_after_put_calls; +} + int plugin_under_test() { if (getenv("FLB_CLOUDWATCH_PLUGIN_UNDER_TEST") != NULL) { @@ -137,6 +165,16 @@ struct flb_http_client *mock_http_call(char *error_env_var, char *api) struct flb_http_client *c = NULL; char *error = mock_error_response(error_env_var); + if (strcmp(api, "CreateLogStream") == 0) { + mock_create_log_stream_calls++; + if (mock_put_log_events_calls > 0) { + mock_create_log_stream_after_put_calls++; + } + } + else if (strcmp(api, "PutLogEvents") == 0) { + mock_put_log_events_calls++; + } + c = flb_calloc(1, sizeof(struct flb_http_client)); if (!c) { flb_errno(); @@ -1573,6 +1611,11 @@ int process_and_send(struct flb_cloudwatch *ctx, const char *input_plugin, data, bytes, config); } + + if (buf->non_retriable_error == FLB_TRUE) { + return -1; + } + /* send any remaining events */ ret = send_log_events(ctx, buf); reset_flush_buf(ctx, buf); @@ -1599,15 +1642,13 @@ struct log_stream *get_or_create_log_stream(struct flb_cloudwatch *ctx, now = time(NULL); mk_list_foreach_safe(head, tmp, &ctx->streams) { stream = mk_list_entry(head, struct log_stream, _head); - if (strcmp(stream_name, stream->name) == 0 && strcmp(group_name, stream->group) == 0) { - return stream; + if (stream->expiration < now) { + mk_list_del(&stream->_head); + log_stream_destroy(stream); } - else { - /* check if stream is expired, if so, clean it up */ - if (stream->expiration < now) { - mk_list_del(&stream->_head); - log_stream_destroy(stream); - } + else if (strcmp(stream_name, stream->name) == 0 && + strcmp(group_name, stream->group) == 0) { + return stream; } } @@ -2012,6 +2053,7 @@ int put_log_events(struct flb_cloudwatch *ctx, struct cw_flush *buf, struct flb_http_client *c = NULL; struct flb_aws_client *cw_client; + flb_sds_t error; int num_headers = 1; int retry = FLB_TRUE; @@ -2066,6 +2108,20 @@ int put_log_events(struct flb_cloudwatch *ctx, struct cw_flush *buf, /* Check error */ if (c->resp.payload_size > 0) { + error = flb_aws_error(c->resp.payload, c->resp.payload_size); + if (error != NULL) { + if (strcmp(error, ERR_CODE_NOT_FOUND) == 0) { + flb_plg_error(ctx->ins, "Log stream %s not found. " + "Rejecting the chunk without retry.", + stream->name); + stream->expiration = 0; + buf->non_retriable_error = FLB_TRUE; + flb_sds_destroy(error); + flb_http_client_destroy(c); + return -1; + } + flb_sds_destroy(error); + } flb_aws_print_error(c->resp.payload, c->resp.payload_size, "PutLogEvents", ctx->ins); } diff --git a/plugins/out_cloudwatch_logs/cloudwatch_api.h b/plugins/out_cloudwatch_logs/cloudwatch_api.h index dff2dbf2599..0003c669abf 100644 --- a/plugins/out_cloudwatch_logs/cloudwatch_api.h +++ b/plugins/out_cloudwatch_logs/cloudwatch_api.h @@ -76,5 +76,8 @@ int put_log_events(struct flb_cloudwatch *ctx, struct cw_flush *buf, size_t payload_size); int create_log_group(struct flb_cloudwatch *ctx, struct log_stream *stream); int compare_events(const void *a_arg, const void *b_arg); +void cloudwatch_mock_call_count_reset(void); +int cloudwatch_mock_call_count_get(const char *api); +int cloudwatch_mock_create_after_put_count_get(void); #endif diff --git a/plugins/out_cloudwatch_logs/cloudwatch_logs.c b/plugins/out_cloudwatch_logs/cloudwatch_logs.c index 52ca9dee758..2f8acf188d6 100644 --- a/plugins/out_cloudwatch_logs/cloudwatch_logs.c +++ b/plugins/out_cloudwatch_logs/cloudwatch_logs.c @@ -458,6 +458,10 @@ static void cb_cloudwatch_flush(struct flb_event_chunk *event_chunk, event_chunk->type, config); if (event_count < 0) { flb_plg_error(ctx->ins, "Failed to send events"); + if (buf->non_retriable_error == FLB_TRUE) { + cw_flush_destroy(buf); + FLB_OUTPUT_RETURN(FLB_ERROR); + } cw_flush_destroy(buf); FLB_OUTPUT_RETURN(FLB_RETRY); } diff --git a/plugins/out_cloudwatch_logs/cloudwatch_logs.h b/plugins/out_cloudwatch_logs/cloudwatch_logs.h index cd134f8a580..4011c434b0d 100644 --- a/plugins/out_cloudwatch_logs/cloudwatch_logs.h +++ b/plugins/out_cloudwatch_logs/cloudwatch_logs.h @@ -106,6 +106,9 @@ struct cw_flush { /* current log stream that we are sending records too */ struct log_stream *current_stream; + + /* marks a delivery failure that must not be retried */ + int non_retriable_error; }; struct cw_event { diff --git a/tests/runtime/out_cloudwatch.c b/tests/runtime/out_cloudwatch.c index c90a3e00776..aa5c65f1946 100644 --- a/tests/runtime/out_cloudwatch.c +++ b/tests/runtime/out_cloudwatch.c @@ -8,7 +8,12 @@ /* CloudWatch API constants */ #include "../../plugins/out_cloudwatch_logs/cloudwatch_api.h" -#define ERROR_ALREADY_EXISTS "{\"__type\":\"ResourceAlreadyExistsException\"}" +#ifdef FLB_SYSTEM_WINDOWS +#define setenv(name, value, overwrite) _putenv_s(name, value) +#define unsetenv(name) _putenv_s(name, "") +#endif + +#define CLOUDWATCH_ERROR_ALREADY_EXISTS "{\"__type\":\"ResourceAlreadyExistsException\"}" #define CLOUDWATCH_ERROR_NOT_FOUND "{\"__type\":\"ResourceNotFoundException\"}" /* not a real error code, but tests that the code can respond to any error */ #define ERROR_UNKNOWN "{\"__type\":\"UNKNOWN\"}" @@ -109,7 +114,7 @@ void flb_test_cloudwatch_already_exists_create_group(void) /* mocks calls- signals that we are in test mode */ setenv("FLB_CLOUDWATCH_PLUGIN_UNDER_TEST", "true", 1); - setenv("TEST_CREATE_LOG_GROUP_ERROR", ERROR_ALREADY_EXISTS, 1); + setenv("TEST_CREATE_LOG_GROUP_ERROR", CLOUDWATCH_ERROR_ALREADY_EXISTS, 1); ctx = flb_create(); @@ -146,7 +151,7 @@ void flb_test_cloudwatch_already_exists_create_stream(void) /* mocks calls- signals that we are in test mode */ setenv("FLB_CLOUDWATCH_PLUGIN_UNDER_TEST", "true", 1); - setenv("TEST_CREATE_LOG_STREAM_ERROR", ERROR_ALREADY_EXISTS, 1); + setenv("TEST_CREATE_LOG_STREAM_ERROR", CLOUDWATCH_ERROR_ALREADY_EXISTS, 1); ctx = flb_create(); @@ -292,9 +297,10 @@ void flb_test_cloudwatch_error_put_log_events_not_found(void) int in_ffd; int out_ffd; - /* ResourceNotFoundException must follow the normal output retry path. */ + /* ResourceNotFoundException must reject the chunk without retrying it. */ setenv("FLB_CLOUDWATCH_PLUGIN_UNDER_TEST", "true", 1); setenv("TEST_PUT_LOG_EVENTS_ERROR", CLOUDWATCH_ERROR_NOT_FOUND, 1); + cloudwatch_mock_call_count_reset(); ctx = flb_create(); @@ -317,8 +323,16 @@ void flb_test_cloudwatch_error_put_log_events_not_found(void) flb_lib_push(ctx, in_ffd, (char *) JSON_TD, (int) sizeof(JSON_TD) - 1); + sleep(2); + TEST_CHECK(cloudwatch_mock_call_count_get("PutLogEvents") == 1); + TEST_CHECK(cloudwatch_mock_create_after_put_count_get() == 0); + + flb_lib_push(ctx, in_ffd, (char *) JSON_TD, (int) sizeof(JSON_TD) - 1); + sleep(2); flb_stop(ctx); + TEST_CHECK(cloudwatch_mock_call_count_get("PutLogEvents") == 2); + TEST_CHECK(cloudwatch_mock_create_after_put_count_get() == 1); flb_destroy(ctx); unsetenv("TEST_PUT_LOG_EVENTS_ERROR"); } @@ -369,7 +383,7 @@ void flb_test_cloudwatch_already_exists_create_group_put_retention_policy(void) /* mocks calls- signals that we are in test mode */ setenv("FLB_CLOUDWATCH_PLUGIN_UNDER_TEST", "true", 1); - setenv("TEST_CREATE_LOG_GROUP_ERROR", ERROR_ALREADY_EXISTS, 1); + setenv("TEST_CREATE_LOG_GROUP_ERROR", CLOUDWATCH_ERROR_ALREADY_EXISTS, 1); /* PutRetentionPolicy is not called if the group already exists */ setenv("TEST_PUT_RETENTION_POLICY_ERROR", ERROR_UNKNOWN, 1);