From 40b0c46bf96e08ebddd371958b3c933b897e9ed1 Mon Sep 17 00:00:00 2001 From: Rituparna Khaund Date: Sun, 7 Jun 2026 17:15:04 +0100 Subject: [PATCH 1/4] aws: add columnar compression API for arrow and parquet The compression option conflated byte-level compression (gzip, zstd, snappy) with columnar format conversion (arrow, parquet), making it impossible to produce Arrow or Parquet files with a chosen internal compression codec. Introduce a single columnar API, out_s3_compress_columnar(), that takes a columnar format (FLB_AWS_COMPRESS_FORMAT_ARROW or FLB_AWS_COMPRESS_FORMAT_PARQUET) and applies a generic FLB_AWS_COMPRESS_* codec on top of it: Parquet via the page-level GLib writer properties, Arrow/Feather via the Feather write properties. Parquet supports none, snappy, gzip and zstd; Arrow supports none and zstd. The arrow and parquet entries are removed from the generic compression dispatch table, and the format-specific FLB_PARQUET_COMPRESSION_* constants together with the standalone compress.h are dropped in favour of the unified declarations in flb_aws_compress.h. Unknown columnar formats now fail loudly instead of silently defaulting to Arrow. Signed-off-by: Rituparna Khaund --- include/fluent-bit/aws/flb_aws_compress.h | 23 +++- src/aws/compression/arrow/compress.c | 155 +++++++++++++--------- src/aws/compression/arrow/compress.h | 28 ---- src/aws/flb_aws_compress.c | 18 --- 4 files changed, 112 insertions(+), 112 deletions(-) delete mode 100644 src/aws/compression/arrow/compress.h diff --git a/include/fluent-bit/aws/flb_aws_compress.h b/include/fluent-bit/aws/flb_aws_compress.h index ada6c83191e..86805790075 100644 --- a/include/fluent-bit/aws/flb_aws_compress.h +++ b/include/fluent-bit/aws/flb_aws_compress.h @@ -23,8 +23,6 @@ #include #define FLB_AWS_COMPRESS_NONE 0 #define FLB_AWS_COMPRESS_GZIP 1 -#define FLB_AWS_COMPRESS_ARROW 2 -#define FLB_AWS_COMPRESS_PARQUET 3 #define FLB_AWS_COMPRESS_ZSTD 4 #define FLB_AWS_COMPRESS_SNAPPY 5 @@ -63,4 +61,25 @@ int flb_aws_compression_b64_truncate_compress(int compression_type, size_t max_o void *in_data, size_t in_len, void **out_data, size_t *out_len); +/* + * Columnar output formats for out_s3_compress_columnar(). Compression is + * applied on top of the format via a generic FLB_AWS_COMPRESS_* codec. + */ +#define FLB_AWS_COMPRESS_FORMAT_ARROW 0 +#define FLB_AWS_COMPRESS_FORMAT_PARQUET 1 + +/* + * Convert JSON data to a columnar format (Apache Arrow/Feather or Apache + * Parquet) selected by `columnar_format` (FLB_AWS_COMPRESS_FORMAT_*), + * applying `compression_type` (a generic FLB_AWS_COMPRESS_* codec) on top of + * the format: + * - Parquet supports NONE, SNAPPY, GZIP and ZSTD (page-level codec). + * - Arrow/Feather supports NONE and ZSTD only. + * + * Returns 0 on success, -1 on failure. + */ +int out_s3_compress_columnar(int columnar_format, void *json, size_t size, + void **out_buf, size_t *out_size, + int compression_type); + #endif diff --git a/src/aws/compression/arrow/compress.c b/src/aws/compression/arrow/compress.c index 1cae74c30e6..b57ec04ca8e 100644 --- a/src/aws/compression/arrow/compress.c +++ b/src/aws/compression/arrow/compress.c @@ -13,8 +13,32 @@ #endif #include #include +#include #include +/* + * compression_type_to_garrow - map a generic FLB_AWS_COMPRESS_* codec to the + * corresponding GArrowCompressionType. + * + * This is shared by the Arrow (Feather) and Parquet writers so that + * compression is treated as an axis applied on top of the format. Codecs that + * are not valid for a given format are rejected by the caller before reaching + * the writer; any unmapped value falls back to uncompressed. + */ +static GArrowCompressionType compression_type_to_garrow(int compression_type) +{ + switch (compression_type) { + case FLB_AWS_COMPRESS_SNAPPY: + return GARROW_COMPRESSION_TYPE_SNAPPY; + case FLB_AWS_COMPRESS_GZIP: + return GARROW_COMPRESSION_TYPE_GZIP; + case FLB_AWS_COMPRESS_ZSTD: + return GARROW_COMPRESSION_TYPE_ZSTD; + default: + return GARROW_COMPRESSION_TYPE_UNCOMPRESSED; + } +} + static int choose_block_size(size_t size) { int block_size = 8 * 1024 * 1024; @@ -89,10 +113,12 @@ static GArrowTable* parse_json(uint8_t *json, int size) return table; } -static GArrowResizableBuffer* table_to_buffer(GArrowTable *table) +static GArrowResizableBuffer* table_to_buffer(GArrowTable *table, + int compression_type) { GArrowResizableBuffer *buffer; GArrowBufferOutputStream *sink; + GArrowFeatherWriteProperties *props; GError *error = NULL; gboolean success; @@ -108,10 +134,22 @@ static GArrowResizableBuffer* table_to_buffer(GArrowTable *table) return NULL; } + /* + * Apply the requested compression codec on top of the Arrow/Feather + * format. Arrow IPC only supports ZSTD (and uncompressed); unsupported + * codecs are rejected at config time before reaching this writer. + */ + props = garrow_feather_write_properties_new(); + g_object_set(props, "compression", + compression_type_to_garrow(compression_type), NULL); + success = garrow_table_write_as_feather( table, GARROW_OUTPUT_STREAM(sink), - NULL, &error); + props, &error); + g_object_unref(props); if (!success) { + flb_error("[aws][compress] Failed to write table to arrow " + "buffer: %s", error->message); g_error_free(error); g_object_unref(buffer); g_object_unref(sink); @@ -121,60 +159,14 @@ static GArrowResizableBuffer* table_to_buffer(GArrowTable *table) return buffer; } -int out_s3_compress_arrow(void *json, size_t size, void **out_buf, size_t *out_size) -{ - GArrowTable *table; - GArrowResizableBuffer *buffer; - GBytes *bytes; - gconstpointer ptr; - gsize len; - uint8_t *buf; - - table = parse_json((uint8_t *) json, size); - if (table == NULL) { - return -1; - } - - buffer = table_to_buffer(table); - g_object_unref(table); - if (buffer == NULL) { - return -1; - } - - bytes = garrow_buffer_get_data(GARROW_BUFFER(buffer)); - if (bytes == NULL) { - g_object_unref(buffer); - return -1; - } - - ptr = g_bytes_get_data(bytes, &len); - if (ptr == NULL) { - g_object_unref(buffer); - g_bytes_unref(bytes); - return -1; - } - - buf = malloc(len); - if (buf == NULL) { - g_object_unref(buffer); - g_bytes_unref(bytes); - return -1; - } - memcpy(buf, ptr, len); - *out_buf = (void *) buf; - *out_size = len; - - g_object_unref(buffer); - g_bytes_unref(bytes); - return 0; -} - #ifdef FLB_HAVE_ARROW_PARQUET -static GArrowResizableBuffer* table_to_parquet_buffer(GArrowTable *table) +static GArrowResizableBuffer* table_to_parquet_buffer(GArrowTable *table, + int compression_type) { GArrowResizableBuffer *buffer; GArrowBufferOutputStream *sink; GParquetArrowFileWriter *writer; + GParquetWriterProperties *props; GArrowSchema *schema; GError *error = NULL; gboolean success; @@ -199,14 +191,19 @@ static GArrowResizableBuffer* table_to_parquet_buffer(GArrowTable *table) return NULL; } - /* Create a new Parquet file writer */ + props = gparquet_writer_properties_new(); + gparquet_writer_properties_set_compression( + props, compression_type_to_garrow(compression_type), NULL); + writer = gparquet_arrow_file_writer_new_arrow(schema, GARROW_OUTPUT_STREAM(sink), - NULL, /* Arrow writer properties */ + props, &error); g_object_unref(schema); + g_object_unref(props); if (writer == NULL) { - flb_error("[aws][compress] Failed to create parquet writer: %s", error->message); + flb_error("[aws][compress] Failed to create parquet writer: %s", + error->message); g_error_free(error); g_object_unref(buffer); g_object_unref(sink); @@ -215,10 +212,11 @@ static GArrowResizableBuffer* table_to_parquet_buffer(GArrowTable *table) n_rows = garrow_table_get_n_rows(table); - /* Write the entire table to the Parquet file buffer */ - success = gparquet_arrow_file_writer_write_table(writer, table, n_rows, &error); + success = gparquet_arrow_file_writer_write_table(writer, table, + n_rows, &error); if (!success) { - flb_error("[aws][compress] Failed to write table to parquet buffer: %s", error->message); + flb_error("[aws][compress] Failed to write table to parquet " + "buffer: %s", error->message); g_error_free(error); g_object_unref(buffer); g_object_unref(sink); @@ -226,7 +224,6 @@ static GArrowResizableBuffer* table_to_parquet_buffer(GArrowTable *table) return NULL; } - /* Close the writer to finalize the Parquet file metadata */ success = gparquet_arrow_file_writer_close(writer, &error); if (!success) { g_error_free(error); @@ -240,9 +237,11 @@ static GArrowResizableBuffer* table_to_parquet_buffer(GArrowTable *table) g_object_unref(writer); return buffer; } +#endif - -int out_s3_compress_parquet(void *json, size_t size, void **out_buf, size_t *out_size) +int out_s3_compress_columnar(int columnar_format, void *json, size_t size, + void **out_buf, size_t *out_size, + int compression_type) { GArrowTable *table; GArrowResizableBuffer *buffer; @@ -253,14 +252,43 @@ int out_s3_compress_parquet(void *json, size_t size, void **out_buf, size_t *out table = parse_json((uint8_t *) json, size); if (table == NULL) { - flb_error("[aws][compress] Failed to parse JSON into Arrow Table for Parquet conversion"); + flb_error("[aws][compress] Failed to parse JSON into Arrow Table"); return -1; } - buffer = table_to_parquet_buffer(table); + /* Select the columnar writer; compression is applied on top by each. */ +#ifdef FLB_HAVE_ARROW_PARQUET + if (columnar_format == FLB_AWS_COMPRESS_FORMAT_PARQUET) { + buffer = table_to_parquet_buffer(table, compression_type); + } + else if (columnar_format == FLB_AWS_COMPRESS_FORMAT_ARROW) { + buffer = table_to_buffer(table, compression_type); + } + else { + flb_error("[aws][compress] unknown columnar format: %d", + columnar_format); + g_object_unref(table); + return -1; + } +#else + if (columnar_format == FLB_AWS_COMPRESS_FORMAT_PARQUET) { + flb_error("[aws][compress] Parquet format requires parquet-glib " + "at compile time"); + g_object_unref(table); + return -1; + } + else if (columnar_format != FLB_AWS_COMPRESS_FORMAT_ARROW) { + flb_error("[aws][compress] unknown columnar format: %d", + columnar_format); + g_object_unref(table); + return -1; + } + buffer = table_to_buffer(table, compression_type); +#endif g_object_unref(table); if (buffer == NULL) { - flb_error("[aws][compress] Failed to convert Arrow Table into Parquet buffer"); + flb_error("[aws][compress] Failed to encode Arrow Table into " + "columnar buffer"); return -1; } @@ -292,4 +320,3 @@ int out_s3_compress_parquet(void *json, size_t size, void **out_buf, size_t *out g_bytes_unref(bytes); return 0; } -#endif diff --git a/src/aws/compression/arrow/compress.h b/src/aws/compression/arrow/compress.h deleted file mode 100644 index f8dcd4a4248..00000000000 --- a/src/aws/compression/arrow/compress.h +++ /dev/null @@ -1,28 +0,0 @@ -/* - * This function converts out_s3 buffer into Apache Arrow format. - * - * `json` is a string that contain (concatenated) JSON objects. - * - * `size` is the length of the json data (excluding the trailing - * null-terminator character). - * - * Return 0 on success (with `out_buf` and `out_size` updated), - * and -1 on failure - */ - -int out_s3_compress_arrow(void *json, size_t size, void **out_buf, size_t *out_size); - -#ifdef FLB_HAVE_ARROW_PARQUET -/* - * This function converts out_s3 buffer into Apache Parquet format. - * - * `json` is a string that contain (concatenated) JSON objects. - * - * `size` is the length of the json data (excluding the trailing - * null-terminator character). - * - * Return 0 on success (with `out_buf` and `out_size` updated), - * and -1 on failure - */ -int out_s3_compress_parquet(void *json, size_t size, void **out_buf, size_t *out_size); -#endif diff --git a/src/aws/flb_aws_compress.c b/src/aws/flb_aws_compress.c index 8188c6b7038..6eac3669769 100644 --- a/src/aws/flb_aws_compress.c +++ b/src/aws/flb_aws_compress.c @@ -28,10 +28,6 @@ #include -#ifdef FLB_HAVE_ARROW -#include "compression/arrow/compress.h" -#endif - /* Wrapper function to adapt flb_snappy_compress to AWS compression interface */ static int flb_snappy_compress_wrapper(void *in_data, size_t in_len, void **out_data, size_t *out_len) @@ -77,20 +73,6 @@ static const struct compression_option compression_options[] = { "snappy", &flb_snappy_compress_wrapper }, -#ifdef FLB_HAVE_ARROW - { - FLB_AWS_COMPRESS_ARROW, - "arrow", - &out_s3_compress_arrow - }, -#endif -#ifdef FLB_HAVE_ARROW_PARQUET - { - FLB_AWS_COMPRESS_PARQUET, - "parquet", - &out_s3_compress_parquet - }, -#endif { 0 } }; From cbb151a2f0c08ca72745cb6f82172f982206562e Mon Sep 17 00:00:00 2001 From: Rituparna Khaund Date: Sun, 7 Jun 2026 17:15:22 +0100 Subject: [PATCH 2/4] out_s3: add format=arrow and format=parquet columnar output Separate the format axis (json, json_lines, otlp_json, arrow, parquet) from the compression axis so users can produce columnar files with a selectable internal codec. 'format arrow' and 'format parquet' write a self-contained columnar object; the compression option then selects the codec applied inside the file (page-level for Parquet, IPC buffer compression for Arrow). The default is uncompressed, preserving existing behaviour. Columnar objects are uploaded via PutObject and never carry a byte-level Content-Encoding header, and log_key is rejected for these formats. Each codec is validated against the chosen format. The deprecated compression=arrow and compression=parquet values are kept as aliases that emit a warning and enable the corresponding format. Signed-off-by: Rituparna Khaund --- plugins/out_s3/s3.c | 366 +++++++++++++++++++++++++++++++++++++------- plugins/out_s3/s3.h | 6 + 2 files changed, 319 insertions(+), 53 deletions(-) diff --git a/plugins/out_s3/s3.c b/plugins/out_s3/s3.c index ac70da12957..23c1a36f350 100644 --- a/plugins/out_s3/s3.c +++ b/plugins/out_s3/s3.c @@ -42,6 +42,7 @@ #include "s3.h" #include "s3_store.h" +#include #define DEFAULT_S3_PORT 443 #define DEFAULT_S3_INSECURE_PORT 80 @@ -92,6 +93,134 @@ static flb_sds_t s3_format_event_chunk(struct flb_s3 *ctx, struct flb_event_chunk *event_chunk, struct flb_config *config); +/* + * enable_parquet_format - configure context for Parquet output + * + * Sets the S3 format to Parquet and forces PutObject mode. + * Returns 0 on success, -1 if Parquet support was not compiled in. + */ +static int enable_parquet_format(struct flb_s3 *ctx) +{ +#ifdef FLB_HAVE_ARROW_PARQUET + ctx->s3_format = FLB_S3_FORMAT_PARQUET; + ctx->use_put_object = FLB_TRUE; + return 0; +#else + flb_plg_error(ctx->ins, + "parquet format requires parquet-glib at compile time"); + return -1; +#endif +} + +/* + * enable_arrow_format - configure context for Apache Arrow output + * + * Sets the S3 format to Arrow and forces PutObject mode. Columnar formats + * must be written as a single complete object, so multipart upload is not + * supported. + * Returns 0 on success, -1 if Arrow support was not compiled in. + */ +static int enable_arrow_format(struct flb_s3 *ctx) +{ +#ifdef FLB_HAVE_ARROW + ctx->s3_format = FLB_S3_FORMAT_ARROW; + ctx->use_put_object = FLB_TRUE; + return 0; +#else + flb_plg_error(ctx->ins, + "arrow format requires arrow-glib at compile time"); + return -1; +#endif +} + +/* + * parse_output_format - resolve format string to format constant + * + * Returns FLB_S3_FORMAT_PARQUET for "parquet" and FLB_S3_FORMAT_ARROW for + * "arrow", otherwise delegates to flb_pack_to_json_format_type for JSON + * format types. + */ +static int parse_output_format(const char *format) +{ + if (strcasecmp(format, "parquet") == 0) { + return FLB_S3_FORMAT_PARQUET; + } + if (strcasecmp(format, "arrow") == 0) { + return FLB_S3_FORMAT_ARROW; + } + return flb_pack_to_json_format_type(format); +} + +/* + * validate_format_compression - check a codec is valid for a columnar format + * + * Compression is an axis applied on top of the format, but each columnar + * format only accepts a subset of codecs: + * - Parquet: none, snappy, gzip, zstd (page-level codec). + * - Arrow/Feather: none, zstd (Arrow IPC only supports ZSTD). + * + * Returns 0 if the FLB_AWS_COMPRESS_* codec is valid for s3_format, else -1. + */ +static int validate_format_compression(int s3_format, int compression_type) +{ + if (s3_format == FLB_S3_FORMAT_PARQUET) { + switch (compression_type) { + case FLB_AWS_COMPRESS_NONE: + case FLB_AWS_COMPRESS_SNAPPY: + case FLB_AWS_COMPRESS_GZIP: + case FLB_AWS_COMPRESS_ZSTD: + return 0; + default: + return -1; + } + } + if (s3_format == FLB_S3_FORMAT_ARROW) { + switch (compression_type) { + case FLB_AWS_COMPRESS_NONE: + case FLB_AWS_COMPRESS_ZSTD: + return 0; + default: + return -1; + } + } + return -1; +} + +/* + * s3_format_is_columnar - report whether a format compresses internally + * + * Columnar formats (Parquet, Arrow) embed compression inside the file, so the + * 'compression' codec is applied by the format writer and the uploaded object + * must NOT carry a byte-level Content-Encoding header. + */ +static int s3_format_is_columnar(int s3_format) +{ + return (s3_format == FLB_S3_FORMAT_PARQUET || + s3_format == FLB_S3_FORMAT_ARROW); +} + +/* + * s3_format_to_aws_compress_format - convert an S3 output format to the + * aws-compress columnar format identifier + * + * Translates FLB_S3_FORMAT_* to the FLB_AWS_COMPRESS_FORMAT_* identifier + * consumed by out_s3_compress_columnar(), keeping the compression layer + * decoupled from the plugin's format enum. Returns -1 for any format that is + * not a known columnar format, so a future format added to + * s3_format_is_columnar() but not mapped here fails loudly instead of being + * silently emitted as Arrow. + */ +static int s3_format_to_aws_compress_format(int s3_format) +{ + if (s3_format == FLB_S3_FORMAT_PARQUET) { + return FLB_AWS_COMPRESS_FORMAT_PARQUET; + } + if (s3_format == FLB_S3_FORMAT_ARROW) { + return FLB_AWS_COMPRESS_FORMAT_ARROW; + } + return -1; +} + static struct flb_aws_header *get_content_encoding_header(int compression_type) { static struct flb_aws_header gzip_header = { @@ -199,9 +328,10 @@ int create_headers(struct flb_s3 *ctx, char *body_md5, if (ctx->content_type != NULL) { headers_len++; } - if (ctx->compression == FLB_AWS_COMPRESS_GZIP || - ctx->compression == FLB_AWS_COMPRESS_ZSTD || - ctx->compression == FLB_AWS_COMPRESS_SNAPPY) { + if (!s3_format_is_columnar(ctx->s3_format) && + (ctx->compression == FLB_AWS_COMPRESS_GZIP || + ctx->compression == FLB_AWS_COMPRESS_ZSTD || + ctx->compression == FLB_AWS_COMPRESS_SNAPPY)) { headers_len++; } if (ctx->canned_acl != NULL) { @@ -231,9 +361,10 @@ int create_headers(struct flb_s3 *ctx, char *body_md5, s3_headers[n].val_len = strlen(ctx->content_type); n++; } - if (ctx->compression == FLB_AWS_COMPRESS_GZIP || - ctx->compression == FLB_AWS_COMPRESS_ZSTD || - ctx->compression == FLB_AWS_COMPRESS_SNAPPY) { + if (!s3_format_is_columnar(ctx->s3_format) && + (ctx->compression == FLB_AWS_COMPRESS_GZIP || + ctx->compression == FLB_AWS_COMPRESS_ZSTD || + ctx->compression == FLB_AWS_COMPRESS_SNAPPY)) { encoding_header = get_content_encoding_header(ctx->compression); if (encoding_header == NULL) { @@ -659,6 +790,7 @@ static int cb_s3_init(struct flb_output_instance *ins, ctx->retry_time = 0; ctx->upload_queue_success = FLB_FALSE; ctx->out_format = FLB_PACK_JSON_FORMAT_LINES; + ctx->s3_format = FLB_S3_FORMAT_JSON_LINES; /* * The engine default retry_limit (1) is too low for S3's internal @@ -706,30 +838,54 @@ static int cb_s3_init(struct flb_output_instance *ins, /* Format key */ tmp = flb_output_get_property("format", ins); if (tmp) { - ret = flb_pack_to_json_format_type(tmp); + ret = parse_output_format(tmp); if (ret == -1) { flb_plg_error(ctx->ins, "invalid format '%s'", tmp); return -1; } - if (ret == FLB_PACK_JSON_FORMAT_JSON) { - flb_plg_warn(ctx->ins, - "'json' format is implicitly interpreted as 'json_lines' before." - "Now interpreted as 'json_lines' explicitly now"); - ret = FLB_PACK_JSON_FORMAT_LINES; + if (ret == FLB_S3_FORMAT_PARQUET) { + if (ctx->log_key != NULL) { + flb_plg_error(ctx->ins, + "'log_key' is not supported when format is " + "parquet"); + return -1; + } + if (enable_parquet_format(ctx) == -1) { + return -1; + } } - else if (ret != FLB_PACK_JSON_FORMAT_LINES && - ret != FLB_PACK_JSON_FORMAT_OTLP) { - flb_plg_error(ctx->ins, "unsupported format '%s'", tmp); - return -1; + else if (ret == FLB_S3_FORMAT_ARROW) { + if (ctx->log_key != NULL) { + flb_plg_error(ctx->ins, + "'log_key' is not supported when format is " + "arrow"); + return -1; + } + if (enable_arrow_format(ctx) == -1) { + return -1; + } } - ctx->out_format = ret; + else if (ret == FLB_PACK_JSON_FORMAT_JSON) { + flb_plg_warn(ctx->ins, + "'json' format is implicitly interpreted as " + "'json_lines'. Now interpreted as 'json_lines' " + "explicitly"); + ctx->out_format = FLB_PACK_JSON_FORMAT_LINES; + } + else if (ret == FLB_PACK_JSON_FORMAT_LINES || + ret == FLB_PACK_JSON_FORMAT_OTLP) { + ctx->out_format = ret; - if (ctx->out_format == FLB_PACK_JSON_FORMAT_OTLP && - ctx->log_key != NULL) { - flb_plg_error(ctx->ins, - "'log_key' is not supported when format is " - "otlp_json or otlp_json_pretty"); + if (ret == FLB_PACK_JSON_FORMAT_OTLP && ctx->log_key != NULL) { + flb_plg_error(ctx->ins, + "'log_key' is not supported when format is " + "otlp_json or otlp_json_pretty"); + return -1; + } + } + else { + flb_plg_error(ctx->ins, "unsupported format '%s'", tmp); return -1; } } @@ -818,19 +974,73 @@ static int cb_s3_init(struct flb_output_instance *ins, tmp = flb_output_get_property("compression", ins); if (tmp) { - ret = flb_aws_compression_get_type(tmp); - if (ret == -1) { - flb_plg_error(ctx->ins, "unknown compression: %s", tmp); - return -1; + if (strcasecmp(tmp, "parquet") == 0) { + if (ctx->log_key != NULL) { + flb_plg_error(ctx->ins, + "'log_key' is not supported when format is " + "parquet"); + return -1; + } + flb_plg_warn(ctx->ins, + "'compression=parquet' is deprecated. " + "Use 'format parquet' with 'compression' set to " + "the desired page-level codec (snappy, zstd, gzip)"); + if (enable_parquet_format(ctx) == -1) { + return -1; + } } - if (ctx->use_put_object == FLB_FALSE && - (ret == FLB_AWS_COMPRESS_ARROW || - ret == FLB_AWS_COMPRESS_PARQUET)) { - flb_plg_error(ctx->ins, - "use_put_object must be enabled when Apache Arrow or Parquet is enabled"); - return -1; + else if (strcasecmp(tmp, "arrow") == 0) { + if (ctx->log_key != NULL) { + flb_plg_error(ctx->ins, + "'log_key' is not supported when format is " + "arrow"); + return -1; + } + flb_plg_warn(ctx->ins, + "'compression=arrow' is deprecated. " + "Use 'format arrow' with 'compression' set to " + "the desired codec (zstd)"); + if (enable_arrow_format(ctx) == -1) { + return -1; + } + } + else { + /* + * 'none' explicitly selects no compression. It is not part of the + * compression dispatch table (which reserves 0/NONE as a footer), + * so accept it here and map it to FLB_AWS_COMPRESS_NONE. + */ + if (strcasecmp(tmp, "none") == 0) { + ret = FLB_AWS_COMPRESS_NONE; + } + else { + ret = flb_aws_compression_get_type(tmp); + if (ret == -1) { + flb_plg_error(ctx->ins, "unknown compression: %s", tmp); + return -1; + } + } + + if (ctx->s3_format == FLB_S3_FORMAT_PARQUET || + ctx->s3_format == FLB_S3_FORMAT_ARROW) { + /* + * For columnar formats, 'compression' selects the codec + * applied inside the format (page-level for Parquet, IPC + * buffer compression for Arrow) rather than a byte-level + * wrap of the uploaded object. The Content-Encoding header + * is intentionally not emitted for these formats. + */ + if (validate_format_compression(ctx->s3_format, ret) != 0) { + flb_plg_error(ctx->ins, + "'%s' is not a supported compression codec " + "for the configured format (parquet " + "supports snappy, gzip, zstd; arrow " + "supports zstd)", tmp); + return -1; + } + } + ctx->compression = ret; } - ctx->compression = ret; } tmp = flb_output_get_property("content_type", ins); @@ -1249,6 +1459,7 @@ static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk, int size_check = FLB_FALSE; int part_num_check = FLB_FALSE; int timeout_check = FLB_FALSE; + int payload_needs_free = FLB_FALSE; int ret; void *payload_buf = NULL; size_t payload_size = 0; @@ -1265,9 +1476,30 @@ static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk, file_first_log_time = chunk->first_log_time; } +#ifdef FLB_HAVE_ARROW + if (s3_format_is_columnar(ctx->s3_format)) { + ret = out_s3_compress_columnar(s3_format_to_aws_compress_format(ctx->s3_format), + body, body_size, &payload_buf, + &payload_size, ctx->compression); + if (ret == -1) { + flb_plg_error(ctx->ins, "Failed to convert data to columnar " + "format"); + if (chunk != NULL) { + s3_store_file_unlock(chunk); + chunk->failures += 1; + } + return FLB_RETRY; + } + preCompress_size = body_size; + body = (void *) payload_buf; + body_size = payload_size; + payload_needs_free = FLB_TRUE; + } + else +#endif if (ctx->compression != FLB_AWS_COMPRESS_NONE) { - /* Map payload */ - ret = flb_aws_compression_compress(ctx->compression, body, body_size, &payload_buf, &payload_size); + ret = flb_aws_compression_compress(ctx->compression, body, body_size, + &payload_buf, &payload_size); if (ret == -1) { flb_plg_error(ctx->ins, "Failed to compress data"); if (chunk != NULL) { @@ -1280,6 +1512,7 @@ static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk, preCompress_size = body_size; body = (void *) payload_buf; body_size = payload_size; + payload_needs_free = FLB_TRUE; } } @@ -1335,7 +1568,7 @@ static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk, * remove chunk from buffer list */ ret = s3_put_object(ctx, tag, file_first_log_time, body, body_size); - if (ctx->compression != FLB_AWS_COMPRESS_NONE) { + if (payload_needs_free) { flb_free(payload_buf); } if (ret < 0) { @@ -1362,7 +1595,7 @@ static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk, if (chunk) { s3_store_file_unlock(chunk); } - if (ctx->compression != FLB_AWS_COMPRESS_NONE) { + if (payload_needs_free) { flb_free(payload_buf); } return FLB_RETRY; @@ -1376,7 +1609,7 @@ static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk, if (chunk) { s3_store_file_unlock(chunk); } - if (ctx->compression != FLB_AWS_COMPRESS_NONE) { + if (payload_needs_free) { flb_free(payload_buf); } return FLB_RETRY; @@ -1386,7 +1619,7 @@ static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk, ret = upload_part(ctx, m_upload, body, body_size, NULL); if (ret < 0) { - if (ctx->compression != FLB_AWS_COMPRESS_NONE) { + if (payload_needs_free) { flb_free(payload_buf); } m_upload->upload_errors += 1; @@ -1403,7 +1636,7 @@ static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk, s3_store_file_delete(ctx, chunk); chunk = NULL; } - if (ctx->compression != FLB_AWS_COMPRESS_NONE) { + if (payload_needs_free) { flb_free(payload_buf); } if (m_upload->bytes >= ctx->file_size) { @@ -1493,15 +1726,42 @@ static int put_all_chunks(struct flb_s3 *ctx) return -1; } +#ifdef FLB_HAVE_ARROW + if (s3_format_is_columnar(ctx->s3_format)) { + ret = out_s3_compress_columnar( + s3_format_to_aws_compress_format(ctx->s3_format), + buffer, buffer_size, + &payload_buf, &payload_size, + ctx->compression); + if (ret == -1) { + flb_plg_error(ctx->ins, + "Failed to convert to columnar format, " + "uploading raw data to prevent data loss"); + } + else { + flb_free(buffer); + buffer = (void *) payload_buf; + buffer_size = payload_size; + } + } + else +#endif if (ctx->compression != FLB_AWS_COMPRESS_NONE) { - /* Map payload */ - ret = flb_aws_compression_compress(ctx->compression, buffer, buffer_size, &payload_buf, &payload_size); + ret = flb_aws_compression_compress(ctx->compression, + buffer, buffer_size, + &payload_buf, + &payload_size); if (ret == -1) { - flb_plg_error(ctx->ins, "Failed to compress data, uploading uncompressed data instead to prevent data loss"); - } else { - flb_plg_info(ctx->ins, "Pre-compression chunk size is %zu, After compression, chunk is %zu bytes", buffer_size, payload_size); + flb_plg_error(ctx->ins, + "Failed to compress data, uploading " + "uncompressed data to prevent data loss"); + } + else { + flb_plg_info(ctx->ins, + "Pre-compression chunk size is %zu, " + "After compression, chunk is %zu bytes", + buffer_size, payload_size); flb_free(buffer); - buffer = (void *) payload_buf; buffer_size = payload_size; } @@ -4156,7 +4416,9 @@ static struct flb_config_map config_map[] = { { FLB_CONFIG_MAP_STR, "format", "json_lines", 0, FLB_FALSE, 0, - "Set record output format. Supported values are json_lines, and otlp_json." + "Set output format. Supported values: json_lines, otlp_json, parquet. " + "When format is parquet, the 'compression' option controls the page-level " + "codec inside the Parquet file (snappy, zstd, gzip). Default: uncompressed." }, { FLB_CONFIG_MAP_STR, "json_date_format", NULL, @@ -4227,12 +4489,10 @@ static struct flb_config_map config_map[] = { { FLB_CONFIG_MAP_STR, "compression", NULL, 0, FLB_FALSE, 0, - "Compression type for S3 objects. Supported values: 'gzip', 'zstd', 'snappy'. " - "'arrow' and 'parquet' are also available if Apache Arrow was enabled at compile time. " - "Defaults to no compression. " - "If 'gzip' is selected, the Content-Encoding HTTP Header will be set to 'gzip'. " - "If 'zstd' is selected, the Content-Encoding HTTP Header will be set to 'zstd'. " - "If 'snappy' is selected, the Content-Encoding HTTP Header will be set to 'snappy'." + "Compression type for S3 objects. Supported values: 'gzip', 'zstd', 'snappy', " + "'arrow'. When format is 'parquet', this sets the page-level codec inside the " + "Parquet file. 'compression=parquet' is deprecated; use 'format parquet' instead. " + "Defaults to no compression." }, { FLB_CONFIG_MAP_STR, "content_type", NULL, diff --git a/plugins/out_s3/s3.h b/plugins/out_s3/s3.h index 3a007b1a5f7..08fc846d0ea 100644 --- a/plugins/out_s3/s3.h +++ b/plugins/out_s3/s3.h @@ -27,6 +27,11 @@ #include #include +/* S3 output format types */ +#define FLB_S3_FORMAT_JSON_LINES 0 +#define FLB_S3_FORMAT_PARQUET 100 +#define FLB_S3_FORMAT_ARROW 101 + /* Upload data to S3 in 5MB chunks */ #define MIN_CHUNKED_UPLOAD_SIZE 5242880 #define MAX_CHUNKED_UPLOAD_SIZE 50000000 @@ -127,6 +132,7 @@ struct flb_s3 { int static_file_path; int retry_exhausted_action; int compression; + int s3_format; int port; int insecure; size_t store_dir_limit_size; From 80db2f12bf7474ac2cb34759be8662b88c6d0664 Mon Sep 17 00:00:00 2001 From: Rituparna Khaund Date: Sun, 7 Jun 2026 17:15:22 +0100 Subject: [PATCH 3/4] tests: internal: aws_compress: add columnar format compression tests Add unit tests for the columnar API covering Arrow/Feather output: uncompressed and zstd produce valid ARROW1 buffers, and an unsupported codec (gzip) is rejected. These complement the existing Parquet page-level compression tests. Signed-off-by: Rituparna Khaund --- tests/internal/aws_compress.c | 215 ++++++++++++++++++++++++++++++++-- 1 file changed, 208 insertions(+), 7 deletions(-) diff --git a/tests/internal/aws_compress.c b/tests/internal/aws_compress.c index bf55f425c50..0bb4414c419 100644 --- a/tests/internal/aws_compress.c +++ b/tests/internal/aws_compress.c @@ -105,7 +105,7 @@ void test_compression_snappy_return_value_normalization() { /* This test verifies that the snappy wrapper correctly normalizes return values * to conform to the AWS compression interface contract: -1 on error, 0 on success. - * + * * The test uses the actual flb_aws_compression_compress function which internally * uses the wrapper. We verify that successful compression returns exactly 0, * demonstrating that the wrapper properly normalizes the return value. @@ -115,17 +115,17 @@ void test_compression_snappy_return_value_normalization() size_t out_len = 0; int compression_type; char test_data[] = "test data for compression"; - + compression_type = flb_aws_compression_get_type("snappy"); TEST_CHECK(compression_type != -1); - + /* Test successful compression - should return exactly 0 (not any other value) */ - ret = flb_aws_compression_compress(compression_type, test_data, + ret = flb_aws_compression_compress(compression_type, test_data, strlen(test_data), &out_data, &out_len); TEST_CHECK(ret == 0); TEST_MSG("Expected return value 0 on success, got: %d", ret); TEST_MSG("This verifies the wrapper returns 0 (not passthrough of underlying function)"); - + if (ret == 0 && out_data != NULL) { TEST_CHECK(out_len > 0); TEST_MSG("Compressed data length: %zu", out_len); @@ -334,6 +334,194 @@ void test_b64_truncated_gzip_boundary() flb_aws_compress_truncate_b64_test_cases__gzip_decode(cases, 40); } +#ifdef FLB_HAVE_ARROW_PARQUET +void test_parquet_format_snappy() +{ + int ret; + void *out_buf = NULL; + size_t out_size = 0; + char *json = "{\"key\":\"value\",\"num\":42}\n" + "{\"key\":\"other\",\"num\":99}\n"; + size_t json_len = strlen(json); + + ret = out_s3_compress_columnar(FLB_AWS_COMPRESS_FORMAT_PARQUET, + json, json_len, &out_buf, &out_size, + FLB_AWS_COMPRESS_SNAPPY); + if (!TEST_CHECK(ret == 0 && out_buf != NULL && out_size >= 8)) { + TEST_MSG("Parquet SNAPPY conversion failed"); + return; + } + TEST_CHECK(memcmp(out_buf, "PAR1", 4) == 0); + TEST_CHECK(memcmp((char *)out_buf + out_size - 4, "PAR1", 4) == 0); + flb_free(out_buf); +} + +void test_parquet_format_zstd() +{ + int ret; + void *out_buf = NULL; + size_t out_size = 0; + char *json = "{\"key\":\"value\",\"num\":42}\n" + "{\"key\":\"other\",\"num\":99}\n"; + size_t json_len = strlen(json); + + ret = out_s3_compress_columnar(FLB_AWS_COMPRESS_FORMAT_PARQUET, + json, json_len, &out_buf, &out_size, + FLB_AWS_COMPRESS_ZSTD); + if (!TEST_CHECK(ret == 0 && out_buf != NULL && out_size >= 8)) { + TEST_MSG("Parquet ZSTD conversion failed"); + return; + } + TEST_CHECK(memcmp(out_buf, "PAR1", 4) == 0); + TEST_CHECK(memcmp((char *)out_buf + out_size - 4, "PAR1", 4) == 0); + flb_free(out_buf); +} + +void test_parquet_format_gzip() +{ + int ret; + void *out_buf = NULL; + size_t out_size = 0; + char *json = "{\"key\":\"value\",\"num\":42}\n" + "{\"key\":\"other\",\"num\":99}\n"; + size_t json_len = strlen(json); + + ret = out_s3_compress_columnar(FLB_AWS_COMPRESS_FORMAT_PARQUET, + json, json_len, &out_buf, &out_size, + FLB_AWS_COMPRESS_GZIP); + if (!TEST_CHECK(ret == 0 && out_buf != NULL && out_size >= 8)) { + TEST_MSG("Parquet GZIP conversion failed"); + return; + } + TEST_CHECK(memcmp(out_buf, "PAR1", 4) == 0); + TEST_CHECK(memcmp((char *)out_buf + out_size - 4, "PAR1", 4) == 0); + flb_free(out_buf); +} + +void test_parquet_format_uncompressed() +{ + int ret; + void *out_buf = NULL; + size_t out_size = 0; + char *json = "{\"key\":\"value\",\"num\":42}\n" + "{\"key\":\"other\",\"num\":99}\n"; + size_t json_len = strlen(json); + + ret = out_s3_compress_columnar(FLB_AWS_COMPRESS_FORMAT_PARQUET, + json, json_len, &out_buf, &out_size, + FLB_AWS_COMPRESS_NONE); + if (!TEST_CHECK(ret == 0 && out_buf != NULL && out_size >= 8)) { + TEST_MSG("Parquet NONE conversion failed"); + return; + } + TEST_CHECK(memcmp(out_buf, "PAR1", 4) == 0); + TEST_CHECK(memcmp((char *)out_buf + out_size - 4, "PAR1", 4) == 0); + flb_free(out_buf); +} + +void test_parquet_compression_reduces_size() +{ + int ret; + void *buf_none = NULL; + void *buf_snappy = NULL; + size_t size_none = 0; + size_t size_snappy = 0; + char *json = "{\"msg\":\"hello hello hello hello hello hello\"}\n" + "{\"msg\":\"hello hello hello hello hello hello\"}\n" + "{\"msg\":\"hello hello hello hello hello hello\"}\n" + "{\"msg\":\"hello hello hello hello hello hello\"}\n" + "{\"msg\":\"hello hello hello hello hello hello\"}\n"; + size_t json_len = strlen(json); + + ret = out_s3_compress_columnar(FLB_AWS_COMPRESS_FORMAT_PARQUET, + json, json_len, &buf_none, &size_none, + FLB_AWS_COMPRESS_NONE); + if (!TEST_CHECK(ret == 0 && buf_none != NULL)) { + TEST_MSG("Parquet NONE conversion failed"); + return; + } + + ret = out_s3_compress_columnar(FLB_AWS_COMPRESS_FORMAT_PARQUET, + json, json_len, &buf_snappy, &size_snappy, + FLB_AWS_COMPRESS_SNAPPY); + if (!TEST_CHECK(ret == 0 && buf_snappy != NULL)) { + TEST_MSG("Parquet SNAPPY conversion failed"); + flb_free(buf_none); + return; + } + TEST_CHECK(size_snappy <= size_none); + + flb_free(buf_none); + flb_free(buf_snappy); +} +#endif + +#ifdef FLB_HAVE_ARROW +void test_arrow_format_uncompressed() +{ + int ret; + void *out_buf = NULL; + size_t out_size = 0; + char *json = "{\"key\":\"value\",\"num\":42}\n" + "{\"key\":\"other\",\"num\":99}\n"; + size_t json_len = strlen(json); + + ret = out_s3_compress_columnar(FLB_AWS_COMPRESS_FORMAT_ARROW, + json, json_len, &out_buf, &out_size, + FLB_AWS_COMPRESS_NONE); + if (!TEST_CHECK(ret == 0 && out_buf != NULL && out_size >= 8)) { + TEST_MSG("Arrow NONE conversion failed"); + return; + } + /* Arrow/Feather V2 files begin with the "ARROW1" magic */ + TEST_CHECK(memcmp(out_buf, "ARROW1", 6) == 0); + flb_free(out_buf); +} + +void test_arrow_format_zstd() +{ + int ret; + void *out_buf = NULL; + size_t out_size = 0; + char *json = "{\"key\":\"value\",\"num\":42}\n" + "{\"key\":\"other\",\"num\":99}\n"; + size_t json_len = strlen(json); + + ret = out_s3_compress_columnar(FLB_AWS_COMPRESS_FORMAT_ARROW, + json, json_len, &out_buf, &out_size, + FLB_AWS_COMPRESS_ZSTD); + if (!TEST_CHECK(ret == 0 && out_buf != NULL && out_size >= 8)) { + TEST_MSG("Arrow ZSTD conversion failed"); + return; + } + TEST_CHECK(memcmp(out_buf, "ARROW1", 6) == 0); + flb_free(out_buf); +} + +/* + * Arrow/Feather (Arrow IPC) only supports ZSTD compression. Codecs such as + * gzip must be rejected by the writer; out_s3 also rejects them at config + * time via validate_format_compression. + */ +void test_arrow_format_gzip_unsupported() +{ + int ret; + void *out_buf = NULL; + size_t out_size = 0; + char *json = "{\"key\":\"value\",\"num\":42}\n" + "{\"key\":\"other\",\"num\":99}\n"; + size_t json_len = strlen(json); + + ret = out_s3_compress_columnar(FLB_AWS_COMPRESS_FORMAT_ARROW, + json, json_len, &out_buf, &out_size, + FLB_AWS_COMPRESS_GZIP); + TEST_CHECK(ret == -1); + if (out_buf != NULL) { + flb_free(out_buf); + } +} +#endif + TEST_LIST = { { "test_compression_gzip", test_compression_gzip }, { "test_compression_zstd", test_compression_zstd }, @@ -352,6 +540,19 @@ TEST_LIST = { test_b64_truncated_gzip_truncation_multi_rounds }, { "test_b64_truncated_gzip_boundary", test_b64_truncated_gzip_boundary }, +#ifdef FLB_HAVE_ARROW_PARQUET + { "test_parquet_format_snappy", test_parquet_format_snappy }, + { "test_parquet_format_zstd", test_parquet_format_zstd }, + { "test_parquet_format_gzip", test_parquet_format_gzip }, + { "test_parquet_format_uncompressed", test_parquet_format_uncompressed }, + { "test_parquet_compression_reduces_size", + test_parquet_compression_reduces_size }, +#endif +#ifdef FLB_HAVE_ARROW + { "test_arrow_format_uncompressed", test_arrow_format_uncompressed }, + { "test_arrow_format_zstd", test_arrow_format_zstd }, + { "test_arrow_format_gzip_unsupported", test_arrow_format_gzip_unsupported }, +#endif { 0 } }; @@ -419,8 +620,8 @@ static void flb_aws_compress_general_test_cases(int test_type, while (tcase->compression_keyword != 0) { size_t in_data_len = strlen(tcase->in_data); - compression_type = flb_aws_compression_get_type(tcase->compression_keyword); - + compression_type = flb_aws_compression_get_type(tcase->compression_keyword); + TEST_CHECK(compression_type != -1); TEST_MSG("| flb_aws_get_compression_type: failed to get compression type for " "keyword " From 940a8f1afa96877dd28e1adce02cfc4ac136d858 Mon Sep 17 00:00:00 2001 From: Rituparna Khaund Date: Sun, 7 Jun 2026 17:15:22 +0100 Subject: [PATCH 4/4] tests: integration: out_s3: add columnar format scenarios Add out_s3 integration scenarios for the columnar formats: arrow and parquet with zstd, parquet and arrow with compression none, and a negative case asserting that an invalid format/compression combination (arrow + gzip) is rejected at startup. Each positive case verifies the ARROW1/PAR1 magic bytes and the absence of a Content-Encoding header. Signed-off-by: Rituparna Khaund --- .../scenarios/out_s3/config/out_s3_arrow.yaml | 27 ++++++ .../config/out_s3_arrow_gzip_invalid.yaml | 27 ++++++ .../out_s3/config/out_s3_arrow_none.yaml | 27 ++++++ .../out_s3/config/out_s3_parquet.yaml | 27 ++++++ .../out_s3/config/out_s3_parquet_none.yaml | 27 ++++++ .../scenarios/out_s3/tests/test_out_s3_001.py | 87 +++++++++++++++++++ 6 files changed, 222 insertions(+) create mode 100644 tests/integration/scenarios/out_s3/config/out_s3_arrow.yaml create mode 100644 tests/integration/scenarios/out_s3/config/out_s3_arrow_gzip_invalid.yaml create mode 100644 tests/integration/scenarios/out_s3/config/out_s3_arrow_none.yaml create mode 100644 tests/integration/scenarios/out_s3/config/out_s3_parquet.yaml create mode 100644 tests/integration/scenarios/out_s3/config/out_s3_parquet_none.yaml diff --git a/tests/integration/scenarios/out_s3/config/out_s3_arrow.yaml b/tests/integration/scenarios/out_s3/config/out_s3_arrow.yaml new file mode 100644 index 00000000000..00795e3d9f1 --- /dev/null +++ b/tests/integration/scenarios/out_s3/config/out_s3_arrow.yaml @@ -0,0 +1,27 @@ +service: + flush: 1 + grace: 3 + log_level: info + http_server: on + http_port: ${FLUENT_BIT_HTTP_MONITORING_PORT} + +pipeline: + inputs: + - name: dummy + tag: out_s3 + dummy: '{"message":"hello arrow s3","source":"dummy"}' + samples: 1 + + outputs: + - name: s3 + match: out_s3 + bucket: test-bucket + region: us-east-1 + endpoint: http://127.0.0.1:${TEST_SUITE_HTTP_PORT} + use_put_object: true + total_file_size: 1M + upload_timeout: 2s + s3_key_format: /payloads/$TAG/$UUID + format: arrow + compression: zstd + store_dir: /tmp/fluent-bit-test-suite-s3-arrow diff --git a/tests/integration/scenarios/out_s3/config/out_s3_arrow_gzip_invalid.yaml b/tests/integration/scenarios/out_s3/config/out_s3_arrow_gzip_invalid.yaml new file mode 100644 index 00000000000..c029bffca8b --- /dev/null +++ b/tests/integration/scenarios/out_s3/config/out_s3_arrow_gzip_invalid.yaml @@ -0,0 +1,27 @@ +service: + flush: 1 + grace: 3 + log_level: info + http_server: on + http_port: ${FLUENT_BIT_HTTP_MONITORING_PORT} + +pipeline: + inputs: + - name: dummy + tag: out_s3 + dummy: '{"message":"hello arrow gzip s3","source":"dummy"}' + samples: 1 + + outputs: + - name: s3 + match: out_s3 + bucket: test-bucket + region: us-east-1 + endpoint: http://127.0.0.1:${TEST_SUITE_HTTP_PORT} + use_put_object: true + total_file_size: 1M + upload_timeout: 2s + s3_key_format: /payloads/$TAG/$UUID + format: arrow + compression: gzip + store_dir: /tmp/fluent-bit-test-suite-s3-arrow-gzip-invalid diff --git a/tests/integration/scenarios/out_s3/config/out_s3_arrow_none.yaml b/tests/integration/scenarios/out_s3/config/out_s3_arrow_none.yaml new file mode 100644 index 00000000000..be4946e2975 --- /dev/null +++ b/tests/integration/scenarios/out_s3/config/out_s3_arrow_none.yaml @@ -0,0 +1,27 @@ +service: + flush: 1 + grace: 3 + log_level: info + http_server: on + http_port: ${FLUENT_BIT_HTTP_MONITORING_PORT} + +pipeline: + inputs: + - name: dummy + tag: out_s3 + dummy: '{"message":"hello arrow none s3","source":"dummy"}' + samples: 1 + + outputs: + - name: s3 + match: out_s3 + bucket: test-bucket + region: us-east-1 + endpoint: http://127.0.0.1:${TEST_SUITE_HTTP_PORT} + use_put_object: true + total_file_size: 1M + upload_timeout: 2s + s3_key_format: /payloads/$TAG/$UUID + format: arrow + compression: none + store_dir: /tmp/fluent-bit-test-suite-s3-arrow-none diff --git a/tests/integration/scenarios/out_s3/config/out_s3_parquet.yaml b/tests/integration/scenarios/out_s3/config/out_s3_parquet.yaml new file mode 100644 index 00000000000..3b199b9242a --- /dev/null +++ b/tests/integration/scenarios/out_s3/config/out_s3_parquet.yaml @@ -0,0 +1,27 @@ +service: + flush: 1 + grace: 3 + log_level: info + http_server: on + http_port: ${FLUENT_BIT_HTTP_MONITORING_PORT} + +pipeline: + inputs: + - name: dummy + tag: out_s3 + dummy: '{"message":"hello parquet s3","source":"dummy"}' + samples: 1 + + outputs: + - name: s3 + match: out_s3 + bucket: test-bucket + region: us-east-1 + endpoint: http://127.0.0.1:${TEST_SUITE_HTTP_PORT} + use_put_object: true + total_file_size: 1M + upload_timeout: 2s + s3_key_format: /payloads/$TAG/$UUID + format: parquet + compression: zstd + store_dir: /tmp/fluent-bit-test-suite-s3-parquet diff --git a/tests/integration/scenarios/out_s3/config/out_s3_parquet_none.yaml b/tests/integration/scenarios/out_s3/config/out_s3_parquet_none.yaml new file mode 100644 index 00000000000..9938b32a97d --- /dev/null +++ b/tests/integration/scenarios/out_s3/config/out_s3_parquet_none.yaml @@ -0,0 +1,27 @@ +service: + flush: 1 + grace: 3 + log_level: info + http_server: on + http_port: ${FLUENT_BIT_HTTP_MONITORING_PORT} + +pipeline: + inputs: + - name: dummy + tag: out_s3 + dummy: '{"message":"hello parquet none s3","source":"dummy"}' + samples: 1 + + outputs: + - name: s3 + match: out_s3 + bucket: test-bucket + region: us-east-1 + endpoint: http://127.0.0.1:${TEST_SUITE_HTTP_PORT} + use_put_object: true + total_file_size: 1M + upload_timeout: 2s + s3_key_format: /payloads/$TAG/$UUID + format: parquet + compression: none + store_dir: /tmp/fluent-bit-test-suite-s3-parquet-none diff --git a/tests/integration/scenarios/out_s3/tests/test_out_s3_001.py b/tests/integration/scenarios/out_s3/tests/test_out_s3_001.py index cf3587b07df..c388292badc 100644 --- a/tests/integration/scenarios/out_s3/tests/test_out_s3_001.py +++ b/tests/integration/scenarios/out_s3/tests/test_out_s3_001.py @@ -140,6 +140,22 @@ def _start_or_skip_unsupported_s3_format(service, format_name): raise +def _start_or_skip_unsupported_columnar_format(service, requires_marker): + """Start the service, skipping if the columnar format support + (arrow-glib/parquet-glib) was not compiled into the Fluent Bit binary.""" + try: + service.start() + except FluentBitStartupError: + log_contents = "" + if service.service.flb and service.service.flb.log_file: + with open(service.service.flb.log_file, "r", encoding="utf-8", errors="replace") as file: + log_contents = file.read() + if requires_marker in log_contents or \ + "unknown configuration property 'format'" in log_contents: + pytest.skip("columnar format support is not compiled into this Fluent Bit binary") + raise + + def test_out_s3_put_object_uploads_json_lines_payload(): service = Service("out_s3_basic.yaml") service.start() @@ -241,3 +257,74 @@ def test_out_s3_default_retry_exhausted_action_quarantines_file(): service.stop() assert len(files) > 0 + + +def test_out_s3_format_arrow_uploads_feather_with_zstd(): + service = Service("out_s3_arrow.yaml") + _start_or_skip_unsupported_columnar_format(service, "requires arrow-glib") + request = service.wait_for_request() + service.stop() + + assert request["method"] == "PUT" + assert request["path"].startswith("/test-bucket/payloads/out_s3/") + body = request["body"] + # Arrow/Feather V2 files begin with the "ARROW1" magic. The object is the + # columnar file itself, so it must not carry a byte-level Content-Encoding. + assert body[:6] == b"ARROW1" + assert "Content-Encoding" not in request["headers"] + + +def test_out_s3_format_parquet_uploads_parquet_with_zstd(): + service = Service("out_s3_parquet.yaml") + _start_or_skip_unsupported_columnar_format(service, "requires parquet-glib") + request = service.wait_for_request() + service.stop() + + assert request["method"] == "PUT" + assert request["path"].startswith("/test-bucket/payloads/out_s3/") + body = request["body"] + # Parquet files start and end with the "PAR1" magic. Page-level zstd is + # applied inside the file, so no byte-level Content-Encoding is expected. + assert body[:4] == b"PAR1" + assert body[-4:] == b"PAR1" + assert "Content-Encoding" not in request["headers"] + + +def test_out_s3_format_parquet_compression_none_is_accepted(): + # 'compression none' must be explicitly accepted (not rejected as an + # unknown codec) and produce an uncompressed Parquet object with no + # byte-level Content-Encoding header. + service = Service("out_s3_parquet_none.yaml") + _start_or_skip_unsupported_columnar_format(service, "requires parquet-glib") + request = service.wait_for_request() + service.stop() + + assert request["method"] == "PUT" + body = request["body"] + assert body[:4] == b"PAR1" + assert body[-4:] == b"PAR1" + assert "Content-Encoding" not in request["headers"] + + +def test_out_s3_format_arrow_compression_none_is_accepted(): + # 'compression none' must be explicitly accepted (not rejected as an + # unknown codec) and produce an uncompressed Arrow/Feather object with no + # byte-level Content-Encoding header. + service = Service("out_s3_arrow_none.yaml") + _start_or_skip_unsupported_columnar_format(service, "requires arrow-glib") + request = service.wait_for_request() + service.stop() + + assert request["method"] == "PUT" + body = request["body"] + assert body[:6] == b"ARROW1" + assert "Content-Encoding" not in request["headers"] + + +def test_out_s3_format_arrow_compression_gzip_is_rejected(): + # format=arrow with compression=gzip is an invalid combination; + # validate_format_compression() must reject it at plugin init. + service = Service("out_s3_arrow_gzip_invalid.yaml") + with pytest.raises(FluentBitStartupError): + service.start() + service.stop()