Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions changelog.d/source_chunk_size_config.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Add `--chunk-size` / `VECTOR_CHUNK_SIZE` to configure the source sender batch size and source output buffer base capacity (defaults to 1000 events).
authors: sakateka
6 changes: 3 additions & 3 deletions lib/vector-core/src/source_sender/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ use vector_common::histogram;
use vector_common::internal_event::DEFAULT_OUTPUT;

use super::{
CHUNK_SIZE, LAG_TIME_NAME, Output, OutputMetrics, SEND_BATCH_LATENCY_NAME, SEND_LATENCY_NAME,
SourceSender, SourceSenderItem,
LAG_TIME_NAME, Output, OutputMetrics, SEND_BATCH_LATENCY_NAME, SEND_LATENCY_NAME, SourceSender,
SourceSenderItem, chunk_size,
};
use crate::config::{ComponentKey, OutputId, SourceOutput};

Expand All @@ -22,7 +22,7 @@ pub struct Builder {
impl Default for Builder {
fn default() -> Self {
Self {
buf_size: CHUNK_SIZE,
buf_size: chunk_size(),
default_output: None,
named_outputs: Default::default(),
output_metrics: OutputMetrics::new(
Expand Down
25 changes: 24 additions & 1 deletion lib/vector-core/src/source_sender/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,30 @@ pub use errors::SendError;
use output::{Output, OutputMetrics};
pub use sender::{SourceSender, SourceSenderItem};

pub const CHUNK_SIZE: usize = 1000;
use std::sync::atomic::{AtomicUsize, Ordering};

/// Default number of events batched per source send, and the base used for source output buffer
/// sizing. Used when the chunk size has not been configured at startup.
pub const DEFAULT_CHUNK_SIZE: usize = 1000;

static CHUNK_SIZE: AtomicUsize = AtomicUsize::new(0);

/// Returns the configured source sender chunk size, or [`DEFAULT_CHUNK_SIZE`] if unset.
#[must_use]
pub fn chunk_size() -> usize {
match CHUNK_SIZE.load(Ordering::Relaxed) {
0 => DEFAULT_CHUNK_SIZE,
size => size,
}
}

/// Sets the process-wide source sender chunk size. Must be called at most once, before the
/// topology is built. Panics if called more than once.
pub fn set_chunk_size(size: usize) {
CHUNK_SIZE
.compare_exchange(0, size, Ordering::Acquire, Ordering::Relaxed)
.unwrap_or_else(|_| panic!("double chunk_size initialization"));
}

#[cfg(any(test, feature = "test"))]
const TEST_BUFFER_SIZE: usize = 100;
Expand Down
6 changes: 3 additions & 3 deletions lib/vector-core/src/source_sender/output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use vector_common::{
};
use vrl::value::Value;

use super::{CHUNK_SIZE, SendError, SourceSenderItem};
use super::{SendError, SourceSenderItem, chunk_size};
use crate::{
EstimatedJsonEncodedSizeOf,
config::{OutputId, log_schema},
Expand Down Expand Up @@ -246,7 +246,7 @@ impl Output {
S: Stream<Item = E> + Unpin,
E: Into<Event> + ByteSizeOf,
{
let mut stream = events.ready_chunks(CHUNK_SIZE);
let mut stream = events.ready_chunks(chunk_size());
while let Some(events) = stream.next().await {
self.send_batch(events).await?;
}
Expand All @@ -270,7 +270,7 @@ impl Output {
let mut unsent_event_count = UnsentEventCount::new(events.len());
let send_batch_start = Instant::now();

for events in array::events_into_arrays(events, Some(CHUNK_SIZE)) {
for events in array::events_into_arrays(events, Some(chunk_size())) {
self.send_inner(events, &mut unsent_event_count, reference)
.await
.inspect_err(|error| {
Expand Down
4 changes: 2 additions & 2 deletions lib/vector-core/src/source_sender/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ async fn emits_component_discarded_events_total_for_send_batch() {
let (mut sender, _recv) = SourceSender::new_test_sender_with_options(1, None);

let expected_drop = 100;
let events: Vec<Event> = (0..(CHUNK_SIZE + expected_drop))
let events: Vec<Event> = (0..(chunk_size() + expected_drop))
.map(|_| {
Event::Metric(Metric::new(
"name",
Expand All @@ -152,7 +152,7 @@ async fn emits_component_discarded_events_total_for_send_batch() {
})
.collect();

// `CHUNK_SIZE` events will be sent into buffer but then the future will not be polled to completion.
// `chunk_size()` events will be sent into buffer but then the future will not be polled to completion.
let res = timeout(
std::time::Duration::from_millis(100),
sender.send_batch(events),
Expand Down
19 changes: 16 additions & 3 deletions src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ impl Application {
);
}

let runtime = build_runtime(opts.root.threads, "vector-worker")?;
let runtime = build_runtime(opts.root.threads, opts.root.chunk_size, "vector-worker")?;

// Signal handler for OS and provider messages.
let mut signals = SignalPair::new(&runtime);
Expand Down Expand Up @@ -541,7 +541,11 @@ fn get_log_levels(default: &str) -> String {
.unwrap_or_else(|_| default.into())
}

pub fn build_runtime(threads: Option<usize>, thread_name: &str) -> Result<Runtime, ExitCode> {
pub fn build_runtime(
threads: Option<usize>,
chunk_size: Option<NonZeroUsize>,
thread_name: &str,
) -> Result<Runtime, ExitCode> {
let mut rt_builder = runtime::Builder::new_multi_thread();
rt_builder.max_blocking_threads(20_000);
rt_builder.enable_all().thread_name(thread_name);
Expand All @@ -556,7 +560,16 @@ pub fn build_runtime(threads: Option<usize>, thread_name: &str) -> Result<Runtim
.unwrap_or_else(|_| panic!("double thread initialization"));
rt_builder.worker_threads(threads);

debug!(message = "Building runtime.", worker_threads = threads);
let chunk_size = chunk_size
.map(NonZeroUsize::get)
.unwrap_or(vector_lib::source_sender::DEFAULT_CHUNK_SIZE);
vector_lib::source_sender::set_chunk_size(chunk_size);

debug!(
message = "Building runtime.",
worker_threads = threads,
chunk_size
);
Ok(rt_builder.build().expect("Unable to create async runtime"))
}

Expand Down
10 changes: 9 additions & 1 deletion src/cli.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
#![allow(missing_docs)]

use std::{num::NonZeroU64, path::PathBuf};
use std::{
num::{NonZeroU64, NonZeroUsize},
path::PathBuf,
};

use clap::{ArgAction, CommandFactory, FromArgMatches, Parser};

Expand Down Expand Up @@ -129,6 +132,11 @@ pub struct RootOpts {
#[arg(short, long, env = "VECTOR_THREADS")]
pub threads: Option<usize>,

/// Number of events batched per source send and used as the base for source output buffer sizing
/// (source output buffer capacity is this value multiplied by the number of worker threads)
#[arg(long, env = "VECTOR_CHUNK_SIZE")]
pub chunk_size: Option<NonZeroUsize>,

/// Enable more detailed internal logging. Repeat to increase level. Overridden by `--quiet`.
#[arg(short, long, action = ArgAction::Count)]
pub verbose: u8,
Expand Down
9 changes: 5 additions & 4 deletions src/topology/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use vector_lib::{
internal_event::{self, CountByteSize, EventsSent, InternalEventHandle as _, Registered},
latency::LatencyRecorder,
schema::Definition,
source_sender::{CHUNK_SIZE, SourceSenderItem},
source_sender::{SourceSenderItem, chunk_size},
transform::update_runtime_schema_definition,
};
use vector_lib::{gauge, internal_event::GaugeName};
Expand Down Expand Up @@ -72,9 +72,8 @@ static ENRICHMENT_TABLES_LOAD_LOCK: LazyLock<AsyncMutex<()>> = LazyLock::new(Asy
static METRICS_STORAGE: LazyLock<MetricsStorage> = LazyLock::new(MetricsStorage::default);

pub(crate) static SOURCE_SENDER_BUFFER_SIZE: LazyLock<usize> =
LazyLock::new(|| *TRANSFORM_CONCURRENCY_LIMIT * CHUNK_SIZE);
LazyLock::new(|| *TRANSFORM_CONCURRENCY_LIMIT * chunk_size());
Comment thread
sakateka marked this conversation as resolved.

const READY_ARRAY_CAPACITY: NonZeroUsize = NonZeroUsize::new(CHUNK_SIZE * 4).unwrap();
pub(crate) const TOPOLOGY_BUFFER_SIZE: NonZeroUsize = NonZeroUsize::new(100).unwrap();

static TRANSFORM_CONCURRENCY_LIMIT: LazyLock<usize> = LazyLock::new(|| {
Expand Down Expand Up @@ -1299,8 +1298,10 @@ impl Runner {
.into_stream()
.filter(move |events| ready(filter_events_type(events, self.input_type)));

let ready_array_capacity =
NonZeroUsize::new(chunk_size() * 4).expect("chunk size is non-zero");
let mut input_rx =
super::ready_arrays::ReadyArrays::with_capacity(input_rx, READY_ARRAY_CAPACITY);
super::ready_arrays::ReadyArrays::with_capacity(input_rx, ready_array_capacity);

let mut in_flight = FuturesOrdered::new();
let mut shutting_down = false;
Expand Down
15 changes: 15 additions & 0 deletions website/cue/reference/cli.cue
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,12 @@ cli: {
type: "integer"
env_var: "VECTOR_THREADS"
}
"chunk-size": {
description: env_vars.VECTOR_CHUNK_SIZE.description
default: env_vars.VECTOR_CHUNK_SIZE.type.uint.default
type: "integer"
env_var: "VECTOR_CHUNK_SIZE"
}
"internal-log-rate-limit": {
_short: "i"
description: env_vars.VECTOR_INTERNAL_LOG_RATE_LIMIT.description
Expand Down Expand Up @@ -654,6 +660,15 @@ cli: {
unit: null
}
}
VECTOR_CHUNK_SIZE: {
description: """
The number of events batched per source send and used as the base for source output buffer sizing.
"""
type: uint: {
default: 1000
unit: "events"
}
}
VECTOR_WATCH_CONFIG: {
description: "Watch for changes in the configuration file and reload accordingly"
type: bool: default: false
Expand Down
Loading