diff --git a/cmake/plugins_options.cmake b/cmake/plugins_options.cmake index 0e6cd9bf080..b878200bfa4 100644 --- a/cmake/plugins_options.cmake +++ b/cmake/plugins_options.cmake @@ -84,6 +84,7 @@ DEFINE_OPTION(FLB_FILTER_GEOIP2 "Enable geoip2 filter" DEFINE_OPTION(FLB_FILTER_GREP "Enable grep filter" ON) DEFINE_OPTION(FLB_FILTER_KUBERNETES "Enable kubernetes filter" ON) DEFINE_OPTION(FLB_FILTER_LOG_TO_METRICS "Enable log-derived metrics filter" ON) +DEFINE_OPTION(FLB_FILTER_LOOKUP "Enable lookup filter" ON) DEFINE_OPTION(FLB_FILTER_LUA "Enable Lua scripting filter" ON) DEFINE_OPTION(FLB_FILTER_MODIFY "Enable modify filter" ON) DEFINE_OPTION(FLB_FILTER_MULTILINE "Enable multiline filter" ON) diff --git a/plugins/CMakeLists.txt b/plugins/CMakeLists.txt index baaaabd0326..2cd9fb8a063 100644 --- a/plugins/CMakeLists.txt +++ b/plugins/CMakeLists.txt @@ -417,6 +417,7 @@ REGISTER_FILTER_PLUGIN("filter_alter_size") REGISTER_FILTER_PLUGIN("filter_aws") REGISTER_FILTER_PLUGIN("filter_checklist") REGISTER_FILTER_PLUGIN("filter_ecs") +REGISTER_FILTER_PLUGIN("filter_lookup") REGISTER_FILTER_PLUGIN("filter_record_modifier") REGISTER_FILTER_PLUGIN("filter_sysinfo") REGISTER_FILTER_PLUGIN("filter_throttle") diff --git a/plugins/filter_lookup/CMakeLists.txt b/plugins/filter_lookup/CMakeLists.txt new file mode 100644 index 00000000000..4b3ad679204 --- /dev/null +++ b/plugins/filter_lookup/CMakeLists.txt @@ -0,0 +1,4 @@ +set(src + lookup.c) + +FLB_PLUGIN(filter_lookup "${src}" "") diff --git a/plugins/filter_lookup/lookup.c b/plugins/filter_lookup/lookup.c new file mode 100644 index 00000000000..8e07fda1462 --- /dev/null +++ b/plugins/filter_lookup/lookup.c @@ -0,0 +1,1027 @@ +/* Fluent Bit + * ========== + * Copyright (C) 2015-2025 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#ifndef _WIN32 +#include +#else +#include +#endif +#include +#include +#include + +#include "lookup.h" + +/* Macro to increment records metrics */ +#ifdef FLB_HAVE_METRICS +#define INCREMENT_SKIPPED_METRIC(ctx, ins) do { \ + uint64_t ts = cfl_time_now(); \ + static char* labels_array[1]; \ + labels_array[0] = (char*)flb_filter_name(ins); \ + cmt_counter_add(ctx->cmt_skipped, ts, 1, 1, labels_array); \ + flb_metrics_sum(FLB_LOOKUP_METRIC_SKIPPED, 1, ins->metrics); \ +} while(0) + +#define INCREMENT_MATCHED_METRIC(ctx, ins) do { \ + uint64_t ts = cfl_time_now(); \ + static char* labels_array[1]; \ + labels_array[0] = (char*)flb_filter_name(ins); \ + cmt_counter_add(ctx->cmt_matched, ts, 1, 1, labels_array); \ + flb_metrics_sum(FLB_LOOKUP_METRIC_MATCHED, 1, ins->metrics); \ +} while(0) + +#define INCREMENT_PROCESSED_METRIC(ctx, ins) do { \ + uint64_t ts = cfl_time_now(); \ + static char* labels_array[1]; \ + labels_array[0] = (char*)flb_filter_name(ins); \ + cmt_counter_add(ctx->cmt_processed, ts, 1, 1, labels_array); \ + flb_metrics_sum(FLB_LOOKUP_METRIC_PROCESSED, 1, ins->metrics); \ +} while(0) +#else +#define INCREMENT_SKIPPED_METRIC(ctx, ins) do { } while(0) +#define INCREMENT_MATCHED_METRIC(ctx, ins) do { } while(0) +#define INCREMENT_PROCESSED_METRIC(ctx, ins) do { } while(0) +#endif + + +struct val_node { + struct mk_list _head; + void *val; +}; + +/* + * Trims leading/trailing whitespace and optionally normalizes to lower-case. + */ +static int normalize_and_trim(const char *input, size_t len, int ignore_case, char **output, size_t *out_len) +{ + if (!input || len == 0) { + *output = NULL; + *out_len = 0; + return 0; + } + /* Trim leading whitespace */ + const char *start = input; + size_t n = len; + while (n > 0 && isspace((unsigned char)*start)) { + start++; + n--; + } + /* Trim trailing whitespace */ + const char *end = start + n; + while (n > 0 && isspace((unsigned char)*(end - 1))) { + end--; + n--; + } + if (n == 0) { + *output = NULL; + *out_len = 0; + return 0; + } + if (ignore_case) { + char *buf; + size_t j; + buf = flb_malloc(n + 1); + if (!buf) { + *output = NULL; + *out_len = 0; + return -1; + } + for (j = 0; j < n; j++) { + buf[j] = tolower((unsigned char)start[j]); + } + buf[n] = '\0'; + *output = buf; + *out_len = n; + return 1; + } else { + *output = (char *)start; + *out_len = n; + return 0; + } +} + +/* Dynamic buffer structure for growing strings */ +struct dynamic_buffer { + char *data; + size_t len; + size_t capacity; +}; + +/* Initialize a dynamic buffer */ +static int dynbuf_init(struct dynamic_buffer *buf, size_t initial_capacity) +{ + buf->data = flb_malloc(initial_capacity); + if (!buf->data) { + return -1; + } + buf->len = 0; + buf->capacity = initial_capacity; + buf->data[0] = '\0'; + return 0; +} + +/* Append a character to dynamic buffer, growing if necessary */ +static int dynbuf_append_char(struct dynamic_buffer *buf, char c) +{ + /* Ensure we have space for the character plus null terminator */ + if (buf->len + 1 >= buf->capacity) { + size_t new_capacity = buf->capacity * 2; + char *new_data = flb_realloc(buf->data, new_capacity); + if (!new_data) { + return -1; + } + buf->data = new_data; + buf->capacity = new_capacity; + } + buf->data[buf->len++] = c; + buf->data[buf->len] = '\0'; + return 0; +} + +/* Free dynamic buffer */ +static void dynbuf_destroy(struct dynamic_buffer *buf) +{ + if (buf && buf->data) { + flb_free(buf->data); + buf->data = NULL; + buf->len = 0; + buf->capacity = 0; + } +} + +/* Read a line of arbitrary length from file using dynamic allocation */ +static char *read_line_dynamic(FILE *fp, size_t *line_length) +{ + size_t capacity = 256; /* Initial capacity */ + size_t len = 0; + char *line = flb_malloc(capacity); + int c; + + if (!line) { + return NULL; + } + + while ((c = fgetc(fp)) != EOF) { + /* Check if we need to grow the buffer */ + if (len + 1 >= capacity) { + size_t new_capacity = capacity * 2; + char *new_line = flb_realloc(line, new_capacity); + if (!new_line) { + flb_free(line); + return NULL; + } + line = new_line; + capacity = new_capacity; + } + + /* Add character to buffer */ + line[len++] = c; + + /* Check for end of line */ + if (c == '\n') { + break; + } + } + + /* If we read nothing and hit EOF, return NULL */ + if (len == 0 && c == EOF) { + flb_free(line); + return NULL; + } + + /* Null terminate the string */ + if (len >= capacity) { + char *new_line = flb_realloc(line, len + 1); + if (!new_line) { + flb_free(line); + return NULL; + } + line = new_line; + } + line[len] = '\0'; + + /* Remove trailing \r\n characters */ + while (len > 0 && (line[len - 1] == '\n' || line[len - 1] == '\r')) { + line[--len] = '\0'; + } + + if (line_length) { + *line_length = len; + } + + return line; +} + +static int load_csv(struct lookup_ctx *ctx) +{ + FILE *fp; + int line_num = 1; + char *header_line; + char *line; + size_t line_length; + + fp = fopen(ctx->file, "r"); + if (!fp) { + flb_plg_error(ctx->ins, "cannot open CSV file '%s': %s", ctx->file, strerror(errno)); + return -1; + } + /* Initialize value list if not already */ + mk_list_init(&ctx->val_list); + + /* Skip header using dynamic line reading */ + header_line = read_line_dynamic(fp, NULL); + if (!header_line) { + flb_plg_error(ctx->ins, "empty CSV file: %s", ctx->file); + fclose(fp); + return -1; + } + flb_free(header_line); /* Free the header line as we don't need it */ + + while ((line = read_line_dynamic(fp, &line_length)) != NULL) { + char *p; + struct dynamic_buffer key_buf, val_buf; + int in_quotes; + int field; /* 0=key, 1=val */ + char *key_ptr; + size_t key_len; + int key_ptr_allocated; + char *val_ptr; + size_t val_len; + int val_ptr_allocated; + char *val_heap; + int ret; + struct val_node *node; + + if (line_length == 0) { + flb_free(line); + line_num++; + continue; + } + + /* Handle quotes in CSV files using dynamic buffers */ + p = line; + in_quotes = 0; + field = 0; + + /* Initialize dynamic buffers */ + if (dynbuf_init(&key_buf, 256) != 0) { + flb_plg_debug(ctx->ins, "Failed to initialize key buffer for line %d", line_num); + flb_free(line); + line_num++; + continue; + } + if (dynbuf_init(&val_buf, 256) != 0) { + flb_plg_debug(ctx->ins, "Failed to initialize value buffer for line %d", line_num); + dynbuf_destroy(&key_buf); + flb_free(line); + line_num++; + continue; + } + + /* Parse key from first column (and handle quotes) */ + while (*p && (field == 0)) { + if (!in_quotes && *p == '"') { + in_quotes = 1; + p++; + continue; + } + if (in_quotes) { + if (*p == '"') { + if (*(p+1) == '"') { + /* Escaped quote */ + if (dynbuf_append_char(&key_buf, '"') != 0) { + flb_plg_debug(ctx->ins, "Buffer allocation failed for line %d", line_num); + dynbuf_destroy(&key_buf); + dynbuf_destroy(&val_buf); + flb_free(line); + line_num++; + goto next_line; + } + p += 2; + continue; + } else { + in_quotes = 0; + p++; + continue; + } + } + if (dynbuf_append_char(&key_buf, *p) != 0) { + flb_plg_debug(ctx->ins, "Buffer allocation failed for line %d", line_num); + dynbuf_destroy(&key_buf); + dynbuf_destroy(&val_buf); + flb_free(line); + line_num++; + goto next_line; + } + p++; + continue; + } + if (*p == ',') { + field = 1; + p++; + break; + } + if (dynbuf_append_char(&key_buf, *p) != 0) { + flb_plg_debug(ctx->ins, "Buffer allocation failed for line %d", line_num); + dynbuf_destroy(&key_buf); + dynbuf_destroy(&val_buf); + flb_free(line); + line_num++; + goto next_line; + } + p++; + } + + /* Parse value from second column (handle quotes) */ + in_quotes = 0; + while (*p && (field == 1)) { + if (!in_quotes && *p == '"') { + in_quotes = 1; + p++; + continue; + } + if (in_quotes) { + if (*p == '"') { + if (*(p+1) == '"') { + /* Escaped quote */ + if (dynbuf_append_char(&val_buf, '"') != 0) { + flb_plg_error(ctx->ins, "Failed to append to value buffer for line %d", line_num); + dynbuf_destroy(&key_buf); + dynbuf_destroy(&val_buf); + flb_free(line); + line_num++; + goto next_line; + } + p += 2; + continue; + } else { + in_quotes = 0; + p++; + continue; + } + } + if (dynbuf_append_char(&val_buf, *p) != 0) { + flb_plg_error(ctx->ins, "Failed to append to value buffer for line %d", line_num); + dynbuf_destroy(&key_buf); + dynbuf_destroy(&val_buf); + flb_free(line); + line_num++; + goto next_line; + } + p++; + continue; + } + if (*p == ',') { + /* Ignore extra fields */ + break; + } + if (dynbuf_append_char(&val_buf, *p) != 0) { + flb_plg_error(ctx->ins, "Failed to append to value buffer for line %d", line_num); + dynbuf_destroy(&key_buf); + dynbuf_destroy(&val_buf); + flb_free(line); + line_num++; + goto next_line; + } + p++; + } + + /* Check for unmatched quote: if in_quotes is set, log warning and skip line */ + if (in_quotes) { + flb_plg_warn(ctx->ins, "Unmatched quote in line %d, skipping", line_num); + dynbuf_destroy(&key_buf); + dynbuf_destroy(&val_buf); + flb_free(line); + line_num++; + continue; + } + + /* Normalize and trim key */ + key_ptr = NULL; + key_len = 0; + key_ptr_allocated = normalize_and_trim(key_buf.data, key_buf.len, ctx->ignore_case, &key_ptr, &key_len); + if (key_ptr_allocated < 0) { + dynbuf_destroy(&key_buf); + dynbuf_destroy(&val_buf); + flb_free(line); + line_num++; + continue; + } + /* Normalize and trim value */ + val_ptr = NULL; + val_len = 0; + val_ptr_allocated = normalize_and_trim(val_buf.data, val_buf.len, 0, &val_ptr, &val_len); + if (val_ptr_allocated < 0) { + if (key_ptr_allocated) flb_free(key_ptr); + dynbuf_destroy(&key_buf); + dynbuf_destroy(&val_buf); + flb_free(line); + line_num++; + continue; + } + if (key_len == 0 || val_len == 0) { + if (key_ptr_allocated) flb_free(key_ptr); + if (val_ptr_allocated) flb_free(val_ptr); + dynbuf_destroy(&key_buf); + dynbuf_destroy(&val_buf); + flb_free(line); + line_num++; + continue; + } + /* Explicitly duplicate value buffer for hash table safety, allocate +1 for null terminator */ + val_heap = flb_malloc(val_len + 1); + if (!val_heap) { + if (key_ptr_allocated) flb_free(key_ptr); + if (val_ptr_allocated) flb_free(val_ptr); + dynbuf_destroy(&key_buf); + dynbuf_destroy(&val_buf); + flb_free(line); + line_num++; + continue; + } + memcpy(val_heap, val_ptr, val_len); + val_heap[val_len] = '\0'; + ret = flb_hash_table_add(ctx->ht, key_ptr, key_len, val_heap, val_len); + if (ret < 0) { + flb_free(val_heap); + flb_plg_warn(ctx->ins, "Failed to add key '%.*s' (duplicate or error), skipping", (int)key_len, key_ptr); + if (key_ptr_allocated) flb_free(key_ptr); + if (val_ptr_allocated) flb_free(val_ptr); + dynbuf_destroy(&key_buf); + dynbuf_destroy(&val_buf); + flb_free(line); + line_num++; + continue; + } + /* Track allocated value for later cleanup */ + node = flb_malloc(sizeof(struct val_node)); + if (node) { + node->val = val_heap; + mk_list_add(&node->_head, &ctx->val_list); + } else { + /* If malloc fails, value will leak, but plugin will still function */ + flb_plg_warn(ctx->ins, "Failed to allocate val_node for value cleanup, value will leak"); + } + /* Do not free val_heap; hash table owns it now */ + if (key_ptr_allocated) flb_free(key_ptr); + if (val_ptr_allocated) flb_free(val_ptr); + dynbuf_destroy(&key_buf); + dynbuf_destroy(&val_buf); + flb_free(line); + line_num++; + continue; + + next_line: + /* Label for error handling - cleanup already done in error paths */ + continue; + } + fclose(fp); + return 0; +} + +static int cb_lookup_init(struct flb_filter_instance *ins, + struct flb_config *config, + void *data) +{ + int ret; + /* + * Allocate and initialize the filter context for this plugin instance. + * This context will hold configuration, hash table, and state. + */ + struct lookup_ctx *ctx; + ctx = flb_calloc(1, sizeof(struct lookup_ctx)); + if (!ctx) { + flb_errno(); + return -1; + } + ctx->ins = ins; + +#ifdef FLB_HAVE_METRICS + /* Initialize CMT metrics */ + { + static char* labels_name[] = {"name"}; + ctx->cmt_processed = cmt_counter_create(ins->cmt, + "fluentbit", "filter", "lookup_processed_records_total", + "Total number of processed records", + 1, labels_name); + + ctx->cmt_matched = cmt_counter_create(ins->cmt, + "fluentbit", "filter", "lookup_matched_records_total", + "Total number of matched records", + 1, labels_name); + + ctx->cmt_skipped = cmt_counter_create(ins->cmt, + "fluentbit", "filter", "lookup_skipped_records_total", + "Total number of skipped records due to errors", + 1, labels_name); + } + + /* Add to old metrics system */ + flb_metrics_add(FLB_LOOKUP_METRIC_PROCESSED, "processed_records_total", ins->metrics); + flb_metrics_add(FLB_LOOKUP_METRIC_MATCHED, "matched_records_total", ins->metrics); + flb_metrics_add(FLB_LOOKUP_METRIC_SKIPPED, "skipped_records_total", ins->metrics); +#endif + + /* + * Populate context fields from config_map. This sets file, lookup_key, + * result_key, and ignore_case from the configuration. + */ + ret = flb_filter_config_map_set(ins, ctx); + if (ret == -1) { + flb_free(ctx); + return -1; + } + + /* + * Validate required configuration options. All three must be set for + * the filter to operate. + */ + if (!ctx->file || !ctx->lookup_key || !ctx->result_key) { + flb_plg_error(ins, "missing required config: file, lookup_key, result_key"); + goto error; + } + + /* Check file existence and readability */ +#ifdef _WIN32 + if (_access(ctx->file, 04) != 0) { /* 04 = R_OK on Windows */ +#else + if (access(ctx->file, R_OK) != 0) { +#endif + flb_plg_error(ins, "CSV file '%s' does not exist or is not readable: %s", ctx->file, strerror(errno)); + goto error; + } + + /* + * Create hash table for lookups. This will store key-value pairs loaded + * from the CSV file for fast lookup during filtering. + */ + ctx->ht = flb_hash_table_create(FLB_HASH_TABLE_EVICT_NONE, 1024, -1); + if (!ctx->ht) { + flb_plg_error(ins, "could not create hash table"); + goto error; + } + + /* Initialize record accessor for lookup_key */ + ctx->ra_lookup_key = flb_ra_create(ctx->lookup_key, FLB_TRUE); + if (!ctx->ra_lookup_key) { + flb_plg_error(ins, "invalid lookup_key pattern: %s", ctx->lookup_key); + goto error; + } + + /* Load CSV data into hash table. */ + ret = load_csv(ctx); + if (ret < 0) { + goto error; + } + flb_plg_info(ins, "Loaded %zu entries from CSV file '%s'", (size_t)ctx->ht->total_count, ctx->file); + flb_plg_info(ins, "Lookup filter initialized: lookup_key='%s', result_key='%s', ignore_case=%s", + ctx->lookup_key, ctx->result_key, ctx->ignore_case ? "true" : "false"); + + /* Store context for use in filter and exit callbacks. */ + flb_filter_set_context(ins, ctx); + return 0; + +error: + if (ctx->ra_lookup_key) { + flb_ra_destroy(ctx->ra_lookup_key); + } + if (ctx->ht) { + flb_hash_table_destroy(ctx->ht); + } + flb_free(ctx); + return -1; +} + +static int emit_original_record( + struct flb_log_event_encoder *log_encoder, + struct flb_log_event *log_event, + struct flb_filter_instance *ins, + struct lookup_ctx *ctx, + int rec_num) +{ + int ret = flb_log_event_encoder_begin_record(log_encoder); + if (ret == FLB_EVENT_ENCODER_SUCCESS) { + ret = flb_log_event_encoder_set_timestamp(log_encoder, &log_event->timestamp); + } + if (ret == FLB_EVENT_ENCODER_SUCCESS && log_event->metadata) { + ret = flb_log_event_encoder_set_metadata_from_msgpack_object(log_encoder, log_event->metadata); + } + if (ret == FLB_EVENT_ENCODER_SUCCESS) { + ret = flb_log_event_encoder_set_body_from_msgpack_object(log_encoder, log_event->body); + } + if (ret == FLB_EVENT_ENCODER_SUCCESS) { + ret = flb_log_event_encoder_commit_record(log_encoder); + } else { + flb_log_event_encoder_rollback_record(log_encoder); + flb_plg_warn(ins, "Record %d: failed to encode original record, skipping", rec_num); + if (ctx) { + INCREMENT_SKIPPED_METRIC(ctx, ins); + } + } + return ret; +} + +static int cb_lookup_filter(const void *data, size_t bytes, + const char *tag, int tag_len, + void **out_buf, size_t *out_bytes, + struct flb_filter_instance *ins, + struct flb_input_instance *in_ins, + void *context, + struct flb_config *config) +{ + /* + * Main filter callback: processes each log event in the input batch. + * For each record, attempts to look up a value in the hash table using + * the configured key. If found, adds result_key to the record; otherwise, + * emits the original record unchanged. + */ + struct lookup_ctx *ctx = context; + struct flb_log_event_decoder log_decoder; + struct flb_log_event_encoder log_encoder; + struct flb_log_event log_event; + int ret; + int rec_num = 0; + void *found_val = NULL; + size_t found_len = 0; + char *lookup_val_str = NULL; + size_t lookup_val_len = 0; + int lookup_val_allocated = 0; + int any_modified = 0; /* Track if any records were modified */ + + /* Ensure context is valid */ + if (!ctx) { + flb_plg_error(ins, "lookup filter context is NULL"); + return FLB_FILTER_NOTOUCH; + } + + /* Initialize log event decoder for input records */ + ret = flb_log_event_decoder_init(&log_decoder, (char *)data, bytes); + if (ret != FLB_EVENT_DECODER_SUCCESS) { + flb_plg_error(ins, "Log event decoder initialization error : %d", ret); + return FLB_FILTER_NOTOUCH; + } + + /* Initialize log event encoder for output records */ + ret = flb_log_event_encoder_init(&log_encoder, FLB_LOG_EVENT_FORMAT_DEFAULT); + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + flb_plg_error(ins, "Log event encoder initialization error : %d", ret); + flb_log_event_decoder_destroy(&log_decoder); + return FLB_FILTER_NOTOUCH; + } + + /* Process each log event in the input batch */ + while ((ret = flb_log_event_decoder_next(&log_decoder, &log_event)) == FLB_EVENT_DECODER_SUCCESS) { + char *dynamic_val_buf; /* Track dynamic buffer for numeric conversions */ + int required_size; + int printed; + int ht_get_ret; + struct flb_ra_value *rval; + + rec_num++; + INCREMENT_PROCESSED_METRIC(ctx, ins); + lookup_val_str = NULL; + lookup_val_len = 0; + lookup_val_allocated = 0; + dynamic_val_buf = NULL; + + /* Helper macro to clean up dynamic buffer and allocated lookup strings */ + #define CLEANUP_DYNAMIC_BUFFERS() do { \ + if (dynamic_val_buf) { \ + flb_free(dynamic_val_buf); \ + dynamic_val_buf = NULL; \ + } \ + if (lookup_val_allocated && lookup_val_str) { \ + flb_free(lookup_val_str); \ + lookup_val_str = NULL; \ + } \ + } while(0) + + /* If body is not a map, emit original record and log debug */ + if (!log_event.body || log_event.body->type != MSGPACK_OBJECT_MAP) { + flb_plg_debug(ins, "Record %d: body is not a map (type=%d), emitting original", rec_num, log_event.body ? log_event.body->type : -1); + emit_original_record(&log_encoder, &log_event, ins, ctx, rec_num); + continue; + } + + /* Use record accessor to get the lookup value */ + rval = flb_ra_get_value_object(ctx->ra_lookup_key, *log_event.body); + if (!rval) { + /* Key not found, emit original record */ + emit_original_record(&log_encoder, &log_event, ins, ctx, rec_num); + continue; + } + + /* Extract string value from record accessor result */ + if (rval->type == FLB_RA_STRING) { + lookup_val_allocated = normalize_and_trim((char *)rval->o.via.str.ptr, rval->o.via.str.size, ctx->ignore_case, &lookup_val_str, &lookup_val_len); + if (lookup_val_allocated < 0) { + flb_plg_warn(ins, "Record %d: malloc failed for normalize_and_trim (string), skipping", rec_num); + INCREMENT_SKIPPED_METRIC(ctx, ins); + lookup_val_str = NULL; + lookup_val_len = 0; + } + } + else { + /* Non-string value: convert to string using two-pass dynamic allocation */ + required_size = 0; + + /* First pass: determine required buffer size */ + switch (rval->type) { + case FLB_RA_BOOL: + required_size = snprintf(NULL, 0, "%s", rval->o.via.boolean ? "true" : "false"); + break; + case FLB_RA_INT: + required_size = snprintf(NULL, 0, "%" PRId64, rval->o.via.i64); + break; + case FLB_RA_FLOAT: + required_size = snprintf(NULL, 0, "%f", rval->o.via.f64); + break; + case FLB_RA_NULL: + required_size = snprintf(NULL, 0, "null"); + break; + case 5: + case 6: + flb_plg_debug(ins, "Record %d: complex type (ARRAY/MAP) from record accessor, skipping conversion", rec_num); + CLEANUP_DYNAMIC_BUFFERS(); + flb_ra_key_value_destroy(rval); + emit_original_record(&log_encoder, &log_event, ins, ctx, rec_num); + continue; + default: + flb_plg_debug(ins, "Record %d: unsupported type %d, skipping conversion", rec_num, rval->type); + CLEANUP_DYNAMIC_BUFFERS(); + flb_ra_key_value_destroy(rval); + emit_original_record(&log_encoder, &log_event, ins, ctx, rec_num); + continue; + } + + if (required_size < 0) { + flb_plg_debug(ins, "Record %d: snprintf sizing failed for type %d, skipping conversion", rec_num, rval->type); + CLEANUP_DYNAMIC_BUFFERS(); + flb_ra_key_value_destroy(rval); + emit_original_record(&log_encoder, &log_event, ins, ctx, rec_num); + continue; + } + + /* Allocate buffer with required size plus null terminator */ + dynamic_val_buf = flb_malloc(required_size + 1); + if (!dynamic_val_buf) { + flb_plg_warn(ins, "Record %d: malloc failed for dynamic value buffer (size %zu), skipping", rec_num, (size_t)(required_size + 1)); + INCREMENT_SKIPPED_METRIC(ctx, ins); + CLEANUP_DYNAMIC_BUFFERS(); + flb_ra_key_value_destroy(rval); + emit_original_record(&log_encoder, &log_event, ins, ctx, rec_num); + continue; + } + + /* Second pass: write to allocated buffer */ + printed = 0; + switch (rval->type) { + case FLB_RA_BOOL: + printed = snprintf(dynamic_val_buf, required_size + 1, "%s", rval->o.via.boolean ? "true" : "false"); + break; + case FLB_RA_INT: + printed = snprintf(dynamic_val_buf, required_size + 1, "%" PRId64, rval->o.via.i64); + break; + case FLB_RA_FLOAT: + printed = snprintf(dynamic_val_buf, required_size + 1, "%f", rval->o.via.f64); + break; + case FLB_RA_NULL: + printed = snprintf(dynamic_val_buf, required_size + 1, "null"); + break; + } + + if (printed < 0 || printed != required_size) { + flb_plg_debug(ins, "Record %d: snprintf formatting failed (expected %d, got %d), skipping conversion", rec_num, required_size, printed); + CLEANUP_DYNAMIC_BUFFERS(); + flb_ra_key_value_destroy(rval); + emit_original_record(&log_encoder, &log_event, ins, ctx, rec_num); + continue; + } + + /* Use the dynamically allocated buffer for normalization */ + lookup_val_allocated = normalize_and_trim(dynamic_val_buf, printed, ctx->ignore_case, &lookup_val_str, &lookup_val_len); + if (lookup_val_allocated < 0) { + flb_plg_warn(ins, "Record %d: malloc failed for normalize_and_trim (non-string), skipping", rec_num); + INCREMENT_SKIPPED_METRIC(ctx, ins); + CLEANUP_DYNAMIC_BUFFERS(); + flb_ra_key_value_destroy(rval); + emit_original_record(&log_encoder, &log_event, ins, ctx, rec_num); + continue; + } + + flb_plg_debug(ins, "Record %d: lookup value for key '%s' is non-string, converted to '%s'", rec_num, ctx->lookup_key, lookup_val_str ? lookup_val_str : "NULL"); + + /* + * If normalize_and_trim allocated a new buffer (lookup_val_allocated > 0), + * we can free the dynamic buffer now. Otherwise, lookup_val_str points + * into dynamic_val_buf and we must delay freeing it. + */ + if (lookup_val_allocated > 0) { + flb_free(dynamic_val_buf); + dynamic_val_buf = NULL; + } + /* Note: dynamic_val_buf will be freed later if still allocated */ + } + + /* If lookup value is missing or empty, emit the original record unchanged. */ + if (!lookup_val_str || lookup_val_len == 0) { + CLEANUP_DYNAMIC_BUFFERS(); + flb_ra_key_value_destroy(rval); + emit_original_record(&log_encoder, &log_event, ins, ctx, rec_num); + continue; + } + + /* + * Attempt to find the lookup value in the hash table. + * If not found, emit the original record unchanged. + */ + ht_get_ret = flb_hash_table_get(ctx->ht, lookup_val_str, lookup_val_len, &found_val, &found_len); + + if (ht_get_ret < 0 || !found_val || found_len == 0) { + /* Not found, emit original record */ + CLEANUP_DYNAMIC_BUFFERS(); + flb_ra_key_value_destroy(rval); + emit_original_record(&log_encoder, &log_event, ins, ctx, rec_num); + continue; + } + + /* Match found - increment counter */ + INCREMENT_MATCHED_METRIC(ctx, ins); + any_modified = 1; /* Mark that we have modified records */ + + flb_plg_trace(ins, "Record %d: Found match for '%.*s' -> '%.*s'", + rec_num, (int)lookup_val_len, lookup_val_str, (int)found_len, (char*)found_val); + + /* Free normalization buffer if allocated (after using it in trace) */ + CLEANUP_DYNAMIC_BUFFERS(); + flb_ra_key_value_destroy(rval); + + /* Begin new record */ + ret = flb_log_event_encoder_begin_record(&log_encoder); + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + flb_plg_warn(ins, "Record %d: failed to begin new record, emitting original", rec_num); + INCREMENT_SKIPPED_METRIC(ctx, ins); + emit_original_record(&log_encoder, &log_event, ins, ctx, rec_num); + continue; + } + + ret = flb_log_event_encoder_set_timestamp(&log_encoder, &log_event.timestamp); + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + flb_plg_warn(ins, "Record %d: failed to set timestamp, emitting original", rec_num); + INCREMENT_SKIPPED_METRIC(ctx, ins); + flb_log_event_encoder_rollback_record(&log_encoder); + emit_original_record(&log_encoder, &log_event, ins, ctx, rec_num); + continue; + } + + if (log_event.metadata) { + ret = flb_log_event_encoder_set_metadata_from_msgpack_object(&log_encoder, log_event.metadata); + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + flb_plg_warn(ins, "Record %d: failed to set metadata, emitting original", rec_num); + INCREMENT_SKIPPED_METRIC(ctx, ins); + flb_log_event_encoder_rollback_record(&log_encoder); + emit_original_record(&log_encoder, &log_event, ins, ctx, rec_num); + continue; + } + } + + /* Copy all keys except result_key (to avoid collision) */ + if (log_event.body && log_event.body->type == MSGPACK_OBJECT_MAP) { + int i; + for (i = 0; i < log_event.body->via.map.size; i++) { + msgpack_object_kv *kv = &log_event.body->via.map.ptr[i]; + if (kv->key.type == MSGPACK_OBJECT_STR && + kv->key.via.str.size == strlen(ctx->result_key) && + strncmp(kv->key.via.str.ptr, ctx->result_key, kv->key.via.str.size) == 0) { + continue; + } + ret = flb_log_event_encoder_append_body_values(&log_encoder, + FLB_LOG_EVENT_MSGPACK_OBJECT_VALUE(&kv->key), + FLB_LOG_EVENT_MSGPACK_OBJECT_VALUE(&kv->val)); + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + flb_plg_warn(ins, "Record %d: failed to append key/value, emitting original", rec_num); + INCREMENT_SKIPPED_METRIC(ctx, ins); + flb_log_event_encoder_rollback_record(&log_encoder); + emit_original_record(&log_encoder, &log_event, ins, ctx, rec_num); + continue; + } + } + } + + /* Add result_key */ + ret = flb_log_event_encoder_append_body_string(&log_encoder, ctx->result_key, strlen(ctx->result_key)); + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + flb_plg_warn(ins, "Record %d: failed to append result_key, emitting original", rec_num); + INCREMENT_SKIPPED_METRIC(ctx, ins); + flb_log_event_encoder_rollback_record(&log_encoder); + emit_original_record(&log_encoder, &log_event, ins, ctx, rec_num); + continue; + } + + ret = flb_log_event_encoder_append_body_string(&log_encoder, (char *)found_val, found_len); + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + flb_plg_warn(ins, "Record %d: failed to append found_val, emitting original", rec_num); + INCREMENT_SKIPPED_METRIC(ctx, ins); + flb_log_event_encoder_rollback_record(&log_encoder); + emit_original_record(&log_encoder, &log_event, ins, ctx, rec_num); + continue; + } + + ret = flb_log_event_encoder_commit_record(&log_encoder); + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + flb_plg_warn(ins, "Record %d: failed to commit record, emitting original", rec_num); + INCREMENT_SKIPPED_METRIC(ctx, ins); + flb_log_event_encoder_rollback_record(&log_encoder); + emit_original_record(&log_encoder, &log_event, ins, ctx, rec_num); + continue; + } + } + + #undef CLEANUP_DYNAMIC_BUFFERS + + /* + * If any records were modified, return the new buffer. + * Otherwise, indicate no change to avoid unnecessary buffer copy. + */ + if (any_modified) { + *out_buf = log_encoder.output_buffer; + *out_bytes = log_encoder.output_length; + flb_log_event_encoder_claim_internal_buffer_ownership(&log_encoder); + ret = FLB_FILTER_MODIFIED; + } else { + ret = FLB_FILTER_NOTOUCH; + } + + flb_log_event_decoder_destroy(&log_decoder); + flb_log_event_encoder_destroy(&log_encoder); + return ret; +} + +static int cb_lookup_exit(void *data, struct flb_config *config) +{ + struct lookup_ctx *ctx = data; + if (!ctx) return 0; + + /* Free all allocated values tracked in val_list */ + struct mk_list *tmp; + struct mk_list *head; + struct val_node *node; + mk_list_foreach_safe(head, tmp, &ctx->val_list) { + node = mk_list_entry(head, struct val_node, _head); + flb_free(node->val); + mk_list_del(head); + flb_free(node); + } + if (ctx->ra_lookup_key) flb_ra_destroy(ctx->ra_lookup_key); + if (ctx->ht) flb_hash_table_destroy(ctx->ht); + flb_free(ctx); + return 0; +} + +static struct flb_config_map config_map[] = { + { FLB_CONFIG_MAP_STR, "file", NULL, 0, FLB_TRUE, offsetof(struct lookup_ctx, file), "CSV file to lookup values from." }, + { FLB_CONFIG_MAP_STR, "lookup_key", NULL, 0, FLB_TRUE, offsetof(struct lookup_ctx, lookup_key), "Name of the key to lookup in input record." }, + { FLB_CONFIG_MAP_STR, "result_key", NULL, 0, FLB_TRUE, offsetof(struct lookup_ctx, result_key), "Name of the key to add to output record if found." }, + { FLB_CONFIG_MAP_BOOL, "ignore_case", "false", 0, FLB_TRUE, offsetof(struct lookup_ctx, ignore_case), "Ignore case when matching lookup values (default: false)." }, + {0} +}; + +struct flb_filter_plugin filter_lookup_plugin = { + .name = "lookup", + .description = "Lookup values from CSV file and add to records", + .cb_init = cb_lookup_init, + .cb_filter = cb_lookup_filter, + .cb_exit = cb_lookup_exit, + .config_map = config_map, + .flags = 0 +}; diff --git a/plugins/filter_lookup/lookup.h b/plugins/filter_lookup/lookup.h new file mode 100644 index 00000000000..30b9cbd14f7 --- /dev/null +++ b/plugins/filter_lookup/lookup.h @@ -0,0 +1,52 @@ +/* Fluent Bit + * ========== + * Copyright (C) 2015-2025 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_FILTER_LOOKUP_H +#define FLB_FILTER_LOOKUP_H + +#include +#include +#include +#include +#include +#include + +/* Metric constants */ +#define FLB_LOOKUP_METRIC_PROCESSED 200 +#define FLB_LOOKUP_METRIC_MATCHED 201 +#define FLB_LOOKUP_METRIC_SKIPPED 202 + +struct lookup_ctx { + struct flb_filter_instance *ins; + char *file; + char *lookup_key; + char *result_key; + struct flb_hash_table *ht; + struct flb_record_accessor *ra_lookup_key; + int ignore_case; + struct mk_list val_list; + +#ifdef FLB_HAVE_METRICS + struct cmt_counter *cmt_processed; + struct cmt_counter *cmt_matched; + struct cmt_counter *cmt_skipped; +#endif +}; + +extern struct flb_filter_plugin filter_lookup_plugin; + +#endif /* FLB_FILTER_LOOKUP_H */ \ No newline at end of file diff --git a/tests/runtime/CMakeLists.txt b/tests/runtime/CMakeLists.txt index 57fb5ede13a..b92e2be2655 100644 --- a/tests/runtime/CMakeLists.txt +++ b/tests/runtime/CMakeLists.txt @@ -180,6 +180,7 @@ if(FLB_IN_LIB AND FLB_OUT_LIB) FLB_RT_TEST(FLB_FILTER_NEST "filter_nest.c") FLB_RT_TEST(FLB_FILTER_REWRITE_TAG "filter_rewrite_tag.c") FLB_RT_TEST(FLB_FILTER_KUBERNETES "filter_kubernetes.c") + FLB_RT_TEST(FLB_FILTER_LOOKUP "filter_lookup.c") FLB_RT_TEST(FLB_FILTER_PARSER "filter_parser.c") FLB_RT_TEST(FLB_FILTER_MODIFY "filter_modify.c") FLB_RT_TEST(FLB_FILTER_LUA "filter_lua.c") diff --git a/tests/runtime/filter_lookup.c b/tests/runtime/filter_lookup.c new file mode 100644 index 00000000000..a1f4808316e --- /dev/null +++ b/tests/runtime/filter_lookup.c @@ -0,0 +1,1044 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2019-2025 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include +#include +#ifndef _WIN32 +#include +#else +#include +#endif +#include +#include +#include +#include +#include +#include +#include "flb_tests_runtime.h" + +#define TMP_CSV_PATH "lookup_test.csv" + +struct test_ctx { + flb_ctx_t *flb; + int i_ffd; + int f_ffd; + int o_ffd; +}; + +static struct test_ctx *test_ctx_create(struct flb_lib_out_cb *data) +{ + int i_ffd; + int o_ffd; + int f_ffd; + struct test_ctx *ctx = NULL; + + ctx = flb_malloc(sizeof(struct test_ctx)); + if (!TEST_CHECK(ctx != NULL)) { + TEST_MSG("malloc failed"); + flb_errno(); + return NULL; + } + + /* Service config */ + ctx->flb = flb_create(); + flb_service_set(ctx->flb, + "Flush", "0.200000000", + "Grace", "1", + "Log_Level", "error", + NULL); + + /* Input */ + i_ffd = flb_input(ctx->flb, (char *) "lib", NULL); + TEST_CHECK(i_ffd >= 0); + flb_input_set(ctx->flb, i_ffd, "tag", "test", NULL); + ctx->i_ffd = i_ffd; + + /* Filter */ + f_ffd = flb_filter(ctx->flb, (char *) "lookup", NULL); + TEST_CHECK(f_ffd >= 0); + ctx->f_ffd = f_ffd; + + /* Output */ + o_ffd = flb_output(ctx->flb, (char *) "lib", (void *) data); + ctx->o_ffd = o_ffd; + TEST_CHECK(o_ffd >= 0); + flb_output_set(ctx->flb, o_ffd, + "match", "test", + NULL); + + return ctx; +} + +static void test_ctx_destroy(struct test_ctx *ctx) +{ + TEST_CHECK(ctx != NULL); + + flb_time_msleep(1000); + flb_stop(ctx->flb); + flb_destroy(ctx->flb); + flb_free(ctx); +} + +void delete_csv_file() +{ +#ifdef _WIN32 + _unlink(TMP_CSV_PATH); +#else + unlink(TMP_CSV_PATH); +#endif +} + +int create_csv_file(char *csv_content) +{ + FILE *fp = NULL; + fp = fopen(TMP_CSV_PATH, "w"); + if (fp == NULL) { + TEST_MSG("fopen error\n"); + return -1; + } + fprintf(fp, "%s", csv_content); + fflush(fp); + fclose(fp); + return 0; +} + +/* Callback to check expected results */ +static int cb_check_result_json(void *record, size_t size, void *data) +{ + char *p; + char *expected; + char *result; + + expected = (char *) data; + result = (char *) record; + + p = strstr(result, expected); + TEST_CHECK(p != NULL); + + if (p == NULL) { + flb_error("Expected to find: '%s' in result '%s'", + expected, result); + } + + flb_free(record); + return 0; +} + +/* Test basic lookup functionality */ +void flb_test_lookup_basic(void) +{ + int ret; + int bytes; + struct test_ctx *ctx; + struct flb_lib_out_cb cb_data; + char *csv_content = + "key,value\n" + "user1,John Doe\n" + "user2,Jane Smith\n" + "user3,Bob Wilson\n"; + char *input = "[0, {\"user_id\": \"user1\"}]"; + + cb_data.cb = cb_check_result_json; + cb_data.data = "\"user_name\":\"John Doe\""; + + ctx = test_ctx_create(&cb_data); + if (!TEST_CHECK(ctx != NULL)) { + TEST_MSG("test_ctx_create failed"); + exit(EXIT_FAILURE); + } + + ret = create_csv_file(csv_content); + TEST_CHECK(ret == 0); + + ret = flb_filter_set(ctx->flb, ctx->f_ffd, + "Match", "*", + "file", TMP_CSV_PATH, + "lookup_key", "user_id", + "result_key", "user_name", + NULL); + TEST_CHECK(ret == 0); + + ret = flb_output_set(ctx->flb, ctx->o_ffd, + "format", "json", + NULL); + TEST_CHECK(ret == 0); + + ret = flb_start(ctx->flb); + TEST_CHECK(ret == 0); + + bytes = flb_lib_push(ctx->flb, ctx->i_ffd, input, strlen(input)); + TEST_CHECK(bytes == strlen(input)); + flb_time_msleep(1500); + + delete_csv_file(); + test_ctx_destroy(ctx); +} + +/* Test lookup with ignore_case option */ +void flb_test_lookup_ignore_case(void) +{ + int ret; + int bytes; + struct test_ctx *ctx; + struct flb_lib_out_cb cb_data; + char *csv_content = + "key,value\n" + "USER1,John Doe\n" + "user2,Jane Smith\n"; + char *input = "[0, {\"user_id\": \"user1\"}]"; + + cb_data.cb = cb_check_result_json; + cb_data.data = "\"user_name\":\"John Doe\""; + + ctx = test_ctx_create(&cb_data); + if (!TEST_CHECK(ctx != NULL)) { + TEST_MSG("test_ctx_create failed"); + exit(EXIT_FAILURE); + } + + ret = create_csv_file(csv_content); + TEST_CHECK(ret == 0); + + ret = flb_filter_set(ctx->flb, ctx->f_ffd, + "Match", "*", + "file", TMP_CSV_PATH, + "lookup_key", "user_id", + "result_key", "user_name", + "ignore_case", "true", + NULL); + TEST_CHECK(ret == 0); + + ret = flb_output_set(ctx->flb, ctx->o_ffd, + "format", "json", + NULL); + TEST_CHECK(ret == 0); + + ret = flb_start(ctx->flb); + TEST_CHECK(ret == 0); + + bytes = flb_lib_push(ctx->flb, ctx->i_ffd, input, strlen(input)); + TEST_CHECK(bytes == strlen(input)); + flb_time_msleep(1500); + + delete_csv_file(); + test_ctx_destroy(ctx); +} + +/* Test lookup with CSV containing quotes and special characters */ +void flb_test_lookup_csv_quotes(void) +{ + int ret; + int bytes; + struct test_ctx *ctx; + struct flb_lib_out_cb cb_data; + char *csv_content = + "key,value\n" + "\"quoted,key\",\"Value with \"\"quotes\"\" and, commas\"\n" + "simple_key,Simple Value\n"; + char *input = "[0, {\"lookup_field\": \"quoted,key\"}]"; + + cb_data.cb = cb_check_result_json; + cb_data.data = "\"result_field\":\"Value with \\\"quotes\\\" and, commas\""; + + ctx = test_ctx_create(&cb_data); + if (!TEST_CHECK(ctx != NULL)) { + TEST_MSG("test_ctx_create failed"); + exit(EXIT_FAILURE); + } + + ret = create_csv_file(csv_content); + TEST_CHECK(ret == 0); + + ret = flb_filter_set(ctx->flb, ctx->f_ffd, + "Match", "*", + "file", TMP_CSV_PATH, + "lookup_key", "lookup_field", + "result_key", "result_field", + NULL); + TEST_CHECK(ret == 0); + + ret = flb_output_set(ctx->flb, ctx->o_ffd, + "format", "json", + NULL); + TEST_CHECK(ret == 0); + + ret = flb_start(ctx->flb); + TEST_CHECK(ret == 0); + + bytes = flb_lib_push(ctx->flb, ctx->i_ffd, input, strlen(input)); + TEST_CHECK(bytes == strlen(input)); + flb_time_msleep(1500); + + delete_csv_file(); + test_ctx_destroy(ctx); +} + +/* Test lookup with numeric values */ +void flb_test_lookup_numeric_values(void) +{ + int ret; + int bytes; + struct test_ctx *ctx; + struct flb_lib_out_cb cb_data; + char *csv_content = + "key,value\n" + "123,Numeric Key\n" + "456,Another Number\n"; + char *input = "[0, {\"numeric_field\": 123}]"; + + cb_data.cb = cb_check_result_json; + cb_data.data = "\"description\":\"Numeric Key\""; + + ctx = test_ctx_create(&cb_data); + if (!TEST_CHECK(ctx != NULL)) { + TEST_MSG("test_ctx_create failed"); + exit(EXIT_FAILURE); + } + + ret = create_csv_file(csv_content); + TEST_CHECK(ret == 0); + + ret = flb_filter_set(ctx->flb, ctx->f_ffd, + "Match", "*", + "file", TMP_CSV_PATH, + "lookup_key", "numeric_field", + "result_key", "description", + NULL); + TEST_CHECK(ret == 0); + + ret = flb_output_set(ctx->flb, ctx->o_ffd, + "format", "json", + NULL); + TEST_CHECK(ret == 0); + + ret = flb_start(ctx->flb); + TEST_CHECK(ret == 0); + + bytes = flb_lib_push(ctx->flb, ctx->i_ffd, input, strlen(input)); + TEST_CHECK(bytes == strlen(input)); + flb_time_msleep(1500); + + delete_csv_file(); + test_ctx_destroy(ctx); +} + +/* Test lookup with very large numbers (testing the two-pass snprintf fix) */ +void flb_test_lookup_large_numbers(void) +{ + int ret; + int bytes; + struct test_ctx *ctx; + struct flb_lib_out_cb cb_data; + char large_number_str[64]; + snprintf(large_number_str, sizeof(large_number_str), "%lld", (long long)LLONG_MAX); + + char csv_content[256]; + snprintf(csv_content, sizeof(csv_content), + "key,value\n" + "%s,Very Large Number\n" + "456,Small Number\n", large_number_str); + + char input[128]; + snprintf(input, sizeof(input), "[0, {\"big_number\": %lld}]", (long long)LLONG_MAX); + + cb_data.cb = cb_check_result_json; + cb_data.data = "\"number_desc\":\"Very Large Number\""; + + ctx = test_ctx_create(&cb_data); + if (!TEST_CHECK(ctx != NULL)) { + TEST_MSG("test_ctx_create failed"); + exit(EXIT_FAILURE); + } + + ret = create_csv_file(csv_content); + TEST_CHECK(ret == 0); + + ret = flb_filter_set(ctx->flb, ctx->f_ffd, + "Match", "*", + "file", TMP_CSV_PATH, + "lookup_key", "big_number", + "result_key", "number_desc", + NULL); + TEST_CHECK(ret == 0); + + ret = flb_output_set(ctx->flb, ctx->o_ffd, + "format", "json", + NULL); + TEST_CHECK(ret == 0); + + ret = flb_start(ctx->flb); + TEST_CHECK(ret == 0); + + bytes = flb_lib_push(ctx->flb, ctx->i_ffd, input, strlen(input)); + TEST_CHECK(bytes == strlen(input)); + flb_time_msleep(1500); + + delete_csv_file(); + test_ctx_destroy(ctx); +} + +/* Test lookup with boolean values */ +void flb_test_lookup_boolean_values(void) +{ + int ret; + int bytes; + struct test_ctx *ctx; + struct flb_lib_out_cb cb_data; + char *csv_content = + "key,value\n" + "true,Boolean True\n" + "false,Boolean False\n"; + char *input = "[0, {\"bool_field\": true}]"; + + cb_data.cb = cb_check_result_json; + cb_data.data = "\"bool_desc\":\"Boolean True\""; + + ctx = test_ctx_create(&cb_data); + if (!TEST_CHECK(ctx != NULL)) { + TEST_MSG("test_ctx_create failed"); + exit(EXIT_FAILURE); + } + + ret = create_csv_file(csv_content); + TEST_CHECK(ret == 0); + + ret = flb_filter_set(ctx->flb, ctx->f_ffd, + "Match", "*", + "file", TMP_CSV_PATH, + "lookup_key", "bool_field", + "result_key", "bool_desc", + NULL); + TEST_CHECK(ret == 0); + + ret = flb_output_set(ctx->flb, ctx->o_ffd, + "format", "json", + NULL); + TEST_CHECK(ret == 0); + + ret = flb_start(ctx->flb); + TEST_CHECK(ret == 0); + + bytes = flb_lib_push(ctx->flb, ctx->i_ffd, input, strlen(input)); + TEST_CHECK(bytes == strlen(input)); + flb_time_msleep(1500); + + delete_csv_file(); + test_ctx_destroy(ctx); +} + +/* Test lookup with no match (should emit original record) */ +void flb_test_lookup_no_match(void) +{ + int ret; + int bytes; + struct test_ctx *ctx; + struct flb_lib_out_cb cb_data; + char *csv_content = + "key,value\n" + "user1,John Doe\n" + "user2,Jane Smith\n"; + char *input = "[0, {\"user_id\": \"user999\", \"other_field\": \"test\"}]"; + + /* Should NOT contain the result_key since no match was found */ + cb_data.cb = cb_check_result_json; + cb_data.data = "\"other_field\":\"test\""; + + ctx = test_ctx_create(&cb_data); + if (!TEST_CHECK(ctx != NULL)) { + TEST_MSG("test_ctx_create failed"); + exit(EXIT_FAILURE); + } + + ret = create_csv_file(csv_content); + TEST_CHECK(ret == 0); + + ret = flb_filter_set(ctx->flb, ctx->f_ffd, + "Match", "*", + "file", TMP_CSV_PATH, + "lookup_key", "user_id", + "result_key", "user_name", + NULL); + TEST_CHECK(ret == 0); + + ret = flb_output_set(ctx->flb, ctx->o_ffd, + "format", "json", + NULL); + TEST_CHECK(ret == 0); + + ret = flb_start(ctx->flb); + TEST_CHECK(ret == 0); + + bytes = flb_lib_push(ctx->flb, ctx->i_ffd, input, strlen(input)); + TEST_CHECK(bytes == strlen(input)); + flb_time_msleep(1500); + + delete_csv_file(); + test_ctx_destroy(ctx); +} + +/* Test dynamic line reading with very long CSV lines */ +void flb_test_lookup_long_csv_lines(void) +{ + int ret; + int bytes; + struct test_ctx *ctx; + struct flb_lib_out_cb cb_data; + char *input = "[0, {\"key_field\": \"long_key\"}]"; + + /* Test that long CSV values (>4096 chars) can be read correctly. + * Just verify that the lookup worked by checking for value_field key. */ + cb_data.cb = cb_check_result_json; + cb_data.data = "\"value_field\""; + + ctx = test_ctx_create(&cb_data); + if (!TEST_CHECK(ctx != NULL)) { + TEST_MSG("test_ctx_create failed"); + exit(EXIT_FAILURE); + } + + /* Create CSV file with very long lines */ + FILE *fp = fopen(TMP_CSV_PATH, "w"); + TEST_CHECK(fp != NULL); + + fprintf(fp, "key,value\n"); + fprintf(fp, "long_key,"); + + /* Write a very long value (> 4096 chars) */ + { + int i; + for (i = 0; i < 100; i++) { + fprintf(fp, "This is a very long value that exceeds the original 4096 character buffer limit to test dynamic line reading functionality. "); + } + } + fprintf(fp, "\n"); + fprintf(fp, "short_key,Short Value\n"); + fclose(fp); + + ret = flb_filter_set(ctx->flb, ctx->f_ffd, + "Match", "*", + "file", TMP_CSV_PATH, + "lookup_key", "key_field", + "result_key", "value_field", + NULL); + TEST_CHECK(ret == 0); + + ret = flb_output_set(ctx->flb, ctx->o_ffd, + "format", "json", + NULL); + TEST_CHECK(ret == 0); + + ret = flb_start(ctx->flb); + TEST_CHECK(ret == 0); + + bytes = flb_lib_push(ctx->flb, ctx->i_ffd, input, strlen(input)); + TEST_CHECK(bytes == strlen(input)); + flb_time_msleep(1500); + + delete_csv_file(); + test_ctx_destroy(ctx); +} + +/* Test with whitespace trimming */ +void flb_test_lookup_whitespace_trim(void) +{ + int ret; + int bytes; + struct test_ctx *ctx; + struct flb_lib_out_cb cb_data; + char *csv_content = + "key,value\n" + " trimmed_key , Trimmed Value \n" + "normal_key,Normal Value\n"; + char *input = "[0, {\"lookup_field\": \" trimmed_key \"}]"; + + cb_data.cb = cb_check_result_json; + cb_data.data = "\"result_field\":\"Trimmed Value\""; + + ctx = test_ctx_create(&cb_data); + if (!TEST_CHECK(ctx != NULL)) { + TEST_MSG("test_ctx_create failed"); + exit(EXIT_FAILURE); + } + + ret = create_csv_file(csv_content); + TEST_CHECK(ret == 0); + + ret = flb_filter_set(ctx->flb, ctx->f_ffd, + "Match", "*", + "file", TMP_CSV_PATH, + "lookup_key", "lookup_field", + "result_key", "result_field", + NULL); + TEST_CHECK(ret == 0); + + ret = flb_output_set(ctx->flb, ctx->o_ffd, + "format", "json", + NULL); + TEST_CHECK(ret == 0); + + ret = flb_start(ctx->flb); + TEST_CHECK(ret == 0); + + bytes = flb_lib_push(ctx->flb, ctx->i_ffd, input, strlen(input)); + TEST_CHECK(bytes == strlen(input)); + flb_time_msleep(1500); + + delete_csv_file(); + test_ctx_destroy(ctx); +} + +/* Mock the dynamic buffer functions and structure for testing */ +struct dynamic_buffer { + char *data; + size_t len; + size_t capacity; +}; + +static int dynbuf_init(struct dynamic_buffer *buf, size_t initial_capacity) { + buf->data = malloc(initial_capacity); + if (!buf->data) return -1; + buf->len = 0; + buf->capacity = initial_capacity; + buf->data[0] = '\0'; + return 0; +} + +static int dynbuf_append_char(struct dynamic_buffer *buf, char c) { + if (buf->len + 1 >= buf->capacity) { + size_t new_capacity = buf->capacity * 2; + char *new_data = realloc(buf->data, new_capacity); + if (!new_data) return -1; + buf->data = new_data; + buf->capacity = new_capacity; + } + buf->data[buf->len++] = c; + buf->data[buf->len] = '\0'; + return 0; +} + +static void dynbuf_destroy(struct dynamic_buffer *buf) { + if (buf && buf->data) { + free(buf->data); + buf->data = NULL; + buf->len = 0; + buf->capacity = 0; + } +} + +/* Test dynamic buffer functionality */ +void flb_test_dynamic_buffer(void) +{ + /* This is an internal unit test that doesn't require Fluent Bit setup */ + + struct dynamic_buffer buf; + + /* Test initialization */ + int ret = dynbuf_init(&buf, 4); + TEST_CHECK(ret == 0); + TEST_CHECK(buf.capacity == 4); + TEST_CHECK(buf.len == 0); + + /* Test appending characters that will cause growth */ + const char *test_str = "This is a test string that is longer than the initial capacity"; + { + size_t i; + for (i = 0; test_str[i]; i++) { + ret = dynbuf_append_char(&buf, test_str[i]); + TEST_CHECK(ret == 0); + } + } + + TEST_CHECK(strcmp(buf.data, test_str) == 0); + TEST_CHECK(buf.len == strlen(test_str)); + TEST_CHECK(buf.capacity >= buf.len + 1); + + dynbuf_destroy(&buf); +} + +/* Test nested record accessor patterns ($a.b.c) */ +void flb_test_lookup_nested_keys(void) +{ + int ret; + int bytes; + struct test_ctx *ctx; + struct flb_lib_out_cb cb_data; + char *csv_content = + "key,value\n" + "user123,John Doe\n" + "admin456,Jane Smith\n"; + char *input = "[0, {\"user\": {\"profile\": {\"id\": \"user123\"}}, \"other_field\": \"test\"}]"; + + cb_data.cb = cb_check_result_json; + cb_data.data = "\"user_name\":\"John Doe\""; + + ctx = test_ctx_create(&cb_data); + if (!TEST_CHECK(ctx != NULL)) { + TEST_MSG("test_ctx_create failed"); + exit(EXIT_FAILURE); + } + + ret = create_csv_file(csv_content); + TEST_CHECK(ret == 0); + + ret = flb_filter_set(ctx->flb, ctx->f_ffd, + "Match", "*", + "file", TMP_CSV_PATH, + "lookup_key", "$user['profile']['id']", + "result_key", "user_name", + NULL); + TEST_CHECK(ret == 0); + + ret = flb_output_set(ctx->flb, ctx->o_ffd, + "format", "json", + NULL); + TEST_CHECK(ret == 0); + + ret = flb_start(ctx->flb); + TEST_CHECK(ret == 0); + + bytes = flb_lib_push(ctx->flb, ctx->i_ffd, input, strlen(input)); + TEST_CHECK(bytes == strlen(input)); + flb_time_msleep(1500); + + delete_csv_file(); + test_ctx_destroy(ctx); +} + +/* Test with large CSV file (performance/load testing) */ +void flb_test_lookup_large_csv(void) +{ + int ret; + int bytes; + struct test_ctx *ctx; + struct flb_lib_out_cb cb_data; + char *input = "[0, {\"user_id\": \"user5000\"}]"; + + cb_data.cb = cb_check_result_json; + cb_data.data = "\"user_name\":\"User 5000\""; + + ctx = test_ctx_create(&cb_data); + if (!TEST_CHECK(ctx != NULL)) { + TEST_MSG("test_ctx_create failed"); + exit(EXIT_FAILURE); + } + + /* Create CSV file with 10,000 entries for performance testing */ + FILE *fp = fopen(TMP_CSV_PATH, "w"); + TEST_CHECK(fp != NULL); + + fprintf(fp, "key,value\n"); + + /* Write 10,000 test entries */ + { + int i; + for (i = 1; i <= 10000; i++) { + fprintf(fp, "user%d,User %d\n", i, i); + } + } + fclose(fp); + + ret = flb_filter_set(ctx->flb, ctx->f_ffd, + "Match", "*", + "file", TMP_CSV_PATH, + "lookup_key", "user_id", + "result_key", "user_name", + NULL); + TEST_CHECK(ret == 0); + + ret = flb_output_set(ctx->flb, ctx->o_ffd, + "format", "json", + NULL); + TEST_CHECK(ret == 0); + + ret = flb_start(ctx->flb); + TEST_CHECK(ret == 0); + + /* Test lookup performance with large dataset */ + bytes = flb_lib_push(ctx->flb, ctx->i_ffd, input, strlen(input)); + TEST_CHECK(bytes == strlen(input)); + flb_time_msleep(2000); /* Give more time for large CSV processing */ + + delete_csv_file(); + test_ctx_destroy(ctx); +} + +/* Test nested record accessor with array indexing ($users[0].id) */ +void flb_test_lookup_nested_array_keys(void) +{ + int ret; + int bytes; + struct test_ctx *ctx; + struct flb_lib_out_cb cb_data; + char *csv_content = + "key,value\n" + "array_user1,First User\n" + "array_user2,Second User\n"; + char *input = "[0, {\"users\": [{\"id\": \"array_user1\"}, {\"id\": \"array_user2\"}], \"metadata\": \"test\"}]"; + + cb_data.cb = cb_check_result_json; + cb_data.data = "\"user_desc\":\"First User\""; + + ctx = test_ctx_create(&cb_data); + if (!TEST_CHECK(ctx != NULL)) { + TEST_MSG("test_ctx_create failed"); + exit(EXIT_FAILURE); + } + + ret = create_csv_file(csv_content); + TEST_CHECK(ret == 0); + + ret = flb_filter_set(ctx->flb, ctx->f_ffd, + "Match", "*", + "file", TMP_CSV_PATH, + "lookup_key", "$users[0]['id']", + "result_key", "user_desc", + NULL); + TEST_CHECK(ret == 0); + + ret = flb_output_set(ctx->flb, ctx->o_ffd, + "format", "json", + NULL); + TEST_CHECK(ret == 0); + + ret = flb_start(ctx->flb); + TEST_CHECK(ret == 0); + + bytes = flb_lib_push(ctx->flb, ctx->i_ffd, input, strlen(input)); + TEST_CHECK(bytes == strlen(input)); + flb_time_msleep(1500); + + delete_csv_file(); + test_ctx_destroy(ctx); +} + +/* Custom callback to capture metrics and verify counts */ +static int cb_check_metrics(void *record, size_t size, void *data) +{ + /* Just free the record - we'll check metrics through the filter instance */ + flb_free(record); + return 0; +} + +/* Helper function to get metric value from filter instance */ +static uint64_t get_filter_metric(struct test_ctx *ctx, int metric_id) +{ + struct flb_filter_instance *f_ins; + struct mk_list *head; + struct flb_metric *metric; + + mk_list_foreach(head, &ctx->flb->config->filters) { + f_ins = mk_list_entry(head, struct flb_filter_instance, _head); + if (f_ins->id == ctx->f_ffd && f_ins->metrics) { + metric = flb_metrics_get_id(metric_id, f_ins->metrics); + if (metric) { + return metric->val; + } + } + } + return 0; +} + +/* Test metrics with matched records */ +void flb_test_lookup_metrics_matched(void) +{ + int ret; + int bytes; + struct test_ctx *ctx; + struct flb_lib_out_cb cb_data; + char *csv_content = + "key,value\n" + "user1,John Doe\n" + "user2,Jane Smith\n" + "user3,Bob Wilson\n"; + char *input1 = "[0, {\"user_id\": \"user1\"}]"; + char *input2 = "[0, {\"user_id\": \"user2\"}]"; + char *input3 = "[0, {\"user_id\": \"unknown\"}]"; // No match + + cb_data.cb = cb_check_metrics; + cb_data.data = NULL; + + ctx = test_ctx_create(&cb_data); + if (!TEST_CHECK(ctx != NULL)) { + TEST_MSG("test_ctx_create failed"); + exit(EXIT_FAILURE); + } + + ret = create_csv_file(csv_content); + TEST_CHECK(ret == 0); + + ret = flb_filter_set(ctx->flb, ctx->f_ffd, + "Match", "*", + "file", TMP_CSV_PATH, + "lookup_key", "user_id", + "result_key", "user_name", + NULL); + TEST_CHECK(ret == 0); + + ret = flb_output_set(ctx->flb, ctx->o_ffd, + "format", "json", + NULL); + TEST_CHECK(ret == 0); + + ret = flb_start(ctx->flb); + TEST_CHECK(ret == 0); + + /* Process three records: 2 matches + 1 no-match */ + bytes = flb_lib_push(ctx->flb, ctx->i_ffd, input1, strlen(input1)); + TEST_CHECK(bytes == strlen(input1)); + + bytes = flb_lib_push(ctx->flb, ctx->i_ffd, input2, strlen(input2)); + TEST_CHECK(bytes == strlen(input2)); + + bytes = flb_lib_push(ctx->flb, ctx->i_ffd, input3, strlen(input3)); + TEST_CHECK(bytes == strlen(input3)); + + flb_time_msleep(2000); + + /* Check metrics: should have 3 processed, 2 matched, 0 skipped */ + uint64_t processed = get_filter_metric(ctx, 200); // FLB_LOOKUP_METRIC_PROCESSED + uint64_t matched = get_filter_metric(ctx, 201); // FLB_LOOKUP_METRIC_MATCHED + uint64_t skipped = get_filter_metric(ctx, 202); // FLB_LOOKUP_METRIC_SKIPPED + + TEST_CHECK(processed == 3); + TEST_CHECK(matched == 2); + TEST_CHECK(skipped == 0); + + if (processed != 3) { + TEST_MSG("Expected processed=3, got %llu", (unsigned long long)processed); + } + if (matched != 2) { + TEST_MSG("Expected matched=2, got %llu", (unsigned long long)matched); + } + if (skipped != 0) { + TEST_MSG("Expected skipped=0, got %llu", (unsigned long long)skipped); + } + + delete_csv_file(); + test_ctx_destroy(ctx); +} + +/* Test metrics with large volume to verify counter accuracy */ +void flb_test_lookup_metrics_processed(void) +{ + int ret; + int bytes; + struct test_ctx *ctx; + struct flb_lib_out_cb cb_data; + char *csv_content = + "key,value\n" + "match_key,Matched Value\n"; + + cb_data.cb = cb_check_metrics; + cb_data.data = NULL; + + ctx = test_ctx_create(&cb_data); + if (!TEST_CHECK(ctx != NULL)) { + TEST_MSG("test_ctx_create failed"); + exit(EXIT_FAILURE); + } + + ret = create_csv_file(csv_content); + TEST_CHECK(ret == 0); + + ret = flb_filter_set(ctx->flb, ctx->f_ffd, + "Match", "*", + "file", TMP_CSV_PATH, + "lookup_key", "test_key", + "result_key", "test_result", + NULL); + TEST_CHECK(ret == 0); + + ret = flb_output_set(ctx->flb, ctx->o_ffd, + "format", "json", + NULL); + TEST_CHECK(ret == 0); + + ret = flb_start(ctx->flb); + TEST_CHECK(ret == 0); + + /* Send 20 matching records and 10 non-matching records */ + const int matching_count = 20; + const int non_matching_count = 10; + { + int i; + for (i = 0; i < matching_count; i++) { + char input[256]; + snprintf(input, sizeof(input), "[0, {\"test_key\": \"match_key\", \"seq\": %d}]", i); + bytes = flb_lib_push(ctx->flb, ctx->i_ffd, input, strlen(input)); + TEST_CHECK(bytes == strlen(input)); + } + } + + { + int i; + for (i = 0; i < non_matching_count; i++) { + char input[256]; + snprintf(input, sizeof(input), "[0, {\"test_key\": \"no_match_%d\", \"seq\": %d}]", i, i); + bytes = flb_lib_push(ctx->flb, ctx->i_ffd, input, strlen(input)); + TEST_CHECK(bytes == strlen(input)); + } + } + + flb_time_msleep(3000); /* Give more time for processing large volume */ + + /* Verify metrics accuracy */ + uint64_t processed = get_filter_metric(ctx, 200); + uint64_t matched = get_filter_metric(ctx, 201); + uint64_t skipped = get_filter_metric(ctx, 202); + + TEST_CHECK(processed == matching_count + non_matching_count); + TEST_CHECK(matched == matching_count); + TEST_CHECK(skipped == 0); + + if (processed != matching_count + non_matching_count) { + TEST_MSG("Expected processed=%d, got %llu", matching_count + non_matching_count, (unsigned long long)processed); + } + if (matched != matching_count) { + TEST_MSG("Expected matched=%d, got %llu", matching_count, (unsigned long long)matched); + } + if (skipped != 0) { + TEST_MSG("Expected skipped=0, got %llu", (unsigned long long)skipped); + } + + delete_csv_file(); + test_ctx_destroy(ctx); +} + +TEST_LIST = { + {"basic_lookup", flb_test_lookup_basic}, + {"ignore_case", flb_test_lookup_ignore_case}, + {"csv_quotes", flb_test_lookup_csv_quotes}, + {"numeric_values", flb_test_lookup_numeric_values}, + {"large_numbers", flb_test_lookup_large_numbers}, + {"boolean_values", flb_test_lookup_boolean_values}, + {"no_match", flb_test_lookup_no_match}, + {"long_csv_lines", flb_test_lookup_long_csv_lines}, + {"whitespace_trim", flb_test_lookup_whitespace_trim}, + {"dynamic_buffer", flb_test_dynamic_buffer}, + {"nested_keys", flb_test_lookup_nested_keys}, + {"large_csv", flb_test_lookup_large_csv}, + {"nested_array_keys", flb_test_lookup_nested_array_keys}, + {"metrics_matched", flb_test_lookup_metrics_matched}, + {"metrics_processed", flb_test_lookup_metrics_processed}, + {NULL, NULL} +};