diff --git a/plugins/filter_wasm/filter_wasm.c b/plugins/filter_wasm/filter_wasm.c index 8ca16646e4b..81377c5987a 100644 --- a/plugins/filter_wasm/filter_wasm.c +++ b/plugins/filter_wasm/filter_wasm.c @@ -33,6 +33,7 @@ #include #include #include +#include #include "filter_wasm.h" @@ -55,6 +56,7 @@ static int cb_wasm_filter(const void *data, size_t bytes, char *json_buf = NULL; size_t json_size; int root_type; + int32_t record_type; /* Get the persistent WASM instance from the filter context. */ struct flb_filter_wasm *ctx = filter_context; struct flb_wasm *wasm = ctx->wasm; @@ -83,6 +85,15 @@ static int cb_wasm_filter(const void *data, size_t bytes, return FLB_FILTER_NOTOUCH; } + + /* Preserve group markers (GROUP_START/GROUP_END) through the filter */ + ret = flb_log_event_decoder_read_groups(&log_decoder, FLB_TRUE); + if (ret != 0) { + flb_plg_error(ctx->ins, "failed to enable group marker decoding"); + flb_log_event_decoder_destroy(&log_decoder); + return FLB_FILTER_NOTOUCH; + } + ret = flb_log_event_encoder_init(&log_encoder, FLB_LOG_EVENT_FORMAT_DEFAULT); @@ -98,6 +109,23 @@ static int cb_wasm_filter(const void *data, size_t bytes, while ((ret = flb_log_event_decoder_next( &log_decoder, &log_event)) == FLB_EVENT_DECODER_SUCCESS) { + + /* Determine record type (normal vs group markers) */ + record_type = FLB_LOG_EVENT_NORMAL; + flb_log_event_decoder_get_record_type(&log_event, &record_type); + + if (record_type == FLB_LOG_EVENT_GROUP_START || + record_type == FLB_LOG_EVENT_GROUP_END) { + + /* RAW BYTES PASS THROUGH */ + flb_log_event_encoder_emit_raw_record( + &log_encoder, + log_decoder.record_base, + log_decoder.record_length); + + continue; + } + off = log_decoder.offset; alloc_size = (off - last_off) + 128; /* JSON is larger than msgpack */ last_off = off; diff --git a/tests/runtime/filter_wasm.c b/tests/runtime/filter_wasm.c index e0e68c371d8..0ec68cdec82 100644 --- a/tests/runtime/filter_wasm.c +++ b/tests/runtime/filter_wasm.c @@ -19,8 +19,10 @@ #include #include +#include #include #include +#include #include #include "flb_tests_runtime.h" @@ -36,6 +38,10 @@ pthread_mutex_t result_mutex = PTHREAD_MUTEX_INITIALIZER; char *output = NULL; int num_output = 0; +/* MsgPack Output Capture Helpers */ +static char *mp_output = NULL; +static size_t mp_output_size = 0; + void set_output(char *val) { pthread_mutex_lock(&result_mutex); @@ -78,6 +84,52 @@ static int get_output_num() return ret; } +static void set_msgpack_output_copy(void *data, size_t size) +{ + char *tmp; + + pthread_mutex_lock(&result_mutex); + /* Append data instead of overwriting it */ + tmp = flb_realloc(mp_output, mp_output_size + size); + if (tmp) { + mp_output = tmp; + memcpy(mp_output + mp_output_size, data, size); + mp_output_size += size; + } + + pthread_mutex_unlock(&result_mutex); +} + +static void clear_msgpack_output() +{ + pthread_mutex_lock(&result_mutex); + if (mp_output) { + flb_free(mp_output); + } + mp_output = NULL; + mp_output_size = 0; + pthread_mutex_unlock(&result_mutex); +} + +static int get_msgpack_output(char **out_buf, size_t *out_size) +{ + pthread_mutex_lock(&result_mutex); + *out_buf = mp_output; + *out_size = mp_output_size; + pthread_mutex_unlock(&result_mutex); + return (*out_buf != NULL && *out_size > 0) ? 0 : -1; +} + +static int cb_store_msgpack_output(void *record, size_t size, void *data) +{ + (void) data; + if (record != NULL && size > 0) { + set_msgpack_output_copy(record, size); + } + flb_free(record); + return 0; +} + static int cb_count_msgpack_events(void *record, size_t size, void *data) { msgpack_unpacked result; @@ -140,6 +192,124 @@ void wait_with_timeout(uint32_t timeout_ms, char **out_result) } } +/* Helpers for MsgPack Object Lookup */ +static msgpack_object *mp_map_get(msgpack_object *map, const char *key) +{ + size_t i; + msgpack_object_kv *kv; + + if (!map || map->type != MSGPACK_OBJECT_MAP) { + return NULL; + } + + kv = map->via.map.ptr; + for (i = 0; i < map->via.map.size; i++) { + if (kv[i].key.type == MSGPACK_OBJECT_STR) { + if (strlen(key) == kv[i].key.via.str.size && + memcmp(kv[i].key.via.str.ptr, key, kv[i].key.via.str.size) == 0) { + return &kv[i].val; + } + } + } + return NULL; +} + +static int mp_str_eq(msgpack_object *o, const char *s) +{ + if (!o || o->type != MSGPACK_OBJECT_STR) { + return FLB_FALSE; + } + if (o->via.str.size != strlen(s)) { + return FLB_FALSE; + } + return memcmp(o->via.str.ptr, s, o->via.str.size) == 0 ? FLB_TRUE : FLB_FALSE; +} + +struct http_client_ctx { + struct flb_upstream *u; + struct flb_connection *u_conn; + struct flb_config *config; + struct mk_event_loop *evl; +}; + +#define PORT_OTEL 4318 +#define JSON_CONTENT_TYPE "application/json" + +struct http_client_ctx* http_client_ctx_create() +{ + struct http_client_ctx *ret_ctx = NULL; + struct mk_event_loop *evl = NULL; + + ret_ctx = flb_calloc(1, sizeof(struct http_client_ctx)); + if (!TEST_CHECK(ret_ctx != NULL)) { + flb_errno(); + TEST_MSG("flb_calloc(http_client_ctx) failed"); + return NULL; + } + + evl = mk_event_loop_create(16); + if (!TEST_CHECK(evl != NULL)) { + TEST_MSG("mk_event_loop failed"); + flb_free(ret_ctx); + return NULL; + } + ret_ctx->evl = evl; + flb_engine_evl_init(); + flb_engine_evl_set(evl); + + ret_ctx->config = flb_config_init(); + if(!TEST_CHECK(ret_ctx->config != NULL)) { + TEST_MSG("flb_config_init failed"); + mk_event_loop_destroy(evl); + flb_free(ret_ctx); + return NULL; + } + + ret_ctx->u = flb_upstream_create(ret_ctx->config, "127.0.0.1", PORT_OTEL, 0, NULL); + if (!TEST_CHECK(ret_ctx->u != NULL)) { + TEST_MSG("flb_upstream_create failed"); + flb_config_exit(ret_ctx->config); + mk_event_loop_destroy(evl); + flb_free(ret_ctx); + return NULL; + } + + ret_ctx->u_conn = flb_upstream_conn_get(ret_ctx->u); + TEST_CHECK(ret_ctx->u_conn != NULL); + + ret_ctx->u_conn->upstream = ret_ctx->u; + + return ret_ctx; +} + +void http_client_ctx_destroy(struct http_client_ctx *ctx) +{ + if (!ctx) { + return; + } + + if (ctx->u_conn) { + flb_upstream_conn_release(ctx->u_conn); + ctx->u_conn = NULL; + } + + if (ctx->u) { + flb_upstream_destroy(ctx->u); + ctx->u = NULL; + } + + if (ctx->config) { + flb_config_exit(ctx->config); + ctx->config = NULL; + } + + if (ctx->evl) { + mk_event_loop_destroy(ctx->evl); + ctx->evl = NULL; + } + + flb_free(ctx); +} void flb_test_append_tag(void) { @@ -519,6 +689,217 @@ void flb_test_append_kv_on_msgpack(void) flb_destroy(ctx); } +void flb_test_wasm_preserve_otlp_group_metadata(void) +{ + int ret; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + int filter_ffd; + + struct flb_lib_out_cb cb_data; + + struct flb_http_client *c; + struct http_client_ctx *httpc; + size_t b_sent; + + char *packed = NULL; + size_t packed_size = 0; + + int found_group_start = FLB_FALSE; + int found_group_end = FLB_FALSE; + int found_normal = FLB_FALSE; + int group_ok = FLB_FALSE; + + msgpack_object *resource; + msgpack_object *scope; + msgpack_object *attrs; + msgpack_object *svc; + msgpack_object *name; + msgpack_object *obj; + msgpack_object *head; + + msgpack_unpacked result; + msgpack_object *rec; + size_t off = 0; + int32_t seconds = 0; + msgpack_object *ts_obj; + unsigned char *p; + int32_t tmp; + + const char *payload = + "{" + "\"resourceLogs\":[{" + "\"resource\":{" + "\"attributes\":[{" + "\"key\":\"service.name\"," + "\"value\":{\"stringValue\":\"filter-service\"}" + "}]" + "}," + "\"scopeLogs\":[{" + "\"scope\":{\"name\":\"my.scope\"}," + "\"logRecords\":[{" + "\"timeUnixNano\":\"1660296023390371588\"," + "\"body\":{\"stringValue\":\"{\\\"message\\\":\\\"dummy\\\"}\"}" + "}]" + "}]" + "}]" + "}"; + + clear_msgpack_output(); + + ctx = flb_create(); + flb_service_set(ctx, + "flush", FLUSH_INTERVAL, + "grace", "1", + "http_server", "on", + "http_listen", "127.0.0.1", + "http_port", "2020", + NULL); + + /* OpenTelemetry input */ + in_ffd = flb_input(ctx, (char *)"opentelemetry", NULL); + TEST_CHECK(in_ffd >= 0); + + /* WASM filter */ + filter_ffd = flb_filter(ctx, (char *)"wasm", NULL); + TEST_CHECK(filter_ffd >= 0); + + ret = flb_filter_set(ctx, filter_ffd, + "Match", "v1_logs", + "wasm_path", DPATH_WASM "/say_hello.wasm", + "function_name", "filter_say_hello", + NULL); + TEST_CHECK(ret == 0); + + /* Output (lib, raw msgpack) */ + cb_data.cb = cb_store_msgpack_output; + cb_data.data = NULL; + + out_ffd = flb_output(ctx, (char *)"lib", (void *)&cb_data); + TEST_CHECK(out_ffd >= 0); + + flb_output_set(ctx, out_ffd, + "match", "*", + NULL); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + httpc = http_client_ctx_create(); + TEST_CHECK(httpc != NULL); + + c = flb_http_client(httpc->u_conn, + FLB_HTTP_POST, + "/v1/logs", + payload, + strlen(payload), + "127.0.0.1", + 4318, + NULL, + 0); + TEST_CHECK(c != NULL); + + ret = flb_http_add_header(c, + FLB_HTTP_HEADER_CONTENT_TYPE, + strlen(FLB_HTTP_HEADER_CONTENT_TYPE), + JSON_CONTENT_TYPE, + strlen(JSON_CONTENT_TYPE)); + TEST_CHECK(ret == 0); + + ret = flb_http_do(c, &b_sent); + TEST_CHECK(ret == 0); + TEST_CHECK(b_sent > 0); + TEST_CHECK(c->resp.status == 201); + + flb_time_msleep(1500); + + ret = get_msgpack_output(&packed, &packed_size); + TEST_CHECK(ret == 0); + + /* Decode msgpack stream and validate group markers are preserved */ + if (packed && packed_size > 0) { + msgpack_unpacked_init(&result); + + /* + * lib output receives a msgpack "chunk" which is typically: + * [ [ts, meta, body], [ts, meta, body], ... ] + * msgpack_unpack_next returns one object at a time from the buffer. + */ + while (msgpack_unpack_next(&result, packed, packed_size, &off) == MSGPACK_UNPACK_SUCCESS) { + /* result.data is the record itself: [[ts, meta], body] */ + rec = &result.data; + + if (rec->type != MSGPACK_OBJECT_ARRAY || rec->via.array.size < 2) { + continue; + } + + /* Check header [ts, meta] */ + head = &rec->via.array.ptr[0]; + if (head->type != MSGPACK_OBJECT_ARRAY || head->via.array.size < 2) { + continue; + } + + ts_obj = &head->via.array.ptr[0]; + obj = &rec->via.array.ptr[1]; + + /* Decode Timestamp to determine record type + * Group Start: -1 + * Group End: -2 + */ + if (ts_obj->type == MSGPACK_OBJECT_EXT && ts_obj->via.ext.type == 0) { + /* flb_time: 8 bytes (4 bytes sec, 4 bytes nsec). Big Endian. */ + p = (const unsigned char *)ts_obj->via.ext.ptr; + tmp = (uint32_t)(p[0] << 24 | p[1] << 16 | p[2] << 8 | p[3]); + seconds = (int32_t)tmp; + } + else if (ts_obj->type == MSGPACK_OBJECT_NEGATIVE_INTEGER) { + seconds = (int32_t)ts_obj->via.i64; + } + else { + /* Assume normal record if it's a positive integer or float */ + seconds = 0; + } + + if (seconds == -1) { /* FLB_LOG_EVENT_GROUP_START */ + found_group_start = FLB_TRUE; + + resource = mp_map_get(obj, "resource"); + scope = mp_map_get(obj, "scope"); + attrs = resource ? mp_map_get(resource, "attributes") : NULL; + svc = attrs ? mp_map_get(attrs, "service.name") : NULL; + name = scope ? mp_map_get(scope, "name") : NULL; + + if (mp_str_eq(svc, "filter-service") && mp_str_eq(name, "my.scope")) { + group_ok = FLB_TRUE; + } + } + else if (seconds == -2) { + /* FLB_LOG_EVENT_GROUP_END */ + found_group_end = FLB_TRUE; + } + else { + /* Normal Record */ + found_normal = FLB_TRUE; + } + } + msgpack_unpacked_destroy(&result); + } + + TEST_CHECK(found_group_start == FLB_TRUE); + TEST_CHECK(found_group_end == FLB_TRUE); + TEST_CHECK(found_normal == FLB_TRUE); + TEST_CHECK(group_ok == FLB_TRUE); + + /* cleanup */ + flb_http_client_destroy(c); + http_client_ctx_destroy(httpc); + + flb_stop(ctx); + flb_destroy(ctx); + clear_msgpack_output(); +} + TEST_LIST = { {"hello_world", flb_test_helloworld}, {"append_tag", flb_test_append_tag}, @@ -526,5 +907,7 @@ TEST_LIST = { {"array_contains_null", flb_test_array_contains_null}, {"drop_all_records", flb_test_drop_all_records}, {"append_kv_on_msgpack_format", flb_test_append_kv_on_msgpack}, + {"wasm_preserve_otlp_group_metadata", + flb_test_wasm_preserve_otlp_group_metadata}, {NULL, NULL} };