Skip to content

Conversation

@kalavt
Copy link

@kalavt kalavt commented Dec 9, 2025

Summary

Add comprehensive AWS MSK IAM authentication support with simplified configuration and fix OAuth token expiration on idle connections. This PR automatically extracts region and cluster type information from broker addresses, provides explicit opt-in for MSK IAM, enhances OAUTHBEARER token refresh for all OAuth methods, and enables automatic background token refresh to prevent authentication failures on idle connections.

Changes

Key Features

  1. Explicit MSK IAM Opt-in

    • MSK IAM is only activated when explicitly requested via rdkafka.sasl.mechanism=aws_msk_iam
    • Uses explicit aws_msk_iam flag to track user intent
    • Ensures compatibility with other OAUTHBEARER methods (OIDC, custom OAuth, etc.)
  2. Simplified Configuration

    • No need for cluster_arn parameter
    • Enable AWS MSK IAM authentication by simply setting rdkafka.sasl.mechanism=aws_msk_iam
    • Automatically converts to OAUTHBEARER internally and registers OAuth callback
  3. Automatic Region Extraction

    • Intelligently extract AWS region information from broker addresses
    • Supports both MSK Standard and Serverless formats
  4. Automatic Cluster Type Detection

    • Automatically identify MSK Standard and MSK Serverless cluster types
    • Selects correct service endpoint based on cluster type
  5. Universal OAUTHBEARER Enhancements

    • Enhanced background token refresh for ALL OAUTHBEARER methods
    • Enabled SASL queue and background callbacks for all OAUTHBEARER configurations
    • Benefits AWS MSK IAM, librdkafka OIDC, custom OAuth implementations, etc.
    • Prevents token expiration on idle connections for both producers and consumers
    • Fixes authentication failures that occurred on idle connections after token expiration
  6. OAuth Token Lifetime Management

    • Maintains 5-minute OAuth token lifetime (AWS industry standard, matches AWS Go SDK)
    • Automatic refresh at 80% of token lifetime (4 minutes)
    • librdkafka's background thread handles refresh independently
    • Works perfectly for completely idle connections without requiring rd_kafka_poll()
    • Fixes authentication failures that occurred on idle connections after 5+ minutes
  7. TLS Support for AWS Credentials

    • Added TLS support for secure AWS credential fetching
    • Supports EC2 metadata, ECS, STS, and credential file sources
    • Ensures secure communication with AWS services
    • Properly manages TLS lifecycle (creation and cleanup)

Technical Details

  1. Explicit MSK IAM Activation:

    // Only activates when user explicitly sets aws_msk_iam
    if (ctx->aws_msk_iam && ctx->sasl_mechanism && 
        strcasecmp(ctx->sasl_mechanism, "OAUTHBEARER") == 0) {
        // Register MSK IAM OAuth callback
    }
    • Prevents automatic activation for generic OAUTHBEARER users
    • Allows users to use OIDC or custom OAuth on AWS brokers without interference
  2. Configuration Simplification:

    • Users only need to set rdkafka.sasl.mechanism=aws_msk_iam
    • System automatically converts it to OAUTHBEARER and registers OAuth callback
    • Automatically sets rdkafka.security.protocol=SASL_SSL (if not configured)
  3. Region Extraction Logic:

    • Parse region from broker address (e.g., b-1.example.kafka.us-east-1.amazonaws.com)
    • Support MSK Standard format: *.kafka.<region>.amazonaws.com
    • Support MSK Serverless format: *.kafka-serverless.<region>.amazonaws.com
  4. Cluster Type Detection:

    • Check if broker address contains .kafka-serverless. to determine cluster type
    • Automatically select correct service endpoint (kafka or kafka-serverless)
  5. Universal OAUTHBEARER Background Processing:

    // Applied to ALL OAUTHBEARER configurations
    if (ctx->sasl_mechanism && strcasecmp(ctx->sasl_mechanism, "OAUTHBEARER") == 0) {
        rd_kafka_conf_enable_sasl_queue(conf, 1);
        rd_kafka_sasl_background_callbacks_enable(rk);
    }
    • Enables automatic token refresh for all OAUTHBEARER methods
    • Handles idle connections, large poll intervals, paused collectors
    • Benefits both consumers (in_kafka) and producers (out_kafka)

Modified Files

AWS MSK IAM Core (2 files)

  • include/fluent-bit/aws/flb_aws_msk_iam.h - Updated function signature (removed cluster_arn parameter)
  • src/aws/flb_aws_msk_iam.c - Refactored region extraction and cluster type detection logic

Kafka Input Plugin (2 files)

  • plugins/in_kafka/in_kafka.h - Added aws_msk_iam flag, removed deprecated fields
  • plugins/in_kafka/in_kafka.c - Added explicit MSK IAM activation, universal OAUTHBEARER support

Kafka Output Plugin (3 files)

  • plugins/out_kafka/kafka_config.h - Added aws_msk_iam flag, removed deprecated fields
  • plugins/out_kafka/kafka_config.c - Added explicit MSK IAM activation, universal OAUTHBEARER support
  • plugins/out_kafka/kafka.c - Removed deprecated configuration mapping

AWS Credentials & TLS Support (4 files)

  • src/aws/flb_aws_credentials_ec2.c - Enhanced TLS support for EC2 metadata credential fetching
  • src/aws/flb_aws_credentials_profile.c - Enhanced TLS support for profile credential fetching
  • src/aws/flb_aws_credentials_sts.c - Enhanced TLS support for STS credential fetching
  • src/flb_kafka.c - Core Kafka integration improvements

Total: 11 files modified

Configuration

Simple AWS MSK IAM Setup:

[INPUT]
    Name kafka
    Brokers b-1.example.kafka.us-east-1.amazonaws.com:9098
    rdkafka.sasl.mechanism aws_msk_iam

No cluster_arn or additional AWS-specific parameters needed!

Supported Configurations

This PR ensures compatibility with multiple OAuth scenarios:

1. AWS MSK IAM (Fluent Bit convenience syntax)

[INPUT]
    Name kafka
    Brokers b-1.my-cluster.kafka.us-east-1.amazonaws.com:9098
    rdkafka.sasl.mechanism aws_msk_iam

2. librdkafka OIDC (unaffected by MSK IAM)

[INPUT]
    Name kafka
    Brokers b-1.my-cluster.kafka.us-east-1.amazonaws.com:9098
    rdkafka.sasl.mechanism OAUTHBEARER
    rdkafka.sasl.oauthbearer.method oidc
    rdkafka.sasl.oauthbearer.client.id my_client_id
    rdkafka.sasl.oauthbearer.client.secret my_secret
    rdkafka.sasl.oauthbearer.token.endpoint.url https://auth.example.com/token

3. librdkafka AWS method (unaffected by MSK IAM)

[INPUT]
    Name kafka
    Brokers b-1.my-cluster.kafka.us-east-1.amazonaws.com:9098
    rdkafka.sasl.mechanism OAUTHBEARER
    rdkafka.sasl.oauthbearer.method aws

All configurations benefit from automatic background token refresh!

Design for Extensibility

This PR establishes a clean, extensible pattern for adding cloud provider IAM authentication:

1. Layered Configuration Approach

Layer 1: Fluent Bit Convenience Syntax (High-level abstraction)
├─ rdkafka.sasl.mechanism=aws_msk_iam       → Auto-configured MSK IAM
├─ rdkafka.sasl.mechanism=gcp_iam           → Future: GCP Kafka IAM
└─ rdkafka.sasl.mechanism=azure_eventhubs   → Future: Azure Event Hubs

Layer 2: librdkafka Native (Direct pass-through)
├─ rdkafka.sasl.mechanism=OAUTHBEARER
├─ rdkafka.sasl.oauthbearer.method=oidc
└─ rdkafka.sasl.oauthbearer.method=aws

Layer 3: Custom Extensions (User plugins)
└─ Custom Fluent Bit extensions

2. Explicit Opt-in Pattern

// Extensible pattern for cloud provider authentication
if (strcasecmp(mechanism, "aws_msk_iam") == 0) {
    ctx->cloud_provider = CLOUD_PROVIDER_AWS;
}
// Future additions follow the same pattern:
// else if (strcasecmp(mechanism, "gcp_iam") == 0) {
//     ctx->cloud_provider = CLOUD_PROVIDER_GCP;
// }

3. Benefits of This Design

  • No interference: Each authentication method is explicitly opted-in
  • Clear separation: Cloud-specific logic isolated from generic OAUTHBEARER handling
  • Easy extension: New providers can be added following the same pattern
  • Backward compatible: Existing OAUTHBEARER configurations unaffected
  • Testable: Each auth method can be tested independently

4. Future Extensions
This architecture makes it straightforward to add:

  • Google Cloud Platform Kafka IAM
  • Azure Event Hubs authentication
  • Other cloud provider-specific OAuth implementations

Each can be added with the same explicit opt-in pattern without affecting existing functionality.

OAuth Token Expiration Fix

Problem Statement:

After prolonged idle periods (5+ minutes), Kafka outputs experienced authentication failures:

[error] SASL authentication error: Access denied (after 302ms in state AUTH_REQ)
[error] 3/3 brokers are down

Root Cause:

librdkafka's OAuth token refresh mechanism relies on rd_kafka_poll() being called regularly. For idle connections, rd_kafka_poll() is only called when producing messages. This is documented in librdkafka issue #3871:

"You need to explicitly call poll() once after creating the client to trigger the oauth callback"

Timeline without background callbacks:

T=0:     Connection established, OAuth token set (5-min lifetime)
T=1-5min: No messages to produce → rd_kafka_poll() never called
T=5min:  Token expires ❌
T=10min: New data arrives, rd_kafka_poll() called
         ├─ librdkafka tries to use expired token
         └─> Access Denied ❌

Solution: Background Callbacks

librdkafka v1.9.0+ provides rd_kafka_sasl_background_callbacks_enable() specifically for this use case:

"Enable SASL OAUTHBEARER refresh callbacks on the librdkafka background thread. This serves as an alternative for applications that do NOT call rd_kafka_poll() at regular intervals"

// Enable automatic token refresh in background thread
rd_kafka_sasl_background_callbacks_enable(rk);

Timeline with background callbacks:

T=0:00  Token generated (expires T=5:00)
        ├─ librdkafka starts background thread
        └─ Token refresh timer active in background

T=4:00  Background thread detects token at 80% lifetime
        ├─ Automatically triggers oauthbearer_token_refresh_cb()
        ├─ New token generated (fresh 5-min lifetime)
        └─> Token refreshed ✅

T=8:00  Background thread refreshes again
T=12:00 Background thread refreshes again
...

Result: Token NEVER expires, even with ZERO traffic ✅

Benefits:

  • ✅ Token refresh occurs automatically every ~4 minutes
  • ✅ Works on completely idle connections (no traffic for hours)
  • ✅ No application involvement needed (rd_kafka_poll() not required)
  • ✅ Built-in librdkafka feature (v1.9.0+, Fluent Bit uses 2.10.1)
  • ✅ Zero authentication failures on idle connections

TLS Support

This PR includes proper TLS support for AWS credential fetching:

ctx->cred_tls = flb_tls_create(FLB_TLS_CLIENT_MODE,
                                FLB_TRUE,
                                FLB_LOG_DEBUG,
                                NULL, NULL, NULL, NULL, NULL, NULL);

Features:

  • ✅ Secure communication with AWS credential services
  • ✅ Supports EC2 metadata, ECS, STS endpoints
  • ✅ Proper TLS lifecycle management (creation and cleanup)
  • ✅ Used by AWS credentials provider chain

Usage:

ctx->provider = flb_standard_chain_provider_create(config,
                                                   ctx->cred_tls,  // ← TLS instance
                                                   ctx->region,
                                                   ...);

Testing

  • Example configuration file for the change
  • Debug log output from testing the change
  • Attached Valgrind output that shows no leaks or memory corruption was found

Packaging

  • Run local packaging test showing all targets (including any new ones) build
  • Set ok-package-test label to test for all targets (requires maintainer to do)

Documentation

  • Documentation required for this feature

Backporting

  • Backport to latest stable release

Fluent Bit is licensed under Apache 2.0, by submitting this pull request I understand that this code will be released under the terms of that license.

Summary by CodeRabbit

  • New Features

    • Auto-detect AWS region from MSK broker addresses with optional aws_region override; MSK IAM integrates via OAUTHBEARER with background token refresh and TLS-backed credentials.
  • Bug Fixes

    • Token lifetime standardized to 5 minutes; credential access is thread-safe; improved error handling and resource cleanup during Kafka/MSK IAM init/shutdown.
  • Configuration Changes

    • MSK IAM now driven by rdkafka.sasl.mechanism; legacy aws_msk_iam and aws_msk_iam_cluster_arn options removed.
  • Documentation

    • Added Kafka MSK IAM examples and README.

✏️ Tip: You can customize this high-level summary in your review settings.

@kalavt kalavt requested review from a team, cosmo0920 and edsiper as code owners December 9, 2025 02:51
@coderabbitai
Copy link

coderabbitai bot commented Dec 9, 2025

Note

Other AI code review bot(s) detected

CodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review.

📝 Walkthrough

Walkthrough

MSK IAM auth was reworked: cluster ARN removed in favor of brokers+region; persistent TLS-backed AWS provider with mutexed token refresh added; Kafka plugins now drive MSK IAM via rdkafka.sasl.mechanism; config map entries and conf ownership/cleanup semantics updated.

Changes

Cohort / File(s) Summary
MSK IAM Header
include/fluent-bit/aws/flb_aws_msk_iam.h
Removed public struct flb_msk_iam_cb; changed flb_aws_msk_iam_register_oauth_cb signature to accept (config, kconf, opaque, brokers, region) and updated docblock.
MSK IAM Core
src/aws/flb_aws_msk_iam.c
Major refactor: add TLS (cred_tls), persistent AWS provider, mutex, fixed token lifetime (300s); add extract_region_from_broker; payload builder now requires explicit credentials; registration initializes TLS/provider and accepts brokers+region; updated lifecycle and destroy logic.
Input Kafka Plugin
plugins/in_kafka/in_kafka.c, plugins/in_kafka.h
Read rdkafka.sasl.mechanism; treat aws_msk_iam as explicit request -> switch to OAUTHBEARER, default security.protocol if missing, enable SASL queue, register OAuth callback when brokers provided; add aws_region field; remove aws_msk_iam_cluster_arn config entry; improve conf ownership and cleanup paths.
Output Kafka Plugin
plugins/out_kafka/kafka.c, plugins/out_kafka/kafka_config.c, plugins/out_kafka/kafka_config.h
Add aws_region config option; remove aws_msk_iam_cluster_arn and old aws_msk_iam entries; detect sasl.mechanism early, switch to OAUTHBEARER for aws_msk_iam, enable SASL queue, conditionally register OAuth callback, and adjust rd_kafka_conf ownership/cleanup and background callback enabling.
Kafka Conf Utilities
src/flb_kafka.c
Error-path cleanup now calls rd_kafka_conf_destroy instead of flb_free for kafka conf on failure.
AWS Credentials Minor Edits
src/aws/flb_aws_credentials_ec2.c, src/aws/flb_aws_credentials_profile.c, src/aws/flb_aws_credentials_sts.c
Small whitespace and log-level tweaks (downgrade ENOENT log); no behavioral changes.
Examples / Docs
examples/kafka_filter/README.md, examples/kafka_filter/kafka_msk_iam.conf
Add README and example config demonstrating MSK IAM usage, region auto-detection, PrivateLink, sample configs, testing and troubleshooting guidance.

Sequence Diagram(s)

sequenceDiagram
    autonumber
    participant FB as Fluent Bit
    participant LR as librdkafka
    participant MSK as flb_aws_msk_iam
    participant AWS as AWS Credentials Provider

    Note over FB: Init reads rdkafka.sasl.mechanism
    FB->>LR: create rd_kafka_conf()
    FB->>FB: detect sasl.mechanism == "aws_msk_iam"
    FB->>MSK: flb_aws_msk_iam_register_oauth_cb(config,kconf,opaque,brokers,region)
    MSK->>MSK: extract_region_from_broker(brokers) / init TLS / create provider / init mutex
    MSK-->>FB: register callback handle

    Note over LR,MSK: Background token refresh
    LR->>MSK: oauthbearer_token_refresh_cb(request)
    MSK->>MSK: lock mutex
    MSK->>AWS: provider->refresh_credentials()
    AWS-->>MSK: credentials
    MSK->>MSK: build_msk_iam_payload(host, credentials)
    MSK-->>LR: rd_kafka_oauthbearer_set_token(token, lifetime=300s)
    MSK->>MSK: unlock mutex
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related issues

Suggested reviewers

  • edsiper
  • cosmo0920
  • patrick-stephens
  • fujimotos
  • niedbalski
  • celalettin1286

Poem

🐇 I sniffed the brokers, found the region there,
Cluster ARNs hopped off — lighter is my care.
With TLS and mutex, tokens hop alive,
Background refresh keeps auth in drive.
Hop-hop hooray — MSK IAM now thrives!

Pre-merge checks and finishing touches

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 40.00% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title 'aws_msk_iam: add AWS MSK IAM authentication support' is clear, specific, and directly summarizes the main change: adding AWS MSK IAM authentication support to Fluent Bit.
✨ Finishing touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment

📜 Recent review details

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 6bb3ab3 and 001e478.

📒 Files selected for processing (1)
  • examples/kafka_filter/README.md
🚧 Files skipped from review as they are similar to previous changes (1)
  • examples/kafka_filter/README.md

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (1)
plugins/out_kafka/kafka_config.c (1)

74-82: Consider checking return values from flb_output_set_property.

The calls to flb_output_set_property() at lines 74 and 81 don't check return values. While unlikely to fail in practice, property setting can fail on allocation errors. For robustness:

-            flb_output_set_property(ins, "rdkafka.sasl.mechanism", "OAUTHBEARER");
+            if (flb_output_set_property(ins, "rdkafka.sasl.mechanism", "OAUTHBEARER") < 0) {
+                flb_plg_error(ins, "failed to set OAUTHBEARER SASL mechanism");
+                flb_sds_destroy(ctx->sasl_mechanism);
+                flb_free(ctx);
+                return NULL;
+            }
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 7ded9ae and b486cb7.

📒 Files selected for processing (11)
  • include/fluent-bit/aws/flb_aws_msk_iam.h (1 hunks)
  • plugins/in_kafka/in_kafka.c (4 hunks)
  • plugins/in_kafka/in_kafka.h (1 hunks)
  • plugins/out_kafka/kafka.c (0 hunks)
  • plugins/out_kafka/kafka_config.c (4 hunks)
  • plugins/out_kafka/kafka_config.h (1 hunks)
  • src/aws/flb_aws_credentials_ec2.c (1 hunks)
  • src/aws/flb_aws_credentials_profile.c (1 hunks)
  • src/aws/flb_aws_credentials_sts.c (2 hunks)
  • src/aws/flb_aws_msk_iam.c (12 hunks)
  • src/flb_kafka.c (1 hunks)
💤 Files with no reviewable changes (1)
  • plugins/out_kafka/kafka.c
🧰 Additional context used
🧠 Learnings (3)
📚 Learning: 2025-08-31T12:46:11.940Z
Learnt from: ThomasDevoogdt
Repo: fluent/fluent-bit PR: 9277
File: .github/workflows/pr-compile-check.yaml:147-151
Timestamp: 2025-08-31T12:46:11.940Z
Learning: In fluent-bit, the correct CMake flag for using system librdkafka is `FLB_PREFER_SYSTEM_LIB_KAFKA=ON`.

Applied to files:

  • src/flb_kafka.c
  • src/aws/flb_aws_msk_iam.c
📚 Learning: 2025-08-31T12:46:11.940Z
Learnt from: ThomasDevoogdt
Repo: fluent/fluent-bit PR: 9277
File: .github/workflows/pr-compile-check.yaml:147-151
Timestamp: 2025-08-31T12:46:11.940Z
Learning: In fluent-bit CMakeLists.txt, the system library preference flags are defined as FLB_PREFER_SYSTEM_LIB_ZSTD and FLB_PREFER_SYSTEM_LIB_KAFKA with the FLB_ prefix.

Applied to files:

  • src/aws/flb_aws_msk_iam.c
📚 Learning: 2025-08-29T06:25:27.250Z
Learnt from: shadowshot-x
Repo: fluent/fluent-bit PR: 10794
File: tests/internal/aws_compress.c:93-107
Timestamp: 2025-08-29T06:25:27.250Z
Learning: In Fluent Bit, ZSTD compression is enabled by default and is treated as a core dependency, not requiring conditional compilation guards like `#ifdef FLB_HAVE_ZSTD`. Unlike some other optional components such as ARROW/PARQUET (which use `#ifdef FLB_HAVE_ARROW` guards), ZSTD support is always available and doesn't need build-time conditionals. ZSTD headers are included directly without guards across multiple plugins and core components.

Applied to files:

  • src/aws/flb_aws_msk_iam.c
🧬 Code graph analysis (3)
plugins/in_kafka/in_kafka.c (4)
src/flb_input.c (2)
  • flb_input_get_property (776-780)
  • flb_input_set_property (557-774)
src/flb_sds.c (2)
  • flb_sds_create (78-90)
  • flb_sds_destroy (389-399)
src/aws/flb_aws_msk_iam.c (2)
  • flb_aws_msk_iam_register_oauth_cb (628-761)
  • flb_aws_msk_iam_destroy (764-786)
src/flb_kafka.c (1)
  • flb_kafka_opaque_destroy (233-240)
plugins/out_kafka/kafka_config.c (2)
src/flb_output.c (2)
  • flb_output_get_property (1108-1111)
  • flb_output_set_property (843-1068)
src/aws/flb_aws_msk_iam.c (1)
  • flb_aws_msk_iam_register_oauth_cb (628-761)
include/fluent-bit/aws/flb_aws_msk_iam.h (1)
src/aws/flb_aws_msk_iam.c (1)
  • flb_aws_msk_iam_register_oauth_cb (628-761)
🪛 Cppcheck (2.18.0)
src/aws/flb_aws_credentials_sts.c

[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.

(normalCheckLevelMaxBranches)


[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.

(toomanyconfigs)


[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.

(normalCheckLevelMaxBranches)


[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.

(toomanyconfigs)

src/flb_kafka.c

[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.

(toomanyconfigs)

src/aws/flb_aws_credentials_ec2.c

[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.

(normalCheckLevelMaxBranches)


[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.

(toomanyconfigs)

plugins/in_kafka/in_kafka.c

[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.

(normalCheckLevelMaxBranches)


[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.

(toomanyconfigs)


[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.

(normalCheckLevelMaxBranches)


[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.

(toomanyconfigs)


[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.

(normalCheckLevelMaxBranches)


[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.

(toomanyconfigs)


[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.

(normalCheckLevelMaxBranches)


[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.

(toomanyconfigs)

src/aws/flb_aws_credentials_profile.c

[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.

(normalCheckLevelMaxBranches)


[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.

(toomanyconfigs)

plugins/out_kafka/kafka_config.c

[information] 61-61: Include file

(missingIncludeSystem)


[information] 62-62: Include file

(missingIncludeSystem)


[information] 63-63: Include file

(missingIncludeSystem)


[information] 65-65: Include file

(missingIncludeSystem)


[information] 66-66: Include file

(missingIncludeSystem)


[information] 67-67: Include file

(missingIncludeSystem)


[information] 68-68: Include file

(missingIncludeSystem)


[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.

(normalCheckLevelMaxBranches)


[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.

(toomanyconfigs)


[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.

(normalCheckLevelMaxBranches)


[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.

(toomanyconfigs)


[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.

(normalCheckLevelMaxBranches)


[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.

(toomanyconfigs)


[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.

(normalCheckLevelMaxBranches)


[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.

(toomanyconfigs)

src/aws/flb_aws_msk_iam.c

[information] 31-31: Include file

(missingIncludeSystem)


[information] 31-31: Include file

(missingIncludeSystem)


[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.

(normalCheckLevelMaxBranches)


[information] 31-31: Include file

(missingIncludeSystem)


[information] 31-31: Include file

(missingIncludeSystem)


[information] 31-31: Include file

(missingIncludeSystem)


[information] 31-31: Include file

(missingIncludeSystem)


[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.

(toomanyconfigs)


[information] 49-49: Include file

(missingIncludeSystem)


[information] 50-50: Include file

(missingIncludeSystem)


[information] 51-51: Include file

(missingIncludeSystem)


[information] 52-52: Include file

(missingIncludeSystem)


[information] 53-53: Include file

(missingIncludeSystem)


[information] 54-54: Include file

(missingIncludeSystem)


[information] 40-40: Include file

(missingIncludeSystem)


[information] 41-41: Include file

(missingIncludeSystem)


[information] 40-40: Include file

(missingIncludeSystem)


[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.

(normalCheckLevelMaxBranches)


[information] 53-53: Include file

(missingIncludeSystem)


[information] 54-54: Include file

(missingIncludeSystem)


[information] 52-52: Include file

(missingIncludeSystem)


[information] 53-53: Include file

(missingIncludeSystem)


[information] 54-54: Include file

(missingIncludeSystem)


[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.

(toomanyconfigs)


[information] 137-137: Include file

(missingIncludeSystem)


[information] 138-138: Include file

(missingIncludeSystem)


[information] 139-139: Include file

(missingIncludeSystem)


[information] 140-140: Include file

(missingIncludeSystem)


[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.

(normalCheckLevelMaxBranches)


[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.

(toomanyconfigs)


[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.

(normalCheckLevelMaxBranches)


[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.

(toomanyconfigs)


[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.

(normalCheckLevelMaxBranches)


[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.

(toomanyconfigs)


[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.

(normalCheckLevelMaxBranches)


[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.

(toomanyconfigs)


[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.

(normalCheckLevelMaxBranches)


[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.

(toomanyconfigs)


[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.

(normalCheckLevelMaxBranches)


[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.

(toomanyconfigs)


[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.

(normalCheckLevelMaxBranches)


[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.

(toomanyconfigs)


[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.

(normalCheckLevelMaxBranches)


[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.

(toomanyconfigs)


[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.

(normalCheckLevelMaxBranches)


[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.

(toomanyconfigs)


[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.

(normalCheckLevelMaxBranches)


[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.

(toomanyconfigs)

🔇 Additional comments (24)
src/aws/flb_aws_credentials_ec2.c (1)

128-139: Whitespace-only change acknowledged.

This is a minor formatting change adding a blank line after the debug log. No functional impact.

src/aws/flb_aws_credentials_sts.c (1)

173-186: Formatting improvements enhance code consistency.

The added blank lines after debug statements in refresh_fn_sts and refresh_fn_eks align these functions with the existing style in init_fn_sts, init_fn_eks, get_credentials_fn_sts, and get_credentials_fn_eks. This improves readability by separating logging statements from control flow logic.

Also applies to: 478-489

src/aws/flb_aws_credentials_profile.c (1)

664-674: LGTM! Log level adjustment is appropriate.

Downgrading the log level for a missing credentials file (ENOENT) to debug is correct. A missing ~/.aws/credentials file is not an error when other credential sources (EC2/ECS metadata, config file with credential_process, environment variables) may be available. This aligns with the similar handling for the config file at line 623.

src/flb_kafka.c (1)

96-100: LGTM! Critical bug fix for proper resource cleanup.

Using rd_kafka_conf_destroy() is the correct way to clean up a rd_kafka_conf_t* object allocated by rd_kafka_conf_new(). The previous use of flb_free() would have caused memory corruption or leaks since librdkafka's configuration objects have internal structures that require proper destruction.

plugins/in_kafka/in_kafka.h (1)

57-60: LGTM! Clean API simplification.

The explicit aws_msk_iam flag replaces the removed aws_msk_iam_cluster_arn field, aligning with the PR's shift to broker-based region detection. The flag clearly indicates user intent via rdkafka.sasl.mechanism=aws_msk_iam, making the opt-in explicit rather than implicit.

plugins/out_kafka/kafka_config.h (1)

128-137: LGTM! Consistent with in_kafka changes.

The aws_msk_iam flag and sasl_mechanism field additions mirror the in_kafka.h structure, maintaining consistency across Kafka input and output plugins. The explicit flag design is cleaner than the previous ARN-based approach.

include/fluent-bit/aws/flb_aws_msk_iam.h (1)

37-49: LGTM! API simplification with clear documentation.

The updated signature removes cluster_arn in favor of brokers, enabling automatic region extraction from broker addresses. This simplifies user configuration—users no longer need to provide the cluster ARN. The documentation clearly explains each parameter's purpose.

plugins/out_kafka/kafka_config.c (4)

209-218: LGTM! Essential for OAuth token refresh on idle connections.

Enabling the SASL queue before producer creation is correct. This allows librdkafka's background thread to handle OAuth token refresh even when rd_kafka_poll() isn't called frequently, preventing authentication failures on idle connections.


220-250: LGTM! Well-structured MSK IAM registration with proper validation.

Good defensive checks: explicit flag, OAUTHBEARER mechanism, and MSK broker patterns must all be present. The callback registration failure correctly triggers cleanup and return.

One observation: the sasl.oauthbearer.config error at lines 243-247 only logs but doesn't fail. This is likely fine since it's a secondary configuration, but verify this doesn't cause issues with librdkafka's OAUTHBEARER validation.


253-285: LGTM! Correct ownership semantics and background callback handling.

Setting ctx->conf = NULL after successful rd_kafka_new() correctly reflects ownership transfer—librdkafka now owns the configuration. The SASL background callback enabling is done post-creation as required, and the non-fatal warning on failure is appropriate (graceful degradation).


344-351: LGTM! Proper two-path cleanup for configuration ownership.

The conditional cleanup correctly handles both scenarios:

  1. Producer created: rd_kafka_destroy() handles both producer and configuration
  2. Producer creation failed: ctx->conf is still valid and needs explicit rd_kafka_conf_destroy()

This pairs correctly with the ctx->conf = NULL assignment after successful rd_kafka_new().

plugins/in_kafka/in_kafka.c (6)

339-345: LGTM!

Opaque context creation and configuration follows proper error handling patterns.


347-356: LGTM!

Enabling SASL queue for OAUTHBEARER is the correct approach for background token refresh. The comment clearly explains the benefit for all OAUTHBEARER methods.


358-389: LGTM with minor observation.

The MSK IAM OAuth callback registration logic is well-structured. The broker pattern validation (".kafka." or ".kafka-serverless." with ".amazonaws.com") appropriately identifies MSK endpoints.

Note: The sasl.oauthbearer.config setting failure at lines 379-385 only logs an error but doesn't fail initialization. This appears intentional since the principal=admin value is primarily for librdkafka's internal validation and the actual authentication uses the generated token.


391-424: LGTM!

Excellent ownership semantics handling:

  • kafka_conf = NULL after successful rd_kafka_new() correctly prevents double-free
  • Clear comments document the ownership transfer
  • Graceful degradation when SASL background callbacks fail (warn but continue)

The SASL background callback enablement ensures tokens refresh even during idle periods.


486-512: LGTM!

Comprehensive error cleanup path:

  • Correctly handles the mutual exclusivity between kafka.rk (owns conf) and standalone kafka_conf
  • MSK IAM resources cleaned up under proper compile guard
  • Proper cleanup order prevents use-after-free

549-559: LGTM!

Exit cleanup mirrors the error path cleanup correctly. The cleanup order (MSK IAM → opaque → sasl_mechanism) is appropriate.

src/aws/flb_aws_msk_iam.c (7)

42-55: LGTM!

Well-designed structure with proper thread safety considerations. The 5-minute token lifetime aligns with AWS MSK IAM standards, and the mutex protects credential provider access during concurrent refresh callbacks.


214-268: LGTM!

Clean function signature with explicit credential passing (rather than fetching internally). Input validation is thorough with informative error messages.


432-478: LGTM!

The presigned URL construction and Base64 URL encoding are implemented correctly:

  • User-Agent parameter added for identification
  • Base64 URL encoding properly converts +-, /_, and strips padding
  • Memory cleanup is thorough in both success and error paths

555-612: LGTM!

Excellent concurrency handling:

  • Mutex protects credential provider access during refresh/get_credentials
  • Lock held for minimal duration (released immediately after getting credentials)
  • Credentials destroyed after use (security best practice)

The explicit vtable calls (provider_vtable->refresh, provider_vtable->get_credentials) assume the provider is valid, which is guaranteed by the context lifecycle.


763-785: LGTM!

Proper cleanup order respects dependencies:

  1. Provider (uses TLS)
  2. TLS
  3. Region string
  4. Mutex
  5. Context struct

The unconditional pthread_mutex_destroy is safe since a context only reaches this function if initialization succeeded (including mutex init).


519-551: LGTM!

Host buffer sizing is safe - the maximum formatted string length (~63 characters) is well within the 256-byte buffer, especially given the 32-character region length limit enforced in extract_region_from_broker.


728-738: Verify provider initialization sequence order.

The code calls sync() before init(), but Fluent Bit's documented AWS credential provider lifecycle calls init() at startup before periodic sync() calls. Confirm whether this reversed sequence is intentional for MSK IAM OAuth callback initialization or if init() should be called first.

Copy link

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

ℹ️ About Codex in GitHub

Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".

@cosmo0920
Copy link
Contributor

Still failing our linter:
❌ Commit b486cb7 failed:
Missing prefix in commit subject: 'aws_msk_iam,in_kafka,out_kafka: add AWS MSK IAM authentication support'

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (3)
src/aws/flb_aws_msk_iam.c (2)

137-212: Verify bounds check for VPC endpoint detection.

Line 180: The condition if (p >= broker + 5 && ...) uses pointer comparison. While likely correct, using offset comparison if (p - broker >= 5 && ...) would be clearer and more portable, explicitly checking there are at least 5 bytes before p before accessing p - 5.

Apply this diff for clarity:

     /* Check for VPC endpoint format: .vpce.amazonaws.com */
-    if (p >= broker + 5 && strncmp(p - 5, ".vpce", 5) == 0) {
+    if (p - broker >= 5 && strncmp(p - 5, ".vpce", 5) == 0) {
         /* For VPC endpoints, region ends at .vpce */
         end = p - 5;
     }

Consider adding unit tests for:

  • VPC endpoint format (vpce-xxx.kafka.region.vpce.amazonaws.com)
  • Brokers with/without ports
  • Edge cases near 32-character region limit

701-738: Verify TLS ownership to prevent potential double-free.

Lines 702-711 create ctx->cred_tls, which is then passed to flb_standard_chain_provider_create at line 714. If the provider stores this pointer internally (e.g., in provider->cred_tls), then the destroy path at lines 771-776 may cause a double-free: flb_aws_provider_destroy(ctx->provider) would free the TLS handle, and then flb_tls_destroy(ctx->cred_tls) would attempt to free it again.

Run the following script to check if the provider stores the TLS pointer:

#!/bin/bash
# Check if flb_standard_chain_provider_create stores the cred_tls pointer
ast-grep --pattern $'flb_standard_chain_provider_create($$$) {
  $$$
  $PROVIDER->cred_tls = $TLS;
  $$$
}'

# Also check the provider destroy function
rg -A 10 "flb_aws_provider_destroy|flb_standard_chain_provider_destroy" --type c
plugins/in_kafka/in_kafka.c (1)

271-297: Add NULL checks after flb_sds_create calls.

Both flb_sds_create calls (lines 274 and 286) can fail and return NULL, but there are no checks. If allocation fails, ctx->sasl_mechanism will be NULL, causing crashes in subsequent strcasecmp calls at lines 279, 353, 361, and 412.

Apply this diff to add proper NULL checks:

     conf = flb_input_get_property("rdkafka.sasl.mechanism", ins);
     if (conf) {
         ctx->sasl_mechanism = flb_sds_create(conf);
+        if (!ctx->sasl_mechanism) {
+            flb_plg_error(ins, "failed to allocate SASL mechanism string");
+            flb_free(ctx);
+            return -1;
+        }
         flb_plg_info(ins, "SASL mechanism configured: %s", ctx->sasl_mechanism);
         
 #ifdef FLB_HAVE_AWS_MSK_IAM
         /* Check if using aws_msk_iam as SASL mechanism */
         if (strcasecmp(conf, "aws_msk_iam") == 0) {
             /* Mark that user explicitly requested AWS MSK IAM */
             ctx->aws_msk_iam = FLB_TRUE;
             
             /* Set SASL mechanism to OAUTHBEARER for librdkafka */
             flb_input_set_property(ins, "rdkafka.sasl.mechanism", "OAUTHBEARER");
             flb_sds_destroy(ctx->sasl_mechanism);
             ctx->sasl_mechanism = flb_sds_create("OAUTHBEARER");
+            if (!ctx->sasl_mechanism) {
+                flb_plg_error(ins, "failed to allocate SASL mechanism string");
+                flb_free(ctx);
+                return -1;
+            }
🧹 Nitpick comments (4)
src/aws/flb_aws_credentials_sts.c (1)

178-178: Trailing whitespace detected.

Lines 178 and 483 contain trailing whitespace/spaces on otherwise blank lines. While not a functional issue, this may cause linter warnings or be flagged in CI.

-    
+

Also applies to: 483-483

src/aws/flb_aws_credentials_ec2.c (1)

133-133: Trailing whitespace on blank line.

Same pattern as in other credential provider files—line 133 has trailing whitespace. Consider removing for consistency with project style.

plugins/out_kafka/kafka_config.c (1)

220-250: Consider making sasl.oauthbearer.config failure fatal.

The OAuth callback registration correctly validates broker patterns for MSK. However, if rd_kafka_conf_set fails for sasl.oauthbearer.config (lines 241-247), execution continues. Since this configuration is part of the OAUTHBEARER setup, a failure here may cause authentication issues at runtime.

            res = rd_kafka_conf_set(ctx->conf, "sasl.oauthbearer.config",
                                    "principal=admin", errstr, sizeof(errstr));
            if (res != RD_KAFKA_CONF_OK) {
                flb_plg_error(ctx->ins,
                             "failed to set sasl.oauthbearer.config: %s",
                             errstr);
+               flb_out_kafka_destroy(ctx);
+               return NULL;
            }
plugins/in_kafka/in_kafka.c (1)

347-356: Consider checking rd_kafka_conf_enable_sasl_queue return value.

While unlikely to fail, rd_kafka_conf_enable_sasl_queue returns an error code. Adding a check would make error handling more complete.

     if (ctx->sasl_mechanism && strcasecmp(ctx->sasl_mechanism, "OAUTHBEARER") == 0) {
-        rd_kafka_conf_enable_sasl_queue(kafka_conf, 1);
+        res = rd_kafka_conf_enable_sasl_queue(kafka_conf, 1);
+        if (res != RD_KAFKA_CONF_OK) {
+            flb_plg_warn(ins, "Failed to enable SASL queue (non-critical)");
+        }
         flb_plg_debug(ins, "SASL queue enabled for OAUTHBEARER mechanism");
     }
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between b486cb7 and 2264d52.

📒 Files selected for processing (11)
  • include/fluent-bit/aws/flb_aws_msk_iam.h (1 hunks)
  • plugins/in_kafka/in_kafka.c (4 hunks)
  • plugins/in_kafka/in_kafka.h (1 hunks)
  • plugins/out_kafka/kafka.c (0 hunks)
  • plugins/out_kafka/kafka_config.c (4 hunks)
  • plugins/out_kafka/kafka_config.h (1 hunks)
  • src/aws/flb_aws_credentials_ec2.c (1 hunks)
  • src/aws/flb_aws_credentials_profile.c (1 hunks)
  • src/aws/flb_aws_credentials_sts.c (2 hunks)
  • src/aws/flb_aws_msk_iam.c (12 hunks)
  • src/flb_kafka.c (1 hunks)
💤 Files with no reviewable changes (1)
  • plugins/out_kafka/kafka.c
🚧 Files skipped from review as they are similar to previous changes (1)
  • include/fluent-bit/aws/flb_aws_msk_iam.h
🧰 Additional context used
🧠 Learnings (3)
📚 Learning: 2025-08-31T12:46:11.940Z
Learnt from: ThomasDevoogdt
Repo: fluent/fluent-bit PR: 9277
File: .github/workflows/pr-compile-check.yaml:147-151
Timestamp: 2025-08-31T12:46:11.940Z
Learning: In fluent-bit, the correct CMake flag for using system librdkafka is `FLB_PREFER_SYSTEM_LIB_KAFKA=ON`.

Applied to files:

  • src/aws/flb_aws_msk_iam.c
  • src/flb_kafka.c
📚 Learning: 2025-08-31T12:46:11.940Z
Learnt from: ThomasDevoogdt
Repo: fluent/fluent-bit PR: 9277
File: .github/workflows/pr-compile-check.yaml:147-151
Timestamp: 2025-08-31T12:46:11.940Z
Learning: In fluent-bit CMakeLists.txt, the system library preference flags are defined as FLB_PREFER_SYSTEM_LIB_ZSTD and FLB_PREFER_SYSTEM_LIB_KAFKA with the FLB_ prefix.

Applied to files:

  • src/aws/flb_aws_msk_iam.c
📚 Learning: 2025-08-29T06:25:27.250Z
Learnt from: shadowshot-x
Repo: fluent/fluent-bit PR: 10794
File: tests/internal/aws_compress.c:93-107
Timestamp: 2025-08-29T06:25:27.250Z
Learning: In Fluent Bit, ZSTD compression is enabled by default and is treated as a core dependency, not requiring conditional compilation guards like `#ifdef FLB_HAVE_ZSTD`. Unlike some other optional components such as ARROW/PARQUET (which use `#ifdef FLB_HAVE_ARROW` guards), ZSTD support is always available and doesn't need build-time conditionals. ZSTD headers are included directly without guards across multiple plugins and core components.

Applied to files:

  • src/aws/flb_aws_msk_iam.c
🧬 Code graph analysis (3)
plugins/out_kafka/kafka_config.c (3)
src/flb_output.c (2)
  • flb_output_get_property (1108-1111)
  • flb_output_set_property (843-1068)
src/flb_sds.c (2)
  • flb_sds_create (78-90)
  • flb_sds_destroy (389-399)
src/aws/flb_aws_msk_iam.c (1)
  • flb_aws_msk_iam_register_oauth_cb (628-761)
src/aws/flb_aws_msk_iam.c (1)
src/tls/flb_tls.c (1)
  • flb_tls_create (183-232)
plugins/in_kafka/in_kafka.c (4)
src/flb_input.c (2)
  • flb_input_get_property (776-780)
  • flb_input_set_property (557-774)
src/flb_sds.c (2)
  • flb_sds_create (78-90)
  • flb_sds_destroy (389-399)
src/aws/flb_aws_msk_iam.c (2)
  • flb_aws_msk_iam_register_oauth_cb (628-761)
  • flb_aws_msk_iam_destroy (764-786)
src/flb_kafka.c (1)
  • flb_kafka_opaque_destroy (233-240)
🪛 Cppcheck (2.18.0)
plugins/out_kafka/kafka_config.c

[information] 61-61: Include file

(missingIncludeSystem)


[information] 62-62: Include file

(missingIncludeSystem)


[information] 63-63: Include file

(missingIncludeSystem)


[information] 65-65: Include file

(missingIncludeSystem)


[information] 66-66: Include file

(missingIncludeSystem)


[information] 67-67: Include file

(missingIncludeSystem)


[information] 68-68: Include file

(missingIncludeSystem)


[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.

(normalCheckLevelMaxBranches)


[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.

(toomanyconfigs)


[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.

(normalCheckLevelMaxBranches)


[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.

(toomanyconfigs)


[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.

(normalCheckLevelMaxBranches)


[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.

(toomanyconfigs)


[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.

(normalCheckLevelMaxBranches)


[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.

(toomanyconfigs)

src/aws/flb_aws_msk_iam.c

[information] 31-31: Include file

(missingIncludeSystem)


[information] 31-31: Include file

(missingIncludeSystem)


[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.

(normalCheckLevelMaxBranches)


[information] 31-31: Include file

(missingIncludeSystem)


[information] 31-31: Include file

(missingIncludeSystem)


[information] 31-31: Include file

(missingIncludeSystem)


[information] 31-31: Include file

(missingIncludeSystem)


[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.

(toomanyconfigs)


[information] 49-49: Include file

(missingIncludeSystem)


[information] 50-50: Include file

(missingIncludeSystem)


[information] 51-51: Include file

(missingIncludeSystem)


[information] 52-52: Include file

(missingIncludeSystem)


[information] 53-53: Include file

(missingIncludeSystem)


[information] 54-54: Include file

(missingIncludeSystem)


[information] 40-40: Include file

(missingIncludeSystem)


[information] 41-41: Include file

(missingIncludeSystem)


[information] 40-40: Include file

(missingIncludeSystem)


[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.

(normalCheckLevelMaxBranches)


[information] 53-53: Include file

(missingIncludeSystem)


[information] 54-54: Include file

(missingIncludeSystem)


[information] 52-52: Include file

(missingIncludeSystem)


[information] 53-53: Include file

(missingIncludeSystem)


[information] 54-54: Include file

(missingIncludeSystem)


[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.

(toomanyconfigs)


[information] 137-137: Include file

(missingIncludeSystem)


[information] 138-138: Include file

(missingIncludeSystem)


[information] 139-139: Include file

(missingIncludeSystem)


[information] 140-140: Include file

(missingIncludeSystem)


[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.

(normalCheckLevelMaxBranches)


[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.

(toomanyconfigs)


[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.

(normalCheckLevelMaxBranches)


[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.

(toomanyconfigs)


[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.

(normalCheckLevelMaxBranches)


[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.

(toomanyconfigs)


[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.

(normalCheckLevelMaxBranches)


[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.

(toomanyconfigs)


[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.

(normalCheckLevelMaxBranches)


[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.

(toomanyconfigs)


[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.

(normalCheckLevelMaxBranches)


[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.

(toomanyconfigs)


[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.

(normalCheckLevelMaxBranches)


[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.

(toomanyconfigs)


[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.

(normalCheckLevelMaxBranches)


[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.

(toomanyconfigs)


[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.

(normalCheckLevelMaxBranches)


[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.

(toomanyconfigs)


[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.

(normalCheckLevelMaxBranches)


[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.

(toomanyconfigs)

plugins/in_kafka/in_kafka.c

[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.

(normalCheckLevelMaxBranches)


[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.

(toomanyconfigs)


[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.

(normalCheckLevelMaxBranches)


[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.

(toomanyconfigs)


[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.

(normalCheckLevelMaxBranches)


[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.

(toomanyconfigs)


[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.

(normalCheckLevelMaxBranches)


[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.

(toomanyconfigs)

src/flb_kafka.c

[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.

(toomanyconfigs)

src/aws/flb_aws_credentials_ec2.c

[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.

(normalCheckLevelMaxBranches)


[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.

(toomanyconfigs)

src/aws/flb_aws_credentials_profile.c

[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.

(normalCheckLevelMaxBranches)


[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.

(toomanyconfigs)

src/aws/flb_aws_credentials_sts.c

[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.

(normalCheckLevelMaxBranches)


[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.

(toomanyconfigs)


[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.

(normalCheckLevelMaxBranches)


[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.

(toomanyconfigs)

🔇 Additional comments (16)
src/aws/flb_aws_credentials_profile.c (1)

664-674: LGTM! Appropriate log level adjustment for missing credentials file.

Downgrading the log level to AWS_CREDS_DEBUG for ENOENT is correct—a missing credentials file is an expected scenario when using alternative credential sources (environment variables, EC2 IMDS, EKS, etc.). This aligns with the handling in get_shared_config_credentials at line 623.

plugins/out_kafka/kafka_config.h (1)

128-131: LGTM! Clean struct refactoring for MSK IAM configuration.

Replacing the aws_msk_iam_cluster_arn field with a simple aws_msk_iam flag is a good simplification. Per the PR objectives, region and cluster type are now auto-extracted from broker hostnames, making the explicit cluster ARN unnecessary.

src/flb_kafka.c (1)

96-100: LGTM! Critical fix for proper librdkafka configuration cleanup.

Using rd_kafka_conf_destroy() instead of flb_free() is the correct approach. The rd_kafka_conf_t object allocated by rd_kafka_conf_new() has internal structures that must be properly released by librdkafka's destructor, not by a simple memory free.

plugins/in_kafka/in_kafka.h (1)

57-60: LGTM! Consistent with out_kafka configuration structure.

The aws_msk_iam flag addition mirrors the changes in plugins/out_kafka/kafka_config.h, ensuring both input and output Kafka plugins have aligned MSK IAM configuration approaches.

plugins/out_kafka/kafka_config.c (4)

61-87: LGTM! Well-structured SASL mechanism handling for MSK IAM.

The logic correctly:

  1. Captures the user-configured SASL mechanism
  2. Detects the aws_msk_iam alias and converts it to OAUTHBEARER
  3. Sets appropriate defaults for security.protocol
  4. Properly manages the sasl_mechanism SDS string lifecycle

209-218: LGTM! Universal OAUTHBEARER enhancement.

Enabling the SASL queue for all OAUTHBEARER configurations is a good design choice. This ensures token refresh works correctly on idle connections regardless of the OAuth provider (AWS IAM, OIDC, custom, etc.).


252-286: LGTM! Correct ownership semantics for librdkafka configuration.

The ownership handling is well-documented and correct:

  • On rd_kafka_new() success: ctx->conf = NULL prevents double-free since librdkafka now owns it
  • On failure: ctx->conf remains valid for cleanup in flb_out_kafka_destroy()

The degraded handling for SASL background callback failures (warning instead of fatal) is reasonable—the connection may still function, though with potential token refresh issues on idle connections.


344-351: LGTM! Proper cleanup handling for all failure scenarios.

The destroy logic correctly handles both cases:

  1. When rd_kafka_new() succeeded: rd_kafka_destroy() handles the configuration
  2. When rd_kafka_new() failed: manual rd_kafka_conf_destroy() is needed

Since ctx->conf is set to NULL after successful producer creation (line 266), there's no risk of double-free.

plugins/in_kafka/in_kafka.c (4)

358-389: LGTM: MSK IAM registration logic is sound.

The conditional registration based on explicit user request, SASL mechanism, and broker patterns is appropriate. Error handling properly cleans up on failure.


391-424: LGTM: Ownership semantics and SASL background callbacks.

The ownership transfer of kafka_conf is properly handled: preserved on failure, nulled on success. SASL background callback enablement is correctly implemented with appropriate error handling.


486-507: LGTM: Error cleanup path is comprehensive.

The cleanup logic properly handles all resources: kafka_topics, kafka.rk, kafka_conf (conditional), opaque, and msk_iam. The conditional destruction of kafka_conf based on whether rd_kafka_new succeeded is correct.


549-553: LGTM: Exit cleanup is correct.

MSK IAM cleanup in the exit path is properly implemented and consistent with the init_error cleanup.

src/aws/flb_aws_msk_iam.c (4)

31-55: LGTM: Struct and include updates support new lifecycle management.

The added includes (TLS, pthread) and struct fields (is_serverless, cred_tls, provider, lock) properly support the enhanced credential provider lifecycle and concurrency control.


214-512: LGTM: Payload generation refactored with proper credential handling.

The updated signature accepts credentials directly, separating concerns. Validation, error handling, and cleanup are comprehensive. The User-Agent addition is a useful enhancement.


514-625: LGTM: OAuth callback with proper concurrency control.

The mutex protection for credential provider access is critical and correctly implemented. The lock is held only during credential operations, not payload generation. Error paths properly unlock the mutex, and credentials are destroyed after use.


740-761: LGTM: Mutex initialization and OAuth callback registration.

Mutex initialization includes proper error handling and cleanup. The opaque context setup and OAuth callback registration are correctly implemented.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (2)
src/aws/flb_aws_msk_iam.c (1)

627-760: Verify TLS ownership to avoid potential double‑free in MSK IAM destroy.

flb_aws_msk_iam_register_oauth_cb() creates ctx->cred_tls and passes it into flb_standard_chain_provider_create(), then flb_aws_msk_iam_destroy() calls:

if (ctx->provider) {
    flb_aws_provider_destroy(ctx->provider);
}

if (ctx->cred_tls) {
    flb_tls_destroy(ctx->cred_tls);
}

Depending on how flb_standard_chain_provider_create() and flb_aws_provider_destroy() are implemented, the provider may already own and destroy cred_tls. If so, calling flb_tls_destroy(ctx->cred_tls) would double‑free the same TLS handle.

This concern mirrors an earlier review comment on the same area; please re‑confirm the ownership contract and adjust either the provider or the MSK IAM destroy path so TLS is released exactly once (e.g., by having the provider own TLS and dropping the explicit flb_tls_destroy, or vice versa).

#!/bin/bash
# Inspect AWS provider/TLS ownership to confirm whether cred_tls is freed by the provider.
rg -n "struct flb_aws_provider" src/aws include -n -C3 || true
rg -n "flb_standard_chain_provider_create" src/aws include -n -C5 || true
rg -n "cred_tls" src/aws include -n -C5 || true

Also applies to: 763-785

plugins/in_kafka/in_kafka.c (1)

271-276: Handle flb_sds_create(conf) OOM before logging/using ctx->sasl_mechanism.

flb_sds_create(conf) can return NULL, but ctx->sasl_mechanism is immediately formatted with %s (Line 275) and later used in comparisons. On OOM this is undefined behavior and can crash the process.

Consider failing init (or at least skipping SASL‑specific logic) on allocation failure:

    conf = flb_input_get_property("rdkafka.sasl.mechanism", ins);
    if (conf) {
-        ctx->sasl_mechanism = flb_sds_create(conf);
-        flb_plg_info(ins, "SASL mechanism configured: %s", ctx->sasl_mechanism);
+        ctx->sasl_mechanism = flb_sds_create(conf);
+        if (!ctx->sasl_mechanism) {
+            flb_plg_error(ins, "failed to allocate SASL mechanism string");
+            goto init_error;
+        }
+        flb_plg_info(ins, "SASL mechanism configured: %s", ctx->sasl_mechanism);
🧹 Nitpick comments (3)
plugins/in_kafka/in_kafka.c (1)

365-393: Log when aws_msk_iam was requested but brokers don’t look like MSK.

If ctx->aws_msk_iam is true but ctx->kafka.brokers is unset or doesn’t contain the expected .kafka. / .kafka-serverless. and .amazonaws.com substrings, MSK IAM is silently skipped. That can be confusing for users who set rdkafka.sasl.mechanism=aws_msk_iam but get no IAM callback.

Consider adding an explicit warning in the “else” case to make this visible:

#ifdef FLB_HAVE_AWS_MSK_IAM
-    if (ctx->aws_msk_iam && ctx->sasl_mechanism && 
-        strcasecmp(ctx->sasl_mechanism, "OAUTHBEARER") == 0) {
-        /* Check if brokers are configured for MSK IAM */
-        if (ctx->kafka.brokers && 
-            (strstr(ctx->kafka.brokers, ".kafka.") || strstr(ctx->kafka.brokers, ".kafka-serverless.")) && 
-            strstr(ctx->kafka.brokers, ".amazonaws.com")) {
+    if (ctx->aws_msk_iam && ctx->sasl_mechanism &&
+        strcasecmp(ctx->sasl_mechanism, "OAUTHBEARER") == 0) {
+        /* Check if brokers are configured for MSK IAM */
+        if (ctx->kafka.brokers &&
+            (strstr(ctx->kafka.brokers, ".kafka.") ||
+             strstr(ctx->kafka.brokers, ".kafka-serverless.")) &&
+            strstr(ctx->kafka.brokers, ".amazonaws.com")) {
             ...
-        }
+        }
+        else {
+            flb_plg_warn(ins,
+                         "aws_msk_iam requested but brokers do not look like MSK; "
+                         "skipping MSK IAM OAuth callback registration");
+        }
     }
#endif
src/aws/flb_aws_msk_iam.c (2)

42-55: Region extraction helper looks safe; add focused tests for broker variants.

extract_region_from_broker() now:

  • Strips the port,
  • Ensures .amazonaws.com lies within the host portion,
  • Safely detects .vpce.amazonaws.com with the p - broker >= 5 guard, and
  • Bounds region length to (0, 32].

The pointer arithmetic and length checks look correct, but this logic is subtle and critical. It would benefit from unit tests covering at least:

  • Standard MSK: b-1.example.c1.kafka.us-east-1.amazonaws.com[:9098]
  • Serverless: boot-xxx.c1.kafka-serverless.eu-west-1.amazonaws.com[:9098]
  • VPC endpoint: vpce-xxx.kafka.ap-southeast-1.vpce.amazonaws.com[:9098]
  • Malformed inputs: missing region, missing .amazonaws.com, or trailing junk after .amazonaws.com.

Also applies to: 137-212


514-625: OAuth refresh callback concurrency fix is sound; consider minor defensive checks.

The callback now:

  • Validates opaque and msk_iam_ctx,
  • Builds the MSK host based on config->is_serverless,
  • Locks config->lock around provider->refresh() and provider->get_credentials(),
  • Unlocks before building the payload and interacting with librdkafka, and
  • Always destroys creds on all paths.

This is a good fix for concurrent access to the shared provider. As a minor hardening step, you could also guard against a null config->provider (e.g., if future refactors allow partial teardown) before dereferencing it inside the locked section, returning token failure early if it’s missing.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 2264d52 and a94d17e.

📒 Files selected for processing (2)
  • plugins/in_kafka/in_kafka.c (4 hunks)
  • src/aws/flb_aws_msk_iam.c (12 hunks)
🧰 Additional context used
🧠 Learnings (3)
📚 Learning: 2025-08-31T12:46:11.940Z
Learnt from: ThomasDevoogdt
Repo: fluent/fluent-bit PR: 9277
File: .github/workflows/pr-compile-check.yaml:147-151
Timestamp: 2025-08-31T12:46:11.940Z
Learning: In fluent-bit, the correct CMake flag for using system librdkafka is `FLB_PREFER_SYSTEM_LIB_KAFKA=ON`.

Applied to files:

  • src/aws/flb_aws_msk_iam.c
📚 Learning: 2025-08-31T12:46:11.940Z
Learnt from: ThomasDevoogdt
Repo: fluent/fluent-bit PR: 9277
File: .github/workflows/pr-compile-check.yaml:147-151
Timestamp: 2025-08-31T12:46:11.940Z
Learning: In fluent-bit CMakeLists.txt, the system library preference flags are defined as FLB_PREFER_SYSTEM_LIB_ZSTD and FLB_PREFER_SYSTEM_LIB_KAFKA with the FLB_ prefix.

Applied to files:

  • src/aws/flb_aws_msk_iam.c
📚 Learning: 2025-08-29T06:25:27.250Z
Learnt from: shadowshot-x
Repo: fluent/fluent-bit PR: 10794
File: tests/internal/aws_compress.c:93-107
Timestamp: 2025-08-29T06:25:27.250Z
Learning: In Fluent Bit, ZSTD compression is enabled by default and is treated as a core dependency, not requiring conditional compilation guards like `#ifdef FLB_HAVE_ZSTD`. Unlike some other optional components such as ARROW/PARQUET (which use `#ifdef FLB_HAVE_ARROW` guards), ZSTD support is always available and doesn't need build-time conditionals. ZSTD headers are included directly without guards across multiple plugins and core components.

Applied to files:

  • src/aws/flb_aws_msk_iam.c
🧬 Code graph analysis (1)
plugins/in_kafka/in_kafka.c (4)
src/flb_input.c (1)
  • flb_input_get_property (776-780)
src/flb_sds.c (2)
  • flb_sds_create (78-90)
  • flb_sds_destroy (389-399)
src/aws/flb_aws_msk_iam.c (1)
  • flb_aws_msk_iam_register_oauth_cb (628-761)
src/flb_kafka.c (1)
  • flb_kafka_opaque_destroy (233-240)
🪛 Cppcheck (2.18.0)
src/aws/flb_aws_msk_iam.c

[information] 31-31: Include file

(missingIncludeSystem)


[information] 31-31: Include file

(missingIncludeSystem)


[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.

(normalCheckLevelMaxBranches)


[information] 31-31: Include file

(missingIncludeSystem)


[information] 31-31: Include file

(missingIncludeSystem)


[information] 31-31: Include file

(missingIncludeSystem)


[information] 31-31: Include file

(missingIncludeSystem)


[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.

(toomanyconfigs)


[information] 49-49: Include file

(missingIncludeSystem)


[information] 50-50: Include file

(missingIncludeSystem)


[information] 51-51: Include file

(missingIncludeSystem)


[information] 52-52: Include file

(missingIncludeSystem)


[information] 53-53: Include file

(missingIncludeSystem)


[information] 54-54: Include file

(missingIncludeSystem)


[information] 40-40: Include file

(missingIncludeSystem)


[information] 41-41: Include file

(missingIncludeSystem)


[information] 40-40: Include file

(missingIncludeSystem)


[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.

(normalCheckLevelMaxBranches)


[information] 53-53: Include file

(missingIncludeSystem)


[information] 54-54: Include file

(missingIncludeSystem)


[information] 52-52: Include file

(missingIncludeSystem)


[information] 53-53: Include file

(missingIncludeSystem)


[information] 54-54: Include file

(missingIncludeSystem)


[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.

(toomanyconfigs)


[information] 137-137: Include file

(missingIncludeSystem)


[information] 138-138: Include file

(missingIncludeSystem)


[information] 139-139: Include file

(missingIncludeSystem)


[information] 140-140: Include file

(missingIncludeSystem)


[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.

(normalCheckLevelMaxBranches)


[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.

(toomanyconfigs)


[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.

(normalCheckLevelMaxBranches)


[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.

(toomanyconfigs)


[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.

(normalCheckLevelMaxBranches)


[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.

(toomanyconfigs)


[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.

(normalCheckLevelMaxBranches)


[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.

(toomanyconfigs)


[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.

(normalCheckLevelMaxBranches)


[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.

(toomanyconfigs)


[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.

(normalCheckLevelMaxBranches)


[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.

(toomanyconfigs)


[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.

(normalCheckLevelMaxBranches)


[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.

(toomanyconfigs)


[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.

(normalCheckLevelMaxBranches)


[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.

(toomanyconfigs)


[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.

(normalCheckLevelMaxBranches)


[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.

(toomanyconfigs)


[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.

(normalCheckLevelMaxBranches)


[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.

(toomanyconfigs)

plugins/in_kafka/in_kafka.c

[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.

(normalCheckLevelMaxBranches)


[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.

(toomanyconfigs)


[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.

(normalCheckLevelMaxBranches)


[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.

(toomanyconfigs)


[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.

(normalCheckLevelMaxBranches)


[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.

(toomanyconfigs)


[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.

(normalCheckLevelMaxBranches)


[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.

(toomanyconfigs)

🔇 Additional comments (5)
plugins/in_kafka/in_kafka.c (4)

279-297: aws_msk_iam branch looks correct; minor allocation path nuance only.

The aws_msk_iam handling (flagging explicit use, flipping mechanism to OAUTHBEARER, defaulting security.protocol to SASL_SSL) is consistent and the second flb_sds_create("OAUTHBEARER") is now correctly NULL‑checked. The early return -1 on OAUTHBEARER SDS OOM is safe since no additional resources have been created yet beyond ctx.

No further changes needed here once the initial ctx->sasl_mechanism OOM is guarded.


352-362: SASL queue enablement for OAUTHBEARER is well‑scoped.

Enabling the SASL queue only when ctx->sasl_mechanism == "OAUTHBEARER" confines the behavior to OAuth flows and avoids surprising other mechanisms. The debug log is also useful for troubleshooting.


417-429: SASL background callbacks enablement is robustly guarded.

The background callbacks are only enabled for the OAUTHBEARER mechanism, the error path logs a clear warning and destroys the returned rd_kafka_error_t *, and the success path logs at info. This is a solid integration of librdkafka’s background refresh with good observability.


491-503: Kafka conf / MSK IAM cleanup paths are consistent with ownership semantics.

The init error path correctly distinguishes:

  • rd_kafka_destroy(ctx->kafka.rk) when rk exists (conf owned by rk),
  • rd_kafka_conf_destroy(kafka_conf) when rk was never created, and
  • flb_aws_msk_iam_destroy(ctx->msk_iam) when MSK IAM was partially initialized.

This avoids leaking kafka_conf and MSK IAM resources without risking double‑destroy.

Also applies to: 508-512

src/aws/flb_aws_msk_iam.c (1)

214-512: MSK IAM payload builder refactor is thorough and cleanup‑safe.

The refactored build_msk_iam_payload():

  • Validates config->region, host, and key fields on creds early with clear error logs.
  • Builds SigV4 credential, query, canonical request, string‑to‑sign, and signature with goto error on any failure.
  • Correctly frees all intermediate SDS values and the presigned URL in the error path (including empty_payload_hex and key), and nulls empty_payload_hex on the success path to avoid double destroy.
  • Generates a base64 payload, then converts to URL‑safe base64 and strips padding in place.

This looks correct and memory‑safe with no obvious leaks or double‑frees.

@kalavt kalavt force-pushed the feature/aws-msk-iam-clean branch from a94d17e to 095b814 Compare December 9, 2025 04:15
@kalavt kalavt changed the title aws_msk_iam,in_kafka,out_kafka: add AWS MSK IAM authentication support aws_msk_iam: add AWS MSK IAM authentication support Dec 9, 2025
@kalavt
Copy link
Author

kalavt commented Dec 9, 2025

Still failing our linter: ❌ Commit b486cb7 failed: Missing prefix in commit subject: 'aws_msk_iam,in_kafka,out_kafka: add AWS MSK IAM authentication support'

@cosmo0920 should be clean now?

@cosmo0920
Copy link
Contributor

Still failing our linter: ❌ Commit b486cb7 failed: Missing prefix in commit subject: 'aws_msk_iam,in_kafka,out_kafka: add AWS MSK IAM authentication support'

@cosmo0920 should be clean now?

Yes, it's clean now:
https://github.com/fluent/fluent-bit/actions/runs/20051852936/job/57522155166?pr=11270

Implements the core AWS MSK IAM authentication mechanism including:
- OAuth callback mechanism for token generation and refresh
- Token lifecycle management and expiration handling
- Integration with AWS credential providers
- SASL/OAUTHBEARER protocol support for librdkafka

This provides the foundation for AWS MSK IAM authentication support
in Fluent Bit's Kafka plugins.

Signed-off-by: Arbin <[email protected]>
Enhance EC2 credential provider to better support MSK IAM authentication
by improving credential refresh behavior and lifecycle management.

Signed-off-by: Arbin <[email protected]>
Enhance profile credential provider to better support MSK IAM authentication
by improving credential refresh behavior and lifecycle management.

Signed-off-by: Arbin <[email protected]>
Enhance STS credential provider to better support MSK IAM authentication
by improving credential refresh behavior and lifecycle management.

Signed-off-by: Arbin <[email protected]>
Update Kafka core functionality to support AWS MSK IAM authentication,
including necessary configuration and lifecycle improvements.

Signed-off-by: Arbin <[email protected]>
Enable AWS MSK IAM authentication in the Kafka input plugin:
- Add AWS MSK IAM configuration options
- Integrate with OAuth callback mechanism
- Support automatic credential refresh
- Add TLS configuration for secure connections

Signed-off-by: Arbin <[email protected]>
Enable AWS MSK IAM authentication in the Kafka output plugin:
- Add AWS MSK IAM configuration options
- Integrate with OAuth callback mechanism
- Support automatic credential refresh
- Add TLS configuration for secure connections

Signed-off-by: Arbin <[email protected]>
Add NULL checks after flb_sds_create() when allocating SASL mechanism strings to prevent crashes on allocation failure. This covers both the initial SASL mechanism configuration and the AWS MSK IAM OAUTHBEARER conversion.

Signed-off-by: Arbin <[email protected]>
Replace pointer comparison with offset comparison in VPC endpoint detection to improve safety and clarity. Changes 'p >= broker + 5' to 'p - broker >= 5' to properly check offset within string bounds before accessing p - 5.

Signed-off-by: Arbin <[email protected]>
- Remove is_serverless detection logic
- Use actual broker hostname instead of constructed host
- Fix memory leak in error cleanup path
- Add broker_host field to store actual hostname
- Update function signature to accept optional region parameter

This aligns with official AWS MSK IAM signers behavior where
the signature Host must match the TLS SNI/actual connection host.

Signed-off-by: Arbin <[email protected]>
- Add aws_region configuration field
- Remove hostname pattern check for MSK IAM registration
- Pass aws_region to MSK IAM registration function
- Support PrivateLink and custom DNS scenarios

Signed-off-by: Arbin <[email protected]>
- Add aws_region configuration field
- Remove hostname pattern check for MSK IAM registration
- Pass aws_region to MSK IAM registration function
- Support PrivateLink and custom DNS scenarios

Signed-off-by: Arbin <[email protected]>
- Add comprehensive MSK IAM configuration examples
- Cover Standard MSK, Serverless, PrivateLink scenarios
- Document aws_region parameter usage
- Add troubleshooting guide and IAM permissions
- Update README with detailed usage instructions

Signed-off-by: Arbin <[email protected]>
Remove service_host from struct flb_aws_msk_iam and construct it
dynamically in OAuth callback. This eliminates data redundancy since
service_host can be derived from region.

Also clean up unused struct flb_msk_iam_cb definition and use
flb_sds_len() instead of strlen() for consistency.

Signed-off-by: Arbin <[email protected]>
Fix critical security issue and improve code quality:

1. Fix potential buffer overread in extract_region_from_broker():
   - Changed iteration from 'start = end - 1' to 'start = end'
   - Check boundary before reading: while (start > broker && *(start - 1) != '.')
   - Eliminates undefined behavior when broker string is malformed

2. Avoid implicit NUL-termination in base64 URL encoding:
   - Replace pointer-based iteration with length-based for loop
   - Remove dependency on flb_base64_encode() NUL-termination behavior
   - Remove unused variable declaration

These changes address security concerns and improve code maintainability.

Signed-off-by: Arbin <[email protected]>
@kalavt kalavt force-pushed the feature/aws-msk-iam-clean branch from 5285c26 to 6bb3ab3 Compare December 24, 2025 16:29
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (1)
examples/kafka_filter/README.md (1)

107-114: Consider clarifying aws_region requirement wording.

Line 112 states aws_region is required "Only for custom DNS". While technically accurate, this could be made clearer as "Optional (required only for custom DNS/PrivateLink)" to emphasize it's not needed for standard MSK endpoints where auto-detection works.

🔎 Proposed clarification
-| `aws_region` | AWS region (auto-detected if not set) | Only for custom DNS |
+| `aws_region` | AWS region (auto-detected if not set) | Optional (required for custom DNS) |
📜 Review details

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 1bde90a and 6bb3ab3.

📒 Files selected for processing (3)
  • examples/kafka_filter/README.md
  • examples/kafka_filter/kafka_msk_iam.conf
  • include/fluent-bit/aws/flb_aws_msk_iam.h
🚧 Files skipped from review as they are similar to previous changes (1)
  • examples/kafka_filter/kafka_msk_iam.conf
🧰 Additional context used
🧬 Code graph analysis (1)
include/fluent-bit/aws/flb_aws_msk_iam.h (1)
src/aws/flb_aws_msk_iam.c (1)
  • flb_aws_msk_iam_register_oauth_cb (628-768)
🔇 Additional comments (1)
include/fluent-bit/aws/flb_aws_msk_iam.h (1)

31-45: LGTM! Well-designed API simplification.

The function signature change from cluster ARN to brokers+region parameters is a significant improvement. The documentation clearly explains the auto-detection behavior and parameter requirements. The implementation (from the relevant code snippets) properly validates inputs and handles region auto-detection when region is NULL.

- Add blank lines for better readability
- Fix markdown table formatting
- Correct IAM permission ARNs to include cluster UUID
- Add note about finding cluster UUID via AWS Console/CLI
- Improve section spacing and code block formatting

Signed-off-by: Arbin <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

docs-required ok-package-test Run PR packaging tests

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants