-
Notifications
You must be signed in to change notification settings - Fork 1.8k
storage: config: engine: Introduce dead letter queue #11000
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
Signed-off-by: Hiroshi Hatake <[email protected]>
Signed-off-by: Hiroshi Hatake <[email protected]>
…nvalid chunks for later verifications Signed-off-by: Hiroshi Hatake <[email protected]>
Signed-off-by: Hiroshi Hatake <[email protected]>
Signed-off-by: Hiroshi Hatake <[email protected]>
WalkthroughIntroduces DLQ (dead-letter queue) support: new config keys/fields, storage APIs to quarantine chunks, engine integration to trigger quarantine on non-retriable output failures, and filesystem-backed implementation to copy rejected chunks into a dedicated stream. Adds unit tests and builds them. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant Engine as Engine (handle_output_event)
participant Task as Output Task
participant Storage as flb_storage_quarantine_chunk
participant CIO as CIO FS Backend
participant Rej as Rejected Stream
rect rgb(235,245,255)
note over Engine: Output failure or no-retry path
Engine->>Task: Inspect result / retry state
alt DLQ enabled and chunk available
Engine->>Storage: quarantine_chunk(ctx, ch, tag, status, out_name)
Storage->>CIO: Ensure source chunk is up
Storage->>Rej: get_or_create_rejected_stream()
Storage->>Rej: Open DLQ chunk (unique name)
Storage->>Rej: Write source payload
Storage->>Rej: Sync and close
Storage-->>Engine: return 0/-1
else DLQ not available
Engine-->>Engine: Skip quarantine
end
end
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Possibly related PRs
Suggested reviewers
Poem
Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Signed-off-by: Hiroshi Hatake <[email protected]>
Signed-off-by: Hiroshi Hatake <[email protected]>
e91922d
to
4530987
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Codex Review
Here are some automated review suggestions for this pull request.
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
static int find_latest_flb_unix(const char *dir, char *out, size_t out_sz) | ||
{ | ||
DIR *d = opendir(dir); | ||
struct dirent *e; | ||
time_t best_t = 0; | ||
char best_path[1024] = {0}; | ||
struct stat st; | ||
char full[1024]; | ||
|
||
if (!d) return -1; | ||
|
||
while ((e = readdir(d)) != NULL) { | ||
size_t len = strlen(e->d_name); | ||
if (len < 5) { | ||
continue; | ||
} | ||
if (strcmp(e->d_name + (len - 4), ".flb") != 0) { | ||
continue; | ||
} | ||
|
||
join_path(full, sizeof(full), dir, e->d_name); | ||
if (stat(full, &st) == 0) { | ||
if (st.st_mtime >= best_t) { | ||
best_t = st.st_mtime; | ||
strncpy(best_path, full, sizeof(best_path)-1); | ||
} | ||
} | ||
} | ||
closedir(d); | ||
|
||
if (best_path[0] == '\0') { | ||
return -1; | ||
} | ||
strncpy(out, best_path, out_sz - 1); | ||
out[out_sz-1] = '\0'; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Include dirent declarations for POSIX directory scan helpers
The new internal test uses DIR
, struct dirent
, opendir
, readdir
, and closedir
inside the Unix implementation of find_latest_flb
but the file never includes <dirent.h>
. GCC/Clang will fail to build the test on Linux because the types and prototypes are undefined. Add the missing header (and, if needed, the Windows equivalent) so the test suite still compiles.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Nitpick comments (3)
src/flb_storage.c (1)
750-785
: Guard rejected-stream lazy creation against races; consider pre-creating at initThis function lazily creates and caches ctx->storage_rejected_stream with no synchronization. Concurrent calls (from multiple output workers) can race, leading to duplicate create attempts or a torn write to the cached pointer.
- Prefer pre-creating the rejected stream during flb_storage_create when storage_keep_rejected is enabled.
- If you keep lazy creation, add a lock or other synchronization around get/create/cache.
Also, sanitize the configured name: storage_rejected_path may contain path separators; pass a single safe stream name to ChunkIO.
If ChunkIO stream creation is guaranteed to be thread-safe and idempotent, please confirm; otherwise, adopt one of the above.
Example minimal sanitization within this function:
- struct cio_stream *st; - const char *name; + struct cio_stream *st; + const char *raw; + char name[256]; + size_t i; @@ - name = ctx->storage_rejected_path ? ctx->storage_rejected_path : "rejected"; + raw = ctx->storage_rejected_path ? ctx->storage_rejected_path : "rejected"; + snprintf(name, sizeof(name), "%s", raw); + name[sizeof(name) - 1] = '\0'; + for (i = 0; name[i] != '\0'; i++) { + if (name[i] == '/' || name[i] == '\\') { + name[i] = '_'; + } + }tests/internal/storage_dlq.c (2)
262-274
: Use join_path for portability (Windows path separators)Hardcoding "/" may fail on Windows. Reuse join_path.
- snprintf(path, sizeof(path), "%s/%s", root, stream_name); - path[sizeof(path)-1] = '\0'; + join_path(path, sizeof(path), root, stream_name);
458-463
: Avoid non-portable basename; implement a portable helperbasename() isn’t available on Windows and requires libgen.h on Unix. Use a small portable helper.
Apply this helper near other utilities:
+static const char *base_name_portable(const char *p) +{ + const char *s1 = strrchr(p, '/'); +#ifdef _WIN32 + const char *s2 = strrchr(p, '\\'); + const char *b = (s1 && s2) ? (s1 > s2 ? s1 : s2) : (s1 ? s1 : s2); +#else + const char *b = s1; +#endif + return b ? b + 1 : p; +}Then replace usage:
- /* get just the filename (basename) */ - strncpy(latest_copy, latest, sizeof(latest_copy)-1); - latest_copy[sizeof(latest_copy)-1] = '\0'; - base = basename(latest_copy); + /* get just the filename (basename) */ + base = base_name_portable(latest);And remove the now-unused latest_copy declaration:
- char latest_copy[1024];
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (7)
include/fluent-bit/flb_config.h
(2 hunks)include/fluent-bit/flb_storage.h
(2 hunks)src/flb_config.c
(3 hunks)src/flb_engine.c
(5 hunks)src/flb_storage.c
(1 hunks)tests/internal/CMakeLists.txt
(1 hunks)tests/internal/storage_dlq.c
(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (5)
src/flb_storage.c (1)
include/fluent-bit/flb_mem.h (1)
flb_free
(126-128)
include/fluent-bit/flb_storage.h (1)
src/flb_storage.c (1)
flb_storage_quarantine_chunk
(787-859)
src/flb_engine.c (5)
src/flb_input_chunk.c (1)
flb_input_chunk_get_tag
(2174-2200)src/flb_sds.c (2)
flb_sds_create_len
(58-76)flb_sds_destroy
(389-399)src/flb_input.c (1)
flb_input_name
(790-797)src/flb_output.c (1)
flb_output_name
(1087-1094)src/flb_storage.c (1)
flb_storage_quarantine_chunk
(787-859)
src/flb_config.c (1)
include/fluent-bit/flb_mem.h (1)
flb_free
(126-128)
tests/internal/storage_dlq.c (2)
src/flb_config.c (2)
flb_config_exit
(448-628)flb_config_init
(232-446)src/flb_storage.c (1)
flb_storage_quarantine_chunk
(787-859)
🔇 Additional comments (2)
src/flb_engine.c (1)
234-276
: Solid defensive gating on DLQ handling.Short-circuiting when DLQ is disabled or the task lacks a chunk keeps us off the hot path unless quarantine is actually configured, which is exactly what we need here.
tests/internal/storage_dlq.c (1)
386-397
: Confirm cio_qsort comparator usagePassing NULL as comparator assumes ChunkIO provides a sensible default. If not guaranteed, pass a comparator or drop the call.
int flb_storage_quarantine_chunk(struct flb_config *ctx, | ||
struct cio_chunk *src, | ||
const char *tag, | ||
int status_code, | ||
const char *out_name) | ||
{ | ||
#ifdef CIO_HAVE_BACKEND_FILESYSTEM | ||
struct cio_stream *dlq; | ||
void *buf = NULL; | ||
size_t size = 0; | ||
int err = 0; | ||
char name[256]; | ||
struct cio_chunk *dst; | ||
|
||
if (!ctx || !src) { | ||
return -1; | ||
} | ||
dlq = get_or_create_rejected_stream(ctx); | ||
if (!dlq) { | ||
return -1; | ||
} | ||
|
||
if (cio_chunk_is_up(src) != CIO_TRUE) { | ||
if (cio_chunk_up_force(src) != CIO_OK) { | ||
flb_warn("[storage] cannot bring chunk up to copy into DLQ"); | ||
return -1; | ||
} | ||
} | ||
|
||
if (cio_chunk_get_content_copy(src, &buf, &size) != CIO_OK || size == 0) { | ||
flb_warn("[storage] cannot read content for DLQ copy (size=%zu)", size); | ||
return -1; | ||
} | ||
|
||
/* Compose a simple, unique-ish file name */ | ||
snprintf(name, sizeof(name), | ||
"%s_%d_%s_%p.flb", | ||
tag ? tag : "no-tag", | ||
status_code, | ||
out_name ? out_name : "out", | ||
(void *) src); | ||
|
||
/* Create + write the DLQ copy */ | ||
dst = cio_chunk_open(ctx->cio, dlq, name, CIO_OPEN, size, &err); | ||
if (!dst) { | ||
flb_warn("[storage] DLQ open failed (err=%d)", err); | ||
flb_free(buf); | ||
return -1; | ||
} | ||
if (cio_chunk_write(dst, buf, size) != CIO_OK || | ||
cio_chunk_sync(dst) != CIO_OK) { | ||
flb_warn("[storage] DLQ write/sync failed"); | ||
cio_chunk_close(dst, CIO_TRUE); | ||
flb_free(buf); | ||
return -1; | ||
} | ||
|
||
cio_chunk_close(dst, CIO_FALSE); | ||
flb_free(buf); | ||
|
||
flb_info("[storage] quarantined rejected chunk into DLQ stream (bytes=%zu)", size); | ||
|
||
return 0; | ||
#else | ||
FLB_UNUSED(ctx); | ||
FLB_UNUSED(src); | ||
FLB_UNUSED(tag); | ||
FLB_UNUSED(status_code); | ||
FLB_UNUSED(out_name); | ||
|
||
return -1; | ||
#endif | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion | 🟠 Major
🧩 Analysis chain
Restore source chunk’s up/down state after copying; sanitize DLQ filename components
-
You force the source chunk “up” but never restore it if it was originally “down”, which can increase memory residency. Capture was_up and bring it back down after copying.
-
tag and out_name may contain filesystem-hostile characters (e.g., '/', '\', ':'). Sanitize before composing name to avoid invalid paths or traversal.
Confirm the appropriate API to bring a chunk back down (e.g., cio_chunk_down or equivalent) is available.
Suggested changes:
- if (cio_chunk_is_up(src) != CIO_TRUE) {
+ int was_up = (cio_chunk_is_up(src) == CIO_TRUE);
+ if (!was_up) {
if (cio_chunk_up_force(src) != CIO_OK) {
flb_warn("[storage] cannot bring chunk up to copy into DLQ");
return -1;
}
}
@@
- snprintf(name, sizeof(name),
- "%s_%d_%s_%p.flb",
- tag ? tag : "no-tag",
- status_code,
- out_name ? out_name : "out",
- (void *) src);
+ /* sanitize tag/out_name to avoid path separators */
+ const char *raw_tag = tag ? tag : "no-tag";
+ const char *raw_out = out_name ? out_name : "out";
+ char safe_tag[96], safe_out[64];
+ size_t i;
+ snprintf(safe_tag, sizeof(safe_tag), "%s", raw_tag);
+ safe_tag[sizeof(safe_tag)-1] = '\0';
+ for (i = 0; safe_tag[i] != '\0'; i++) {
+ if (safe_tag[i] == '/' || safe_tag[i] == '\\' || safe_tag[i] == ':') {
+ safe_tag[i] = '_';
+ }
+ }
+ snprintf(safe_out, sizeof(safe_out), "%s", raw_out);
+ safe_out[sizeof(safe_out)-1] = '\0';
+ for (i = 0; safe_out[i] != '\0'; i++) {
+ if (safe_out[i] == '/' || safe_out[i] == '\\' || safe_out[i] == ':') {
+ safe_out[i] = '_';
+ }
+ }
+ snprintf(name, sizeof(name), "%s_%d_%s_%p.flb",
+ safe_tag, status_code, safe_out, (void *) src);
@@
- flb_free(buf);
+ flb_free(buf);
+
+ if (!was_up) {
+ /* restore original state if API available */
+ (void) cio_chunk_down(src);
+ }
🏁 Script executed:
#!/bin/bash
rg -nP 'cio_chunk_down\(' -C2
rg -nP 'chunk_down\(' -C2
rg -nP 'cio_chunk_is_up\(' -C2
rg -nP 'cio_chunk_up_force\(' -C2
Length of output: 35734
Restore original chunk state after DLQ copy; sanitize filename components
- Record
was_up = (cio_chunk_is_up(src) == CIO_TRUE)
, callcio_chunk_up_force(src)
if needed, then after writing invokecio_chunk_down(src)
when!was_up
. - Copy
tag
/out_name
into fixed-size buffers and replace any/
,\
or:
chars with_
before composingname
.
🤖 Prompt for AI Agents
In src/flb_storage.c around lines 787 to 859, the DLQ copy logic leaves the
original chunk state changed and uses raw tag/out_name when building the file
name; record whether the source chunk was up (was_up = cio_chunk_is_up(src) ==
CIO_TRUE), only call cio_chunk_up_force(src) if required, and after the DLQ
write restore the original state by calling cio_chunk_down(src) when was_up is
false; additionally copy tag and out_name into fixed-size local buffers, replace
any '/' '\' or ':' characters with '_' to sanitize them before using snprintf to
compose name so the resulting filename is safe.
#include <stdio.h> | ||
#include <string.h> | ||
#include <errno.h> | ||
#include <assert.h> | ||
#include <sys/stat.h> | ||
|
||
#include <fluent-bit/flb_compat.h> | ||
#include <fluent-bit/flb_info.h> | ||
#include <fluent-bit/flb_mem.h> | ||
#include <fluent-bit/flb_config.h> | ||
#include <fluent-bit/flb_storage.h> | ||
#include <fluent-bit/flb_str.h> | ||
#include <fluent-bit/flb_utils.h> | ||
|
||
#include <chunkio/chunkio.h> | ||
|
||
#include "flb_tests_internal.h" | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add missing OS headers for dir traversal and PID; avoid build breaks
find_latest_flb_unix uses dirent/stat; tmpdir_for uses getpid. Include the proper headers on non-Windows.
Apply:
#include <stdio.h>
#include <string.h>
#include <errno.h>
#include <assert.h>
#include <sys/stat.h>
+#ifndef _WIN32
+#include <dirent.h>
+#include <unistd.h>
+#endif
🤖 Prompt for AI Agents
In tests/internal/storage_dlq.c around lines 2 to 19, the file uses dirent/stat
functions in find_latest_flb_unix and getpid in tmpdir_for but doesn't include
the required OS headers; add the missing headers by including <dirent.h> and
<unistd.h> (wrapped in a non-Windows guard, e.g., #ifndef _WIN32 ... #endif) so
dir traversal and getpid compile on Unix-like systems.
This is still WIP PR and working for introducing DLQ.
This is because the current Fluent Bit mechanism does not offer for preserving invalid chunks for requesting via network.
So, if users encounter such behavior, the chunks will be just deleted and there is no clues to solve it.
Related to #9363.
Enter
[N/A]
in the box, if an item is not applicable to your change.Testing
Before we can approve your change; please submit the following in a comment:
If this is a change to packaging of containers or native binaries then please confirm it works for all targets.
ok-package-test
label to test for all targets (requires maintainer to do).Documentation
Backporting
Fluent Bit is licensed under Apache 2.0, by submitting this pull request I understand that this code will be released under the terms of that license.
Summary by CodeRabbit
New Features
Tests