Skip to content

Conversation

@kalavt
Copy link

@kalavt kalavt commented Dec 22, 2025

The S3 storage is good place for cold log storage, especially Athena query with data lake (parquet)
current implementation:

  • limit file upload with 50MB, which might be more small with parquet format (1-10MB)
  • will lead to S3 file fragmentation issue (small files), inefficient cost and query performance.
  • no multi part upload support
  • no official build of docker image
  • semantic error of compression - should be format (Json, Parquet, Avro etc.)

hence propose PR for enhance


Enter [N/A] in the box, if an item is not applicable to your change.

Testing
Before we can approve your change; please submit the following in a comment:

  • Example configuration file for the change
  • Debug log output from testing the change
  • Attached Valgrind output that shows no leaks or memory corruption was found

If this is a change to packaging of containers or native binaries then please confirm it works for all targets.

  • Run local packaging test showing all targets (including any new ones) build.
  • Set ok-package-test label to test for all targets (requires maintainer to do).

Documentation

  • Documentation required for this feature

Backporting

  • Backport to latest stable release.

Fluent Bit is licensed under Apache 2.0, by submitting this pull request I understand that this code will be released under the terms of that license.

Summary by CodeRabbit

  • New Features

    • Added Parquet output format for S3 with automated batching, conversion and optimized upload paths.
    • Docker build now accepts a configurable Dockerfile selection for different build variants.
  • Build/Infrastructure

    • Introduced staged "full" and "standard" image variants (production and debug) and multi-architecture build support.
    • Added a staging build matrix to produce variant-specific images and versioning.

✏️ Tip: You can customize this high-level summary in your review settings.

@coderabbitai
Copy link

coderabbitai bot commented Dec 22, 2025

Note

Other AI code review bot(s) detected

CodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review.

Walkthrough

Adds configurable Dockerfile input to the reusable build workflow, introduces a new multi-stage multi-arch Dockerfile.full, and implements Parquet output support in the S3 plugin via a new parquet batch manager and JSON→Parquet conversion/upload paths.

Changes

Cohort / File(s) Summary
Workflow config
\.github/workflows/call-build-images.yaml, \.github/workflows/staging-build.yaml
Add reusable workflow input dockerfile and make staging build a matrix with standard/full variants; propagate dockerfile param to build calls and normalize quoting/formatting.
Docker build
dockerfiles/Dockerfile.full
New multi-stage Dockerfile implementing multi-arch builds (QEMU), builder stages, distroless production image, debug image, JSON schema generation, and RELEASE_VERSION ARG usage.
S3 plugin header
plugins/out_s3/s3.h
Extend struct flb_s3 with format, parquet_batches, and parquet_batch_lock; add FLB_S3_FORMAT_JSON/FLB_S3_FORMAT_PARQUET constants and declarations for parquet_batch_init, parquet_batch_destroy, and s3_put_object.
S3 plugin core
plugins/out_s3/s3.c
Make s3_put_object public (remove static); add format parsing and handling, integrate parquet-specific logic (batching, legacy compression handling), and wire parquet batch lifecycle into init/flush/exit paths.
S3 Parquet module
plugins/out_s3/s3_parquet.c
New module implementing parquet batch manager: batch lifecycle (init/destroy), per-tag batch tracking, thresholds, JSON→Parquet conversion, optional gzip/zstd compression, and dual upload paths (PutObject and multipart), with logging and cleanup.

Sequence Diagram(s)

sequenceDiagram
    participant Producer as FluentBit Core
    participant Plugin as S3 Plugin (out_s3)
    participant BatchMgr as Parquet Batch Manager
    participant Conv as JSON→Parquet Converter
    participant S3 as S3 Uploader

    Note over Producer,Plugin: Flush event (format=parquet)
    Producer->>Plugin: cb_s3_flush(data, tag)
    Plugin->>BatchMgr: append JSON to per-tag batch
    BatchMgr->>BatchMgr: evaluate thresholds (size/count/timeout)
    alt Batch not ready
        BatchMgr-->>Plugin: buffer retained
    else Batch ready
        BatchMgr->>Conv: convert buffered JSON -> Parquet file
        Conv->>Conv: optional compression (gzip/zstd)
        alt small file
            Conv->>S3: call s3_put_object (PutObject)
        else large file
            Conv->>S3: initiate multipart upload and stream parts
        end
        S3-->>BatchMgr: upload result
        BatchMgr->>BatchMgr: remove batch & cleanup
    end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

  • Areas needing careful review:
    • plugins/out_s3/s3_parquet.c — conversion correctness, temp file management, error paths, memory/resource cleanup.
    • Integration points in plugins/out_s3/s3.c — thread-safety, batch lifecycle wiring, and behavior changes when format=parquet.
    • Header/API changes in plugins/out_s3/s3.h — public API and struct layout modifications.
    • dockerfiles/Dockerfile.full — multi-arch build correctness and produced artifacts.
    • Workflow changes — ensure dockerfile input propagation and matrix variants function as intended.

Possibly related PRs

Suggested reviewers

  • niedbalski
  • patrick-stephens
  • celalettin1286
  • PettitWesley
  • edsiper

Poem

🐰 I hopped through builds and batches bright,
Dockerfiles stacked to multiarc height,
JSON gathered, Parquet formed with care,
Uploaded in parts through the crisp cold air,
Happy builds — carrots for all, I declare! 🥕

Pre-merge checks and finishing touches

❌ Failed checks (1 warning, 1 inconclusive)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 58.33% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
Title check ❓ Inconclusive The title 'WIP: Feature/parquet multipart upload' is partially related to the changeset but uses vague terminology and doesn't clearly convey the main change. Replace 'WIP: Feature/' prefix with a clear, descriptive title that explains the primary change (e.g., 'Add Parquet multipart upload support to S3 output plugin').
✅ Passed checks (1 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

ℹ️ About Codex in GitHub

Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".

Comment on lines +3859 to +3881
batch->chunk = s3_store_file_get(ctx, event_chunk->tag,
flb_sds_len(event_chunk->tag));
}

ret = s3_store_buffer_put(ctx, batch->chunk, event_chunk->tag,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge First parquet batch can never flush without follow-up events

When starting a new parquet batch, the code looks up an existing store file but leaves batch->chunk NULL if none exists while s3_store_buffer_put silently creates a new file. The subsequent conversion check skips whenever batch->chunk is NULL, so a tag whose very first chunk exceeds total_file_size or times out will never be converted/uploaded until another event arrives or shutdown forces a drain. Persist the newly created store file back into batch->chunk (or re-fetch after buffering) before calling parquet_batch_should_convert so the first batch can flush on size/timeout without waiting for more traffic.

Useful? React with 👍 / 👎.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you take a look on this, @kalavt ?

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

🧹 Nitpick comments (4)
dockerfiles/Dockerfile.full (2)

198-199: Arrow/Parquet package versions are hardcoded.

The packages libarrow-glib2200 and libparquet-glib2200 have version-specific names (2200 suffix). When Apache Arrow releases a new major version, these package names will change and break the build.

Consider using a version-agnostic approach or documenting the version dependency clearly.


291-298: Consider organizing debug packages into logical groups.

The debug package list mixes network tools, debugging utilities, and build dependencies on a single line. While functional, grouping them with line continuations (like done for network tools on lines 293-295) would improve readability.

plugins/out_s3/s3.h (1)

242-244: Verify function signature consistency.

The s3_put_object declaration uses const char *tag while the implementation in s3.c line 1537 also uses const char *tag. This is consistent, but the third parameter name differs slightly between declaration (create_time) and implementation (file_first_log_time). Consider aligning parameter names for clarity.

plugins/out_s3/s3.c (1)

44-44: Unconventional code organization: s3_parquet.c is included directly instead of being separately compiled.

The pattern of including a .c file directly reduces modularity and makes the codebase harder to navigate. Since s3_parquet.c is only included in s3.c and not compiled separately, consider refactoring to improve code organization:

  1. Move shared helper functions that s3_parquet.c needs into a public or internal header
  2. Create s3_internal.h for functions shared between s3.c and s3_parquet.c, making them non-static
  3. Compile s3_parquet.c separately as a standard source file in CMakeLists.txt
📜 Review details

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between b9f26a9 and 5a31cd3.

📒 Files selected for processing (6)
  • .github/workflows/call-build-images.yaml
  • .github/workflows/staging-build.yaml
  • dockerfiles/Dockerfile.full
  • plugins/out_s3/s3.c
  • plugins/out_s3/s3.h
  • plugins/out_s3/s3_parquet.c
🧰 Additional context used
🧠 Learnings (6)
📚 Learning: 2025-09-14T09:46:09.531Z
Learnt from: aminvakil
Repo: fluent/fluent-bit PR: 10844
File: conf/fluent-bit:13-15
Timestamp: 2025-09-14T09:46:09.531Z
Learning: For fluent-bit Debian packaging, /opt/fluent-bit/bin/ is the appropriate installation path since the package may be installed from non-official Debian sources, making /opt compliant with FHS for optional software packages.

Applied to files:

  • dockerfiles/Dockerfile.full
📚 Learning: 2025-08-29T06:25:02.561Z
Learnt from: shadowshot-x
Repo: fluent/fluent-bit PR: 10794
File: tests/internal/aws_compress.c:7-7
Timestamp: 2025-08-29T06:25:02.561Z
Learning: In Fluent Bit, ZSTD (zstandard) compression library is bundled directly in the source tree at `lib/zstd-1.5.7` and is built unconditionally as a static library. Unlike optional external dependencies, ZSTD does not use conditional compilation guards like `FLB_HAVE_ZSTD` and is always available. Headers like `<fluent-bit/flb_zstd.h>` can be included directly without guards.

Applied to files:

  • plugins/out_s3/s3.c
📚 Learning: 2025-08-29T06:25:27.250Z
Learnt from: shadowshot-x
Repo: fluent/fluent-bit PR: 10794
File: tests/internal/aws_compress.c:93-107
Timestamp: 2025-08-29T06:25:27.250Z
Learning: In Fluent Bit, ZSTD compression is enabled by default and is treated as a core dependency, not requiring conditional compilation guards like `#ifdef FLB_HAVE_ZSTD`. Unlike some other optional components such as ARROW/PARQUET (which use `#ifdef FLB_HAVE_ARROW` guards), ZSTD support is always available and doesn't need build-time conditionals. ZSTD headers are included directly without guards across multiple plugins and core components.

Applied to files:

  • plugins/out_s3/s3.c
📚 Learning: 2025-08-29T06:24:26.170Z
Learnt from: shadowshot-x
Repo: fluent/fluent-bit PR: 10794
File: tests/internal/aws_compress.c:39-42
Timestamp: 2025-08-29T06:24:26.170Z
Learning: In Fluent Bit, ZSTD compression support is enabled by default and does not require conditional compilation guards (like #ifdef FLB_HAVE_ZSTD) around ZSTD-related code declarations and implementations.

Applied to files:

  • plugins/out_s3/s3.c
📚 Learning: 2025-08-29T06:25:27.250Z
Learnt from: shadowshot-x
Repo: fluent/fluent-bit PR: 10794
File: tests/internal/aws_compress.c:93-107
Timestamp: 2025-08-29T06:25:27.250Z
Learning: In Fluent Bit, ZSTD compression is enabled by default and is treated as a core dependency, not requiring conditional compilation guards like `#ifdef FLB_HAVE_ZSTD`. Unlike some other optional components, ZSTD support is always available and doesn't need build-time conditionals.

Applied to files:

  • plugins/out_s3/s3.c
📚 Learning: 2025-08-29T06:24:55.855Z
Learnt from: shadowshot-x
Repo: fluent/fluent-bit PR: 10794
File: src/aws/flb_aws_compress.c:52-56
Timestamp: 2025-08-29T06:24:55.855Z
Learning: ZSTD compression is always available in Fluent Bit and does not require conditional compilation guards. Unlike Arrow/Parquet which use #ifdef FLB_HAVE_ARROW guards, ZSTD is built unconditionally with flb_zstd.c included directly in src/CMakeLists.txt and a bundled ZSTD library at lib/zstd-1.5.7/.

Applied to files:

  • plugins/out_s3/s3.c
🧬 Code graph analysis (3)
plugins/out_s3/s3.c (1)
plugins/out_s3/s3_parquet.c (5)
  • parquet_batch_init (42-59)
  • parquet_batch_get_or_create (85-139)
  • parquet_batch_should_convert (142-168)
  • parquet_batch_convert_and_upload (172-360)
  • parquet_batch_destroy (62-82)
plugins/out_s3/s3_parquet.c (3)
include/fluent-bit/flb_mem.h (2)
  • flb_free (126-128)
  • flb_calloc (84-96)
plugins/out_s3/s3.c (1)
  • s3_put_object (1537-1674)
plugins/out_s3/s3_multipart.c (2)
  • create_multipart_upload (558-639)
  • upload_part (685-799)
plugins/out_s3/s3.h (2)
plugins/out_s3/s3_parquet.c (2)
  • parquet_batch_init (42-59)
  • parquet_batch_destroy (62-82)
plugins/out_s3/s3.c (1)
  • s3_put_object (1537-1674)
🔇 Additional comments (22)
.github/workflows/call-build-images.yaml (3)

42-46: LGTM! Well-designed configurable Dockerfile input.

The new dockerfile input is properly defined with a sensible default path and clear description. This enables flexibility for building different image variants (e.g., the new Dockerfile.full) while maintaining backward compatibility.


398-399: LGTM! Explicit string quoting for matrix values.

Quoting "2022" and "2025" as explicit strings prevents YAML type coercion issues where integers could be misinterpreted. This is a good defensive practice.


126-126: The dockerfile input is used safely in this workflow. All callers are internal to the repository and either use the secure default value or pass hardcoded paths from the workflow matrix. The reusable workflow mechanism restricts invocation to trusted internal workflows only, and path traversal risks are minimal given the Git context usage. No additional input validation is required.

.github/workflows/staging-build.yaml (4)

7-7: Verify the wildcard tag trigger is intentional.

The tag trigger "*" matches any tag pushed to the repository. This is very permissive and will trigger staging builds for all tags (not just version tags like v*). Confirm this is the intended behavior.


49-54: LGTM! Version extraction logic is clean.

The refactored version extraction logic correctly handles the three trigger scenarios (manual dispatch, cron, tag) with clear fallback to master.


72-90: LGTM! Well-structured matrix for multi-variant builds.

The matrix strategy cleanly defines standard and full variants with their corresponding Dockerfile paths and version suffixes. The version composition (${{ needs.staging-build-get-meta.outputs.version }}${{ matrix.variant.suffix }}) correctly appends the suffix (empty for standard, -full for full builds).


108-108: Schema artifact download may fail for the full variant.

The schema artifact is downloaded using fluent-bit-schema-${{ needs.staging-build-get-meta.outputs.version }}, which does not include the variant suffix. If both standard and full variants upload schemas with the same base version, there could be artifact name collisions or the wrong schema being downloaded.

However, since continue-on-error: true is set, this won't break the build. Verify if schema handling needs adjustment for multi-variant builds.

dockerfiles/Dockerfile.full (3)

1-2: LGTM! Appropriate Dockerfile directives for multi-arch builds.

The syntax directive enables BuildKit features, and check=skip=InvalidBaseImagePlatform is necessary to suppress false positives when building multi-architecture images with QEMU emulation.


289-302: Debug image includes Arrow/Parquet libraries - good consistency.

The debug image correctly includes libarrow-glib2200 and libparquet-glib2200 matching the production image's capabilities, ensuring debug builds can test Parquet functionality.


64-72: Consider adding GPG signature verification for the Apache Arrow repository.

While using the official Apache Arrow "latest" package source is supported, the current approach lacks signature verification. APT repository metadata should ideally be GPG-signed. However, be aware that Apache Arrow has known issues with GPG key management that may affect verification reliability. If implementing signature checks, monitor the apache-arrow-keyring.gpg for completeness and compatibility with your target distributions.

Regarding version pinning: the "latest" approach introduces supply chain dependency, but pinning specific versions with checksums is impractical given Apache Arrow's frequently-changing package checksums in their official repository. Document this trade-off decision if retaining the "latest" approach.

plugins/out_s3/s3_parquet.c (4)

42-59: LGTM!

The initialization function properly initializes the list and mutex with appropriate error handling.


85-139: LGTM!

The batch retrieval and creation logic is properly synchronized with mutex protection, and error handling covers allocation failures.


142-168: LGTM!

The conversion trigger logic correctly checks both size and timeout thresholds.


347-359: LGTM for cleanup logic.

The cleanup sequence correctly removes the batch from the list under mutex protection before freeing resources.

plugins/out_s3/s3.h (1)

197-213: LGTM!

The header additions for Parquet support are well-structured with clear section comments and proper declarations matching the implementation.

plugins/out_s3/s3.c (7)

728-781: LGTM with good backward compatibility.

The format parsing with deprecation warnings for legacy compression=parquet and compression=arrow provides a smooth migration path. The default compression selection for parquet format is sensible.


1116-1129: LGTM!

The conditional initialization of the Parquet batch manager is correctly placed and has proper error handling.


3881-3894: Potential state inconsistency with multipart upload object.

When create_upload is called on line 3884-3886, it adds the upload to ctx->uploads list and may increment seq_index. If parquet_batch_convert_and_upload fails or doesn't complete the multipart upload (just marks it for completion), the upload object remains in the list. This is likely intentional for retry handling, but ensure the upload state machine handles this path correctly.


4167-4174: LGTM!

The configuration map entry for the format option is well-documented with clear explanation of the parquet multipart behavior.


93-100: No issues with formatting changes.


828-828: Good improvement to error message clarity.

The updated error message clearly specifies both the limit and the condition when it applies.


3857-3870: No issue found - NULL handling is intentional.

s3_store_file_get() returns NULL when no existing file matches the tag, which is normal and expected behavior. The s3_store_buffer_put() function explicitly handles NULL input (line 143) by creating a new file. This design pattern is intentional and safe—when batch->chunk is NULL, a new chunk file is automatically created. No additional error checking is needed.

Likely an incorrect or invalid review comment.

@kalavt kalavt force-pushed the feature/parquet-multipart-upload branch from 5a31cd3 to f526101 Compare December 22, 2025 15:51
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (2)
plugins/out_s3/s3.c (2)

3876-3903: Consider adding clarifying comments for the batch chunk initialization pattern.

The logic correctly handles the case where batch->chunk is initially NULL by:

  1. Attempting to fetch an existing store file (lines 3877-3878)
  2. Buffering data (which creates a new file if chunk is NULL) (lines 3881-3883)
  3. Re-fetching the chunk after buffering to capture the newly created file (lines 3894-3903)

This pattern works correctly and addresses the past review comment about first batch flush issues. However, adding brief comments explaining this initialization flow would improve maintainability.

🔎 Suggested comment additions
         /* Accumulate JSON data to batch's buffer file */
+        /* Attempt to fetch existing buffer file for this batch */
         if (batch->chunk == NULL) {
             batch->chunk = s3_store_file_get(ctx, event_chunk->tag,
                                             flb_sds_len(event_chunk->tag));
         }
 
+        /* Buffer data; if chunk is NULL, s3_store_buffer_put creates a new file */
         ret = s3_store_buffer_put(ctx, batch->chunk, event_chunk->tag,
                                  flb_sds_len(event_chunk->tag), chunk,
                                  (size_t) chunk_size, time(NULL));
         if (ret < 0) {
             flb_plg_error(ctx->ins, "failed to buffer data for parquet batch");
             flb_sds_destroy(chunk);
             FLB_OUTPUT_RETURN(FLB_RETRY);
         }
 
         /*
-         * If batch->chunk was NULL, s3_store_buffer_put may have created a new file.
-         * Re-fetch the chunk to ensure batch->chunk is up to date.
+         * Re-fetch chunk if it was NULL before buffering.
+         * s3_store_buffer_put creates the file, but we need the reference.
          */
         if (batch->chunk == NULL) {

794-796: Document that format=parquet implicitly sets compression=parquet as a default.

When format=parquet and compression is unset (defaults to FLB_AWS_COMPRESS_NONE), the code automatically sets compression to FLB_AWS_COMPRESS_PARQUET. This is intentional behavior in Fluent Bit's design, where Parquet format intrinsically requires Parquet compression. Consider adding inline documentation to clarify this design decision, or document it in the plugin's configuration guide if not already present.

📜 Review details

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 5a31cd3 and f526101.

📒 Files selected for processing (3)
  • dockerfiles/Dockerfile.full
  • plugins/out_s3/s3.c
  • plugins/out_s3/s3_parquet.c
🚧 Files skipped from review as they are similar to previous changes (2)
  • dockerfiles/Dockerfile.full
  • plugins/out_s3/s3_parquet.c
🧰 Additional context used
🧠 Learnings (5)
📚 Learning: 2025-08-29T06:25:27.250Z
Learnt from: shadowshot-x
Repo: fluent/fluent-bit PR: 10794
File: tests/internal/aws_compress.c:93-107
Timestamp: 2025-08-29T06:25:27.250Z
Learning: In Fluent Bit, ZSTD compression is enabled by default and is treated as a core dependency, not requiring conditional compilation guards like `#ifdef FLB_HAVE_ZSTD`. Unlike some other optional components such as ARROW/PARQUET (which use `#ifdef FLB_HAVE_ARROW` guards), ZSTD support is always available and doesn't need build-time conditionals. ZSTD headers are included directly without guards across multiple plugins and core components.

Applied to files:

  • plugins/out_s3/s3.c
📚 Learning: 2025-08-29T06:24:55.855Z
Learnt from: shadowshot-x
Repo: fluent/fluent-bit PR: 10794
File: src/aws/flb_aws_compress.c:52-56
Timestamp: 2025-08-29T06:24:55.855Z
Learning: ZSTD compression is always available in Fluent Bit and does not require conditional compilation guards. Unlike Arrow/Parquet which use #ifdef FLB_HAVE_ARROW guards, ZSTD is built unconditionally with flb_zstd.c included directly in src/CMakeLists.txt and a bundled ZSTD library at lib/zstd-1.5.7/.

Applied to files:

  • plugins/out_s3/s3.c
📚 Learning: 2025-08-29T06:25:27.250Z
Learnt from: shadowshot-x
Repo: fluent/fluent-bit PR: 10794
File: tests/internal/aws_compress.c:93-107
Timestamp: 2025-08-29T06:25:27.250Z
Learning: In Fluent Bit, ZSTD compression is enabled by default and is treated as a core dependency, not requiring conditional compilation guards like `#ifdef FLB_HAVE_ZSTD`. Unlike some other optional components, ZSTD support is always available and doesn't need build-time conditionals.

Applied to files:

  • plugins/out_s3/s3.c
📚 Learning: 2025-08-29T06:24:26.170Z
Learnt from: shadowshot-x
Repo: fluent/fluent-bit PR: 10794
File: tests/internal/aws_compress.c:39-42
Timestamp: 2025-08-29T06:24:26.170Z
Learning: In Fluent Bit, ZSTD compression support is enabled by default and does not require conditional compilation guards (like #ifdef FLB_HAVE_ZSTD) around ZSTD-related code declarations and implementations.

Applied to files:

  • plugins/out_s3/s3.c
📚 Learning: 2025-08-29T06:25:02.561Z
Learnt from: shadowshot-x
Repo: fluent/fluent-bit PR: 10794
File: tests/internal/aws_compress.c:7-7
Timestamp: 2025-08-29T06:25:02.561Z
Learning: In Fluent Bit, ZSTD (zstandard) compression library is bundled directly in the source tree at `lib/zstd-1.5.7` and is built unconditionally as a static library. Unlike optional external dependencies, ZSTD does not use conditional compilation guards like `FLB_HAVE_ZSTD` and is always available. Headers like `<fluent-bit/flb_zstd.h>` can be included directly without guards.

Applied to files:

  • plugins/out_s3/s3.c
🔇 Additional comments (2)
plugins/out_s3/s3.c (2)

1134-1147: LGTM!

The Parquet batch manager initialization is properly guarded by format and use_put_object checks, with appropriate error handling.


4106-4117: Approve: Parquet batch cleanup on exit follows best-effort pattern.

The code correctly attempts to flush all pending Parquet batches before cleanup by calling parquet_batch_flush_all(ctx). If the flush fails, a warning is logged and cleanup proceeds. This follows the same best-effort pattern used elsewhere in cb_s3_exit (lines 4074-4080 for regular chunks, 4082-4104 for multipart uploads) and is appropriate for an exit handler where preventing shutdown is generally undesirable.

Note: The past review comment about data loss risk has been addressed by adding the flush-before-destroy logic.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the reason for a whole separate container definition rather than just a stage or configured via build-args?

I would strongly recommend not breaking out into a separate container image definition as it will be harder to maintain - we will update one but not the other, etc.

Plus it's not clear what the proposed change here is without diffing the two files.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

well, Ok boss. I'll fix it, but I do really need a full feature office image. :)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is a full featured office image?
That's what I'm trying to understand - what's the need you're solving here? More dependencies - can/should they be added to the default image? Something else, e.g. a shell?

We can also add it as a stage in the existing container definition if needed, e.g. if you need more dependencies/etc.
Making a whole new file is not a good idea for maintenance as we'll end up deviating between them.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I need: Official docker image with Apache Arrow/Parquet support

Use case: I need to output Fluent Bit logs in Parquet format to S3 for efficient data analytics.

Technical requirement: This requires linking against Apache Arrow and Parquet C libraries, which adds:

  • libarrow-glib-dev and libparquet-glib-dev at build time
  • Runtime libraries (~150MB) in the final image

My plan is have a lightweight image with default enabled features, and variant support parquet available for users who need it (~+150MB)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Runtime libraries are 150Mb? That sounds excessive - which libraries do we need at runtime?

Copy link
Contributor

@cosmo0920 cosmo0920 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The direction of this implementation would be great but I found some of the style glitches.
We need to fix them at first.

Comment on lines +741 to +749
} else if (strcasecmp(tmp, "json") == 0) {
ctx->format = FLB_S3_FORMAT_JSON;
} else {
flb_plg_error(ctx->ins, "Invalid format '%s', must be 'json' or 'parquet'", tmp);
return -1;
}
} else {
ctx->format = FLB_S3_FORMAT_JSON;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to use newlines before else or else if clauses.

"data will be discarded",
batch->tag);
error_count++;
} else {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need a break this line like as:

Suggested change
} else {
}
else {

flb_free(parquet_buffer);
return -1;
}
} else if (parquet_size > MAX_FILE_SIZE_PUT_OBJECT) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to use a newline like as:

Suggested change
} else if (parquet_size > MAX_FILE_SIZE_PUT_OBJECT) {
}
else if (parquet_size > MAX_FILE_SIZE_PUT_OBJECT) {

/* Mark for completion */
m_upload->upload_state = MULTIPART_UPLOAD_STATE_COMPLETE_IN_PROGRESS;

} else {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to break a line like as:

Suggested change
} else {
}
else {

Comment on lines +3859 to +3881
batch->chunk = s3_store_file_get(ctx, event_chunk->tag,
flb_sds_len(event_chunk->tag));
}

ret = s3_store_buffer_put(ctx, batch->chunk, event_chunk->tag,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you take a look on this, @kalavt ?

#include <fluent-bit/flb_output_plugin.h>
#include <fluent-bit/flb_time.h>
#include <fluent-bit/aws/flb_aws_compress.h>
#include <pthread.h>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to use #include <fluent-bit/flb_compat.h> instead.
This is because flb_compat.h finally reached to include mk_pthread.h:
https://github.com/fluent/fluent-bit/blob/master/lib/monkey/include/monkey/mk_core/mk_pthread.h
And it includes winpthreads.h on Windows. So, we need to use it here.

See:

[ 68%] Built target flb-plugin-out_vivo_exporter
[ 68%] Building C object plugins/out_s3/CMakeFiles/flb-plugin-out_s3.dir/s3.c.obj
s3.c
D:\a\fluent-bit\fluent-bit\lib\chunkio\include\chunkio/chunkio_compat.h(36): warning C4005: 'R_OK': macro redefinition
D:\a\fluent-bit\fluent-bit\lib\monkey\include\monkey\mk_core\mk_dep_unistd.h(24): note: see previous definition of 'R_OK'
D:\a\fluent-bit\fluent-bit\lib\chunkio\include\chunkio/chunkio_compat.h(37): warning C4005: 'W_OK': macro redefinition
D:\a\fluent-bit\fluent-bit\lib\monkey\include\monkey\mk_core\mk_dep_unistd.h(25): note: see previous definition of 'W_OK'
D:\a\fluent-bit\fluent-bit\lib\chunkio\include\chunkio/chunkio_compat.h(39): warning C4005: 'F_OK': macro redefinition
D:\a\fluent-bit\fluent-bit\lib\monkey\include\monkey\mk_core\mk_dep_unistd.h(27): note: see previous definition of 'F_OK'
D:\a\fluent-bit\fluent-bit\lib\chunkio\include\chunkio/chunkio_compat.h(43): warning C4005: 'strerror_r': macro redefinition
D:\a\fluent-bit\fluent-bit\lib\monkey\include\monkey\mk_core\mk_utils.h(68): note: see previous definition of 'strerror_r'
D:\a\fluent-bit\fluent-bit\plugins\out_s3\s3_parquet.c(23): fatal error C1083: Cannot open include file: 'pthread.h': No such file or directory
NMAKE : fatal error U1077: '"C:\Program Files\CMake\bin\cmake.exe" -E cmake_cl_compile_depends --dep-file=CMakeFiles\flb-plugin-out_s3.dir\s3.c.obj.d --working-dir=D:\a\fluent-bit\fluent-bit\build\plugins\out_s3 --filter-prefix="Note: including file: " -- C:\PROGRA~1\MICROS~2\2022\ENTERP~1\VC\Tools\MSVC\1444~1.352\bin\Hostx86\x86\cl.exe @C:\Users\RUNNER~1\AppData\Local\Temp\nm2675.tmp' : return code '0x2'
Stop.
NMAKE : fatal error U1077: '"C:\Program Files\Microsoft Visual Studio\2022\Enterprise\VC\Tools\MSVC\14.44.35207\bin\HostX86\x86\nmake.exe" -s -f plugins\out_s3\CMakeFiles\flb-plugin-out_s3.dir\build.make /nologo -SL                 plugins\out_s3\CMakeFiles\flb-plugin-out_s3.dir\build' : return code '0x2'
Stop.
NMAKE : fatal error U1077: '"C:\Program Files\Microsoft Visual Studio\2022\Enterprise\VC\Tools\MSVC\14.44.35207\bin\HostX86\x86\nmake.exe" -s -f CMakeFiles\Makefile2 /nologo -LS                 all' : return code '0x2'
Stop.
CPack: Create package using NSIS
CPack: Install projects
CPack: - Run preinstall target for: fluent-bit
CPack Error: Problem running install command: "C:\Program Files\CMake\bin\cmake.exe" --build . --target "preinstall"
Please check D:/a/fluent-bit/fluent-bit/build/_CPack_Packages/win32/NSIS/PreinstallOutput.log for errors
CPack Error: Error when generating package: fluent-bit

@kalavt kalavt changed the title Feature/parquet multipart upload WIP: Feature/parquet multipart upload Dec 23, 2025
@kalavt kalavt marked this pull request as draft December 24, 2025 13:56
@kalavt
Copy link
Author

kalavt commented Dec 24, 2025

this PR has been superseded by a newer implementation and is now deprecated.
Please refer to PR #11312 for the current and correct Parquet output implementation.

@kalavt kalavt closed this Dec 24, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants