diff --git a/changelog.d/gcs_parquet_encoding.feature.md b/changelog.d/gcs_parquet_encoding.feature.md new file mode 100644 index 0000000000000..e42380e753255 --- /dev/null +++ b/changelog.d/gcs_parquet_encoding.feature.md @@ -0,0 +1,3 @@ +The `gcp_cloud_storage` sink now supports encoding events in the [Apache Parquet](https://parquet.apache.org/) columnar format, matching the existing `aws_s3` sink capability. Enable it by setting `batch_encoding.codec = "parquet"` along with either `batch_encoding.schema_mode = "auto_infer"` or a `batch_encoding.schema_file` (one of the two is required). The Parquet format handles its own compression internally (configurable via `batch_encoding.compression`), so the top-level `compression` setting is bypassed, the object `Content-Type` is automatically set to `application/vnd.apache.parquet`, and the filename extension defaults to `parquet`. + +authors: shmatovd diff --git a/lib/codecs/src/encoding/format/parquet.rs b/lib/codecs/src/encoding/format/parquet.rs index b8874d9b1de90..08753b2b92d53 100644 --- a/lib/codecs/src/encoding/format/parquet.rs +++ b/lib/codecs/src/encoding/format/parquet.rs @@ -42,13 +42,13 @@ type EventsDroppedError = ComponentEventsDropped<'static, UNINTENTIONAL>; pub enum ParquetCompression { /// Zstd compression. Level must be between 1 and 21. Zstd { - /// Compression level (1–21). This is the range Vector supports; higher values compress more but are slower. + /// Compression level (1–21). This is the range Vector supports; higher values provide more compression but are slower. #[configurable(validation(range(min = 1, max = 21)))] level: u8, }, /// Gzip compression. Level must be between 1 and 9. Gzip { - /// Compression level (1–9). This is the range Vector supports; higher values compress more but are slower. + /// Compression level (1–9). This is the range Vector supports; higher values provide more compression but are slower. #[configurable(validation(range(min = 1, max = 9)))] level: u8, }, @@ -93,7 +93,7 @@ pub enum ParquetSchemaMode { Relaxed, /// Missing fields become null. Extra fields cause an error. Strict, - /// Auto infer schema based on the batch. No schema file needed. + /// Automatically infer schema based on the batch. No schema file is needed. AutoInfer, } @@ -118,7 +118,7 @@ pub struct ParquetSerializerConfig { #[configurable(derived)] pub compression: ParquetCompression, - /// Controls how events with fields not present in the schema are handled. + /// Controls how events that contain fields not present in the schema are handled. #[serde(default)] #[configurable(derived)] pub schema_mode: ParquetSchemaMode, diff --git a/src/sinks/gcp/cloud_storage.rs b/src/sinks/gcp/cloud_storage.rs index c63abdb6689f9..b1c5177201fdc 100644 --- a/src/sinks/gcp/cloud_storage.rs +++ b/src/sinks/gcp/cloud_storage.rs @@ -10,9 +10,13 @@ use indoc::indoc; use snafu::{ResultExt, Snafu}; use tower::ServiceBuilder; use uuid::Uuid; +#[cfg(feature = "codecs-parquet")] +use vector_lib::codecs::BatchEncoder; +#[cfg(feature = "codecs-parquet")] +use vector_lib::codecs::encoding::{BatchSerializerConfig, format::ParquetSerializerConfig}; use vector_lib::{ TimeZone, - codecs::encoding::Framer, + codecs::{EncoderKind, encoding::Framer}, configurable::configurable_component, event::{EventFinalizers, Finalizable}, request_metadata::RequestMetadata, @@ -60,6 +64,21 @@ impl TowerRequestConfigDefaults for GcsTowerRequestConfigDefaults { const RATE_LIMIT_NUM: u64 = 1_000; } +/// Batch encoding configuration for the `gcp_cloud_storage` sink. +#[cfg(feature = "codecs-parquet")] +#[configurable_component] +#[derive(Clone, Debug)] +#[serde(tag = "codec", rename_all = "snake_case")] +#[configurable(metadata( + docs::enum_tag_description = "The codec to use for batch encoding events." +))] +pub enum GcsBatchEncoding { + /// Encodes events in [Apache Parquet][apache_parquet] columnar format. + /// + /// [apache_parquet]: https://parquet.apache.org/ + Parquet(ParquetSerializerConfig), +} + /// Configuration for the `gcp_cloud_storage` sink. #[configurable_component(sink( "gcp_cloud_storage", @@ -151,6 +170,16 @@ pub struct GcsSinkConfig { #[serde(flatten)] encoding: EncodingConfigWithFraming, + /// Batch encoding configuration for columnar formats. + /// + /// When set, events are encoded together as a batch in a columnar format (Parquet) + /// instead of the standard per-event framing-based encoding. The columnar format handles + /// its own internal compression, so the top-level `compression` setting is bypassed. + #[cfg(feature = "codecs-parquet")] + #[configurable(derived)] + #[serde(default)] + batch_encoding: Option, + /// Compression configuration. /// /// All compression algorithms use the default compression level unless otherwise specified. @@ -238,6 +267,8 @@ fn default_config(encoding: EncodingConfigWithFraming) -> GcsSinkConfig { content_encoding: Default::default(), cache_control: Default::default(), encoding, + #[cfg(feature = "codecs-parquet")] + batch_encoding: Default::default(), compression: Compression::gzip_default(), batch: Default::default(), endpoint: Default::default(), @@ -340,7 +371,7 @@ struct RequestSettings { extension: String, time_format: String, append_uuid: bool, - encoder: (Transformer, Encoder), + encoder: (Transformer, EncoderKind), compression: Compression, tz_offset: Option, } @@ -348,7 +379,7 @@ struct RequestSettings { impl RequestBuilder<(String, Vec)> for RequestSettings { type Metadata = (String, EventFinalizers); type Events = Vec; - type Encoder = (Transformer, Encoder); + type Encoder = (Transformer, EncoderKind); type Payload = Bytes; type Request = GcsRequest; type Error = io::Error; @@ -419,20 +450,75 @@ impl RequestBuilder<(String, Vec)> for RequestSettings { impl RequestSettings { fn new(config: &GcsSinkConfig, cx: SinkContext) -> crate::Result { let transformer = config.encoding.transformer(); - let (framer, serializer) = config.encoding.build(SinkType::MessageBased)?; - let encoder = Encoder::::new(framer, serializer); + + // Determine the encoder, the effective compression, the derived + // content type, and the default filename extension. When a columnar + // `batch_encoding` (e.g. Parquet) is configured, events are encoded as + // a batch and the format handles its own compression internally, so the + // top-level `compression` setting is bypassed. + let (encoder, effective_compression, derived_content_type, default_extension): ( + EncoderKind, + Compression, + String, + Option, + ) = { + #[cfg(feature = "codecs-parquet")] + if let Some(batch_encoding) = &config.batch_encoding { + let GcsBatchEncoding::Parquet(parquet_config) = batch_encoding; + let resolved_batch_config = BatchSerializerConfig::Parquet(parquet_config.clone()); + let batch_serializer = resolved_batch_config.build_batch_serializer()?; + let batch_encoder = BatchEncoder::new(batch_serializer); + let content_type = batch_encoder + .content_type() + .unwrap_or("application/octet-stream") + .to_string(); + + if config.compression != Compression::None { + warn!( + "Top level compression setting ignored when batch_encoding set to parquet." + ); + } + + ( + EncoderKind::Batch(batch_encoder), + Compression::None, + content_type, + Some("parquet".to_string()), + ) + } else { + let (framer, serializer) = config.encoding.build(SinkType::MessageBased)?; + let encoder = Encoder::::new(framer, serializer); + let content_type = encoder.content_type().to_string(); + ( + EncoderKind::Framed(Box::new(encoder)), + config.compression, + content_type, + None, + ) + } + + #[cfg(not(feature = "codecs-parquet"))] + { + let (framer, serializer) = config.encoding.build(SinkType::MessageBased)?; + let encoder = Encoder::::new(framer, serializer); + let content_type = encoder.content_type().to_string(); + ( + EncoderKind::Framed(Box::new(encoder)), + config.compression, + content_type, + None, + ) + } + }; + let acl = config .acl .map(|acl| HeaderValue::from_str(&to_string(acl)).unwrap()); - let content_type_str = config - .content_type - .as_deref() - .unwrap_or_else(|| encoder.content_type()); - let content_type = HeaderValue::from_str(content_type_str)?; + let content_type_str = config.content_type.clone().unwrap_or(derived_content_type); + let content_type = HeaderValue::from_str(&content_type_str)?; let content_encoding = match &config.content_encoding { Some(ce) => Some(HeaderValue::from_str(ce)?), - None => config - .compression + None => effective_compression .content_encoding() .map(|ce| HeaderValue::from_str(&to_string(ce)).unwrap()), }; @@ -456,7 +542,8 @@ impl RequestSettings { let extension = config .filename_extension .clone() - .unwrap_or_else(|| config.compression.extension().into()); + .or(default_extension) + .unwrap_or_else(|| effective_compression.extension().into()); let time_format = config.filename_time_format.clone(); let append_uuid = config.filename_append_uuid; let offset = config @@ -474,7 +561,7 @@ impl RequestSettings { extension, time_format, append_uuid, - compression: config.compression, + compression: effective_compression, encoder: (transformer, encoder), tz_offset: offset, }) @@ -783,4 +870,221 @@ mod tests { // Should return an error, not panic assert!(result.is_err()); } + + /// Correct TOML shape: `batch_encoding.codec = "parquet"` with `schema_mode = "auto_infer"`. + #[cfg(feature = "codecs-parquet")] + #[test] + fn parquet_batch_encoding_correct_toml_shape() { + use vector_lib::codecs::encoding::format::{ParquetCompression, ParquetSchemaMode}; + + let config: GcsSinkConfig = toml::from_str( + r#" + bucket = "test-bucket" + compression = "none" + + [encoding] + codec = "text" + + [batch_encoding] + schema_mode = "auto_infer" + codec = "parquet" + + [batch_encoding.compression] + algorithm = "snappy" + "#, + ) + .expect("correct batch_encoding shape should parse"); + + let batch_enc = config + .batch_encoding + .expect("batch_encoding should be Some"); + let GcsBatchEncoding::Parquet(ref p) = batch_enc; + assert_eq!(p.schema_mode, ParquetSchemaMode::AutoInfer); + assert_eq!(p.compression, ParquetCompression::Snappy); + } + + #[cfg(feature = "codecs-parquet")] + fn parquet_sink_config() -> GcsSinkConfig { + use vector_lib::codecs::encoding::format::{ + ParquetCompression, ParquetSchemaMode, ParquetSerializerConfig, + }; + + let parquet_config = ParquetSerializerConfig { + schema_mode: ParquetSchemaMode::AutoInfer, + compression: ParquetCompression::Snappy, + ..Default::default() + }; + + GcsSinkConfig { + batch_encoding: Some(GcsBatchEncoding::Parquet(parquet_config)), + compression: Compression::None, + ..default_config((None::, TextSerializerConfig::default()).into()) + } + } + + /// Content-Type must be auto-detected as `application/vnd.apache.parquet` + /// when `batch_encoding` is set and `content_type` is not explicitly provided. + #[cfg(feature = "codecs-parquet")] + #[test] + fn parquet_content_type_auto_detected() { + let request_settings = request_settings(&parquet_sink_config(), SinkContext::default()); + assert_eq!( + request_settings.content_type.to_str().unwrap(), + "application/vnd.apache.parquet", + "Content-Type must be auto-detected for Parquet" + ); + } + + /// When user explicitly sets `content_type`, the auto-detection must not override it. + #[cfg(feature = "codecs-parquet")] + #[test] + fn parquet_content_type_user_override_preserved() { + let sink_config = GcsSinkConfig { + content_type: Some("application/octet-stream".to_string()), + ..parquet_sink_config() + }; + let request_settings = request_settings(&sink_config, SinkContext::default()); + assert_eq!( + request_settings.content_type.to_str().unwrap(), + "application/octet-stream", + "User-specified Content-Type must not be overridden" + ); + } + + /// The filename extension defaults to `parquet` when `batch_encoding` is set. + #[cfg(feature = "codecs-parquet")] + #[test] + fn parquet_filename_extension_default() { + let request_settings = request_settings(&parquet_sink_config(), SinkContext::default()); + assert_eq!(request_settings.extension, "parquet"); + } + + /// An explicit `filename_extension` overrides the `.parquet` default. + #[cfg(feature = "codecs-parquet")] + #[test] + fn parquet_filename_extension_user_override() { + let sink_config = GcsSinkConfig { + filename_extension: Some("pq".to_string()), + ..parquet_sink_config() + }; + let request_settings = request_settings(&sink_config, SinkContext::default()); + assert_eq!(request_settings.extension, "pq"); + } + + /// Top-level compression is bypassed when `batch_encoding` is set, since + /// Parquet handles compression internally. + #[cfg(feature = "codecs-parquet")] + #[test] + fn parquet_top_level_compression_bypassed() { + let sink_config = GcsSinkConfig { + // Even with gzip requested, Parquet encoding bypasses it. + compression: Compression::gzip_default(), + ..parquet_sink_config() + }; + let request_settings = request_settings(&sink_config, SinkContext::default()); + assert_eq!(request_settings.compression, Compression::None); + // No outer Content-Encoding since compression is bypassed. + assert!(request_settings.content_encoding.is_none()); + } + + /// Codecs other than `parquet` must be rejected at parse time, since + /// `GcsBatchEncoding` only exposes the `parquet` variant. + #[cfg(feature = "codecs-parquet")] + #[test] + fn parquet_batch_encoding_rejects_unsupported_codec() { + let err = serde_yaml::from_str::( + r#" + bucket: test-bucket + compression: none + encoding: + codec: text + batch_encoding: + codec: arrow_stream + "#, + ) + .unwrap_err(); + + assert!( + err.to_string().contains("arrow_stream"), + "expected error to mention the offending codec, got: {err}" + ); + } + + /// `schema_mode` defaults to `relaxed` when not specified. + #[cfg(feature = "codecs-parquet")] + #[test] + fn parquet_schema_mode_defaults_to_relaxed() { + use vector_lib::codecs::encoding::format::ParquetSchemaMode; + + let config: GcsSinkConfig = toml::from_str( + r#" + bucket = "test-bucket" + compression = "none" + + [encoding] + codec = "text" + + [batch_encoding] + codec = "parquet" + "#, + ) + .unwrap(); + + let GcsBatchEncoding::Parquet(p) = config.batch_encoding.unwrap(); + assert_eq!(p.schema_mode, ParquetSchemaMode::Relaxed); + } + + /// End-to-end encoding: a batch of events is encoded into a valid Parquet + /// file through the sink's request builder. Validates the Parquet magic + /// bytes, the row count, and the inferred columns — exercising the real + /// encoding path without requiring a live GCS backend. + #[cfg(feature = "codecs-parquet")] + #[test] + fn parquet_encodes_valid_file() { + use bytes::Bytes; + use parquet::file::reader::{FileReader, SerializedFileReader}; + use parquet::record::reader::RowIter; + + let request_settings = request_settings(&parquet_sink_config(), SinkContext::default()); + + let events: Vec = (0..10) + .map(|i| { + let mut log = LogEvent::from(format!("message_{i}")); + log.insert("host", format!("host_{}", i % 3)); + Event::from(log) + }) + .collect(); + + let payload = request_settings + .encode_events(events) + .expect("parquet encoding should succeed"); + let body = payload.into_payload(); + + assert!(body.len() >= 4, "Output too short to be valid Parquet"); + assert_eq!(&body[..4], b"PAR1", "Missing Parquet magic bytes"); + + let reader = + SerializedFileReader::new(Bytes::copy_from_slice(&body)).expect("Invalid Parquet file"); + let row_count = RowIter::from_file_into(Box::new(reader)).count(); + assert_eq!(row_count, 10, "Expected 10 rows in Parquet file"); + + let reader = + SerializedFileReader::new(Bytes::copy_from_slice(&body)).expect("Invalid Parquet file"); + let columns: Vec = reader + .metadata() + .file_metadata() + .schema_descr() + .columns() + .iter() + .map(|c| c.name().to_string()) + .collect(); + assert!( + columns.contains(&"message".to_string()), + "expected a `message` column, got: {columns:?}" + ); + assert!( + columns.contains(&"host".to_string()), + "expected a `host` column, got: {columns:?}" + ); + } } diff --git a/website/cue/reference/components/sinks/generated/aws_s3.cue b/website/cue/reference/components/sinks/generated/aws_s3.cue index 5627d4636698a..e02832d475b62 100644 --- a/website/cue/reference/components/sinks/generated/aws_s3.cue +++ b/website/cue/reference/components/sinks/generated/aws_s3.cue @@ -298,7 +298,7 @@ generated: components: sinks: aws_s3: configuration: { } } level: { - description: "Compression level (1–21). This is the range Vector supports; higher values compress more but are slower." + description: "Compression level (1–21). This is the range Vector supports; higher values provide more compression but are slower." relevant_when: "algorithm = \"zstd\" or algorithm = \"gzip\"" required: true type: uint: {} @@ -316,12 +316,12 @@ generated: components: sinks: aws_s3: configuration: { type: string: {} } schema_mode: { - description: "Controls how events with fields not present in the schema are handled." + description: "Controls how events that contain fields not present in the schema are handled." required: false type: string: { default: "relaxed" enum: { - auto_infer: "Auto infer schema based on the batch. No schema file needed." + auto_infer: "Automatically infer schema based on the batch. No schema file is needed." relaxed: "Missing fields become null. Extra fields are silently dropped." strict: "Missing fields become null. Extra fields cause an error." } diff --git a/website/cue/reference/components/sinks/generated/gcp_cloud_storage.cue b/website/cue/reference/components/sinks/generated/gcp_cloud_storage.cue index 90bf02472af88..79798ff8b6bed 100644 --- a/website/cue/reference/components/sinks/generated/gcp_cloud_storage.cue +++ b/website/cue/reference/components/sinks/generated/gcp_cloud_storage.cue @@ -130,6 +130,79 @@ generated: components: sinks: gcp_cloud_storage: configuration: { } } } + batch_encoding: { + description: """ + Batch encoding configuration for columnar formats. + + When set, events are encoded together as a batch in a columnar format (Parquet) + instead of the standard per-event framing-based encoding. The columnar format handles + its own internal compression, so the top-level `compression` setting is bypassed. + """ + required: false + type: object: options: { + codec: { + description: """ + Encodes events in [Apache Parquet][apache_parquet] columnar format. + + [apache_parquet]: https://parquet.apache.org/ + """ + required: true + type: string: enum: parquet: """ + Encodes events in [Apache Parquet][apache_parquet] columnar format. + + [apache_parquet]: https://parquet.apache.org/ + """ + } + compression: { + description: "Compression codec applied per column page inside the Parquet file." + required: false + type: object: options: { + algorithm: { + description: "Compression codec applied per column page inside the Parquet file." + required: false + type: string: { + default: "snappy" + enum: { + gzip: "Gzip compression. Level must be between 1 and 9." + lz4: "LZ4 raw compression" + none: "No compression" + snappy: "Snappy compression (no level)." + zstd: "Zstd compression. Level must be between 1 and 21." + } + } + } + level: { + description: "Compression level (1–21). This is the range Vector supports; higher values provide more compression but are slower." + relevant_when: "algorithm = \"zstd\" or algorithm = \"gzip\"" + required: true + type: uint: {} + } + } + } + schema_file: { + description: """ + Path to a native Parquet schema file (`.schema`). + + Required unless `schema_mode` is `auto_infer`. The file must contain a valid + Parquet message type definition. + """ + required: false + type: string: {} + } + schema_mode: { + description: "Controls how events that contain fields not present in the schema are handled." + required: false + type: string: { + default: "relaxed" + enum: { + auto_infer: "Automatically infer schema based on the batch. No schema file is needed." + relaxed: "Missing fields become null. Extra fields are silently dropped." + strict: "Missing fields become null. Extra fields cause an error." + } + } + } + } + } bucket: { description: "The GCS bucket name." required: true