From 0ae378ff1688028862867bf15dbaa1a72d3e8a1d Mon Sep 17 00:00:00 2001 From: Daniel Kuschny Date: Mon, 15 Jun 2026 17:46:50 +0200 Subject: [PATCH 1/3] feat(azure_blob sink): add append blob support via blob_type option --- ...9397_azure_blob_append_blob.enhancement.md | 8 + src/sinks/azure_blob/config.rs | 84 +++- src/sinks/azure_blob/integration_tests.rs | 194 +++++++- src/sinks/azure_blob/request_builder.rs | 4 +- src/sinks/azure_blob/test.rs | 431 +++++++++++++++++- src/sinks/azure_common/config.rs | 25 + src/sinks/azure_common/service.rs | 153 ++++++- 7 files changed, 858 insertions(+), 41 deletions(-) create mode 100644 changelog.d/19397_azure_blob_append_blob.enhancement.md diff --git a/changelog.d/19397_azure_blob_append_blob.enhancement.md b/changelog.d/19397_azure_blob_append_blob.enhancement.md new file mode 100644 index 0000000000000..790e222eed21f --- /dev/null +++ b/changelog.d/19397_azure_blob_append_blob.enhancement.md @@ -0,0 +1,8 @@ +The `azure_blob` sink now supports `blob_type: append`, which writes data as Azure Append Blobs. +Unlike the default block blob mode that creates a new uniquely-named blob per batch, append mode +reuses a stable blob name and extends it on each flush — ideal for continuous log streaming +where you want a single growing file per time window. + +When `blob_type` is set to `append`, `blob_append_uuid` defaults to `false` and `blob_time_format` +defaults to `%Y-%m-%d` (daily rotation). Both can still be overridden explicitly. +The Azure hard limit of 4 MiB per `append_block` call is enforced at startup via `batch.max_bytes`. diff --git a/src/sinks/azure_blob/config.rs b/src/sinks/azure_blob/config.rs index 1311e09edd4f7..0661fdbfada72 100644 --- a/src/sinks/azure_blob/config.rs +++ b/src/sinks/azure_blob/config.rs @@ -17,7 +17,8 @@ use crate::{ Healthcheck, VectorSink, azure_common::{ self, config::AzureAuthentication, config::AzureBlobRetryLogic, - config::AzureBlobTlsConfig, service::AzureBlobService, sink::AzureBlobSink, + config::AzureBlobTlsConfig, config::AzureBlobType, service::AzureBlobService, + sink::AzureBlobSink, }, util::{ BatchConfig, BulkSizeBasedDefaultBatchSettings, Compression, ServiceBuilderExt, @@ -107,19 +108,23 @@ pub struct AzureBlobSinkConfig { /// The timestamp format for the time component of the blob key. /// - /// By default, blob keys are appended with a timestamp that reflects when the blob are sent to - /// Azure Blob Storage, such that the resulting blob key is functionally equivalent to joining + /// Blob keys are appended with a timestamp that reflects when the blob is sent to + /// Azure Blob Storage. The resulting blob key is functionally equivalent to joining /// the blob prefix with the formatted timestamp, such as `date=2022-07-18/1658176486`. /// /// This would represent a `blob_prefix` set to `date=%F/` and the timestamp of Mon Jul 18 2022 - /// 20:34:44 GMT+0000, with the `filename_time_format` being set to `%s`, which renders - /// timestamps in seconds since the Unix epoch. + /// 20:34:44 GMT+0000, with the `blob_time_format` set to `%s`, which renders timestamps in + /// seconds since the Unix epoch. /// /// Supports the common [`strftime`][chrono_strftime_specifiers] specifiers found in most /// languages. /// /// When set to an empty string, no timestamp is appended to the blob prefix. /// + /// The default value depends on `blob_type`: + /// - `block`: `%s` (Unix epoch seconds) — each batch gets a unique timestamp. + /// - `append`: `%Y-%m-%d` (ISO date) — batches within the same day share the same blob. + /// /// [chrono_strftime_specifiers]: https://docs.rs/chrono/latest/chrono/format/strftime/index.html#specifiers #[configurable(metadata(docs::syntax_override = "strftime"))] pub blob_time_format: Option, @@ -128,13 +133,37 @@ pub struct AzureBlobSinkConfig { /// /// The UUID is appended to the timestamp portion of the object key, such that if the blob key /// generated is `date=2022-07-18/1658176486`, setting this field to `true` results - /// in an blob key that looks like + /// in a blob key that looks like /// `date=2022-07-18/1658176486-30f6652c-71da-4f9f-800d-a1189c47c547`. /// - /// This ensures there are no name collisions, and can be useful in high-volume workloads where - /// blob keys must be unique. + /// The default value depends on `blob_type`: + /// - `block`: `true` — guarantees unique blob names across concurrent writers. + /// - `append`: `false` — multiple batches must share the same blob name to append to it. + /// Set to `true` only if you intentionally want each flush to target a distinct append blob. pub blob_append_uuid: Option, + /// The type of blob to use when writing to Azure Blob Storage. + /// + /// - `block` (default): a new uniquely-named blob per batch. + /// `blob_append_uuid` defaults to `true`; `blob_time_format` defaults to `%s`. + /// - `append`: each batch appends to the same blob. + /// `blob_append_uuid` defaults to `false`; `blob_time_format` defaults to `%Y-%m-%d`. + /// Multiple batches within the same time window write to the same blob. + /// + /// **Batch size limit for `append` mode**: Azure limits each `append_block` call to 4 MiB + /// (4,194,304 bytes). `batch.max_bytes` automatically defaults to `4194304` when + /// `blob_type` is `append` and the setting is not explicitly configured. + /// Setting `batch.max_bytes` above `4194304` with `blob_type: append` is an error and + /// Vector will fail to start. + /// + /// When `blob_type` is `append` and compression is enabled, each batch is compressed as an + /// independent frame and appended to the blob. The result is a series of concatenated + /// compressed frames. Use decompressors that support multi-stream decompression + /// (e.g., `gunzip`, `zstd -d`). + #[configurable(derived)] + #[serde(default)] + pub blob_type: AzureBlobType, + #[serde(flatten)] pub encoding: EncodingConfigWithFraming, @@ -184,6 +213,7 @@ impl GenerateConfig for AzureBlobSinkConfig { blob_prefix: default_blob_prefix(), blob_time_format: Some(String::from("%s")), blob_append_uuid: Some(true), + blob_type: AzureBlobType::Block, encoding: (Some(NewlineDelimitedEncoderConfig::new()), JsonSerializerConfig::default()).into(), compression: Compression::gzip_default(), batch: BatchConfig::default(), @@ -270,6 +300,9 @@ impl SinkConfig for AzureBlobSinkConfig { const DEFAULT_KEY_PREFIX: &str = "blob/%F/"; const DEFAULT_FILENAME_TIME_FORMAT: &str = "%s"; const DEFAULT_FILENAME_APPEND_UUID: bool = true; +const DEFAULT_APPEND_BLOB_TIME_FORMAT: &str = "%Y-%m-%d"; +const DEFAULT_APPEND_BLOB_APPEND_UUID: bool = false; +const APPEND_BLOB_MAX_BLOCK_BYTES: usize = 4 * 1024 * 1024; impl AzureBlobSinkConfig { pub fn build_processor(&self, client: Arc) -> crate::Result { @@ -279,16 +312,34 @@ impl AzureBlobSinkConfig { .service(AzureBlobService::new(client)); // Configure our partitioning/batching. - let batcher_settings = self.batch.into_batcher_settings()?; - + // For append blobs, if the user hasn't set max_bytes, default it to the 4 MiB + // Azure hard limit instead of inheriting the 10 MB BulkSizeBasedDefault. + // This prevents a startup failure when blob_type = append is used out of the box. + let mut batch = self.batch; + if self.blob_type == AzureBlobType::Append && batch.max_bytes.is_none() { + batch.max_bytes = Some(APPEND_BLOB_MAX_BLOCK_BYTES); + } + let validated_batch = batch.validate()?; + let validated_batch = if self.blob_type == AzureBlobType::Append { + validated_batch.limit_max_bytes(APPEND_BLOB_MAX_BLOCK_BYTES)? + } else { + validated_batch + }; + let batcher_settings = validated_batch.into_batcher_settings()?; + + let (default_append_uuid, default_time_format) = match self.blob_type { + AzureBlobType::Block => (DEFAULT_FILENAME_APPEND_UUID, DEFAULT_FILENAME_TIME_FORMAT), + AzureBlobType::Append => ( + DEFAULT_APPEND_BLOB_APPEND_UUID, + DEFAULT_APPEND_BLOB_TIME_FORMAT, + ), + }; let blob_time_format = self .blob_time_format - .as_ref() - .cloned() - .unwrap_or_else(|| DEFAULT_FILENAME_TIME_FORMAT.into()); - let blob_append_uuid = self - .blob_append_uuid - .unwrap_or(DEFAULT_FILENAME_APPEND_UUID); + .as_deref() + .unwrap_or(default_time_format) + .to_string(); + let blob_append_uuid = self.blob_append_uuid.unwrap_or(default_append_uuid); let transformer = self.encoding.transformer(); let (framer, serializer) = self.encoding.build(SinkType::MessageBased)?; @@ -298,6 +349,7 @@ impl AzureBlobSinkConfig { container_name: self.container_name.clone(), blob_time_format, blob_append_uuid, + blob_type: self.blob_type, encoder: (transformer, encoder), compression: self.compression, }; diff --git a/src/sinks/azure_blob/integration_tests.rs b/src/sinks/azure_blob/integration_tests.rs index 9703db841a062..3759b51361d4c 100644 --- a/src/sinks/azure_blob/integration_tests.rs +++ b/src/sinks/azure_blob/integration_tests.rs @@ -5,7 +5,8 @@ use azure_core::http::StatusCode; use azure_storage_blob::BlobContainerClient; use bytes::{Buf, BytesMut}; -use flate2::read::GzDecoder; +use chrono::Utc; +use flate2::read::MultiGzDecoder; use futures::{Stream, StreamExt, stream}; use vector_lib::{ ByteSizeOf, @@ -20,6 +21,7 @@ use crate::{ event::{Event, EventArray, LogEvent}, sinks::{ VectorSink, azure_common, + azure_common::config::AzureBlobType, util::{Compression, TowerRequestConfig}, }, test_util::{ @@ -243,6 +245,192 @@ async fn azure_blob_rotate_files_after_the_buffer_size_is_reached_with_oauth() { .await; } +// ── Append blob integration tests ───────────────────────────────────────────── + +/// Two sequential batches land in exactly one blob and their lines appear in order. +async fn assert_append_blob_reuses_same_blob(config: AzureBlobSinkConfig) { + let blob_prefix = format!("append/basic/{}", random_string(10)); + let config = AzureBlobSinkConfig { + blob_prefix: blob_prefix.clone().try_into().unwrap(), + blob_type: AzureBlobType::Append, + blob_time_format: Some(String::new()), // stable name — no time component + blob_append_uuid: Some(false), + ..config + }; + + let (lines1, input1) = random_lines_with_stream(100, 5, None); + let (lines2, input2) = random_lines_with_stream(100, 5, None); + + config.run_assert(input1).await; + config.run_assert(input2).await; + + let blobs = config.list_blobs(blob_prefix).await; + assert_eq!(blobs.len(), 1, "append blob mode must reuse a single blob"); + let (content_type, _content_encoding, blob_lines) = config.get_blob(blobs[0].clone()).await; + assert_eq!(content_type, Some(String::from("text/plain"))); + + let expected: Vec = lines1.into_iter().chain(lines2).collect(); + assert_eq!(blob_lines, expected); +} + +#[tokio::test] +async fn azure_blob_append_blob_reuses_same_blob() { + assert_append_blob_reuses_same_blob(AzureBlobSinkConfig::new_emulator().await).await; +} + +#[tokio::test] +async fn azure_blob_append_blob_reuses_same_blob_with_oauth() { + assert_append_blob_reuses_same_blob(AzureBlobSinkConfig::new_emulator_with_oauth().await).await; +} + +/// NDJSON append: two batches of structured events land in one blob with the correct content-type +/// and all JSON lines intact and in order. +async fn assert_append_blob_json_encoding(config: AzureBlobSinkConfig) { + let blob_prefix = format!("append/json/{}", random_string(10)); + let config = AzureBlobSinkConfig { + blob_prefix: blob_prefix.clone().try_into().unwrap(), + blob_type: AzureBlobType::Append, + blob_time_format: Some(String::new()), + blob_append_uuid: Some(false), + encoding: ( + Some(NewlineDelimitedEncoderConfig::new()), + JsonSerializerConfig::default(), + ) + .into(), + ..config + }; + + let (events1, input1) = random_events_with_stream(100, 5, None); + let (events2, input2) = random_events_with_stream(100, 5, None); + + config.run_assert(input1).await; + config.run_assert(input2).await; + + let blobs = config.list_blobs(blob_prefix).await; + assert_eq!(blobs.len(), 1, "append blob must produce exactly one blob"); + let (content_type, _content_encoding, blob_lines) = config.get_blob(blobs[0].clone()).await; + assert_eq!( + content_type, + Some(String::from("application/x-ndjson")), + "content-type must reflect NDJSON encoding" + ); + + let expected: Vec = events1 + .iter() + .chain(events2.iter()) + .map(|e| serde_json::to_string(&e.as_log().all_event_fields().unwrap()).unwrap()) + .collect(); + assert_eq!(blob_lines, expected); +} + +#[tokio::test] +async fn azure_blob_append_blob_json_encoding() { + assert_append_blob_json_encoding(AzureBlobSinkConfig::new_emulator().await).await; +} + +#[tokio::test] +async fn azure_blob_append_blob_json_encoding_with_oauth() { + assert_append_blob_json_encoding(AzureBlobSinkConfig::new_emulator_with_oauth().await).await; +} + +/// Default daily rotation: without explicit blob_time_format or blob_append_uuid overrides, +/// append blobs use `%Y-%m-%d` and no UUID — two batches both write to today's date blob. +async fn assert_append_blob_default_daily_rotation(config: AzureBlobSinkConfig) { + let blob_prefix = format!("append/daily/{}/", random_string(10)); + let config = AzureBlobSinkConfig { + blob_prefix: blob_prefix.clone().try_into().unwrap(), + blob_type: AzureBlobType::Append, + // Intentionally leave blob_time_format and blob_append_uuid at None + // to exercise the type-aware defaults in build_processor. + blob_time_format: None, + blob_append_uuid: None, + ..config + }; + + let (lines1, input1) = random_lines_with_stream(100, 5, None); + let (lines2, input2) = random_lines_with_stream(100, 5, None); + + config.run_assert(input1).await; + config.run_assert(input2).await; + + let blobs = config.list_blobs(blob_prefix.clone()).await; + assert_eq!( + blobs.len(), + 1, + "both batches must go to the same daily-rotated blob" + ); + + // The blob name must embed today's date in %Y-%m-%d format. + let today = chrono::Utc::now().format("%Y-%m-%d").to_string(); + assert!( + blobs[0].contains(&today), + "blob name '{}' must contain today's date '{}'", + blobs[0], + today + ); + + let (_content_type, _content_encoding, blob_lines) = config.get_blob(blobs[0].clone()).await; + let expected: Vec = lines1.into_iter().chain(lines2).collect(); + assert_eq!(blob_lines, expected); +} + +#[tokio::test] +async fn azure_blob_append_blob_default_daily_rotation() { + assert_append_blob_default_daily_rotation(AzureBlobSinkConfig::new_emulator().await).await; +} + +#[tokio::test] +async fn azure_blob_append_blob_default_daily_rotation_with_oauth() { + assert_append_blob_default_daily_rotation(AzureBlobSinkConfig::new_emulator_with_oauth().await) + .await; +} + +/// Forced multi-flush: a low batch.max_bytes causes Vector to flush many small blocks within a +/// single run. All blocks must land in one append blob and every line must be present. +async fn assert_append_blob_multiple_forced_flushes(config: AzureBlobSinkConfig) { + let blob_prefix = format!("append/multiflush/{}", random_string(10)); + + // Generate enough lines that at ~100 bytes each we get at least 5 forced flushes. + let line_count = 50; + let (lines, input) = random_lines_with_stream(100, line_count, None); + + // Rough per-line size: 100 bytes content + framing. Force a flush every ~3 lines. + let flush_every_n_bytes = 350; + + let mut batch = config.batch.clone(); + batch.max_bytes = Some(flush_every_n_bytes); + + let config = AzureBlobSinkConfig { + blob_prefix: blob_prefix.clone().try_into().unwrap(), + blob_type: AzureBlobType::Append, + blob_time_format: Some(String::new()), + blob_append_uuid: Some(false), + batch, + ..config + }; + + config.run_assert(input).await; + + let blobs = config.list_blobs(blob_prefix).await; + assert_eq!( + blobs.len(), + 1, + "all forced flushes must append to the same blob" + ); + let (_content_type, _content_encoding, blob_lines) = config.get_blob(blobs[0].clone()).await; + assert_eq!( + blob_lines.len(), + line_count, + "every flushed line must appear in the blob" + ); + assert_eq!(blob_lines, lines); +} + +#[tokio::test] +async fn azure_blob_append_blob_multiple_forced_flushes() { + assert_append_blob_multiple_forced_flushes(AzureBlobSinkConfig::new_emulator().await).await; +} + impl AzureBlobSinkConfig { pub async fn new_emulator() -> AzureBlobSinkConfig { let address = std::env::var("AZURITE_ADDRESS").unwrap_or_else(|_| "localhost".into()); @@ -255,6 +443,7 @@ impl AzureBlobSinkConfig { blob_prefix: Default::default(), blob_time_format: None, blob_append_uuid: None, + blob_type: Default::default(), encoding: (None::, TextSerializerConfig::default()).into(), compression: Compression::None, batch: Default::default(), @@ -279,6 +468,7 @@ impl AzureBlobSinkConfig { blob_prefix: Default::default(), blob_time_format: None, blob_append_uuid: None, + blob_type: Default::default(), encoding: (None::, TextSerializerConfig::default()).into(), compression: Compression::None, batch: Default::default(), @@ -394,7 +584,7 @@ impl AzureBlobSinkConfig { if self.compression == Compression::None { BufReader::new(body).lines().map(|l| l.unwrap()).collect() } else { - BufReader::new(GzDecoder::new(body)) + BufReader::new(MultiGzDecoder::new(body)) .lines() .map(|l| l.unwrap()) .collect() diff --git a/src/sinks/azure_blob/request_builder.rs b/src/sinks/azure_blob/request_builder.rs index 079aea91d3525..a8dbe495a303a 100644 --- a/src/sinks/azure_blob/request_builder.rs +++ b/src/sinks/azure_blob/request_builder.rs @@ -9,7 +9,7 @@ use crate::{ codecs::{Encoder, Transformer}, event::{Event, Finalizable}, sinks::{ - azure_common::config::{AzureBlobMetadata, AzureBlobRequest}, + azure_common::config::{AzureBlobMetadata, AzureBlobRequest, AzureBlobType}, util::{ Compression, RequestBuilder, metadata::RequestMetadataBuilder, request_builder::EncodeResult, @@ -22,6 +22,7 @@ pub struct AzureBlobRequestOptions { pub container_name: String, pub blob_time_format: String, pub blob_append_uuid: bool, + pub blob_type: AzureBlobType, pub encoder: (Transformer, Encoder), pub compression: Compression, } @@ -95,6 +96,7 @@ impl RequestBuilder<(String, Vec)> for AzureBlobRequestOptions { content_type: self.encoder.1.content_type(), metadata: azure_metadata, request_metadata, + blob_type: self.blob_type, } } } diff --git a/src/sinks/azure_blob/test.rs b/src/sinks/azure_blob/test.rs index 2dd6aaf69bdd4..65d5d3ec9d358 100644 --- a/src/sinks/azure_blob/test.rs +++ b/src/sinks/azure_blob/test.rs @@ -14,10 +14,10 @@ use super::{config::AzureBlobSinkConfig, request_builder::AzureBlobRequestOption use crate::{ codecs::{Encoder, EncodingConfigWithFraming}, event::{Event, LogEvent}, - sinks::azure_common::config::{AzureAuthentication, SpecificAzureCredential}, + sinks::azure_common::config::{AzureAuthentication, AzureBlobType, SpecificAzureCredential}, sinks::prelude::*, sinks::util::{ - Compression, + BatchConfig, Compression, request_builder::{EncodeResult, RequestBuilder}, }, }; @@ -32,6 +32,7 @@ fn default_config(encoding: EncodingConfigWithFraming) -> AzureBlobSinkConfig { blob_prefix: Default::default(), blob_time_format: Default::default(), blob_append_uuid: Default::default(), + blob_type: Default::default(), encoding, compression: Compression::gzip_default(), batch: Default::default(), @@ -69,6 +70,7 @@ fn azure_blob_build_request_without_compression() { container_name, blob_time_format, blob_append_uuid, + blob_type: Default::default(), encoder: ( Default::default(), Encoder::::new( @@ -117,6 +119,7 @@ fn azure_blob_build_request_with_compression() { container_name, blob_time_format, blob_append_uuid, + blob_type: Default::default(), encoder: ( Default::default(), Encoder::::new( @@ -165,6 +168,7 @@ fn azure_blob_build_request_with_time_format() { container_name, blob_time_format, blob_append_uuid, + blob_type: Default::default(), encoder: ( Default::default(), Encoder::::new( @@ -216,6 +220,7 @@ fn azure_blob_build_request_with_uuid() { container_name, blob_time_format, blob_append_uuid, + blob_type: Default::default(), encoder: ( Default::default(), Encoder::::new( @@ -554,3 +559,425 @@ async fn azure_blob_build_config_with_custom_ca_certificate() { .await .unwrap_or_else(|error| panic!("Failed to build sink: {error:?}")); } + +#[test] +fn azure_blob_build_request_append_blob_defaults() { + let log = Event::Log(LogEvent::from("test message")); + let container_name = String::from("logs"); + let sink_config = AzureBlobSinkConfig { + blob_prefix: "blob/".try_into().unwrap(), + container_name: container_name.clone(), + ..default_config((None::, TextSerializerConfig::default()).into()) + }; + + let key = sink_config + .key_partitioner() + .unwrap() + .partition(&log) + .expect("key wasn't provided"); + + let request_options = AzureBlobRequestOptions { + container_name, + blob_time_format: "%Y-%m-%d".to_string(), + blob_append_uuid: false, + blob_type: AzureBlobType::Append, + encoder: ( + Default::default(), + Encoder::::new( + NewlineDelimitedEncoder::default().into(), + TextSerializerConfig::default().build().into(), + ), + ), + compression: Compression::None, + }; + + let mut byte_size = GroupedCountByteSize::new_untagged(); + byte_size.add_event(&log, log.estimated_json_encoded_size_of()); + + let (metadata, request_metadata_builder, _events) = + request_options.split_input((key, vec![log])); + + let payload = EncodeResult::uncompressed(Bytes::new(), byte_size); + let request_metadata = request_metadata_builder.build(&payload); + let request = request_options.build_request(metadata, request_metadata, payload); + + let expected_date = Utc::now().format("%Y-%m-%d").to_string(); + assert_eq!( + request.metadata.partition_key, + format!("blob/{expected_date}.log") + ); + assert_eq!(request.blob_type, AzureBlobType::Append); +} + +#[test] +fn azure_blob_build_request_append_blob_with_compression() { + let log = Event::Log(LogEvent::from("test message")); + let container_name = String::from("logs"); + let sink_config = AzureBlobSinkConfig { + blob_prefix: "blob".try_into().unwrap(), + container_name: container_name.clone(), + ..default_config((None::, TextSerializerConfig::default()).into()) + }; + + let key = sink_config + .key_partitioner() + .unwrap() + .partition(&log) + .expect("key wasn't provided"); + + let request_options = AzureBlobRequestOptions { + container_name, + blob_time_format: "".to_string(), + blob_append_uuid: false, + blob_type: AzureBlobType::Append, + encoder: ( + Default::default(), + Encoder::::new( + NewlineDelimitedEncoder::default().into(), + TextSerializerConfig::default().build().into(), + ), + ), + compression: Compression::gzip_default(), + }; + + let mut byte_size = GroupedCountByteSize::new_untagged(); + byte_size.add_event(&log, log.estimated_json_encoded_size_of()); + + let (metadata, request_metadata_builder, _events) = + request_options.split_input((key, vec![log])); + + let payload = EncodeResult::uncompressed(Bytes::new(), byte_size); + let request_metadata = request_metadata_builder.build(&payload); + let request = request_options.build_request(metadata, request_metadata, payload); + + assert!( + request.metadata.partition_key.ends_with(".log.gz"), + "expected partition_key to end with .log.gz, got: {}", + request.metadata.partition_key + ); + assert_eq!(request.content_encoding, Some("gzip")); + assert_eq!(request.blob_type, AzureBlobType::Append); +} + +#[test] +fn azure_blob_append_blob_rejects_oversized_batch() { + // Validates that batch.validate()?.limit_max_bytes(APPEND_BLOB_MAX_BLOCK_BYTES)? + // rejects configurations that exceed the Azure 4 MiB append_block limit at startup. + let mut batch: BatchConfig = + BatchConfig::default(); + batch.max_bytes = Some(5_000_000); // 5 MB > 4 MiB limit + + let result = batch + .validate() + .and_then(|v| v.limit_max_bytes(4 * 1024 * 1024)); + assert!( + result.is_err(), + "Expected validation error when max_bytes exceeds the 4 MiB append blob limit" + ); +} + +#[test] +fn azure_blob_append_blob_accepts_batch_at_limit() { + let mut batch: BatchConfig = + BatchConfig::default(); + batch.max_bytes = Some(4 * 1024 * 1024); // exactly 4 MiB — must be accepted + + let result = batch + .validate() + .and_then(|v| v.limit_max_bytes(4 * 1024 * 1024)); + assert!( + result.is_ok(), + "Expected max_bytes equal to the limit to be accepted" + ); +} + +#[test] +fn azure_blob_block_blob_request_carries_block_type() { + let log = Event::Log(LogEvent::from("test message")); + let container_name = String::from("logs"); + let sink_config = AzureBlobSinkConfig { + blob_prefix: "blob".try_into().unwrap(), + container_name: container_name.clone(), + ..default_config((None::, TextSerializerConfig::default()).into()) + }; + + let key = sink_config + .key_partitioner() + .unwrap() + .partition(&log) + .expect("key wasn't provided"); + + let request_options = AzureBlobRequestOptions { + container_name, + blob_time_format: "".to_string(), + blob_append_uuid: false, + blob_type: AzureBlobType::Block, + encoder: ( + Default::default(), + Encoder::::new( + NewlineDelimitedEncoder::default().into(), + TextSerializerConfig::default().build().into(), + ), + ), + compression: Compression::None, + }; + + let mut byte_size = GroupedCountByteSize::new_untagged(); + byte_size.add_event(&log, log.estimated_json_encoded_size_of()); + + let (metadata, request_metadata_builder, _events) = + request_options.split_input((key, vec![log])); + let payload = EncodeResult::uncompressed(Bytes::new(), byte_size); + let request_metadata = request_metadata_builder.build(&payload); + let request = request_options.build_request(metadata, request_metadata, payload); + + assert_eq!(request.blob_type, AzureBlobType::Block); +} + +#[test] +fn azure_blob_append_blob_with_uuid_override_generates_unique_keys() { + // Even in append mode, an explicit blob_append_uuid: true produces a UUID suffix. + // This is intentional: some users may want distinct append blobs per flush. + let container_name = String::from("logs"); + let sink_config = AzureBlobSinkConfig { + blob_prefix: "blob".try_into().unwrap(), + container_name: container_name.clone(), + ..default_config((None::, TextSerializerConfig::default()).into()) + }; + + let make_key = || { + let log = Event::Log(LogEvent::from("test message")); + let key = sink_config + .key_partitioner() + .unwrap() + .partition(&log) + .expect("key wasn't provided"); + + let request_options = AzureBlobRequestOptions { + container_name: container_name.clone(), + blob_time_format: "".to_string(), + blob_append_uuid: true, // explicit override: UUID even for append type + blob_type: AzureBlobType::Append, + encoder: ( + Default::default(), + Encoder::::new( + NewlineDelimitedEncoder::default().into(), + TextSerializerConfig::default().build().into(), + ), + ), + compression: Compression::None, + }; + + let mut byte_size = GroupedCountByteSize::new_untagged(); + byte_size.add_event(&log, log.estimated_json_encoded_size_of()); + + let (metadata, request_metadata_builder, _events) = + request_options.split_input((key, vec![log])); + let payload = EncodeResult::uncompressed(Bytes::new(), byte_size); + let request_metadata = request_metadata_builder.build(&payload); + request_options + .build_request(metadata, request_metadata, payload) + .metadata + .partition_key + }; + + let key1 = make_key(); + let key2 = make_key(); + assert_ne!( + key1, key2, + "uuid override must produce unique keys per flush" + ); +} + +#[test] +fn azure_blob_append_blob_stable_name_without_uuid_and_time() { + // An append blob with empty time format and no UUID always targets the same key, + // which is the required property for append-mode continuous log streaming. + let container_name = String::from("logs"); + let sink_config = AzureBlobSinkConfig { + blob_prefix: "logs/app".try_into().unwrap(), + container_name: container_name.clone(), + ..default_config((None::, TextSerializerConfig::default()).into()) + }; + + let make_key = || { + let log = Event::Log(LogEvent::from("test message")); + let key = sink_config + .key_partitioner() + .unwrap() + .partition(&log) + .expect("key wasn't provided"); + + let request_options = AzureBlobRequestOptions { + container_name: container_name.clone(), + blob_time_format: "".to_string(), // no time component + blob_append_uuid: false, // no UUID + blob_type: AzureBlobType::Append, + encoder: ( + Default::default(), + Encoder::::new( + NewlineDelimitedEncoder::default().into(), + TextSerializerConfig::default().build().into(), + ), + ), + compression: Compression::None, + }; + + let mut byte_size = GroupedCountByteSize::new_untagged(); + byte_size.add_event(&log, log.estimated_json_encoded_size_of()); + + let (metadata, request_metadata_builder, _events) = + request_options.split_input((key, vec![log])); + let payload = EncodeResult::uncompressed(Bytes::new(), byte_size); + let request_metadata = request_metadata_builder.build(&payload); + request_options + .build_request(metadata, request_metadata, payload) + .metadata + .partition_key + }; + + let key1 = make_key(); + let key2 = make_key(); + assert_eq!( + key1, key2, + "append blob without UUID and time format must produce a stable key" + ); + assert_eq!(key1, "logs/app.log"); +} + +#[test] +fn azure_blob_append_blob_custom_time_format_hourly_rotation() { + let log = Event::Log(LogEvent::from("test message")); + let container_name = String::from("logs"); + let sink_config = AzureBlobSinkConfig { + blob_prefix: "app/".try_into().unwrap(), + container_name: container_name.clone(), + ..default_config((None::, TextSerializerConfig::default()).into()) + }; + + let key = sink_config + .key_partitioner() + .unwrap() + .partition(&log) + .expect("key wasn't provided"); + + let request_options = AzureBlobRequestOptions { + container_name, + blob_time_format: "%Y-%m-%d-%H".to_string(), // hourly rotation + blob_append_uuid: false, + blob_type: AzureBlobType::Append, + encoder: ( + Default::default(), + Encoder::::new( + NewlineDelimitedEncoder::default().into(), + TextSerializerConfig::default().build().into(), + ), + ), + compression: Compression::None, + }; + + let mut byte_size = GroupedCountByteSize::new_untagged(); + byte_size.add_event(&log, log.estimated_json_encoded_size_of()); + + let (metadata, request_metadata_builder, _events) = + request_options.split_input((key, vec![log])); + let payload = EncodeResult::uncompressed(Bytes::new(), byte_size); + let request_metadata = request_metadata_builder.build(&payload); + let request = request_options.build_request(metadata, request_metadata, payload); + + let expected = format!("app/{}.log", Utc::now().format("%Y-%m-%d-%H")); + assert_eq!(request.metadata.partition_key, expected); + assert_eq!(request.blob_type, AzureBlobType::Append); +} + +#[tokio::test] +async fn azure_blob_config_parse_blob_type_append() { + let config: AzureBlobSinkConfig = toml::from_str( + r#" + connection_string = "AccountName=mylogstorage" + container_name = "my-logs" + blob_type = "append" + + [encoding] + codec = "json" + "#, + ) + .unwrap_or_else(|e| panic!("Config parsing failed: {e:?}")); + + assert_eq!(config.blob_type, AzureBlobType::Append); +} + +#[tokio::test] +async fn azure_blob_config_default_blob_type_is_block() { + let config: AzureBlobSinkConfig = toml::from_str( + r#" + connection_string = "AccountName=mylogstorage" + container_name = "my-logs" + + [encoding] + codec = "json" + "#, + ) + .unwrap_or_else(|e| panic!("Config parsing failed: {e:?}")); + + assert_eq!( + config.blob_type, + AzureBlobType::Block, + "blob_type should default to Block when not specified" + ); +} + +#[tokio::test] +async fn azure_blob_append_blob_default_max_bytes_succeeds() { + // Without explicit batch.max_bytes, append mode defaults to 4 MiB automatically. + // build() must not fail due to the 10 MB BulkSizeBasedDefault exceeding the limit. + let config: AzureBlobSinkConfig = toml::from_str( + r#" + connection_string = "AccountName=mylogstorage" + container_name = "my-logs" + blob_type = "append" + + [encoding] + codec = "json" + "#, + ) + .unwrap_or_else(|e| panic!("Config parsing failed: {e:?}")); + + let cx = SinkContext::default(); + let _ = config + .build(cx) + .await + .unwrap_or_else(|e| panic!("build should succeed without explicit batch.max_bytes: {e:?}")); +} + +#[tokio::test] +async fn azure_blob_append_blob_explicit_oversized_batch_fails_at_startup() { + // If the user explicitly sets batch.max_bytes above the 4 MiB Azure limit, build must fail. + let config: AzureBlobSinkConfig = toml::from_str( + r#" + connection_string = "AccountName=mylogstorage" + container_name = "my-logs" + blob_type = "append" + + [encoding] + codec = "json" + + [batch] + max_bytes = 5000000 + "#, + ) + .unwrap_or_else(|e| panic!("Config parsing failed: {e:?}")); + + let cx = SinkContext::default(); + let err = match config.build(cx).await { + Err(e) => e, + Ok(_) => panic!( + "build must fail when batch.max_bytes exceeds the 4 MiB Azure append_block limit" + ), + }; + let msg = err.to_string(); + assert!( + msg.contains("max_bytes") && msg.contains("exceeds"), + "expected a max_bytes batch limit error, got: {msg}" + ); +} diff --git a/src/sinks/azure_common/config.rs b/src/sinks/azure_common/config.rs index 245517d89170b..d1135be38ec21 100644 --- a/src/sinks/azure_common/config.rs +++ b/src/sinks/azure_common/config.rs @@ -96,6 +96,30 @@ pub enum UserAssignedManagedIdentityIdType { ResourceId, } +/// The type of Azure Blob to create when writing to Azure Blob Storage. +#[configurable_component] +#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)] +#[serde(deny_unknown_fields, rename_all = "snake_case")] +pub enum AzureBlobType { + /// Stores data as block blobs. + /// + /// Each batch creates a new uniquely-named blob. Recommended for high-throughput + /// scenarios where blobs are written once and read many times. + #[default] + Block, + + /// Stores data as append blobs. + /// + /// Batches are appended to an existing blob rather than creating a new one. + /// The blob name stays stable across flushes — suited for continuous log streaming + /// where you want a single growing file per time window. + /// + /// When combined with compression, each batch is compressed as an independent frame + /// and appended to the blob. The result is a series of concatenated compressed frames. + /// Use decompressors that support multi-stream decompression (e.g., `gunzip`, `zstd -d`). + Append, +} + /// Specific Azure credential types. #[configurable_component] #[derive(Clone, Debug, Eq, PartialEq)] @@ -411,6 +435,7 @@ pub struct AzureBlobRequest { pub content_type: &'static str, pub metadata: AzureBlobMetadata, pub request_metadata: RequestMetadata, + pub blob_type: AzureBlobType, } impl Finalizable for AzureBlobRequest { diff --git a/src/sinks/azure_common/service.rs b/src/sinks/azure_common/service.rs index 31668ab85ba70..cb02ae9a7bb5a 100644 --- a/src/sinks/azure_common/service.rs +++ b/src/sinks/azure_common/service.rs @@ -4,13 +4,20 @@ use std::{ task::{Context, Poll}, }; -use azure_core::http::RequestContent; -use azure_storage_blob::{BlobContainerClient, models::BlockBlobClientUploadOptions}; +use azure_core::{ + error::ErrorKind, + http::{RequestContent, StatusCode}, +}; +use azure_storage_blob::{ + BlobClient, BlobContainerClient, + models::{AppendBlobClientCreateOptions, BlockBlobClientUploadOptions, StorageErrorCode}, +}; +use bytes::Bytes; use futures::future::BoxFuture; use tower::Service; use tracing::Instrument; -use crate::sinks::azure_common::config::{AzureBlobRequest, AzureBlobResponse}; +use crate::sinks::azure_common::config::{AzureBlobRequest, AzureBlobResponse, AzureBlobType}; #[derive(Clone)] pub struct AzureBlobService { @@ -43,23 +50,29 @@ impl Service for AzureBlobService { .client .blob_client(request.metadata.partition_key.as_str()); let byte_size = request.blob_data.len(); - let upload_options = BlockBlobClientUploadOptions { - blob_content_type: Some(request.content_type.to_string()), - blob_content_encoding: request.content_encoding.map(|e| e.to_string()), - ..Default::default() - } - .if_not_exists(); - - let result = blob_client - .upload( - RequestContent::from(request.blob_data.to_vec()), - Some(upload_options), - ) - .instrument(info_span!("request").or_current()) - .await - .map_err(|err| err.into()); - - result.map(|_resp| AzureBlobResponse { + + let result = match request.blob_type { + AzureBlobType::Block => { + upload_block_blob( + &blob_client, + request.blob_data, + request.content_type, + request.content_encoding, + ) + .await + } + AzureBlobType::Append => { + append_blob( + &blob_client.append_blob_client(), + request.blob_data, + request.content_type, + request.content_encoding, + ) + .await + } + }; + + result.map(|()| AzureBlobResponse { events_byte_size: request .request_metadata .into_events_estimated_json_encoded_byte_size(), @@ -68,3 +81,103 @@ impl Service for AzureBlobService { }) } } + +async fn upload_block_blob( + blob_client: &BlobClient, + data: Bytes, + content_type: &str, + content_encoding: Option<&str>, +) -> Result<(), Box> { + let upload_options = BlockBlobClientUploadOptions { + blob_content_type: Some(content_type.to_string()), + blob_content_encoding: content_encoding.map(str::to_string), + ..Default::default() + } + .if_not_exists(); + + blob_client + .upload(RequestContent::from(data.to_vec()), Some(upload_options)) + .instrument(info_span!("request").or_current()) + .await + .map(|_| ()) + .map_err(|e| e.into()) +} + +// Extracts the Azure storage error code string from an azure_core::Error, if present. +fn storage_error_code(e: &azure_core::Error) -> Option { + match e.kind() { + ErrorKind::HttpResponse { error_code, .. } => error_code.clone(), + _ => None, + } +} + +// Appends `data` to an existing append blob, creating the blob first if it doesn't exist. +// Uses the EAFP pattern: attempt append, create on 404, retry once. +// A 409 Conflict on create is swallowed — it means a concurrent writer created the blob first. +async fn append_blob( + append_client: &azure_storage_blob::AppendBlobClient, + data: Bytes, + content_type: &str, + content_encoding: Option<&str>, +) -> Result<(), Box> { + let data_len = data.len() as u64; + + match append_client + .append_block(RequestContent::from(data.to_vec()), data_len, None) + .instrument(info_span!("request").or_current()) + .await + { + Ok(_) => return Ok(()), + Err(e) => { + let error_code = storage_error_code(&e); + + if e.http_status() == Some(StatusCode::NotFound) { + if error_code.as_deref() == Some(StorageErrorCode::ContainerNotFound.as_ref()) { + // Container doesn't exist — this is a misconfiguration, not a missing blob. + // Propagate immediately; creating the blob would also fail. + warn!( + message = "Azure container not found when appending to blob. \ + Verify the container exists and that `container_name` is correct.", + ); + return Err(e.into()); + } + // BlobNotFound (or unrecognised 404) — first write, fall through to create. + } else { + if error_code.as_deref() == Some(StorageErrorCode::BlockCountExceedsLimit.as_ref()) + { + warn!( + message = "Azure append blob has reached the 50,000-block limit. \ + No further data can be appended to this blob. \ + Configure `blob_time_format` for time-based rotation, \ + or delete the full blob manually.", + ); + } + return Err(e.into()); + } + } + } + + let create_opts = AppendBlobClientCreateOptions { + blob_content_type: Some(content_type.to_string()), + blob_content_encoding: content_encoding.map(str::to_string), + ..Default::default() + } + .if_not_exists(); + + match append_client + .create(Some(create_opts)) + .instrument(info_span!("request").or_current()) + .await + { + Ok(_) => {} + Err(e) if e.http_status() == Some(StatusCode::Conflict) => {} // race: already created + Err(e) => return Err(e.into()), + } + + append_client + .append_block(RequestContent::from(data.to_vec()), data_len, None) + .instrument(info_span!("request").or_current()) + .await + .map(|_| ()) + .map_err(|e| e.into()) +} From e5734e634c18ebfb6cb9fcd17da43faf48c269c8 Mon Sep 17 00:00:00 2001 From: Daniel Kuschny Date: Mon, 15 Jun 2026 19:31:10 +0200 Subject: [PATCH 2/3] chore(azure_blob sink): fix lints, add changelog author, and regenerate docs --- ...9397_azure_blob_append_blob.enhancement.md | 2 + src/sinks/azure_blob/integration_tests.rs | 3 +- .../components/sinks/generated/azure_blob.cue | 65 +++++++++++++++++-- 3 files changed, 61 insertions(+), 9 deletions(-) diff --git a/changelog.d/19397_azure_blob_append_blob.enhancement.md b/changelog.d/19397_azure_blob_append_blob.enhancement.md index 790e222eed21f..4358068afb5d7 100644 --- a/changelog.d/19397_azure_blob_append_blob.enhancement.md +++ b/changelog.d/19397_azure_blob_append_blob.enhancement.md @@ -6,3 +6,5 @@ where you want a single growing file per time window. When `blob_type` is set to `append`, `blob_append_uuid` defaults to `false` and `blob_time_format` defaults to `%Y-%m-%d` (daily rotation). Both can still be overridden explicitly. The Azure hard limit of 4 MiB per `append_block` call is enforced at startup via `batch.max_bytes`. + +authors: danielku15 diff --git a/src/sinks/azure_blob/integration_tests.rs b/src/sinks/azure_blob/integration_tests.rs index 3759b51361d4c..e6b1d12d30568 100644 --- a/src/sinks/azure_blob/integration_tests.rs +++ b/src/sinks/azure_blob/integration_tests.rs @@ -5,7 +5,6 @@ use azure_core::http::StatusCode; use azure_storage_blob::BlobContainerClient; use bytes::{Buf, BytesMut}; -use chrono::Utc; use flate2::read::MultiGzDecoder; use futures::{Stream, StreamExt, stream}; use vector_lib::{ @@ -397,7 +396,7 @@ async fn assert_append_blob_multiple_forced_flushes(config: AzureBlobSinkConfig) // Rough per-line size: 100 bytes content + framing. Force a flush every ~3 lines. let flush_every_n_bytes = 350; - let mut batch = config.batch.clone(); + let mut batch = config.batch; batch.max_bytes = Some(flush_every_n_bytes); let config = AzureBlobSinkConfig { diff --git a/website/cue/reference/components/sinks/generated/azure_blob.cue b/website/cue/reference/components/sinks/generated/azure_blob.cue index 8d962c30adbdb..e51b949262166 100644 --- a/website/cue/reference/components/sinks/generated/azure_blob.cue +++ b/website/cue/reference/components/sinks/generated/azure_blob.cue @@ -192,11 +192,13 @@ generated: components: sinks: azure_blob: configuration: { The UUID is appended to the timestamp portion of the object key, such that if the blob key generated is `date=2022-07-18/1658176486`, setting this field to `true` results - in an blob key that looks like + in a blob key that looks like `date=2022-07-18/1658176486-30f6652c-71da-4f9f-800d-a1189c47c547`. - This ensures there are no name collisions, and can be useful in high-volume workloads where - blob keys must be unique. + The default value depends on `blob_type`: + - `block`: `true` — guarantees unique blob names across concurrent writers. + - `append`: `false` — multiple batches must share the same blob name to append to it. + Set to `true` only if you intentionally want each flush to target a distinct append blob. """ required: false type: bool: {} @@ -230,24 +232,73 @@ generated: components: sinks: azure_blob: configuration: { description: """ The timestamp format for the time component of the blob key. - By default, blob keys are appended with a timestamp that reflects when the blob are sent to - Azure Blob Storage, such that the resulting blob key is functionally equivalent to joining + Blob keys are appended with a timestamp that reflects when the blob is sent to + Azure Blob Storage. The resulting blob key is functionally equivalent to joining the blob prefix with the formatted timestamp, such as `date=2022-07-18/1658176486`. This would represent a `blob_prefix` set to `date=%F/` and the timestamp of Mon Jul 18 2022 - 20:34:44 GMT+0000, with the `filename_time_format` being set to `%s`, which renders - timestamps in seconds since the Unix epoch. + 20:34:44 GMT+0000, with the `blob_time_format` set to `%s`, which renders timestamps in + seconds since the Unix epoch. Supports the common [`strftime`][chrono_strftime_specifiers] specifiers found in most languages. When set to an empty string, no timestamp is appended to the blob prefix. + The default value depends on `blob_type`: + - `block`: `%s` (Unix epoch seconds) — each batch gets a unique timestamp. + - `append`: `%Y-%m-%d` (ISO date) — batches within the same day share the same blob. + [chrono_strftime_specifiers]: https://docs.rs/chrono/latest/chrono/format/strftime/index.html#specifiers """ required: false type: string: syntax: "strftime" } + blob_type: { + description: """ + The type of blob to use when writing to Azure Blob Storage. + + - `block` (default): a new uniquely-named blob per batch. + `blob_append_uuid` defaults to `true`; `blob_time_format` defaults to `%s`. + - `append`: each batch appends to the same blob. + `blob_append_uuid` defaults to `false`; `blob_time_format` defaults to `%Y-%m-%d`. + Multiple batches within the same time window write to the same blob. + + **Batch size limit for `append` mode**: Azure limits each `append_block` call to 4 MiB + (4,194,304 bytes). `batch.max_bytes` automatically defaults to `4194304` when + `blob_type` is `append` and the setting is not explicitly configured. + Setting `batch.max_bytes` above `4194304` with `blob_type: append` is an error and + Vector will fail to start. + + When `blob_type` is `append` and compression is enabled, each batch is compressed as an + independent frame and appended to the blob. The result is a series of concatenated + compressed frames. Use decompressors that support multi-stream decompression + (e.g., `gunzip`, `zstd -d`). + """ + required: false + type: string: { + default: "block" + enum: { + append: """ + Stores data as append blobs. + + Batches are appended to an existing blob rather than creating a new one. + The blob name stays stable across flushes — suited for continuous log streaming + where you want a single growing file per time window. + + When combined with compression, each batch is compressed as an independent frame + and appended to the blob. The result is a series of concatenated compressed frames. + Use decompressors that support multi-stream decompression (e.g., `gunzip`, `zstd -d`). + """ + block: """ + Stores data as block blobs. + + Each batch creates a new uniquely-named blob. Recommended for high-throughput + scenarios where blobs are written once and read many times. + """ + } + } + } compression: { description: """ Compression configuration. From 4029daa5bfc8067568cde7b6b6b0297cb466006e Mon Sep 17 00:00:00 2001 From: Daniel Kuschny Date: Tue, 16 Jun 2026 08:43:54 +0200 Subject: [PATCH 3/3] chore(azure_blob sink): address append blob review feedback (batch default, ordering, docs) --- src/sinks/azure_blob/config.rs | 56 +++++++++++++++---- src/sinks/azure_blob/test.rs | 40 +++++++++++-- .../components/sinks/generated/azure_blob.cue | 18 ++++++ 3 files changed, 99 insertions(+), 15 deletions(-) diff --git a/src/sinks/azure_blob/config.rs b/src/sinks/azure_blob/config.rs index 0661fdbfada72..6219d8256d900 100644 --- a/src/sinks/azure_blob/config.rs +++ b/src/sinks/azure_blob/config.rs @@ -22,7 +22,8 @@ use crate::{ }, util::{ BatchConfig, BulkSizeBasedDefaultBatchSettings, Compression, ServiceBuilderExt, - TowerRequestConfig, partitioner::KeyPartitioner, service::TowerRequestConfigDefaults, + SinkBatchSettings, TowerRequestConfig, partitioner::KeyPartitioner, + service::TowerRequestConfigDefaults, }, }, template::Template, @@ -61,6 +62,10 @@ pub struct AzureBlobSinkConfig { /// | Allowed services | Blob | /// | Allowed resource types | Container & Object | /// | Allowed permissions | Read & Create | + /// + /// When `blob_type` is `append`, the SAS token additionally needs the `Add` (or `Write`) + /// permission. `Read & Create` is sufficient to pass the health check and create the blob, + /// but every `Append Block` call fails with `403 Forbidden` without `Add`/`Write`. #[configurable(metadata( docs::warnings = "Access keys and SAS tokens can be used to gain unauthorized access to Azure Blob Storage \ resources. Numerous security breaches have occurred due to leaked connection strings. It is important to keep \ @@ -156,6 +161,20 @@ pub struct AzureBlobSinkConfig { /// Setting `batch.max_bytes` above `4194304` with `blob_type: append` is an error and /// Vector will fail to start. /// + /// `batch.max_bytes` is measured on the uncompressed, pre-encoding event size, while Azure + /// enforces the 4 MiB limit on the encoded (and, if enabled, compressed) request body. With + /// the default `gzip` compression the encoded body is smaller than the batched events, so the + /// 4 MiB batch limit leaves headroom. If you disable compression, encoding overhead (for + /// example JSON escaping) can push a near-limit batch over 4 MiB and Azure rejects it; lower + /// `batch.max_bytes` to leave headroom in that case. + /// + /// **Ordering and delivery for `append` mode**: appended blocks are persisted in the order + /// Azure receives the requests, so append mode pins request concurrency to 1 (unless you set a + /// fixed `request.concurrency`) to keep flushes to the same blob in order. As with all Vector + /// sinks, delivery is at-least-once: if a flush is retried after Azure already committed the + /// block (a rare server-side error after a successful write), the batch can be appended twice. + /// Set `request.retry_attempts` to `0` if you prefer at-most-once over possible duplication. + /// /// When `blob_type` is `append` and compression is enabled, each batch is compressed as an /// independent frame and appended to the blob. The result is a series of concatenated /// compressed frames. Use decompressors that support multi-stream decompression @@ -306,24 +325,39 @@ const APPEND_BLOB_MAX_BLOCK_BYTES: usize = 4 * 1024 * 1024; impl AzureBlobSinkConfig { pub fn build_processor(&self, client: Arc) -> crate::Result { - let request_limits = self.request.into_settings(); + let mut request_limits = self.request.into_settings(); + // Append blobs must be written in order: Azure orders appended blocks by the order the + // service receives them, not by event order. With the default adaptive concurrency, two + // flushes targeting the same blob can be in flight at once and land out of order. Pin + // concurrency to 1 for append mode unless the user explicitly chose a fixed value. + // (Same approach the loki sink uses for its order-sensitive modes.) + if self.blob_type == AzureBlobType::Append && request_limits.concurrency.is_none() { + request_limits.concurrency = Some(1); + } let service = ServiceBuilder::new() .settings(request_limits, AzureBlobRetryLogic) .service(AzureBlobService::new(client)); // Configure our partitioning/batching. - // For append blobs, if the user hasn't set max_bytes, default it to the 4 MiB - // Azure hard limit instead of inheriting the 10 MB BulkSizeBasedDefault. - // This prevents a startup failure when blob_type = append is used out of the box. + // Sinks that enforce a hard per-request byte limit give their `BatchConfig` a type-level + // default `MAX_BYTES` equal to that limit (see `gcp_pubsub`, `aws_kinesis`), so + // `validate()?.limit_max_bytes()?` only ever rejects *explicit* over-configuration. Block + // and append share one `batch` field here, so append inherits block's 10 MB bulk default, + // which exceeds Azure's 4 MiB per-append limit. Restore the "default == limit" property for + // append before validating, so an omitted (or partially-specified) `[batch]` table uses the + // append limit while an explicit larger value is still rejected at startup. let mut batch = self.batch; - if self.blob_type == AzureBlobType::Append && batch.max_bytes.is_none() { - batch.max_bytes = Some(APPEND_BLOB_MAX_BLOCK_BYTES); - } - let validated_batch = batch.validate()?; let validated_batch = if self.blob_type == AzureBlobType::Append { - validated_batch.limit_max_bytes(APPEND_BLOB_MAX_BLOCK_BYTES)? + if batch.max_bytes.is_none() + || batch.max_bytes == BulkSizeBasedDefaultBatchSettings::MAX_BYTES + { + batch.max_bytes = Some(APPEND_BLOB_MAX_BLOCK_BYTES); + } + batch + .validate()? + .limit_max_bytes(APPEND_BLOB_MAX_BLOCK_BYTES)? } else { - validated_batch + batch.validate()? }; let batcher_settings = validated_batch.into_batcher_settings()?; diff --git a/src/sinks/azure_blob/test.rs b/src/sinks/azure_blob/test.rs index 65d5d3ec9d358..90f90b596c3e7 100644 --- a/src/sinks/azure_blob/test.rs +++ b/src/sinks/azure_blob/test.rs @@ -599,12 +599,18 @@ fn azure_blob_build_request_append_blob_defaults() { let payload = EncodeResult::uncompressed(Bytes::new(), byte_size); let request_metadata = request_metadata_builder.build(&payload); + + // Capture the date window around `build_request`, which formats the key with its own + // `Utc::now()`. Comparing against a single later `Utc::now()` would flake if the test + // crossed a UTC midnight between the two calls, so accept either side of the boundary. + let before = Utc::now().format("%Y-%m-%d").to_string(); let request = request_options.build_request(metadata, request_metadata, payload); + let after = Utc::now().format("%Y-%m-%d").to_string(); - let expected_date = Utc::now().format("%Y-%m-%d").to_string(); - assert_eq!( - request.metadata.partition_key, - format!("blob/{expected_date}.log") + let key = &request.metadata.partition_key; + assert!( + *key == format!("blob/{before}.log") || *key == format!("blob/{after}.log"), + "partition_key {key:?} did not match the expected daily key for {before} or {after}" ); assert_eq!(request.blob_type, AzureBlobType::Append); } @@ -981,3 +987,29 @@ async fn azure_blob_append_blob_explicit_oversized_batch_fails_at_startup() { "expected a max_bytes batch limit error, got: {msg}" ); } + +#[tokio::test] +async fn azure_blob_append_blob_partial_batch_without_max_bytes_succeeds() { + // A `[batch]` table that sets another field but omits `max_bytes` still triggers the + // per-field serde default (the 10 MB bulk default). Append mode must treat that inherited + // default as "unset" and fall back to the 4 MiB append limit, rather than failing at startup. + let config: AzureBlobSinkConfig = toml::from_str( + r#" + connection_string = "AccountName=mylogstorage" + container_name = "my-logs" + blob_type = "append" + + [encoding] + codec = "json" + + [batch] + timeout_secs = 5 + "#, + ) + .unwrap_or_else(|e| panic!("Config parsing failed: {e:?}")); + + let cx = SinkContext::default(); + let _ = config.build(cx).await.unwrap_or_else(|e| { + panic!("build should succeed when [batch] omits max_bytes (only timeout_secs set): {e:?}") + }); +} diff --git a/website/cue/reference/components/sinks/generated/azure_blob.cue b/website/cue/reference/components/sinks/generated/azure_blob.cue index e51b949262166..34a2ea346c47a 100644 --- a/website/cue/reference/components/sinks/generated/azure_blob.cue +++ b/website/cue/reference/components/sinks/generated/azure_blob.cue @@ -270,6 +270,20 @@ generated: components: sinks: azure_blob: configuration: { Setting `batch.max_bytes` above `4194304` with `blob_type: append` is an error and Vector will fail to start. + `batch.max_bytes` is measured on the uncompressed, pre-encoding event size, while Azure + enforces the 4 MiB limit on the encoded (and, if enabled, compressed) request body. With + the default `gzip` compression the encoded body is smaller than the batched events, so the + 4 MiB batch limit leaves headroom. If you disable compression, encoding overhead (for + example JSON escaping) can push a near-limit batch over 4 MiB and Azure rejects it; lower + `batch.max_bytes` to leave headroom in that case. + + **Ordering and delivery for `append` mode**: appended blocks are persisted in the order + Azure receives the requests, so append mode pins request concurrency to 1 (unless you set a + fixed `request.concurrency`) to keep flushes to the same blob in order. As with all Vector + sinks, delivery is at-least-once: if a flush is retried after Azure already committed the + block (a rare server-side error after a successful write), the batch can be appended twice. + Set `request.retry_attempts` to `0` if you prefer at-most-once over possible duplication. + When `blob_type` is `append` and compression is enabled, each batch is compressed as an independent frame and appended to the blob. The result is a series of concatenated compressed frames. Use decompressors that support multi-stream decompression @@ -352,6 +366,10 @@ generated: components: sinks: azure_blob: configuration: { | Allowed services | Blob | | Allowed resource types | Container & Object | | Allowed permissions | Read & Create | + + When `blob_type` is `append`, the SAS token additionally needs the `Add` (or `Write`) + permission. `Read & Create` is sufficient to pass the health check and create the blob, + but every `Append Block` call fails with `403 Forbidden` without `Add`/`Write`. """ required: false type: string: examples: ["DefaultEndpointsProtocol=https;AccountName=mylogstorage;AccountKey=storageaccountkeybase64encoded;EndpointSuffix=core.windows.net", "BlobEndpoint=https://mylogstorage.blob.core.windows.net/;SharedAccessSignature=generatedsastoken", "AccountName=mylogstorage"]