Skip to content

Conversation

@cosmo0920
Copy link
Contributor

@cosmo0920 cosmo0920 commented Dec 22, 2025

Currently, filter_wasm only handles ordinary metadata of logs. This should be sick for handling oltp style of group metadata.
This patch starts to handle oltp style of group metadata.

Closes #11302.


Enter [N/A] in the box, if an item is not applicable to your change.

Testing
Before we can approve your change; please submit the following in a comment:

  • Example configuration file for the change
service:
  http_port: 2020

pipeline:
  inputs:
    - name: opentelemetry

  filters:
    - name: wasm
      match: 'v1_logs'
      function_name: 'filter_say_hello'
      wasm_path: /path/to/GitHub/fluent-bit/tests/runtime/data/wasm/say_hello.wasm

  outputs:
    - name: stdout
      match: '*'
$ curl --header "Content-Type: application/json" --request POST --data '{"resourceLogs":[{"resource":{"attributes":[{"key":"service.name","value":{"stringValue":"filter-service"}}]},"scopeLogs":[{"scope":{"name":"my.scope"},"logRecords":[{"timeUnixNano":"1660296023390371588","body":{"stringValue":"{\"message\":\"dummy\"}"}}]}]}]}'   http://0.0.0.0:4318/v1/logs
  • Debug log output from testing the change
Fluent Bit v4.2.2
* Copyright (C) 2015-2025 The Fluent Bit Authors
* Fluent Bit is a CNCF graduated project under the Fluent organization
* https://fluentbit.io

______ _                  _    ______ _ _             ___   _____ 
|  ___| |                | |   | ___ (_) |           /   | / __  \
| |_  | |_   _  ___ _ __ | |_  | |_/ /_| |_  __   __/ /| | `' / /'
|  _| | | | | |/ _ \ '_ \| __| | ___ \ | __| \ \ / / /_| |   / /  
| |   | | |_| |  __/ | | | |_  | |_/ / | |_   \ V /\___  |_./ /___
\_|   |_|\__,_|\___|_| |_|\__| \____/|_|\__|   \_/     |_(_)_____/
                                                                  
             Fluent Bit v4.2 – Direct Routes Ahead
         Celebrating 10 Years of Open, Fluent Innovation!

[2025/12/23 15:05:04.552611000] [ info] Configuration:
[2025/12/23 15:05:04.552617000] [ info]  flush time     | 1.000000 seconds
[2025/12/23 15:05:04.552621000] [ info]  grace          | 5 seconds
[2025/12/23 15:05:04.552623000] [ info]  daemon         | 0
[2025/12/23 15:05:04.552625000] [ info] ___________
[2025/12/23 15:05:04.552627000] [ info]  inputs:
[2025/12/23 15:05:04.552629000] [ info]      opentelemetry
[2025/12/23 15:05:04.552631000] [ info] ___________
[2025/12/23 15:05:04.552633000] [ info]  filters:
[2025/12/23 15:05:04.552635000] [ info]      wasm.0
[2025/12/23 15:05:04.552637000] [ info] ___________
[2025/12/23 15:05:04.552638000] [ info]  outputs:
[2025/12/23 15:05:04.552640000] [ info]      stdout.0
[2025/12/23 15:05:04.552641000] [ info] ___________
[2025/12/23 15:05:04.552643000] [ info]  collectors:
[2025/12/23 15:05:04.552795000] [ info] [fluent bit] version=4.2.2, commit=d4bb848ef8, pid=36720
[2025/12/23 15:05:04.552804000] [debug] [engine] coroutine stack size: 36864 bytes (36.0K)
[2025/12/23 15:05:04.552857000] [ info] [storage] ver=1.4.0, type=memory, sync=normal, checksum=off, max_chunks_up=128
[2025/12/23 15:05:04.552874000] [ info] [simd    ] NEON
[2025/12/23 15:05:04.552877000] [ info] [cmetrics] version=1.0.5
[2025/12/23 15:05:04.552892000] [ info] [ctraces ] version=0.6.6
[2025/12/23 15:05:04.552979000] [ info] [input:opentelemetry:opentelemetry.0] initializing
[2025/12/23 15:05:04.552984000] [ info] [input:opentelemetry:opentelemetry.0] storage_strategy='memory' (memory only)
[2025/12/23 15:05:04.552991000] [debug] [opentelemetry:opentelemetry.0] created event channels: read=25 write=26
[2025/12/23 15:05:04.553099000] [debug] [downstream] listening on 0.0.0.0:4318
[2025/12/23 15:05:04.553105000] [ info] [input:opentelemetry:opentelemetry.0] listening on 0.0.0.0:4318
[2025/12/23 15:05:04.566496000] [debug] [stdout:stdout.0] created event channels: read=29 write=30
[2025/12/23 15:05:04.566610000] [ info] [output:stdout:stdout.0] worker #0 started
[2025/12/23 15:05:04.566626000] [ info] [sp] stream processor started
[2025/12/23 15:05:04.566662000] [ info] [engine] Shutdown Grace Period=5, Shutdown Input Grace Period=2
Hello from WASM!
[2025/12/23 15:06:00.571033000] [debug] [task] created task=0x1065ed6e0 id=0 OK
[2025/12/23 15:06:00.571075000] [debug] [output:stdout:stdout.0] task_id=0 assigned to thread #0
GROUP METADATA : 

{"schema"=>"otlp", "resource_id"=>0, "scope_id"=>0}

GROUP ATTRIBUTES : 

{"resource"=>{"attributes"=>{"service.name"=>"filter-service"}}, "scope"=>{"name"=>"my.scope"}}

[0] v1_logs: [[1660296023.1698112429, {"otlp"=>{}}], {"log"=>"{"message":"dummy"}"}]
[2025/12/23 15:06:00.571143000] [debug] [out flush] cb_destroy coro_id=0
[2025/12/23 15:06:00.571168000] [debug] [task] destroy task=0x1065ed6e0 (task_id=0)
^C[2025/12/23 15:06:03] [engine] caught signal (SIGINT)
[2025/12/23 15:06:03.372291000] [ info] [output:stdout:stdout.0] thread worker #0 stopping...
[2025/12/23 15:06:03.372403000] [ info] [output:stdout:stdout.0] thread worker #0 stopped
  • Attached Valgrind output that shows no leaks or memory corruption was found

leaks command result:

Process 36828 is not debuggable. Due to security restrictions, leaks can only show or save contents of readonly memory of restricted processes.

Process:         fluent-bit [36828]
Path:            /Users/USER/*/fluent-bit
Load Address:    0x102afc000
Identifier:      fluent-bit
Version:         0
Code Type:       ARM64
Platform:        macOS
Parent Process:  leaks [36827]
Target Type:     live task

Date/Time:       2025-12-23 15:06:40.395 +0900
Launch Time:     2025-12-23 15:06:31.268 +0900
OS Version:      macOS 26.0.1 (25A362)
Report Version:  7
Analysis Tool:   /usr/bin/leaks

Physical footprint:         8432K
Physical footprint (peak):  9040K
Idle exit:                  untracked
----

leaks Report Version: 4.0, multi-line stacks
Process 36828: 1277 nodes malloced for 216 KB
Process 36828: 0 leaks for 0 total leaked bytes.

[2025/12/23 15:06:41] [engine] caught signal (SIGCONT)
[2025/12/23 15:06:41] Fluent Bit Dump

===== Input =====

If this is a change to packaging of containers or native binaries then please confirm it works for all targets.

  • Run local packaging test showing all targets (including any new ones) build.
  • Set ok-package-test label to test for all targets (requires maintainer to do).

Documentation

  • Documentation required for this feature

Backporting

  • Backport to latest stable release.

Fluent Bit is licensed under Apache 2.0, by submitting this pull request I understand that this code will be released under the terms of that license.

Summary by CodeRabbit

Release Notes

  • Bug Fixes
    • Group markers from OpenTelemetry logs are now correctly preserved when passing through WASM filters, ensuring group boundaries and associated metadata remain intact throughout the processing pipeline.

✏️ Tip: You can customize this high-level summary in your review settings.

@coderabbitai
Copy link

coderabbitai bot commented Dec 22, 2025

Walkthrough

This change resolves OpenTelemetry metadata loss in the WASM filter by enabling group marker decoding in the log event decoder and implementing logic to preserve group start/end markers through raw byte emission, bypassing normal JSON/msgpack processing. A comprehensive end-to-end OTLP test validates group metadata preservation.

Changes

Cohort / File(s) Change Summary
Core WASM Filter Logic
plugins/filter_wasm/filter_wasm.c
Added stdint.h include and record_type (int32_t) variable. Enabled group marker decoding on log event decoder with error handling (destroy decoder, return FLB_FILTER_NOTOUCH on failure). In main decode loop, determines record type per entry; group start/end markers are emitted as raw bytes via encoder and bypass JSON/msgpack processing. Non-group records follow normal path. Early error path avoids destroying persistent WASM instance on group marker encounter.
OTLP Group Metadata Test Suite
tests/runtime/filter_wasm.c
Added http_client_ctx structure with create/destroy helpers for lightweight HTTP client context. Introduced MsgPack capture utilities (static buffer mp_output/mp_output_size, append/clear/read helpers, output callback). Added MsgPack lookup utilities mp_map_get and mp_str_eq for nested structure navigation. New test flb_test_wasm_preserve_otlp_group_metadata configures OTLP HTTP endpoint, OpenTelemetry input, WASM filter, and lib output; sends v1/logs payload and validates OTLP group markers (start/end) and matching service.name and scope.name are preserved. Exposed test in TEST_LIST. Added includes: flb_http_client.h, inttypes.h.

Sequence Diagram(s)

sequenceDiagram
    participant Client as OTLP Client
    participant OTLPIn as OTLP Input
    participant Decoder as Log Event Decoder
    participant Filter as WASM Filter
    participant Encoder as Output Encoder
    participant Output as Output Plugin

    Note over Decoder: Group marker decoding enabled
    
    Client->>OTLPIn: POST /v1/logs (with group metadata)
    OTLPIn->>Decoder: Decode log records
    Decoder-->>Decoder: Enable group marker decoding
    
    rect rgb(200, 220, 255)
        Note over Decoder,Filter: New: Group Marker Handling Path
        Decoder->>Decoder: Detect group start marker
        Decoder->>Filter: Pass group marker to filter
        Filter->>Encoder: Emit marker as raw bytes
        Encoder->>Output: Preserve group boundary
    end
    
    rect rgb(220, 220, 220)
        Note over Decoder,Filter: Existing: Normal Record Path
        Decoder->>Filter: Decode normal log record
        Filter->>Filter: Apply WASM logic
        Filter->>Encoder: Emit processed record
        Encoder->>Output: Send to output plugin
    end
    
    rect rgb(200, 220, 255)
        Note over Decoder,Filter: New: Group Marker Handling Path
        Decoder->>Decoder: Detect group end marker
        Decoder->>Filter: Pass group marker to filter
        Filter->>Encoder: Emit marker as raw bytes
        Encoder->>Output: Preserve group boundary
    end
    
    Output->>Client: OTLP metadata and records preserved
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

Suggested reviewers

  • edsiper
  • fujimotos
  • koleini

Pre-merge checks and finishing touches

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 12.50% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (4 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title clearly and directly summarizes the main change: handling group metadata in the filter_wasm component, which aligns with the primary objective of preserving OpenTelemetry group attributes.
Linked Issues check ✅ Passed The PR addresses issue #11302 by implementing group metadata preservation in the wasm filter, adding support for decoding and forwarding OTLP group markers while maintaining backward compatibility with regular record processing.
Out of Scope Changes check ✅ Passed All changes are directly related to handling group metadata in the wasm filter; modifications include decoder setup, group marker handling in the main loop, and comprehensive test coverage without unrelated alterations.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch cosmo0920-handle-group-metadata-on-filter_wasm

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@cosmo0920 cosmo0920 force-pushed the cosmo0920-handle-group-metadata-on-filter_wasm branch from 7405c2c to bb51057 Compare December 22, 2025 12:22
@cosmo0920 cosmo0920 force-pushed the cosmo0920-handle-group-metadata-on-filter_wasm branch from bb51057 to 0bc837f Compare December 22, 2025 15:10
@cosmo0920 cosmo0920 force-pushed the cosmo0920-handle-group-metadata-on-filter_wasm branch from 0bc837f to 45926d2 Compare December 22, 2025 15:21
@cosmo0920 cosmo0920 force-pushed the cosmo0920-handle-group-metadata-on-filter_wasm branch from 45926d2 to 8d56a96 Compare December 22, 2025 15:40
@cosmo0920 cosmo0920 force-pushed the cosmo0920-handle-group-metadata-on-filter_wasm branch from 8d56a96 to 5be26d7 Compare December 22, 2025 15:51
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
tests/runtime/filter_wasm.c (1)

87-101: Consider logging realloc failure.

The realloc failure is silently ignored on line 94. While the code safely handles it by continuing with the old buffer, logging a warning would help diagnose issues during test failures.

🔎 Proposed enhancement
     tmp = flb_realloc(mp_output, mp_output_size + size);
     if (tmp) {
         mp_output = tmp;
         memcpy(mp_output + mp_output_size, data, size);
         mp_output_size += size;
     }
+    else {
+        flb_warn("Failed to realloc msgpack output buffer");
+    }
📜 Review details

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between b05bc20 and d4bb848.

📒 Files selected for processing (2)
  • plugins/filter_wasm/filter_wasm.c
  • tests/runtime/filter_wasm.c
🧰 Additional context used
🧠 Learnings (10)
📚 Learning: 2025-08-31T12:46:11.940Z
Learnt from: ThomasDevoogdt
Repo: fluent/fluent-bit PR: 9277
File: .github/workflows/pr-compile-check.yaml:147-151
Timestamp: 2025-08-31T12:46:11.940Z
Learning: In fluent-bit CMakeLists.txt, the system library preference flags are defined as FLB_PREFER_SYSTEM_LIB_ZSTD and FLB_PREFER_SYSTEM_LIB_KAFKA with the FLB_ prefix.

Applied to files:

  • tests/runtime/filter_wasm.c
📚 Learning: 2025-08-29T06:25:02.561Z
Learnt from: shadowshot-x
Repo: fluent/fluent-bit PR: 10794
File: tests/internal/aws_compress.c:7-7
Timestamp: 2025-08-29T06:25:02.561Z
Learning: In Fluent Bit, ZSTD (zstandard) compression library is bundled directly in the source tree at `lib/zstd-1.5.7` and is built unconditionally as a static library. Unlike optional external dependencies, ZSTD does not use conditional compilation guards like `FLB_HAVE_ZSTD` and is always available. Headers like `<fluent-bit/flb_zstd.h>` can be included directly without guards.

Applied to files:

  • tests/runtime/filter_wasm.c
📚 Learning: 2025-11-21T06:23:29.770Z
Learnt from: cosmo0920
Repo: fluent/fluent-bit PR: 11171
File: include/fluent-bit/flb_lib.h:52-53
Timestamp: 2025-11-21T06:23:29.770Z
Learning: In Fluent Bit core (fluent/fluent-bit repository), function descriptions/documentation are not required for newly added functions in header files.

Applied to files:

  • tests/runtime/filter_wasm.c
📚 Learning: 2025-08-29T06:25:27.250Z
Learnt from: shadowshot-x
Repo: fluent/fluent-bit PR: 10794
File: tests/internal/aws_compress.c:93-107
Timestamp: 2025-08-29T06:25:27.250Z
Learning: In Fluent Bit, ZSTD compression is enabled by default and is treated as a core dependency, not requiring conditional compilation guards like `#ifdef FLB_HAVE_ZSTD`. Unlike some other optional components such as ARROW/PARQUET (which use `#ifdef FLB_HAVE_ARROW` guards), ZSTD support is always available and doesn't need build-time conditionals. ZSTD headers are included directly without guards across multiple plugins and core components.

Applied to files:

  • tests/runtime/filter_wasm.c
📚 Learning: 2025-08-29T06:24:44.797Z
Learnt from: shadowshot-x
Repo: fluent/fluent-bit PR: 10794
File: src/aws/flb_aws_compress.c:26-26
Timestamp: 2025-08-29T06:24:44.797Z
Learning: In Fluent Bit, ZSTD support is always available and enabled by default. The build system automatically detects and uses either the system libzstd library or builds the bundled ZSTD version. Unlike other optional dependencies like Arrow which use conditional compilation guards (e.g., FLB_HAVE_ARROW), ZSTD does not require conditional includes or build flags.

Applied to files:

  • tests/runtime/filter_wasm.c
📚 Learning: 2025-08-29T06:24:26.170Z
Learnt from: shadowshot-x
Repo: fluent/fluent-bit PR: 10794
File: tests/internal/aws_compress.c:39-42
Timestamp: 2025-08-29T06:24:26.170Z
Learning: In Fluent Bit, ZSTD compression support is enabled by default and does not require conditional compilation guards (like #ifdef FLB_HAVE_ZSTD) around ZSTD-related code declarations and implementations.

Applied to files:

  • tests/runtime/filter_wasm.c
📚 Learning: 2025-08-31T12:46:11.940Z
Learnt from: ThomasDevoogdt
Repo: fluent/fluent-bit PR: 9277
File: .github/workflows/pr-compile-check.yaml:147-151
Timestamp: 2025-08-31T12:46:11.940Z
Learning: In fluent-bit, the correct CMake flag for using system librdkafka is `FLB_PREFER_SYSTEM_LIB_KAFKA=ON`.

Applied to files:

  • tests/runtime/filter_wasm.c
📚 Learning: 2025-09-14T09:46:09.531Z
Learnt from: aminvakil
Repo: fluent/fluent-bit PR: 10844
File: conf/fluent-bit:13-15
Timestamp: 2025-09-14T09:46:09.531Z
Learning: For fluent-bit Debian packaging, /opt/fluent-bit/bin/ is the appropriate installation path since the package may be installed from non-official Debian sources, making /opt compliant with FHS for optional software packages.

Applied to files:

  • tests/runtime/filter_wasm.c
📚 Learning: 2025-08-29T06:24:55.855Z
Learnt from: shadowshot-x
Repo: fluent/fluent-bit PR: 10794
File: src/aws/flb_aws_compress.c:52-56
Timestamp: 2025-08-29T06:24:55.855Z
Learning: ZSTD compression is always available in Fluent Bit and does not require conditional compilation guards. Unlike Arrow/Parquet which use #ifdef FLB_HAVE_ARROW guards, ZSTD is built unconditionally with flb_zstd.c included directly in src/CMakeLists.txt and a bundled ZSTD library at lib/zstd-1.5.7/.

Applied to files:

  • tests/runtime/filter_wasm.c
📚 Learning: 2025-08-29T06:25:27.250Z
Learnt from: shadowshot-x
Repo: fluent/fluent-bit PR: 10794
File: tests/internal/aws_compress.c:93-107
Timestamp: 2025-08-29T06:25:27.250Z
Learning: In Fluent Bit, ZSTD compression is enabled by default and is treated as a core dependency, not requiring conditional compilation guards like `#ifdef FLB_HAVE_ZSTD`. Unlike some other optional components, ZSTD support is always available and doesn't need build-time conditionals.

Applied to files:

  • tests/runtime/filter_wasm.c
🔇 Additional comments (10)
plugins/filter_wasm/filter_wasm.c (4)

36-36: LGTM!

Standard header inclusion for int32_t type used in the record_type variable.


59-59: LGTM!

The record_type variable is properly declared to store the log event type (normal vs. group markers).


88-95: Good error handling for group marker enablement.

The code properly enables group marker decoding and handles failure gracefully by cleaning up resources and returning FLB_FILTER_NOTOUCH. The comment clearly documents the intent.


112-127: Excellent implementation of group marker preservation.

The logic correctly identifies group start/end markers and emits them as raw bytes, bypassing the WASM filter processing. This ensures OpenTelemetry group boundaries are preserved intact. The safe default initialization of record_type and the use of continue to skip normal processing are both well-designed.

tests/runtime/filter_wasm.c (6)

22-22: LGTM!

The added includes support the new HTTP client infrastructure and integer handling needed for the OTLP test.

Also applies to: 25-25


195-226: Well-designed msgpack navigation helpers.

These utility functions properly handle NULL checks, type validation, and provide a clean API for navigating nested msgpack structures in the test assertions.


228-283: Excellent error handling in HTTP client setup.

The http_client_ctx_create function demonstrates thorough error handling with proper cleanup at each failure point, preventing resource leaks in the test infrastructure.


285-312: LGTM!

The cleanup logic follows the correct destruction order (reverse of creation) and includes defensive NULL checks and pointer clearing.


910-911: LGTM!

The test is properly registered with a descriptive name in the test suite.


692-901: Test thoroughly validates OpenTelemetry log event group preservation through WASM filter.

The test demonstrates a complete end-to-end flow: OTLP v1/logs HTTP ingestion → WASM filter application → lib output capture with raw msgpack deserialization. It validates that group start/end markers and record metadata (service.name, scope.name) are properly preserved through the filter pipeline. The timestamp decoding logic correctly handles the msgpack ext type format with big-endian byte order for the 4 MSBs (seconds) as a uint32, and properly interprets the special group marker constants (FLB_LOG_EVENT_GROUP_START = -1, FLB_LOG_EVENT_GROUP_END = -2) defined in flb_log_event.h.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Opentelemetry metadata and attributes are lost when wasm filter is used

2 participants