Skip to content

Conversation

cosmo0920
Copy link
Contributor

@cosmo0920 cosmo0920 commented Oct 8, 2025

This is still WIP PR and working for introducing DLQ.
This is because the current Fluent Bit mechanism does not offer for preserving invalid chunks for requesting via network.
So, if users encounter such behavior, the chunks will be just deleted and there is no clues to solve it.

Related to #9363.


Enter [N/A] in the box, if an item is not applicable to your change.

Testing
Before we can approve your change; please submit the following in a comment:

  • Example configuration file for the change
  • Debug log output from testing the change
  • Attached Valgrind output that shows no leaks or memory corruption was found

If this is a change to packaging of containers or native binaries then please confirm it works for all targets.

  • Run local packaging test showing all targets (including any new ones) build.
  • Set ok-package-test label to test for all targets (requires maintainer to do).

Documentation

  • Documentation required for this feature

Backporting

  • Backport to latest stable release.

Fluent Bit is licensed under Apache 2.0, by submitting this pull request I understand that this code will be released under the terms of that license.

Summary by CodeRabbit

  • New Features

    • Introduced a Dead-Letter Queue (DLQ) to capture records from non-retriable output failures when enabled.
    • Added service configuration options to control DLQ behavior (enable/disable and destination path).
    • Failed records are quarantined to filesystem-backed storage for later inspection, with informative filenames including context.
  • Tests

    • Added unit tests covering DLQ write and read flows, including verification of payload content.
    • Added tests for DLQ-disabled behavior to ensure no artifacts are created.

Copy link

coderabbitai bot commented Oct 8, 2025

Walkthrough

Introduces DLQ (dead-letter queue) support: new config keys/fields, storage APIs to quarantine chunks, engine integration to trigger quarantine on non-retriable output failures, and filesystem-backed implementation to copy rejected chunks into a dedicated stream. Adds unit tests and builds them.

Changes

Cohort / File(s) Summary
Config: fields and keys
include/fluent-bit/flb_config.h, src/flb_config.c
Adds DLQ-related config fields (storage_keep_rejected, storage_rejected_path, storage_rejected_stream) and public keys (storage.keep.rejected, storage.rejected.path); initializes and frees new fields; exposes options in service config table.
Storage public API
include/fluent-bit/flb_storage.h
Adds forward declaration for struct flb_input_instance and new public functions: flb_storage_create, flb_storage_input_create, flb_storage_quarantine_chunk(...).
Storage implementation (DLQ)
src/flb_storage.c
Implements DLQ quarantine: creates/gets a “rejected” stream (filesystem only) and copies chunk data into a uniquely named DLQ chunk; guarded by CIO_HAVE_BACKEND_FILESYSTEM.
Engine integration
src/flb_engine.c
Adds handle_dlq_if_available(...) and invokes it on output failure paths lacking retries or when retry scheduling/creation fails.
Tests: build
tests/internal/CMakeLists.txt
Registers new unit test source storage_dlq.c.
Tests: DLQ scenarios
tests/internal/storage_dlq.c
Adds unit tests for DLQ enabled/disabled scenarios, helpers to create temp storage, write/read chunks, and validate DLQ contents on filesystem.

Sequence Diagram(s)

sequenceDiagram
  autonumber
  participant Engine as Engine (handle_output_event)
  participant Task as Output Task
  participant Storage as flb_storage_quarantine_chunk
  participant CIO as CIO FS Backend
  participant Rej as Rejected Stream

  rect rgb(235,245,255)
  note over Engine: Output failure or no-retry path
  Engine->>Task: Inspect result / retry state
  alt DLQ enabled and chunk available
    Engine->>Storage: quarantine_chunk(ctx, ch, tag, status, out_name)
    Storage->>CIO: Ensure source chunk is up
    Storage->>Rej: get_or_create_rejected_stream()
    Storage->>Rej: Open DLQ chunk (unique name)
    Storage->>Rej: Write source payload
    Storage->>Rej: Sync and close
    Storage-->>Engine: return 0/-1
  else DLQ not available
    Engine-->>Engine: Skip quarantine
  end
  end
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

Possibly related PRs

Suggested reviewers

  • edsiper
  • koleini
  • nokute78
  • PettitWesley

Poem

A packet hopped and lost its way,
I found a burrow where it could stay—DLQ bay.
With tidy paws I file each byte,
In “rejected” meadows, safe at night.
When outputs fail and retriers rue,
I stash the crumbs—log bunny’s coup. 🐇📦

Pre-merge checks and finishing touches

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 10.00% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title Check ✅ Passed The title succinctly describes the addition of a dead letter queue mechanism, accurately reflecting the primary change introduced by the pull request.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch cosmo0920-introduce-dead-letter-queue

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@cosmo0920 cosmo0920 requested a review from koleini as a code owner October 9, 2025 10:59
Copy link

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Comment on lines +165 to +199
static int find_latest_flb_unix(const char *dir, char *out, size_t out_sz)
{
DIR *d = opendir(dir);
struct dirent *e;
time_t best_t = 0;
char best_path[1024] = {0};
struct stat st;
char full[1024];

if (!d) return -1;

while ((e = readdir(d)) != NULL) {
size_t len = strlen(e->d_name);
if (len < 5) {
continue;
}
if (strcmp(e->d_name + (len - 4), ".flb") != 0) {
continue;
}

join_path(full, sizeof(full), dir, e->d_name);
if (stat(full, &st) == 0) {
if (st.st_mtime >= best_t) {
best_t = st.st_mtime;
strncpy(best_path, full, sizeof(best_path)-1);
}
}
}
closedir(d);

if (best_path[0] == '\0') {
return -1;
}
strncpy(out, best_path, out_sz - 1);
out[out_sz-1] = '\0';

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Include dirent declarations for POSIX directory scan helpers

The new internal test uses DIR, struct dirent, opendir, readdir, and closedir inside the Unix implementation of find_latest_flb but the file never includes <dirent.h>. GCC/Clang will fail to build the test on Linux because the types and prototypes are undefined. Add the missing header (and, if needed, the Windows equivalent) so the test suite still compiles.

Useful? React with 👍 / 👎.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (3)
src/flb_storage.c (1)

750-785: Guard rejected-stream lazy creation against races; consider pre-creating at init

This function lazily creates and caches ctx->storage_rejected_stream with no synchronization. Concurrent calls (from multiple output workers) can race, leading to duplicate create attempts or a torn write to the cached pointer.

  • Prefer pre-creating the rejected stream during flb_storage_create when storage_keep_rejected is enabled.
  • If you keep lazy creation, add a lock or other synchronization around get/create/cache.

Also, sanitize the configured name: storage_rejected_path may contain path separators; pass a single safe stream name to ChunkIO.

If ChunkIO stream creation is guaranteed to be thread-safe and idempotent, please confirm; otherwise, adopt one of the above.

Example minimal sanitization within this function:

-    struct cio_stream *st;
-    const char *name;
+    struct cio_stream *st;
+    const char *raw;
+    char name[256];
+    size_t i;
@@
-    name = ctx->storage_rejected_path ? ctx->storage_rejected_path : "rejected";
+    raw = ctx->storage_rejected_path ? ctx->storage_rejected_path : "rejected";
+    snprintf(name, sizeof(name), "%s", raw);
+    name[sizeof(name) - 1] = '\0';
+    for (i = 0; name[i] != '\0'; i++) {
+        if (name[i] == '/' || name[i] == '\\') {
+            name[i] = '_';
+        }
+    }
tests/internal/storage_dlq.c (2)

262-274: Use join_path for portability (Windows path separators)

Hardcoding "/" may fail on Windows. Reuse join_path.

-    snprintf(path, sizeof(path), "%s/%s", root, stream_name);
-    path[sizeof(path)-1] = '\0';
+    join_path(path, sizeof(path), root, stream_name);

458-463: Avoid non-portable basename; implement a portable helper

basename() isn’t available on Windows and requires libgen.h on Unix. Use a small portable helper.

Apply this helper near other utilities:

+static const char *base_name_portable(const char *p)
+{
+    const char *s1 = strrchr(p, '/');
+#ifdef _WIN32
+    const char *s2 = strrchr(p, '\\');
+    const char *b = (s1 && s2) ? (s1 > s2 ? s1 : s2) : (s1 ? s1 : s2);
+#else
+    const char *b = s1;
+#endif
+    return b ? b + 1 : p;
+}

Then replace usage:

-    /* get just the filename (basename) */
-    strncpy(latest_copy, latest, sizeof(latest_copy)-1);
-    latest_copy[sizeof(latest_copy)-1] = '\0';
-    base = basename(latest_copy);
+    /* get just the filename (basename) */
+    base = base_name_portable(latest);

And remove the now-unused latest_copy declaration:

-    char latest_copy[1024];
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 912b7d7 and 4530987.

📒 Files selected for processing (7)
  • include/fluent-bit/flb_config.h (2 hunks)
  • include/fluent-bit/flb_storage.h (2 hunks)
  • src/flb_config.c (3 hunks)
  • src/flb_engine.c (5 hunks)
  • src/flb_storage.c (1 hunks)
  • tests/internal/CMakeLists.txt (1 hunks)
  • tests/internal/storage_dlq.c (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (5)
src/flb_storage.c (1)
include/fluent-bit/flb_mem.h (1)
  • flb_free (126-128)
include/fluent-bit/flb_storage.h (1)
src/flb_storage.c (1)
  • flb_storage_quarantine_chunk (787-859)
src/flb_engine.c (5)
src/flb_input_chunk.c (1)
  • flb_input_chunk_get_tag (2174-2200)
src/flb_sds.c (2)
  • flb_sds_create_len (58-76)
  • flb_sds_destroy (389-399)
src/flb_input.c (1)
  • flb_input_name (790-797)
src/flb_output.c (1)
  • flb_output_name (1087-1094)
src/flb_storage.c (1)
  • flb_storage_quarantine_chunk (787-859)
src/flb_config.c (1)
include/fluent-bit/flb_mem.h (1)
  • flb_free (126-128)
tests/internal/storage_dlq.c (2)
src/flb_config.c (2)
  • flb_config_exit (448-628)
  • flb_config_init (232-446)
src/flb_storage.c (1)
  • flb_storage_quarantine_chunk (787-859)
🔇 Additional comments (2)
src/flb_engine.c (1)

234-276: Solid defensive gating on DLQ handling.

Short-circuiting when DLQ is disabled or the task lacks a chunk keeps us off the hot path unless quarantine is actually configured, which is exactly what we need here.

tests/internal/storage_dlq.c (1)

386-397: Confirm cio_qsort comparator usage

Passing NULL as comparator assumes ChunkIO provides a sensible default. If not guaranteed, pass a comparator or drop the call.

Comment on lines +787 to +859
int flb_storage_quarantine_chunk(struct flb_config *ctx,
struct cio_chunk *src,
const char *tag,
int status_code,
const char *out_name)
{
#ifdef CIO_HAVE_BACKEND_FILESYSTEM
struct cio_stream *dlq;
void *buf = NULL;
size_t size = 0;
int err = 0;
char name[256];
struct cio_chunk *dst;

if (!ctx || !src) {
return -1;
}
dlq = get_or_create_rejected_stream(ctx);
if (!dlq) {
return -1;
}

if (cio_chunk_is_up(src) != CIO_TRUE) {
if (cio_chunk_up_force(src) != CIO_OK) {
flb_warn("[storage] cannot bring chunk up to copy into DLQ");
return -1;
}
}

if (cio_chunk_get_content_copy(src, &buf, &size) != CIO_OK || size == 0) {
flb_warn("[storage] cannot read content for DLQ copy (size=%zu)", size);
return -1;
}

/* Compose a simple, unique-ish file name */
snprintf(name, sizeof(name),
"%s_%d_%s_%p.flb",
tag ? tag : "no-tag",
status_code,
out_name ? out_name : "out",
(void *) src);

/* Create + write the DLQ copy */
dst = cio_chunk_open(ctx->cio, dlq, name, CIO_OPEN, size, &err);
if (!dst) {
flb_warn("[storage] DLQ open failed (err=%d)", err);
flb_free(buf);
return -1;
}
if (cio_chunk_write(dst, buf, size) != CIO_OK ||
cio_chunk_sync(dst) != CIO_OK) {
flb_warn("[storage] DLQ write/sync failed");
cio_chunk_close(dst, CIO_TRUE);
flb_free(buf);
return -1;
}

cio_chunk_close(dst, CIO_FALSE);
flb_free(buf);

flb_info("[storage] quarantined rejected chunk into DLQ stream (bytes=%zu)", size);

return 0;
#else
FLB_UNUSED(ctx);
FLB_UNUSED(src);
FLB_UNUSED(tag);
FLB_UNUSED(status_code);
FLB_UNUSED(out_name);

return -1;
#endif
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

🧩 Analysis chain

Restore source chunk’s up/down state after copying; sanitize DLQ filename components

  • You force the source chunk “up” but never restore it if it was originally “down”, which can increase memory residency. Capture was_up and bring it back down after copying.

  • tag and out_name may contain filesystem-hostile characters (e.g., '/', '\', ':'). Sanitize before composing name to avoid invalid paths or traversal.

Confirm the appropriate API to bring a chunk back down (e.g., cio_chunk_down or equivalent) is available.

Suggested changes:

-    if (cio_chunk_is_up(src) != CIO_TRUE) {
+    int was_up = (cio_chunk_is_up(src) == CIO_TRUE);
+    if (!was_up) {
         if (cio_chunk_up_force(src) != CIO_OK) {
             flb_warn("[storage] cannot bring chunk up to copy into DLQ");
             return -1;
         }
     }
@@
-    snprintf(name, sizeof(name),
-             "%s_%d_%s_%p.flb",
-             tag ? tag : "no-tag",
-             status_code,
-             out_name ? out_name : "out",
-             (void *) src);
+    /* sanitize tag/out_name to avoid path separators */
+    const char *raw_tag = tag ? tag : "no-tag";
+    const char *raw_out = out_name ? out_name : "out";
+    char safe_tag[96], safe_out[64];
+    size_t i;
+    snprintf(safe_tag, sizeof(safe_tag), "%s", raw_tag);
+    safe_tag[sizeof(safe_tag)-1] = '\0';
+    for (i = 0; safe_tag[i] != '\0'; i++) {
+        if (safe_tag[i] == '/' || safe_tag[i] == '\\' || safe_tag[i] == ':') {
+            safe_tag[i] = '_';
+        }
+    }
+    snprintf(safe_out, sizeof(safe_out), "%s", raw_out);
+    safe_out[sizeof(safe_out)-1] = '\0';
+    for (i = 0; safe_out[i] != '\0'; i++) {
+        if (safe_out[i] == '/' || safe_out[i] == '\\' || safe_out[i] == ':') {
+            safe_out[i] = '_';
+        }
+    }
+    snprintf(name, sizeof(name), "%s_%d_%s_%p.flb",
+             safe_tag, status_code, safe_out, (void *) src);
@@
-    flb_free(buf);
+    flb_free(buf);
+
+    if (!was_up) {
+        /* restore original state if API available */
+        (void) cio_chunk_down(src);
+    }

🏁 Script executed:

#!/bin/bash
rg -nP 'cio_chunk_down\(' -C2
rg -nP 'chunk_down\(' -C2
rg -nP 'cio_chunk_is_up\(' -C2
rg -nP 'cio_chunk_up_force\(' -C2

Length of output: 35734


Restore original chunk state after DLQ copy; sanitize filename components

  • Record was_up = (cio_chunk_is_up(src) == CIO_TRUE), call cio_chunk_up_force(src) if needed, then after writing invoke cio_chunk_down(src) when !was_up.
  • Copy tag/out_name into fixed-size buffers and replace any /, \ or : chars with _ before composing name.
🤖 Prompt for AI Agents
In src/flb_storage.c around lines 787 to 859, the DLQ copy logic leaves the
original chunk state changed and uses raw tag/out_name when building the file
name; record whether the source chunk was up (was_up = cio_chunk_is_up(src) ==
CIO_TRUE), only call cio_chunk_up_force(src) if required, and after the DLQ
write restore the original state by calling cio_chunk_down(src) when was_up is
false; additionally copy tag and out_name into fixed-size local buffers, replace
any '/' '\' or ':' characters with '_' to sanitize them before using snprintf to
compose name so the resulting filename is safe.

Comment on lines +2 to +19
#include <stdio.h>
#include <string.h>
#include <errno.h>
#include <assert.h>
#include <sys/stat.h>

#include <fluent-bit/flb_compat.h>
#include <fluent-bit/flb_info.h>
#include <fluent-bit/flb_mem.h>
#include <fluent-bit/flb_config.h>
#include <fluent-bit/flb_storage.h>
#include <fluent-bit/flb_str.h>
#include <fluent-bit/flb_utils.h>

#include <chunkio/chunkio.h>

#include "flb_tests_internal.h"

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Add missing OS headers for dir traversal and PID; avoid build breaks

find_latest_flb_unix uses dirent/stat; tmpdir_for uses getpid. Include the proper headers on non-Windows.

Apply:

 #include <stdio.h>
 #include <string.h>
 #include <errno.h>
 #include <assert.h>
 #include <sys/stat.h>
+#ifndef _WIN32
+#include <dirent.h>
+#include <unistd.h>
+#endif
🤖 Prompt for AI Agents
In tests/internal/storage_dlq.c around lines 2 to 19, the file uses dirent/stat
functions in find_latest_flb_unix and getpid in tmpdir_for but doesn't include
the required OS headers; add the missing headers by including <dirent.h> and
<unistd.h> (wrapped in a non-Windows guard, e.g., #ifndef _WIN32 ... #endif) so
dir traversal and getpid compile on Unix-like systems.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant