Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 64 additions & 8 deletions plugins/out_cloudwatch_logs/cloudwatch_api.c
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,34 @@ static struct flb_aws_header put_log_events_header[] = {
},
};

static int mock_create_log_stream_calls;
static int mock_put_log_events_calls;
static int mock_create_log_stream_after_put_calls;

void cloudwatch_mock_call_count_reset(void)
{
mock_create_log_stream_calls = 0;
mock_put_log_events_calls = 0;
mock_create_log_stream_after_put_calls = 0;
}

int cloudwatch_mock_call_count_get(const char *api)
{
if (strcmp(api, "CreateLogStream") == 0) {
return mock_create_log_stream_calls;
}
else if (strcmp(api, "PutLogEvents") == 0) {
return mock_put_log_events_calls;
}

return 0;
}

int cloudwatch_mock_create_after_put_count_get(void)
{
return mock_create_log_stream_after_put_calls;
}

int plugin_under_test()
{
if (getenv("FLB_CLOUDWATCH_PLUGIN_UNDER_TEST") != NULL) {
Expand Down Expand Up @@ -137,6 +165,16 @@ struct flb_http_client *mock_http_call(char *error_env_var, char *api)
struct flb_http_client *c = NULL;
char *error = mock_error_response(error_env_var);

if (strcmp(api, "CreateLogStream") == 0) {
mock_create_log_stream_calls++;
if (mock_put_log_events_calls > 0) {
mock_create_log_stream_after_put_calls++;
}
}
else if (strcmp(api, "PutLogEvents") == 0) {
mock_put_log_events_calls++;
}

c = flb_calloc(1, sizeof(struct flb_http_client));
if (!c) {
flb_errno();
Expand Down Expand Up @@ -1573,6 +1611,11 @@ int process_and_send(struct flb_cloudwatch *ctx, const char *input_plugin,
data, bytes,
config);
}

if (buf->non_retriable_error == FLB_TRUE) {
return -1;
}

/* send any remaining events */
ret = send_log_events(ctx, buf);
reset_flush_buf(ctx, buf);
Expand All @@ -1599,15 +1642,13 @@ struct log_stream *get_or_create_log_stream(struct flb_cloudwatch *ctx,
now = time(NULL);
mk_list_foreach_safe(head, tmp, &ctx->streams) {
stream = mk_list_entry(head, struct log_stream, _head);
if (strcmp(stream_name, stream->name) == 0 && strcmp(group_name, stream->group) == 0) {
return stream;
if (stream->expiration < now) {
mk_list_del(&stream->_head);
log_stream_destroy(stream);
}
else {
/* check if stream is expired, if so, clean it up */
if (stream->expiration < now) {
mk_list_del(&stream->_head);
log_stream_destroy(stream);
}
else if (strcmp(stream_name, stream->name) == 0 &&
strcmp(group_name, stream->group) == 0) {
return stream;
}
}

Expand Down Expand Up @@ -2012,6 +2053,7 @@ int put_log_events(struct flb_cloudwatch *ctx, struct cw_flush *buf,

struct flb_http_client *c = NULL;
struct flb_aws_client *cw_client;
flb_sds_t error;
int num_headers = 1;
int retry = FLB_TRUE;

Expand Down Expand Up @@ -2066,6 +2108,20 @@ int put_log_events(struct flb_cloudwatch *ctx, struct cw_flush *buf,

/* Check error */
if (c->resp.payload_size > 0) {
error = flb_aws_error(c->resp.payload, c->resp.payload_size);
if (error != NULL) {
if (strcmp(error, ERR_CODE_NOT_FOUND) == 0) {
flb_plg_error(ctx->ins, "Log stream %s not found. "
"Rejecting the chunk without retry.",
stream->name);
stream->expiration = 0;
buf->non_retriable_error = FLB_TRUE;
Comment thread
cosmo0920 marked this conversation as resolved.
flb_sds_destroy(error);
flb_http_client_destroy(c);
return -1;
}
flb_sds_destroy(error);
}
flb_aws_print_error(c->resp.payload, c->resp.payload_size,
"PutLogEvents", ctx->ins);
}
Expand Down
3 changes: 3 additions & 0 deletions plugins/out_cloudwatch_logs/cloudwatch_api.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,5 +76,8 @@ int put_log_events(struct flb_cloudwatch *ctx, struct cw_flush *buf,
size_t payload_size);
int create_log_group(struct flb_cloudwatch *ctx, struct log_stream *stream);
int compare_events(const void *a_arg, const void *b_arg);
void cloudwatch_mock_call_count_reset(void);
int cloudwatch_mock_call_count_get(const char *api);
int cloudwatch_mock_create_after_put_count_get(void);

#endif
4 changes: 4 additions & 0 deletions plugins/out_cloudwatch_logs/cloudwatch_logs.c
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,10 @@ static void cb_cloudwatch_flush(struct flb_event_chunk *event_chunk,
event_chunk->type, config);
if (event_count < 0) {
flb_plg_error(ctx->ins, "Failed to send events");
if (buf->non_retriable_error == FLB_TRUE) {
cw_flush_destroy(buf);
FLB_OUTPUT_RETURN(FLB_ERROR);
}
cw_flush_destroy(buf);
FLB_OUTPUT_RETURN(FLB_RETRY);
}
Expand Down
3 changes: 3 additions & 0 deletions plugins/out_cloudwatch_logs/cloudwatch_logs.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,9 @@ struct cw_flush {

/* current log stream that we are sending records too */
struct log_stream *current_stream;

/* marks a delivery failure that must not be retried */
int non_retriable_error;
};

struct cw_event {
Expand Down
24 changes: 19 additions & 5 deletions tests/runtime/out_cloudwatch.c
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,12 @@
/* CloudWatch API constants */
#include "../../plugins/out_cloudwatch_logs/cloudwatch_api.h"

#define ERROR_ALREADY_EXISTS "{\"__type\":\"ResourceAlreadyExistsException\"}"
#ifdef FLB_SYSTEM_WINDOWS
#define setenv(name, value, overwrite) _putenv_s(name, value)
#define unsetenv(name) _putenv_s(name, "")
#endif

#define CLOUDWATCH_ERROR_ALREADY_EXISTS "{\"__type\":\"ResourceAlreadyExistsException\"}"
#define CLOUDWATCH_ERROR_NOT_FOUND "{\"__type\":\"ResourceNotFoundException\"}"
/* not a real error code, but tests that the code can respond to any error */
#define ERROR_UNKNOWN "{\"__type\":\"UNKNOWN\"}"
Expand Down Expand Up @@ -109,7 +114,7 @@ void flb_test_cloudwatch_already_exists_create_group(void)

/* mocks calls- signals that we are in test mode */
setenv("FLB_CLOUDWATCH_PLUGIN_UNDER_TEST", "true", 1);
setenv("TEST_CREATE_LOG_GROUP_ERROR", ERROR_ALREADY_EXISTS, 1);
setenv("TEST_CREATE_LOG_GROUP_ERROR", CLOUDWATCH_ERROR_ALREADY_EXISTS, 1);

ctx = flb_create();

Expand Down Expand Up @@ -146,7 +151,7 @@ void flb_test_cloudwatch_already_exists_create_stream(void)

/* mocks calls- signals that we are in test mode */
setenv("FLB_CLOUDWATCH_PLUGIN_UNDER_TEST", "true", 1);
setenv("TEST_CREATE_LOG_STREAM_ERROR", ERROR_ALREADY_EXISTS, 1);
setenv("TEST_CREATE_LOG_STREAM_ERROR", CLOUDWATCH_ERROR_ALREADY_EXISTS, 1);

ctx = flb_create();

Expand Down Expand Up @@ -292,9 +297,10 @@ void flb_test_cloudwatch_error_put_log_events_not_found(void)
int in_ffd;
int out_ffd;

/* ResourceNotFoundException must follow the normal output retry path. */
/* ResourceNotFoundException must reject the chunk without retrying it. */
setenv("FLB_CLOUDWATCH_PLUGIN_UNDER_TEST", "true", 1);
setenv("TEST_PUT_LOG_EVENTS_ERROR", CLOUDWATCH_ERROR_NOT_FOUND, 1);
cloudwatch_mock_call_count_reset();

ctx = flb_create();

Expand All @@ -317,8 +323,16 @@ void flb_test_cloudwatch_error_put_log_events_not_found(void)

flb_lib_push(ctx, in_ffd, (char *) JSON_TD, (int) sizeof(JSON_TD) - 1);

sleep(2);
TEST_CHECK(cloudwatch_mock_call_count_get("PutLogEvents") == 1);
TEST_CHECK(cloudwatch_mock_create_after_put_count_get() == 0);

flb_lib_push(ctx, in_ffd, (char *) JSON_TD, (int) sizeof(JSON_TD) - 1);

sleep(2);
flb_stop(ctx);
TEST_CHECK(cloudwatch_mock_call_count_get("PutLogEvents") == 2);
TEST_CHECK(cloudwatch_mock_create_after_put_count_get() == 1);
flb_destroy(ctx);
unsetenv("TEST_PUT_LOG_EVENTS_ERROR");
}
Expand Down Expand Up @@ -369,7 +383,7 @@ void flb_test_cloudwatch_already_exists_create_group_put_retention_policy(void)

/* mocks calls- signals that we are in test mode */
setenv("FLB_CLOUDWATCH_PLUGIN_UNDER_TEST", "true", 1);
setenv("TEST_CREATE_LOG_GROUP_ERROR", ERROR_ALREADY_EXISTS, 1);
setenv("TEST_CREATE_LOG_GROUP_ERROR", CLOUDWATCH_ERROR_ALREADY_EXISTS, 1);

/* PutRetentionPolicy is not called if the group already exists */
setenv("TEST_PUT_RETENTION_POLICY_ERROR", ERROR_UNKNOWN, 1);
Expand Down
Loading