diff --git a/changelog.d/source_chunk_size_config.feature.md b/changelog.d/source_chunk_size_config.feature.md new file mode 100644 index 0000000000000..2e4fd8a9e8921 --- /dev/null +++ b/changelog.d/source_chunk_size_config.feature.md @@ -0,0 +1,2 @@ +Add `--chunk-size` / `VECTOR_CHUNK_SIZE` to configure the source sender batch size and source output buffer base capacity (defaults to 1000 events). +authors: sakateka diff --git a/lib/vector-core/src/source_sender/builder.rs b/lib/vector-core/src/source_sender/builder.rs index 7f9f5d522497a..d0533f085e503 100644 --- a/lib/vector-core/src/source_sender/builder.rs +++ b/lib/vector-core/src/source_sender/builder.rs @@ -5,8 +5,8 @@ use vector_common::histogram; use vector_common::internal_event::DEFAULT_OUTPUT; use super::{ - CHUNK_SIZE, LAG_TIME_NAME, Output, OutputMetrics, SEND_BATCH_LATENCY_NAME, SEND_LATENCY_NAME, - SourceSender, SourceSenderItem, + LAG_TIME_NAME, Output, OutputMetrics, SEND_BATCH_LATENCY_NAME, SEND_LATENCY_NAME, SourceSender, + SourceSenderItem, chunk_size, }; use crate::config::{ComponentKey, OutputId, SourceOutput}; @@ -22,7 +22,7 @@ pub struct Builder { impl Default for Builder { fn default() -> Self { Self { - buf_size: CHUNK_SIZE, + buf_size: chunk_size(), default_output: None, named_outputs: Default::default(), output_metrics: OutputMetrics::new( diff --git a/lib/vector-core/src/source_sender/mod.rs b/lib/vector-core/src/source_sender/mod.rs index 24eb23688c0a1..d188fd1822ffe 100644 --- a/lib/vector-core/src/source_sender/mod.rs +++ b/lib/vector-core/src/source_sender/mod.rs @@ -17,7 +17,30 @@ pub use errors::SendError; use output::{Output, OutputMetrics}; pub use sender::{SourceSender, SourceSenderItem}; -pub const CHUNK_SIZE: usize = 1000; +use std::sync::atomic::{AtomicUsize, Ordering}; + +/// Default number of events batched per source send, and the base used for source output buffer +/// sizing. Used when the chunk size has not been configured at startup. +pub const DEFAULT_CHUNK_SIZE: usize = 1000; + +static CHUNK_SIZE: AtomicUsize = AtomicUsize::new(0); + +/// Returns the configured source sender chunk size, or [`DEFAULT_CHUNK_SIZE`] if unset. +#[must_use] +pub fn chunk_size() -> usize { + match CHUNK_SIZE.load(Ordering::Relaxed) { + 0 => DEFAULT_CHUNK_SIZE, + size => size, + } +} + +/// Sets the process-wide source sender chunk size. Must be called at most once, before the +/// topology is built. Panics if called more than once. +pub fn set_chunk_size(size: usize) { + CHUNK_SIZE + .compare_exchange(0, size, Ordering::Acquire, Ordering::Relaxed) + .unwrap_or_else(|_| panic!("double chunk_size initialization")); +} #[cfg(any(test, feature = "test"))] const TEST_BUFFER_SIZE: usize = 100; diff --git a/lib/vector-core/src/source_sender/output.rs b/lib/vector-core/src/source_sender/output.rs index 4593f64a3b7a9..144b724304aba 100644 --- a/lib/vector-core/src/source_sender/output.rs +++ b/lib/vector-core/src/source_sender/output.rs @@ -24,7 +24,7 @@ use vector_common::{ }; use vrl::value::Value; -use super::{CHUNK_SIZE, SendError, SourceSenderItem}; +use super::{SendError, SourceSenderItem, chunk_size}; use crate::{ EstimatedJsonEncodedSizeOf, config::{OutputId, log_schema}, @@ -246,7 +246,7 @@ impl Output { S: Stream + Unpin, E: Into + ByteSizeOf, { - let mut stream = events.ready_chunks(CHUNK_SIZE); + let mut stream = events.ready_chunks(chunk_size()); while let Some(events) = stream.next().await { self.send_batch(events).await?; } @@ -270,7 +270,7 @@ impl Output { let mut unsent_event_count = UnsentEventCount::new(events.len()); let send_batch_start = Instant::now(); - for events in array::events_into_arrays(events, Some(CHUNK_SIZE)) { + for events in array::events_into_arrays(events, Some(chunk_size())) { self.send_inner(events, &mut unsent_event_count, reference) .await .inspect_err(|error| { diff --git a/lib/vector-core/src/source_sender/tests.rs b/lib/vector-core/src/source_sender/tests.rs index 6ca446c23b7ad..a5c4d8ffd8de0 100644 --- a/lib/vector-core/src/source_sender/tests.rs +++ b/lib/vector-core/src/source_sender/tests.rs @@ -142,7 +142,7 @@ async fn emits_component_discarded_events_total_for_send_batch() { let (mut sender, _recv) = SourceSender::new_test_sender_with_options(1, None); let expected_drop = 100; - let events: Vec = (0..(CHUNK_SIZE + expected_drop)) + let events: Vec = (0..(chunk_size() + expected_drop)) .map(|_| { Event::Metric(Metric::new( "name", @@ -152,7 +152,7 @@ async fn emits_component_discarded_events_total_for_send_batch() { }) .collect(); - // `CHUNK_SIZE` events will be sent into buffer but then the future will not be polled to completion. + // `chunk_size()` events will be sent into buffer but then the future will not be polled to completion. let res = timeout( std::time::Duration::from_millis(100), sender.send_batch(events), diff --git a/src/app.rs b/src/app.rs index 565937d3dc21d..24733617060c5 100644 --- a/src/app.rs +++ b/src/app.rs @@ -229,7 +229,7 @@ impl Application { ); } - let runtime = build_runtime(opts.root.threads, "vector-worker")?; + let runtime = build_runtime(opts.root.threads, opts.root.chunk_size, "vector-worker")?; // Signal handler for OS and provider messages. let mut signals = SignalPair::new(&runtime); @@ -541,7 +541,11 @@ fn get_log_levels(default: &str) -> String { .unwrap_or_else(|_| default.into()) } -pub fn build_runtime(threads: Option, thread_name: &str) -> Result { +pub fn build_runtime( + threads: Option, + chunk_size: Option, + thread_name: &str, +) -> Result { let mut rt_builder = runtime::Builder::new_multi_thread(); rt_builder.max_blocking_threads(20_000); rt_builder.enable_all().thread_name(thread_name); @@ -556,7 +560,16 @@ pub fn build_runtime(threads: Option, thread_name: &str) -> Result, + /// Number of events batched per source send and used as the base for source output buffer sizing + /// (source output buffer capacity is this value multiplied by the number of worker threads) + #[arg(long, env = "VECTOR_CHUNK_SIZE")] + pub chunk_size: Option, + /// Enable more detailed internal logging. Repeat to increase level. Overridden by `--quiet`. #[arg(short, long, action = ArgAction::Count)] pub verbose: u8, diff --git a/src/topology/builder.rs b/src/topology/builder.rs index d325d1fc070d3..c4d65c54475ab 100644 --- a/src/topology/builder.rs +++ b/src/topology/builder.rs @@ -31,7 +31,7 @@ use vector_lib::{ internal_event::{self, CountByteSize, EventsSent, InternalEventHandle as _, Registered}, latency::LatencyRecorder, schema::Definition, - source_sender::{CHUNK_SIZE, SourceSenderItem}, + source_sender::{SourceSenderItem, chunk_size}, transform::update_runtime_schema_definition, }; use vector_lib::{gauge, internal_event::GaugeName}; @@ -72,9 +72,8 @@ static ENRICHMENT_TABLES_LOAD_LOCK: LazyLock> = LazyLock::new(Asy static METRICS_STORAGE: LazyLock = LazyLock::new(MetricsStorage::default); pub(crate) static SOURCE_SENDER_BUFFER_SIZE: LazyLock = - LazyLock::new(|| *TRANSFORM_CONCURRENCY_LIMIT * CHUNK_SIZE); + LazyLock::new(|| *TRANSFORM_CONCURRENCY_LIMIT * chunk_size()); -const READY_ARRAY_CAPACITY: NonZeroUsize = NonZeroUsize::new(CHUNK_SIZE * 4).unwrap(); pub(crate) const TOPOLOGY_BUFFER_SIZE: NonZeroUsize = NonZeroUsize::new(100).unwrap(); static TRANSFORM_CONCURRENCY_LIMIT: LazyLock = LazyLock::new(|| { @@ -1299,8 +1298,10 @@ impl Runner { .into_stream() .filter(move |events| ready(filter_events_type(events, self.input_type))); + let ready_array_capacity = + NonZeroUsize::new(chunk_size() * 4).expect("chunk size is non-zero"); let mut input_rx = - super::ready_arrays::ReadyArrays::with_capacity(input_rx, READY_ARRAY_CAPACITY); + super::ready_arrays::ReadyArrays::with_capacity(input_rx, ready_array_capacity); let mut in_flight = FuturesOrdered::new(); let mut shutting_down = false; diff --git a/website/cue/reference/cli.cue b/website/cue/reference/cli.cue index b570bd823aa77..355cb35a606b3 100644 --- a/website/cue/reference/cli.cue +++ b/website/cue/reference/cli.cue @@ -192,6 +192,12 @@ cli: { type: "integer" env_var: "VECTOR_THREADS" } + "chunk-size": { + description: env_vars.VECTOR_CHUNK_SIZE.description + default: env_vars.VECTOR_CHUNK_SIZE.type.uint.default + type: "integer" + env_var: "VECTOR_CHUNK_SIZE" + } "internal-log-rate-limit": { _short: "i" description: env_vars.VECTOR_INTERNAL_LOG_RATE_LIMIT.description @@ -654,6 +660,15 @@ cli: { unit: null } } + VECTOR_CHUNK_SIZE: { + description: """ + The number of events batched per source send and used as the base for source output buffer sizing. + """ + type: uint: { + default: 1000 + unit: "events" + } + } VECTOR_WATCH_CONFIG: { description: "Watch for changes in the configuration file and reload accordingly" type: bool: default: false