diff --git a/.github/workflows/call-build-images.yaml b/.github/workflows/call-build-images.yaml index 5174c0f5cfd..c9c05bdb1e0 100644 --- a/.github/workflows/call-build-images.yaml +++ b/.github/workflows/call-build-images.yaml @@ -39,6 +39,11 @@ on: type: boolean required: false default: true + dockerfile: + description: The Dockerfile to use for building (relative to repo root). + type: string + required: false + default: ./dockerfiles/Dockerfile secrets: token: description: The Github token or similar to authenticate with for the registry. @@ -118,7 +123,7 @@ jobs: uses: docker/build-push-action@v6 with: # Use path context rather than Git context as we want local files - file: ./dockerfiles/Dockerfile + file: ${{ inputs.dockerfile }} context: . target: ${{ matrix.target }} outputs: type=image,name=${{ inputs.registry }}/${{ inputs.image }},push-by-digest=true,name-canonical=true,push=${{ inputs.push }} @@ -390,8 +395,8 @@ jobs: fail-fast: true matrix: windows-base-version: - - '2022' - - '2025' + - "2022" + - "2025" permissions: contents: read packages: write diff --git a/.github/workflows/staging-build.yaml b/.github/workflows/staging-build.yaml index 5351c253398..7b078a38a94 100644 --- a/.github/workflows/staging-build.yaml +++ b/.github/workflows/staging-build.yaml @@ -4,7 +4,7 @@ name: Deploy to staging on: push: tags: - - '*' + - "*" workflow_dispatch: inputs: @@ -27,7 +27,6 @@ on: concurrency: staging-build-release jobs: - # This job strips off the `v` at the start of any tag provided. # It then provides this metadata for the other jobs to use. staging-build-get-meta: @@ -36,7 +35,6 @@ jobs: outputs: version: ${{ steps.formatted_version.outputs.replaced }} steps: - - run: | echo "Version: ${{ inputs.version || github.ref_name }}" shell: bash @@ -48,12 +46,12 @@ jobs: - name: Get the version id: get_version run: | - VERSION="${INPUT_VERSION}" - if [ -z "${VERSION}" ]; then - echo "Defaulting to master" - VERSION=master - fi - echo "VERSION=$VERSION" >> $GITHUB_OUTPUT + VERSION="${INPUT_VERSION}" + if [ -z "${VERSION}" ]; then + echo "Defaulting to master" + VERSION=master + fi + echo "VERSION=$VERSION" >> $GITHUB_OUTPUT shell: bash env: # Use the dispatch variable in preference, if empty use the context ref_name which should @@ -64,21 +62,32 @@ jobs: - uses: frabert/replace-string-action@v2.5 id: formatted_version with: - pattern: '[v]*(.*)$' + pattern: "[v]*(.*)$" string: "${{ steps.get_version.outputs.VERSION }}" - replace-with: '$1' - flags: 'g' + replace-with: "$1" + flags: "g" staging-build-images: needs: staging-build-get-meta + strategy: + matrix: + variant: + - name: standard + dockerfile: ./dockerfiles/Dockerfile + suffix: "" + - name: full + dockerfile: ./dockerfiles/Dockerfile.full + suffix: -full + name: Build ${{ matrix.variant.name }} images uses: ./.github/workflows/call-build-images.yaml with: - version: ${{ needs.staging-build-get-meta.outputs.version }} + version: ${{ needs.staging-build-get-meta.outputs.version }}${{ matrix.variant.suffix }} ref: ${{ inputs.version || github.ref_name }} registry: ghcr.io username: ${{ github.actor }} image: ${{ github.repository }}/staging environment: staging + dockerfile: ${{ matrix.variant.dockerfile }} secrets: token: ${{ secrets.GITHUB_TOKEN }} cosign_private_key: ${{ secrets.COSIGN_PRIVATE_KEY }} diff --git a/dockerfiles/Dockerfile.full b/dockerfiles/Dockerfile.full new file mode 100644 index 00000000000..5bea578544f --- /dev/null +++ b/dockerfiles/Dockerfile.full @@ -0,0 +1,318 @@ +# syntax=docker/dockerfile:1 +# check=skip=InvalidBaseImagePlatform + +# To use this container you may need to do the following: +# https://askubuntu.com/a/1369504 +# sudo add-apt-repository ppa:jacob/virtualisation #(for Ubuntu 20.04) +# sudo apt-get update && sudo apt-get install qemu qemu-user qemu-user-static +# https://stackoverflow.com/a/60667468 +# docker run --rm --privileged multiarch/qemu-user-static --reset -p yes +# docker buildx rm builder +# docker buildx create --name builder --use +# docker buildx inspect --bootstrap +# docker buildx build --platform "linux/amd64,linux/arm64,linux/arm/v7,linux/s390x" -f ./dockerfiles/Dockerfile.multiarch --build-arg FLB_TARBALL=https://github.com/fluent/fluent-bit/archive/v1.8.11.tar.gz ./dockerfiles/ + +# Set this to the current release version: it gets done so as part of the release. +ARG RELEASE_VERSION=4.2.2 + +# For multi-arch builds - assumption is running on an AMD64 host +FROM multiarch/qemu-user-static:x86_64-arm AS qemu-arm32 +FROM multiarch/qemu-user-static:x86_64-aarch64 AS qemu-arm64 + +FROM debian:trixie-slim AS builder-base + +COPY --from=qemu-arm32 /usr/bin/qemu-arm-static /usr/bin/ +COPY --from=qemu-arm64 /usr/bin/qemu-aarch64-static /usr/bin/ + +ARG FLB_NIGHTLY_BUILD +ENV FLB_NIGHTLY_BUILD=$FLB_NIGHTLY_BUILD + +ARG FLB_CHUNK_TRACE=On +ENV FLB_CHUNK_TRACE=${FLB_CHUNK_TRACE} + +RUN mkdir -p /fluent-bit/bin /fluent-bit/etc /fluent-bit/log + +ENV DEBIAN_FRONTEND=noninteractive + +# hadolint ignore=DL3008 +RUN apt-get update && \ + apt-get install -y --no-install-recommends \ + build-essential \ + curl \ + ca-certificates \ + git \ + make \ + tar \ + libssl-dev \ + libcurl4-openssl-dev \ + libsasl2-dev \ + pkg-config \ + libsystemd-dev \ + zlib1g-dev \ + libpq-dev \ + postgresql-server-dev-all \ + flex \ + bison \ + libyaml-dev \ + wget \ + lsb-release \ + gnupg \ + && apt-get satisfy -y cmake "cmake (<< 4.0)" \ + && apt-get clean \ + && rm -rf /var/lib/apt/lists/* + +RUN wget -O apache-arrow.deb https://apache.jfrog.io/artifactory/arrow/$(lsb_release --id --short | tr 'A-Z' 'a-z')/apache-arrow-apt-source-latest-$(lsb_release --codename --short).deb && \ + apt-get install -y --no-install-recommends ./apache-arrow.deb && \ + apt-get update && \ + apt-get install -y --no-install-recommends \ + libarrow-glib-dev \ + libparquet-glib-dev \ + && rm -f apache-arrow.deb && \ + apt-get clean && \ + rm -rf /var/lib/apt/lists/* + +# Must be run from root of repo +WORKDIR /src/fluent-bit/ +COPY . ./ + +# We split the builder setup out so people can target it or use as a base image without doing a full build. +FROM builder-base AS builder +WORKDIR /src/fluent-bit/build/ + +# Required to be set to ARMV7 for that target +ARG WAMR_BUILD_TARGET +ARG EXTRA_CMAKE_FLAGS +ENV EXTRA_CMAKE_FLAGS=${EXTRA_CMAKE_FLAGS} + +# Optional: jemalloc configure flags (e.g., page size). Leave unset to keep defaults. +ARG FLB_JEMALLOC_OPTIONS +ENV FLB_JEMALLOC_OPTIONS=${FLB_JEMALLOC_OPTIONS} + +# We do not want word splitting for EXTRA_CMAKE_FLAGS in case multiple are defined +# hadolint ignore=SC2086 +RUN [ -n "${WAMR_BUILD_TARGET:-}" ] && EXTRA_CMAKE_FLAGS="$EXTRA_CMAKE_FLAGS -DWAMR_BUILD_TARGET=$WAMR_BUILD_TARGET"; \ + cmake -DFLB_SIMD=On \ + -DFLB_RELEASE=On \ + -DFLB_JEMALLOC=On \ + -DFLB_TLS=On \ + -DFLB_SHARED_LIB=Off \ + -DFLB_EXAMPLES=Off \ + -DFLB_HTTP_SERVER=On \ + -DFLB_IN_EXEC=Off \ + -DFLB_IN_SYSTEMD=On \ + -DFLB_OUT_KAFKA=On \ + -DFLB_OUT_PGSQL=On \ + -DFLB_ARROW=On \ + -DFLB_NIGHTLY_BUILD="$FLB_NIGHTLY_BUILD" \ + -DFLB_LOG_NO_CONTROL_CHARS=On \ + -DFLB_CHUNK_TRACE="$FLB_CHUNK_TRACE" \ + -DFLB_JEMALLOC_OPTIONS="$FLB_JEMALLOC_OPTIONS" \ + $EXTRA_CMAKE_FLAGS \ + .. + +ARG CFLAGS="-v" +ENV CFLAGS=${CFLAGS} + +RUN make -j "$(getconf _NPROCESSORS_ONLN)" +RUN install bin/fluent-bit /fluent-bit/bin/ + +# Configuration files +COPY conf/fluent-bit.conf \ + conf/parsers.conf \ + conf/parsers_ambassador.conf \ + conf/parsers_java.conf \ + conf/parsers_extra.conf \ + conf/parsers_openstack.conf \ + conf/parsers_cinder.conf \ + conf/plugins.conf \ + /fluent-bit/etc/ + +# Generate schema and include as part of the container image +RUN /fluent-bit/bin/fluent-bit -J > /fluent-bit/etc/schema.json + +# Simple example of how to properly extract packages for reuse in distroless +# Taken from: https://github.com/GoogleContainerTools/distroless/issues/863 +FROM debian:trixie-slim AS deb-extractor +COPY --from=qemu-arm32 /usr/bin/qemu-arm-static /usr/bin/ +COPY --from=qemu-arm64 /usr/bin/qemu-aarch64-static /usr/bin/ + +ENV DEBIAN_FRONTEND=noninteractive + +# Install ca-certificates first to enable HTTPS apt sources +RUN apt-get update && \ + apt-get install -y --no-install-recommends ca-certificates && \ + apt-get clean && \ + rm -rf /var/lib/apt/lists/* + +# Now copy Arrow apt source configuration +COPY --from=builder-base /etc/apt/sources.list.d/ /etc/apt/sources.list.d/ +COPY --from=builder-base /etc/apt/trusted.gpg.d/ /etc/apt/trusted.gpg.d/ +COPY --from=builder-base /usr/share/keyrings/apache-arrow-apt-source.asc /usr/share/keyrings/ + +# We download all debs locally then extract them into a directory we can use as the root for distroless. +# We also include some extra handling for the status files that some tooling uses for scanning, etc. +WORKDIR /tmp +SHELL ["/bin/bash", "-o", "pipefail", "-c"] +RUN apt-get update && \ + apt-get download \ + libssl3t64 \ + libcurl4t64 \ + libnghttp2-14 \ + libnghttp3-9 \ + librtmp1 \ + libssh2-1t64 \ + libpsl5t64 \ + libbrotli1 \ + libsasl2-2 \ + pkg-config \ + libpq5 \ + libsystemd0 \ + zlib1g \ + ca-certificates \ + libatomic1 \ + libgcrypt20 \ + libzstd1 \ + liblz4-1 \ + libgssapi-krb5-2 \ + libldap-2.5 \ + libgpg-error0 \ + libkrb5-3 \ + libk5crypto3 \ + libcom-err2 \ + libkrb5support0 \ + libgnutls30t64 \ + libkeyutils1 \ + libp11-kit0 \ + libidn2-0 \ + libunistring5 \ + libtasn1-6 \ + libnettle8t64 \ + libhogweed6t64 \ + libgmp10 \ + libffi8 \ + liblzma5 \ + libyaml-0-2 \ + libcap2 \ + libldap2 \ + libglib2.0-0t64 \ + libarrow2200 \ + libarrow-acero2200 \ + libarrow-dataset2200 \ + libarrow-glib2200 \ + libparquet2200 \ + libparquet-glib2200 && \ + mkdir -p /dpkg/var/lib/dpkg/status.d/ && \ + for deb in *.deb; do \ + package_name=$(dpkg-deb -I "${deb}" | awk '/^ Package: .*$/ {print $2}'); \ + echo "Processing: ${package_name}"; \ + dpkg --ctrl-tarfile "$deb" | tar -Oxf - ./control > "/dpkg/var/lib/dpkg/status.d/${package_name}"; \ + dpkg --extract "$deb" /dpkg || exit 10; \ + done + +# Remove unnecessary files extracted from deb packages like man pages and docs etc. +RUN find /dpkg/ -type d -empty -delete && \ + rm -r /dpkg/usr/share/doc/ + +# We want latest at time of build +# hadolint ignore=DL3006 +FROM gcr.io/distroless/cc-debian13 AS production +ARG RELEASE_VERSION +ENV FLUENT_BIT_VERSION=${RELEASE_VERSION} +LABEL description="Fluent Bit multi-architecture container image" \ + vendor="Fluent Organization" \ + version="${RELEASE_VERSION}" \ + author="Eduardo Silva " \ + org.opencontainers.image.description="Fluent Bit container image" \ + org.opencontainers.image.title="Fluent Bit" \ + org.opencontainers.image.licenses="Apache-2.0" \ + org.opencontainers.image.vendor="Fluent Organization" \ + org.opencontainers.image.version="${RELEASE_VERSION}" \ + org.opencontainers.image.source="https://github.com/fluent/fluent-bit" \ + org.opencontainers.image.documentation="https://docs.fluentbit.io/" \ + org.opencontainers.image.authors="Eduardo Silva " + +# Copy the libraries from the extractor stage into root +COPY --from=deb-extractor /dpkg / + +# Copy certificates +COPY --from=builder /etc/ssl/certs /etc/ssl/certs + +# Finally the binaries as most likely to change +COPY --from=builder /fluent-bit /fluent-bit + +EXPOSE 2020 + +# Entry point +ENTRYPOINT [ "/fluent-bit/bin/fluent-bit" ] +CMD ["-c", "/fluent-bit/etc/fluent-bit.conf"] + +FROM debian:trixie-slim AS debug +ARG RELEASE_VERSION +ENV FLUENT_BIT_VERSION=${RELEASE_VERSION} +LABEL description="Fluent Bit multi-architecture debug container image" \ + vendor="Fluent Organization" \ + version="${RELEASE_VERSION}-debug" \ + author="Eduardo Silva " \ + org.opencontainers.image.description="Fluent Bit debug container image" \ + org.opencontainers.image.title="Fluent Bit Debug" \ + org.opencontainers.image.licenses="Apache-2.0" \ + org.opencontainers.image.vendor="Fluent Organization" \ + org.opencontainers.image.version="${RELEASE_VERSION}-debug" \ + org.opencontainers.image.source="https://github.com/fluent/fluent-bit" \ + org.opencontainers.image.documentation="https://docs.fluentbit.io/" \ + org.opencontainers.image.authors="Eduardo Silva " + +COPY --from=qemu-arm32 /usr/bin/qemu-arm-static /usr/bin/ +COPY --from=qemu-arm64 /usr/bin/qemu-aarch64-static /usr/bin/ +COPY --from=builder-base /etc/apt/sources.list.d/ /etc/apt/sources.list.d/ +COPY --from=builder-base /etc/apt/trusted.gpg.d/ /etc/apt/trusted.gpg.d/ +ENV DEBIAN_FRONTEND=noninteractive + +# hadolint ignore=DL3008 +RUN apt-get update && \ + apt-get install -y --no-install-recommends \ + libssl3t64 \ + libcurl4t64 \ + libnghttp2-14 \ + libnghttp3-9 \ + librtmp1 \ + libssh2-1t64 \ + libpsl5t64 \ + libbrotli1 \ + libsasl2-2 \ + pkg-config \ + libpq5 \ + libsystemd0 \ + zlib1g \ + ca-certificates \ + libatomic1 \ + libgcrypt20 \ + libyaml-0-2 \ + libldap2 \ + libglib2.0-0t64 \ + libarrow2200 \ + libarrow-acero2200 \ + libarrow-dataset2200 \ + libarrow-glib2200 \ + libparquet2200 \ + libparquet-glib2200 \ + bash gdb valgrind build-essential \ + git bash-completion vim tmux jq \ + dnsutils iputils-ping iputils-arping iputils-tracepath iputils-clockdiff \ + tcpdump curl nmap tcpflow iftop \ + net-tools mtr netcat-openbsd bridge-utils iperf ngrep \ + openssl \ + htop atop strace iotop sysstat ncdu logrotate hdparm pciutils psmisc tree pv \ + make tar flex bison \ + libssl-dev libsasl2-dev libsystemd-dev zlib1g-dev libpq-dev libyaml-dev postgresql-server-dev-all \ + && apt-get satisfy -y cmake "cmake (<< 4.0)" \ + && apt-get clean \ + && rm -rf /var/lib/apt/lists/* + +RUN rm -f /usr/bin/qemu-*-static +COPY --from=builder /fluent-bit /fluent-bit + +EXPOSE 2020 + +# No entry point so we can just shell in +CMD ["/fluent-bit/bin/fluent-bit", "-c", "/fluent-bit/etc/fluent-bit.conf"] diff --git a/plugins/out_s3/s3.c b/plugins/out_s3/s3.c index 3016b28d69a..e75214eb4ea 100644 --- a/plugins/out_s3/s3.c +++ b/plugins/out_s3/s3.c @@ -42,6 +42,15 @@ #include "s3.h" #include "s3_store.h" +/* Forward declarations for functions used in s3_parquet.c */ +static struct multipart_upload *get_upload(struct flb_s3 *ctx, + const char *tag, int tag_len); +static struct multipart_upload *create_upload(struct flb_s3 *ctx, + const char *tag, int tag_len, + time_t file_first_log_time); + +#include "s3_parquet.c" + #define DEFAULT_S3_PORT 443 #define DEFAULT_S3_INSECURE_PORT 80 @@ -66,9 +75,6 @@ static int construct_request_buffer(struct flb_s3 *ctx, flb_sds_t new_data, struct s3_file *chunk, char **out_buf, size_t *out_size); -static int s3_put_object(struct flb_s3 *ctx, const char *tag, time_t file_first_log_time, - char *body, size_t body_size); - static int put_all_chunks(struct flb_s3 *ctx); static void cb_s3_upload(struct flb_config *ctx, void *data); @@ -92,14 +98,14 @@ static struct flb_aws_header *get_content_encoding_header(int compression_type) .val = "gzip", .val_len = 4, }; - + static struct flb_aws_header zstd_header = { .key = "Content-Encoding", .key_len = 16, .val = "zstd", .val_len = 4, }; - + switch (compression_type) { case FLB_AWS_COMPRESS_GZIP: return &gzip_header; @@ -727,6 +733,22 @@ static int cb_s3_init(struct flb_output_instance *ins, ctx->use_put_object = FLB_TRUE; } + /* Parse format parameter */ + tmp = flb_output_get_property("format", ins); + if (tmp) { + if (strcasecmp(tmp, "parquet") == 0) { + ctx->format = FLB_S3_FORMAT_PARQUET; + } else if (strcasecmp(tmp, "json") == 0) { + ctx->format = FLB_S3_FORMAT_JSON; + } else { + flb_plg_error(ctx->ins, "Invalid format '%s', must be 'json' or 'parquet'", tmp); + return -1; + } + } else { + ctx->format = FLB_S3_FORMAT_JSON; + } + + /* Parse compression (for both JSON and Parquet formats) */ tmp = flb_output_get_property("compression", ins); if (tmp) { ret = flb_aws_compression_get_type(tmp); @@ -734,16 +756,48 @@ static int cb_s3_init(struct flb_output_instance *ins, flb_plg_error(ctx->ins, "unknown compression: %s", tmp); return -1; } - if (ctx->use_put_object == FLB_FALSE && - (ret == FLB_AWS_COMPRESS_ARROW || - ret == FLB_AWS_COMPRESS_PARQUET)) { - flb_plg_error(ctx->ins, - "use_put_object must be enabled when Apache Arrow or Parquet is enabled"); - return -1; + + /* Legacy: compression=arrow is deprecated */ + if (ret == FLB_AWS_COMPRESS_ARROW) { + flb_plg_warn(ctx->ins, + "DEPRECATED: compression=arrow is deprecated and will be removed in a future version"); + if (ctx->use_put_object == FLB_FALSE) { + flb_plg_error(ctx->ins, + "use_put_object must be enabled when Apache Arrow is used"); + return -1; + } + } + + /* Legacy: compression=parquet sets format */ + if (ret == FLB_AWS_COMPRESS_PARQUET) { + flb_plg_warn(ctx->ins, + "DEPRECATED: compression=parquet is deprecated, use format=parquet instead"); + ctx->format = FLB_S3_FORMAT_PARQUET; + + if (ctx->use_put_object == FLB_FALSE) { + flb_plg_info(ctx->ins, "Parquet multipart mode enabled via legacy compression parameter"); + } } + ctx->compression = ret; } + /* For Parquet format, validate support and set compression */ + if (ctx->format == FLB_S3_FORMAT_PARQUET) { +#ifndef FLB_HAVE_ARROW_PARQUET + flb_plg_error(ctx->ins, + "format=parquet is not supported in this build. " + "Fluent Bit must be compiled with Apache Arrow/Parquet support. " + "Please rebuild with -DFLB_ARROW=On or use format=json instead."); + return -1; +#else + if (ctx->compression == FLB_AWS_COMPRESS_NONE) { + ctx->compression = FLB_AWS_COMPRESS_PARQUET; + flb_plg_info(ctx->ins, "Using Parquet compression (default for format=parquet)"); + } +#endif + } + tmp = flb_output_get_property("content_type", ins); if (tmp) { ctx->content_type = (char *) tmp; @@ -789,7 +843,7 @@ static int cb_s3_init(struct flb_output_instance *ins, */ ctx->upload_chunk_size = ctx->file_size; if (ctx->file_size > MAX_FILE_SIZE_PUT_OBJECT) { - flb_plg_error(ctx->ins, "Max total_file_size is 50M when use_put_object is enabled"); + flb_plg_error(ctx->ins, "max total_file_size is 1GB (1,000,000,000 bytes) when use_put_object is enabled"); return -1; } } @@ -1077,6 +1131,21 @@ static int cb_s3_init(struct flb_output_instance *ins, } } + /* Initialize Parquet batch manager for Parquet format with multipart */ + if (ctx->format == FLB_S3_FORMAT_PARQUET && ctx->use_put_object == FLB_FALSE) { + /* Initialize Parquet batch manager */ + ret = parquet_batch_init(ctx); + if (ret < 0) { + flb_plg_error(ctx->ins, "Failed to initialize Parquet batch manager"); + return -1; + } + + flb_plg_debug(ctx->ins, + "parquet multipart mode: batch_size=%zuMB, timeout=%ds", + ctx->file_size / (1024 * 1024), + (int)ctx->upload_timeout); + } + return 0; } @@ -1483,7 +1552,7 @@ static int construct_request_buffer(struct flb_s3 *ctx, flb_sds_t new_data, return 0; } -static int s3_put_object(struct flb_s3 *ctx, const char *tag, time_t file_first_log_time, +int s3_put_object(struct flb_s3 *ctx, const char *tag, time_t file_first_log_time, char *body, size_t body_size) { flb_sds_t s3_key = NULL; @@ -3786,6 +3855,81 @@ static void cb_s3_flush(struct flb_event_chunk *event_chunk, } chunk_size = flb_sds_len(chunk); + /* + * Parquet multipart mode: accumulate JSON data in batches + * and convert to Parquet when thresholds are reached + */ + if (ctx->format == FLB_S3_FORMAT_PARQUET && ctx->use_put_object == FLB_FALSE) { + struct parquet_batch *batch; + int should_convert; + + /* Get or create batch for this tag */ + batch = parquet_batch_get_or_create(ctx, event_chunk->tag, + flb_sds_len(event_chunk->tag)); + if (!batch) { + flb_plg_error(ctx->ins, "failed to get/create parquet batch"); + flb_sds_destroy(chunk); + FLB_OUTPUT_RETURN(FLB_RETRY); + } + + /* Accumulate JSON data to batch's buffer file */ + if (batch->chunk == NULL) { + batch->chunk = s3_store_file_get(ctx, event_chunk->tag, + flb_sds_len(event_chunk->tag)); + } + + ret = s3_store_buffer_put(ctx, batch->chunk, event_chunk->tag, + flb_sds_len(event_chunk->tag), chunk, + (size_t) chunk_size, time(NULL)); + if (ret < 0) { + flb_plg_error(ctx->ins, "failed to buffer data for parquet batch"); + flb_sds_destroy(chunk); + FLB_OUTPUT_RETURN(FLB_RETRY); + } + + /* + * If batch->chunk was NULL, s3_store_buffer_put may have created a new file. + * Re-fetch the chunk to ensure batch->chunk is up to date. + */ + if (batch->chunk == NULL) { + batch->chunk = s3_store_file_get(ctx, event_chunk->tag, + flb_sds_len(event_chunk->tag)); + if (!batch->chunk) { + flb_plg_error(ctx->ins, + "failed to retrieve chunk after buffering for parquet batch"); + flb_sds_destroy(chunk); + FLB_OUTPUT_RETURN(FLB_RETRY); + } + } + + batch->accumulated_size += chunk_size; + batch->append_count++; + batch->last_append_time = time(NULL); + + flb_sds_destroy(chunk); + + /* Check if batch should be converted and uploaded */ + should_convert = parquet_batch_should_convert(ctx, batch); + if (should_convert) { + m_upload_file = get_upload(ctx, event_chunk->tag, + flb_sds_len(event_chunk->tag)); + if (!m_upload_file) { + m_upload_file = create_upload(ctx, event_chunk->tag, + flb_sds_len(event_chunk->tag), + batch->create_time); + } + + ret = parquet_batch_convert_and_upload(ctx, batch, m_upload_file); + if (ret < 0) { + flb_plg_error(ctx->ins, + "failed to convert and upload parquet batch"); + FLB_OUTPUT_RETURN(FLB_RETRY); + } + } + + FLB_OUTPUT_RETURN(FLB_OK); + } + /* Get a file candidate matching the given 'tag' */ upload_file = s3_store_file_get(ctx, event_chunk->tag, @@ -3959,6 +4103,19 @@ static int cb_s3_exit(void *data, struct flb_config *config) } } + /* Cleanup Parquet batch manager if enabled */ + if (ctx->format == FLB_S3_FORMAT_PARQUET && ctx->use_put_object == FLB_FALSE) { + /* Flush all pending parquet batches before destroying */ + flb_plg_info(ctx->ins, "Flushing pending parquet batches on shutdown"); + ret = parquet_batch_flush_all(ctx); + if (ret < 0) { + flb_plg_warn(ctx->ins, + "Some parquet batches could not be flushed on shutdown, " + "data may have been lost"); + } + parquet_batch_destroy(ctx); + } + if (ctx->blob_database_file != NULL && ctx->blob_db.db != NULL) { @@ -4048,6 +4205,14 @@ static struct flb_config_map config_map[] = { "If 'gzip' is selected, the Content-Encoding HTTP Header will be set to 'gzip'." "If 'zstd' is selected, the Content-Encoding HTTP Header will be set to 'zstd'." }, + { + FLB_CONFIG_MAP_STR, "format", "json", + 0, FLB_FALSE, 0, + "Output file format: 'json' or 'parquet'. " + "Default: json. " + "When format=parquet with use_put_object=false, enables multipart upload for large files. " + "Batch parameters are automatically configured based on total_file_size and upload_timeout." + }, { FLB_CONFIG_MAP_STR, "content_type", NULL, 0, FLB_FALSE, 0, diff --git a/plugins/out_s3/s3.h b/plugins/out_s3/s3.h index fc30ff81ff7..a93492960bb 100644 --- a/plugins/out_s3/s3.h +++ b/plugins/out_s3/s3.h @@ -194,8 +194,23 @@ struct flb_s3 { flb_sds_t seq_index_file; struct flb_output_instance *ins; + + /* ==================== Output Format ==================== */ + int format; /* Output format: json or parquet */ + + /* ==================== Parquet Runtime State ==================== */ + struct mk_list parquet_batches; /* Active Parquet batch list */ + pthread_mutex_t parquet_batch_lock; /* Batch operation lock */ }; +/* ==================== Format Types ==================== */ +#define FLB_S3_FORMAT_JSON 0 +#define FLB_S3_FORMAT_PARQUET 1 + +/* ==================== Parquet Function Declarations ==================== */ +int parquet_batch_init(struct flb_s3 *ctx); +void parquet_batch_destroy(struct flb_s3 *ctx); + int upload_part(struct flb_s3 *ctx, struct multipart_upload *m_upload, char *body, size_t body_size, char *pre_signed_url); @@ -224,4 +239,7 @@ int create_headers(struct flb_s3 *ctx, char *body_md5, struct flb_aws_header **headers, int *num_headers, int multipart_upload); +int s3_put_object(struct flb_s3 *ctx, const char *tag, + time_t create_time, char *buffer, size_t size); + #endif diff --git a/plugins/out_s3/s3_parquet.c b/plugins/out_s3/s3_parquet.c new file mode 100644 index 00000000000..5979733c776 --- /dev/null +++ b/plugins/out_s3/s3_parquet.c @@ -0,0 +1,500 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2024 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include + +#include "s3.h" +#include "s3_store.h" + +/* Parquet batch structure */ +struct parquet_batch { + char *tag; + int tag_len; + struct s3_file *chunk; + time_t create_time; + time_t last_append_time; + size_t accumulated_size; + int append_count; + int conversion_attempts; + struct mk_list _head; +}; + +/* Forward declarations */ +static int parquet_batch_convert_and_upload(struct flb_s3 *ctx, + struct parquet_batch *batch, + struct multipart_upload *m_upload); + +/* Initialize Parquet batch management */ +int parquet_batch_init(struct flb_s3 *ctx) +{ + int ret; + + mk_list_init(&ctx->parquet_batches); + + ret = pthread_mutex_init(&ctx->parquet_batch_lock, NULL); + if (ret != 0) { + flb_plg_error(ctx->ins, "failed to initialize parquet batch mutex"); + return -1; + } + + flb_plg_debug(ctx->ins, + "parquet batch manager initialized: batch_size=%zu, timeout=%ds", + ctx->file_size, (int)ctx->upload_timeout); + + return 0; +} + +/* Flush all pending Parquet batches before shutdown */ +static int parquet_batch_flush_all(struct flb_s3 *ctx) +{ + struct mk_list *tmp; + struct mk_list *head; + struct parquet_batch *batch; + struct multipart_upload *m_upload; + int ret; + int flushed_count = 0; + int error_count = 0; + + flb_plg_info(ctx->ins, "Flushing all pending parquet batches before shutdown"); + + pthread_mutex_lock(&ctx->parquet_batch_lock); + + mk_list_foreach_safe(head, tmp, &ctx->parquet_batches) { + batch = mk_list_entry(head, struct parquet_batch, _head); + + /* Skip empty batches */ + if (!batch->chunk || batch->accumulated_size == 0) { + flb_plg_debug(ctx->ins, + "Skipping empty parquet batch for tag '%s'", + batch->tag); + + /* Clean up the batch structure even if skipping */ + mk_list_del(&batch->_head); + if (batch->chunk) { + s3_store_file_delete(ctx, batch->chunk); + batch->chunk = NULL; + } + if (batch->tag) { + flb_free(batch->tag); + } + flb_free(batch); + continue; + } + + flb_plg_info(ctx->ins, + "Flushing parquet batch for tag '%s': %zu bytes, %d appends", + batch->tag, batch->accumulated_size, batch->append_count); + + /* Get or create multipart upload for this tag */ + m_upload = get_upload(ctx, batch->tag, batch->tag_len); + if (!m_upload) { + m_upload = create_upload(ctx, batch->tag, batch->tag_len, + batch->create_time); + if (!m_upload) { + flb_plg_error(ctx->ins, + "Failed to create upload for parquet batch '%s', " + "data will be discarded", + batch->tag); + error_count++; + continue; + } + } + + /* Temporarily unlock to allow conversion and upload */ + pthread_mutex_unlock(&ctx->parquet_batch_lock); + + /* Convert and upload the batch */ + ret = parquet_batch_convert_and_upload(ctx, batch, m_upload); + + /* Re-lock for next iteration */ + pthread_mutex_lock(&ctx->parquet_batch_lock); + + if (ret < 0) { + flb_plg_error(ctx->ins, + "Failed to flush parquet batch for tag '%s', " + "data will be discarded", + batch->tag); + error_count++; + } else { + flb_plg_info(ctx->ins, + "Successfully flushed parquet batch for tag '%s'", + batch->tag); + flushed_count++; + } + } + + pthread_mutex_unlock(&ctx->parquet_batch_lock); + + if (flushed_count > 0) { + flb_plg_info(ctx->ins, + "Flushed %d parquet batch(es) on shutdown", + flushed_count); + } + + if (error_count > 0) { + flb_plg_warn(ctx->ins, + "Failed to flush %d parquet batch(es), data was discarded", + error_count); + return -1; + } + + return 0; +} + +/* Cleanup Parquet batch management */ +void parquet_batch_destroy(struct flb_s3 *ctx) +{ + struct mk_list *tmp; + struct mk_list *head; + struct parquet_batch *batch; + + pthread_mutex_lock(&ctx->parquet_batch_lock); + + mk_list_foreach_safe(head, tmp, &ctx->parquet_batches) { + batch = mk_list_entry(head, struct parquet_batch, _head); + mk_list_del(&batch->_head); + + /* Warn if batch still has data and clean up chunk */ + if (batch->chunk) { + if (batch->accumulated_size > 0) { + flb_plg_warn(ctx->ins, + "Discarding parquet batch for tag '%s' with %zu bytes of data", + batch->tag, batch->accumulated_size); + } + /* Delete the temporary file */ + s3_store_file_delete(ctx, batch->chunk); + batch->chunk = NULL; + } + + if (batch->tag) { + flb_free(batch->tag); + } + flb_free(batch); + } + + pthread_mutex_unlock(&ctx->parquet_batch_lock); + pthread_mutex_destroy(&ctx->parquet_batch_lock); +} + +/* Get or create Parquet batch */ +static struct parquet_batch *parquet_batch_get_or_create(struct flb_s3 *ctx, + const char *tag, + int tag_len) +{ + struct mk_list *head; + struct parquet_batch *batch; + struct parquet_batch *new_batch; + time_t now = time(NULL); + + pthread_mutex_lock(&ctx->parquet_batch_lock); + + /* Find existing batch */ + mk_list_foreach(head, &ctx->parquet_batches) { + batch = mk_list_entry(head, struct parquet_batch, _head); + + if (batch->tag_len == tag_len && + strncmp(batch->tag, tag, tag_len) == 0) { + pthread_mutex_unlock(&ctx->parquet_batch_lock); + return batch; + } + } + + /* Create new batch */ + new_batch = flb_calloc(1, sizeof(struct parquet_batch)); + if (!new_batch) { + flb_errno(); + pthread_mutex_unlock(&ctx->parquet_batch_lock); + return NULL; + } + + new_batch->tag = flb_malloc(tag_len + 1); + if (!new_batch->tag) { + flb_errno(); + flb_free(new_batch); + pthread_mutex_unlock(&ctx->parquet_batch_lock); + return NULL; + } + + memcpy(new_batch->tag, tag, tag_len); + new_batch->tag[tag_len] = '\0'; + new_batch->tag_len = tag_len; + new_batch->create_time = now; + new_batch->last_append_time = now; + new_batch->accumulated_size = 0; + new_batch->append_count = 0; + new_batch->conversion_attempts = 0; + new_batch->chunk = NULL; + + mk_list_add(&new_batch->_head, &ctx->parquet_batches); + + flb_plg_debug(ctx->ins, "created parquet batch for tag: %s", new_batch->tag); + + pthread_mutex_unlock(&ctx->parquet_batch_lock); + return new_batch; +} + +/* Check if batch should be converted */ +static int parquet_batch_should_convert(struct flb_s3 *ctx, + struct parquet_batch *batch) +{ + time_t now = time(NULL); + + if (!batch || !batch->chunk) { + return FLB_FALSE; + } + + /* Check size threshold */ + if (batch->accumulated_size >= ctx->file_size) { + flb_plg_debug(ctx->ins, + "batch '%s' reached size threshold: %zu/%zu bytes", + batch->tag, batch->accumulated_size, ctx->file_size); + return FLB_TRUE; + } + + /* Check timeout */ + if (now > (batch->create_time + ctx->upload_timeout)) { + flb_plg_debug(ctx->ins, + "batch '%s' reached timeout: %ld seconds", + batch->tag, now - batch->create_time); + return FLB_TRUE; + } + + return FLB_FALSE; +} + + +/* Convert and upload Parquet batch */ +static int parquet_batch_convert_and_upload(struct flb_s3 *ctx, + struct parquet_batch *batch, + struct multipart_upload *m_upload) +{ + int ret; + char *json_buffer = NULL; + size_t json_size = 0; + char *parquet_buffer = NULL; + size_t parquet_size = 0; + struct flb_time start_time, end_time; + uint64_t elapsed_ms; + double compression_ratio; + + if (!batch || !batch->chunk) { + flb_plg_error(ctx->ins, "invalid batch for conversion"); + return -1; + } + + batch->conversion_attempts++; + + /* Record start time */ + flb_time_get(&start_time); + + flb_plg_debug(ctx->ins, + "converting batch '%s': size=%zu bytes, appends=%d, age=%ld seconds", + batch->tag, batch->accumulated_size, batch->append_count, + time(NULL) - batch->create_time); + + /* 1. Read JSON data */ + ret = s3_store_file_read(ctx, batch->chunk, &json_buffer, &json_size); + if (ret < 0) { + flb_plg_error(ctx->ins, + "failed to read buffered data for batch '%s'", + batch->tag); + return -1; + } + + /* 2. Convert to Parquet */ + ret = flb_aws_compression_compress(FLB_AWS_COMPRESS_PARQUET, + json_buffer, json_size, + (void **)&parquet_buffer, &parquet_size); + + if (ret < 0) { + flb_plg_error(ctx->ins, + "parquet conversion failed for batch '%s'", + batch->tag); + flb_free(json_buffer); + + /* Return error to trigger Fluent Bit retry mechanism */ + return -1; + } + + flb_free(json_buffer); + + /* Record end time and calculate elapsed time */ + flb_time_get(&end_time); + { + struct flb_time diff_time; + flb_time_diff(&end_time, &start_time, &diff_time); + elapsed_ms = flb_time_to_nanosec(&diff_time) / 1000000; + } + + flb_plg_debug(ctx->ins, + "parquet conversion: %zu bytes -> %zu bytes, %"PRIu64"ms", + json_size, parquet_size, elapsed_ms); + + /* 2.5. Apply additional compression if configured (gzip/zstd) */ + if (ctx->compression == FLB_AWS_COMPRESS_GZIP || + ctx->compression == FLB_AWS_COMPRESS_ZSTD) { + void *compressed_buffer = NULL; + size_t compressed_size = 0; + size_t pre_compress_size = parquet_size; + + ret = flb_aws_compression_compress(ctx->compression, + parquet_buffer, parquet_size, + &compressed_buffer, &compressed_size); + if (ret < 0) { + flb_plg_error(ctx->ins, + "failed to apply %s compression for batch '%s'", + ctx->compression == FLB_AWS_COMPRESS_GZIP ? "gzip" : "zstd", + batch->tag); + flb_free(parquet_buffer); + return -1; + } + + flb_free(parquet_buffer); + parquet_buffer = compressed_buffer; + parquet_size = compressed_size; + + flb_plg_debug(ctx->ins, + "%s compression: %zu bytes -> %zu bytes", + ctx->compression == FLB_AWS_COMPRESS_GZIP ? "gzip" : "zstd", + pre_compress_size, compressed_size); + } + + /* 3. Upload Parquet data */ + if (ctx->use_put_object == FLB_TRUE) { + /* Check PutObject size limit */ + if (parquet_size > MAX_FILE_SIZE_PUT_OBJECT) { + flb_plg_error(ctx->ins, + "parquet size %zu exceeds 1GB limit for use_put_object mode", + parquet_size); + flb_free(parquet_buffer); + return -1; + } + + /* Use PutObject */ + ret = s3_put_object(ctx, batch->tag, batch->create_time, + parquet_buffer, parquet_size); + if (ret < 0) { + flb_plg_error(ctx->ins, + "failed to upload batch '%s'", + batch->tag); + flb_free(parquet_buffer); + return -1; + } + } else if (parquet_size > MAX_FILE_SIZE_PUT_OBJECT) { + /* Use multipart upload for large files */ + size_t offset = 0; + struct multipart_upload *local_upload = NULL; + int need_cleanup = FLB_FALSE; + + flb_plg_debug(ctx->ins, + "using multipart upload: %zu bytes", + parquet_size); + + /* Ensure we have a valid multipart upload structure */ + if (!m_upload) { + /* Create a local multipart upload structure */ + local_upload = create_upload(ctx, batch->tag, batch->tag_len, + batch->create_time); + if (!local_upload) { + flb_plg_error(ctx->ins, + "failed to create multipart upload structure for batch '%s'", + batch->tag); + flb_free(parquet_buffer); + return -1; + } + m_upload = local_upload; + need_cleanup = FLB_TRUE; + } + + /* Ensure multipart upload is created */ + if (m_upload->upload_state == MULTIPART_UPLOAD_STATE_NOT_CREATED) { + ret = create_multipart_upload(ctx, m_upload, NULL); + if (ret < 0) { + flb_plg_error(ctx->ins, + "failed to create multipart upload for batch '%s'", + batch->tag); + flb_free(parquet_buffer); + if (need_cleanup) { + mk_list_del(&local_upload->_head); + multipart_upload_destroy(local_upload); + } + return -1; + } + m_upload->upload_state = MULTIPART_UPLOAD_STATE_CREATED; + } + + /* Upload each part */ + while (offset < parquet_size) { + size_t chunk_size = (parquet_size - offset > ctx->upload_chunk_size) ? + ctx->upload_chunk_size : (parquet_size - offset); + + ret = upload_part(ctx, m_upload, + parquet_buffer + offset, chunk_size, NULL); + if (ret < 0) { + flb_plg_error(ctx->ins, + "failed to upload part for batch '%s'", + batch->tag); + flb_free(parquet_buffer); + if (need_cleanup) { + mk_list_del(&local_upload->_head); + multipart_upload_destroy(local_upload); + } + return -1; + } + + offset += chunk_size; + m_upload->part_number++; + /* Note: upload_part already updates m_upload->bytes, no need to add manually */ + } + + /* Mark for completion */ + m_upload->upload_state = MULTIPART_UPLOAD_STATE_COMPLETE_IN_PROGRESS; + + } else { + /* Small file: Use PutObject */ + ret = s3_put_object(ctx, batch->tag, batch->create_time, + parquet_buffer, parquet_size); + if (ret < 0) { + flb_plg_error(ctx->ins, + "failed to upload batch '%s'", + batch->tag); + flb_free(parquet_buffer); + return -1; + } + } + + flb_free(parquet_buffer); + + /* 4. Delete temporary file */ + s3_store_file_delete(ctx, batch->chunk); + batch->chunk = NULL; + + /* 5. Remove batch from list */ + pthread_mutex_lock(&ctx->parquet_batch_lock); + mk_list_del(&batch->_head); + pthread_mutex_unlock(&ctx->parquet_batch_lock); + + flb_free(batch->tag); + flb_free(batch); + + return 0; +}