Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions changelog.d/19397_azure_blob_append_blob.enhancement.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
The `azure_blob` sink now supports `blob_type: append`, which writes data as Azure Append Blobs.
Unlike the default block blob mode that creates a new uniquely-named blob per batch, append mode
reuses a stable blob name and extends it on each flush — ideal for continuous log streaming
where you want a single growing file per time window.

When `blob_type` is set to `append`, `blob_append_uuid` defaults to `false` and `blob_time_format`
defaults to `%Y-%m-%d` (daily rotation). Both can still be overridden explicitly.
The Azure hard limit of 4 MiB per `append_block` call is enforced at startup via `batch.max_bytes`.

authors: danielku15
122 changes: 104 additions & 18 deletions src/sinks/azure_blob/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@ use crate::{
Healthcheck, VectorSink,
azure_common::{
self, config::AzureAuthentication, config::AzureBlobRetryLogic,
config::AzureBlobTlsConfig, service::AzureBlobService, sink::AzureBlobSink,
config::AzureBlobTlsConfig, config::AzureBlobType, service::AzureBlobService,
sink::AzureBlobSink,
},
util::{
BatchConfig, BulkSizeBasedDefaultBatchSettings, Compression, ServiceBuilderExt,
TowerRequestConfig, partitioner::KeyPartitioner, service::TowerRequestConfigDefaults,
SinkBatchSettings, TowerRequestConfig, partitioner::KeyPartitioner,
service::TowerRequestConfigDefaults,
},
},
template::Template,
Expand Down Expand Up @@ -60,6 +62,10 @@ pub struct AzureBlobSinkConfig {
/// | Allowed services | Blob |
/// | Allowed resource types | Container & Object |
/// | Allowed permissions | Read & Create |
///
/// When `blob_type` is `append`, the SAS token additionally needs the `Add` (or `Write`)
/// permission. `Read & Create` is sufficient to pass the health check and create the blob,
/// but every `Append Block` call fails with `403 Forbidden` without `Add`/`Write`.
#[configurable(metadata(
docs::warnings = "Access keys and SAS tokens can be used to gain unauthorized access to Azure Blob Storage \
resources. Numerous security breaches have occurred due to leaked connection strings. It is important to keep \
Expand Down Expand Up @@ -107,19 +113,23 @@ pub struct AzureBlobSinkConfig {

/// The timestamp format for the time component of the blob key.
///
/// By default, blob keys are appended with a timestamp that reflects when the blob are sent to
/// Azure Blob Storage, such that the resulting blob key is functionally equivalent to joining
/// Blob keys are appended with a timestamp that reflects when the blob is sent to
/// Azure Blob Storage. The resulting blob key is functionally equivalent to joining
/// the blob prefix with the formatted timestamp, such as `date=2022-07-18/1658176486`.
///
/// This would represent a `blob_prefix` set to `date=%F/` and the timestamp of Mon Jul 18 2022
/// 20:34:44 GMT+0000, with the `filename_time_format` being set to `%s`, which renders
/// timestamps in seconds since the Unix epoch.
/// 20:34:44 GMT+0000, with the `blob_time_format` set to `%s`, which renders timestamps in
/// seconds since the Unix epoch.
///
/// Supports the common [`strftime`][chrono_strftime_specifiers] specifiers found in most
/// languages.
///
/// When set to an empty string, no timestamp is appended to the blob prefix.
///
/// The default value depends on `blob_type`:
/// - `block`: `%s` (Unix epoch seconds) — each batch gets a unique timestamp.
/// - `append`: `%Y-%m-%d` (ISO date) — batches within the same day share the same blob.
///
/// [chrono_strftime_specifiers]: https://docs.rs/chrono/latest/chrono/format/strftime/index.html#specifiers
#[configurable(metadata(docs::syntax_override = "strftime"))]
pub blob_time_format: Option<String>,
Expand All @@ -128,13 +138,51 @@ pub struct AzureBlobSinkConfig {
///
/// The UUID is appended to the timestamp portion of the object key, such that if the blob key
/// generated is `date=2022-07-18/1658176486`, setting this field to `true` results
/// in an blob key that looks like
/// in a blob key that looks like
/// `date=2022-07-18/1658176486-30f6652c-71da-4f9f-800d-a1189c47c547`.
///
/// This ensures there are no name collisions, and can be useful in high-volume workloads where
/// blob keys must be unique.
/// The default value depends on `blob_type`:
/// - `block`: `true` — guarantees unique blob names across concurrent writers.
/// - `append`: `false` — multiple batches must share the same blob name to append to it.
/// Set to `true` only if you intentionally want each flush to target a distinct append blob.
pub blob_append_uuid: Option<bool>,

/// The type of blob to use when writing to Azure Blob Storage.
///
/// - `block` (default): a new uniquely-named blob per batch.
/// `blob_append_uuid` defaults to `true`; `blob_time_format` defaults to `%s`.
/// - `append`: each batch appends to the same blob.
/// `blob_append_uuid` defaults to `false`; `blob_time_format` defaults to `%Y-%m-%d`.
/// Multiple batches within the same time window write to the same blob.
Comment on lines +154 to +156

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Document SAS permissions for appends

For SAS-authenticated configs, the existing connection-string guidance only tells users to grant Read/Create permissions, which can pass healthcheck and blob creation but is insufficient for Append Block. This new append option should document that SAS tokens also need Add or Write permission; otherwise users following the component docs get 403s on every flush in append mode.

Useful? React with 👍 / 👎.

///
/// **Batch size limit for `append` mode**: Azure limits each `append_block` call to 4 MiB
/// (4,194,304 bytes). `batch.max_bytes` automatically defaults to `4194304` when
/// `blob_type` is `append` and the setting is not explicitly configured.
/// Setting `batch.max_bytes` above `4194304` with `blob_type: append` is an error and
/// Vector will fail to start.
///
/// `batch.max_bytes` is measured on the uncompressed, pre-encoding event size, while Azure
/// enforces the 4 MiB limit on the encoded (and, if enabled, compressed) request body. With
/// the default `gzip` compression the encoded body is smaller than the batched events, so the
/// 4 MiB batch limit leaves headroom. If you disable compression, encoding overhead (for
/// example JSON escaping) can push a near-limit batch over 4 MiB and Azure rejects it; lower
/// `batch.max_bytes` to leave headroom in that case.
///
/// **Ordering and delivery for `append` mode**: appended blocks are persisted in the order
/// Azure receives the requests, so append mode pins request concurrency to 1 (unless you set a
/// fixed `request.concurrency`) to keep flushes to the same blob in order. As with all Vector
/// sinks, delivery is at-least-once: if a flush is retried after Azure already committed the
/// block (a rare server-side error after a successful write), the batch can be appended twice.
/// Set `request.retry_attempts` to `0` if you prefer at-most-once over possible duplication.
///
/// When `blob_type` is `append` and compression is enabled, each batch is compressed as an
/// independent frame and appended to the blob. The result is a series of concatenated
/// compressed frames. Use decompressors that support multi-stream decompression
/// (e.g., `gunzip`, `zstd -d`).
#[configurable(derived)]
#[serde(default)]
pub blob_type: AzureBlobType,

#[serde(flatten)]
pub encoding: EncodingConfigWithFraming,

Expand Down Expand Up @@ -184,6 +232,7 @@ impl GenerateConfig for AzureBlobSinkConfig {
blob_prefix: default_blob_prefix(),
blob_time_format: Some(String::from("%s")),
blob_append_uuid: Some(true),
blob_type: AzureBlobType::Block,
encoding: (Some(NewlineDelimitedEncoderConfig::new()), JsonSerializerConfig::default()).into(),
compression: Compression::gzip_default(),
batch: BatchConfig::default(),
Expand Down Expand Up @@ -270,25 +319,61 @@ impl SinkConfig for AzureBlobSinkConfig {
const DEFAULT_KEY_PREFIX: &str = "blob/%F/";
const DEFAULT_FILENAME_TIME_FORMAT: &str = "%s";
const DEFAULT_FILENAME_APPEND_UUID: bool = true;
const DEFAULT_APPEND_BLOB_TIME_FORMAT: &str = "%Y-%m-%d";
const DEFAULT_APPEND_BLOB_APPEND_UUID: bool = false;
const APPEND_BLOB_MAX_BLOCK_BYTES: usize = 4 * 1024 * 1024;

impl AzureBlobSinkConfig {
pub fn build_processor(&self, client: Arc<BlobContainerClient>) -> crate::Result<VectorSink> {
let request_limits = self.request.into_settings();
let mut request_limits = self.request.into_settings();
// Append blobs must be written in order: Azure orders appended blocks by the order the
// service receives them, not by event order. With the default adaptive concurrency, two
// flushes targeting the same blob can be in flight at once and land out of order. Pin
// concurrency to 1 for append mode unless the user explicitly chose a fixed value.
// (Same approach the loki sink uses for its order-sensitive modes.)
if self.blob_type == AzureBlobType::Append && request_limits.concurrency.is_none() {
request_limits.concurrency = Some(1);
}
let service = ServiceBuilder::new()
.settings(request_limits, AzureBlobRetryLogic)
.service(AzureBlobService::new(client));

// Configure our partitioning/batching.
let batcher_settings = self.batch.into_batcher_settings()?;

// Sinks that enforce a hard per-request byte limit give their `BatchConfig` a type-level
// default `MAX_BYTES` equal to that limit (see `gcp_pubsub`, `aws_kinesis`), so
// `validate()?.limit_max_bytes()?` only ever rejects *explicit* over-configuration. Block
// and append share one `batch` field here, so append inherits block's 10 MB bulk default,
// which exceeds Azure's 4 MiB per-append limit. Restore the "default == limit" property for
// append before validating, so an omitted (or partially-specified) `[batch]` table uses the
// append limit while an explicit larger value is still rejected at startup.
let mut batch = self.batch;
let validated_batch = if self.blob_type == AzureBlobType::Append {
if batch.max_bytes.is_none()
|| batch.max_bytes == BulkSizeBasedDefaultBatchSettings::MAX_BYTES
{
batch.max_bytes = Some(APPEND_BLOB_MAX_BLOCK_BYTES);
}
batch
.validate()?
.limit_max_bytes(APPEND_BLOB_MAX_BLOCK_BYTES)?
} else {
batch.validate()?
};
let batcher_settings = validated_batch.into_batcher_settings()?;

let (default_append_uuid, default_time_format) = match self.blob_type {
AzureBlobType::Block => (DEFAULT_FILENAME_APPEND_UUID, DEFAULT_FILENAME_TIME_FORMAT),
AzureBlobType::Append => (
DEFAULT_APPEND_BLOB_APPEND_UUID,
DEFAULT_APPEND_BLOB_TIME_FORMAT,
),
};
let blob_time_format = self
.blob_time_format
.as_ref()
.cloned()
.unwrap_or_else(|| DEFAULT_FILENAME_TIME_FORMAT.into());
let blob_append_uuid = self
.blob_append_uuid
.unwrap_or(DEFAULT_FILENAME_APPEND_UUID);
.as_deref()
.unwrap_or(default_time_format)
.to_string();
let blob_append_uuid = self.blob_append_uuid.unwrap_or(default_append_uuid);

let transformer = self.encoding.transformer();
let (framer, serializer) = self.encoding.build(SinkType::MessageBased)?;
Expand All @@ -298,6 +383,7 @@ impl AzureBlobSinkConfig {
container_name: self.container_name.clone(),
blob_time_format,
blob_append_uuid,
blob_type: self.blob_type,
encoder: (transformer, encoder),
compression: self.compression,
};
Expand Down
Loading
Loading