diff --git a/Cargo.lock b/Cargo.lock index efab800f22094..704eb16de7aeb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1501,6 +1501,17 @@ version = "0.8.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b" +[[package]] +name = "core_affinity" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a034b3a7b624016c6e13f5df875747cc25f884156aad2abd12b6c46797971342" +dependencies = [ + "libc", + "num_cpus", + "winapi", +] + [[package]] name = "core_extensions" version = "1.5.4" @@ -1792,7 +1803,11 @@ name = "datafusion-benchmarks" version = "52.2.0" dependencies = [ "arrow", + "async-trait", + "bytes", + "chrono", "clap", + "core_affinity", "datafusion", "datafusion-common", "datafusion-proto", @@ -1810,6 +1825,7 @@ dependencies = [ "snmalloc-rs", "tokio", "tokio-util", + "url", ] [[package]] @@ -2750,7 +2766,7 @@ dependencies = [ "libc", "option-ext", "redox_users", - "windows-sys 0.59.0", + "windows-sys 0.61.2", ] [[package]] @@ -2894,7 +2910,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb" dependencies = [ "libc", - "windows-sys 0.59.0", + "windows-sys 0.61.2", ] [[package]] @@ -3330,6 +3346,12 @@ version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" +[[package]] +name = "hermit-abi" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc0fef456e4baa96da950455cd02c081ca953b141298e41db3fc7e36b1da849c" + [[package]] name = "hex" version = "0.4.3" @@ -4155,7 +4177,7 @@ version = "0.50.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7957b9740744892f114936ab4a57b3f487491bbeafaf8083688b16841a4240e5" dependencies = [ - "windows-sys 0.59.0", + "windows-sys 0.61.2", ] [[package]] @@ -4239,6 +4261,16 @@ dependencies = [ "libm", ] +[[package]] +name = "num_cpus" +version = "1.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "91df4bbde75afed763b708b7eee1e8e7651e02d97f6d5dd763e89367e957b23b" +dependencies = [ + "hermit-abi", + "libc", +] + [[package]] name = "objc2-core-foundation" version = "0.3.1" @@ -4773,7 +4805,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "343d3bd7056eda839b03204e68deff7d1b13aba7af2b2fd16890697274262ee7" dependencies = [ "heck", - "itertools 0.13.0", + "itertools 0.14.0", "log", "multimap", "petgraph", @@ -4792,7 +4824,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "27c6023962132f4b30eb4c172c91ce92d933da334c59c23cddee82358ddafb0b" dependencies = [ "anyhow", - "itertools 0.13.0", + "itertools 0.14.0", "proc-macro2", "quote", "syn 2.0.117", @@ -4894,7 +4926,7 @@ dependencies = [ "once_cell", "socket2", "tracing", - "windows-sys 0.59.0", + "windows-sys 0.60.2", ] [[package]] @@ -5261,7 +5293,7 @@ dependencies = [ "errno", "libc", "linux-raw-sys", - "windows-sys 0.59.0", + "windows-sys 0.61.2", ] [[package]] @@ -5969,7 +6001,7 @@ dependencies = [ "getrandom 0.3.4", "once_cell", "rustix", - "windows-sys 0.59.0", + "windows-sys 0.61.2", ] [[package]] @@ -6927,7 +6959,7 @@ version = "0.1.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22" dependencies = [ - "windows-sys 0.59.0", + "windows-sys 0.61.2", ] [[package]] diff --git a/benchmarks/Cargo.toml b/benchmarks/Cargo.toml index cb4a308ceb516..956fbbe138a70 100644 --- a/benchmarks/Cargo.toml +++ b/benchmarks/Cargo.toml @@ -40,7 +40,11 @@ mimalloc_extended = ["libmimalloc-sys/extended"] [dependencies] arrow = { workspace = true } +async-trait = { workspace = true } +bytes = "1" +chrono = { workspace = true } clap = { version = "4.5.60", features = ["derive"] } +core_affinity = "0.8" datafusion = { workspace = true, default-features = true } datafusion-common = { workspace = true, default-features = true } env_logger = { workspace = true } @@ -56,6 +60,7 @@ serde = { version = "1.0.228", features = ["derive"] } serde_json = { workspace = true } snmalloc-rs = { version = "0.3", optional = true } tokio = { workspace = true, features = ["rt-multi-thread", "parking_lot"] } +url = { workspace = true } tokio-util = { version = "0.7.17" } [dev-dependencies] diff --git a/benchmarks/bench.sh b/benchmarks/bench.sh index 761efa6d591a4..daa193c534802 100755 --- a/benchmarks/bench.sh +++ b/benchmarks/bench.sh @@ -42,6 +42,13 @@ DATAFUSION_DIR=${DATAFUSION_DIR:-$SCRIPT_DIR/..} DATA_DIR=${DATA_DIR:-$SCRIPT_DIR/data} CARGO_COMMAND=${CARGO_COMMAND:-"cargo run --release"} PREFER_HASH_JOIN=${PREFER_HASH_JOIN:-true} +PIN_THREADS=${PIN_THREADS:-false} + +# Build the --pin-threads flag if enabled +PIN_THREADS_FLAG="" +if [ "${PIN_THREADS}" = "true" ]; then + PIN_THREADS_FLAG="--pin-threads" +fi usage() { echo " @@ -141,6 +148,7 @@ CARGO_COMMAND command that runs the benchmark binary DATAFUSION_DIR directory to use (default $DATAFUSION_DIR) RESULTS_NAME folder where the benchmark files are stored PREFER_HASH_JOIN Prefer hash join algorithm (default true) +PIN_THREADS Pin each tokio worker thread to a distinct CPU core (default false) DATAFUSION_* Set the given datafusion configuration " exit 1 @@ -189,6 +197,7 @@ main() { echo "DATA_DIR: ${DATA_DIR}" echo "CARGO_COMMAND: ${CARGO_COMMAND}" echo "PREFER_HASH_JOIN: ${PREFER_HASH_JOIN}" + echo "PIN_THREADS: ${PIN_THREADS}" echo "***************************" case "$BENCHMARK" in all) @@ -371,6 +380,7 @@ main() { echo "RESULTS_DIR: ${RESULTS_DIR}" echo "CARGO_COMMAND: ${CARGO_COMMAND}" echo "PREFER_HASH_JOIN: ${PREFER_HASH_JOIN}" + echo "PIN_THREADS: ${PIN_THREADS}" echo "***************************" # navigate to the appropriate directory @@ -655,7 +665,7 @@ run_tpch() { echo "Running tpch benchmark..." FORMAT=$2 - debug_run $CARGO_COMMAND --bin dfbench -- tpch --iterations 5 --path "${TPCH_DIR}" --prefer_hash_join "${PREFER_HASH_JOIN}" --format ${FORMAT} -o "${RESULTS_FILE}" ${QUERY_ARG} + debug_run $CARGO_COMMAND --bin dfbench -- ${PIN_THREADS_FLAG} tpch --iterations 5 --path "${TPCH_DIR}" --prefer_hash_join "${PREFER_HASH_JOIN}" --format ${FORMAT} -o "${RESULTS_FILE}" ${QUERY_ARG} } # Runs the tpch in memory (needs tpch parquet data) @@ -671,7 +681,7 @@ run_tpch_mem() { echo "RESULTS_FILE: ${RESULTS_FILE}" echo "Running tpch_mem benchmark..." # -m means in memory - debug_run $CARGO_COMMAND --bin dfbench -- tpch --iterations 5 --path "${TPCH_DIR}" --prefer_hash_join "${PREFER_HASH_JOIN}" -m --format parquet -o "${RESULTS_FILE}" ${QUERY_ARG} + debug_run $CARGO_COMMAND --bin dfbench -- ${PIN_THREADS_FLAG} tpch --iterations 5 --path "${TPCH_DIR}" --prefer_hash_join "${PREFER_HASH_JOIN}" -m --format parquet -o "${RESULTS_FILE}" ${QUERY_ARG} } # Runs the tpcds benchmark @@ -691,7 +701,7 @@ run_tpcds() { echo "RESULTS_FILE: ${RESULTS_FILE}" echo "Running tpcds benchmark..." - debug_run $CARGO_COMMAND --bin dfbench -- tpcds --iterations 5 --path "${TPCDS_DIR}" --query_path "../datafusion/core/tests/tpc-ds" --prefer_hash_join "${PREFER_HASH_JOIN}" -o "${RESULTS_FILE}" ${QUERY_ARG} + debug_run $CARGO_COMMAND --bin dfbench -- ${PIN_THREADS_FLAG} tpcds --iterations 5 --path "${TPCDS_DIR}" --query_path "../datafusion/core/tests/tpc-ds" --prefer_hash_join "${PREFER_HASH_JOIN}" -o "${RESULTS_FILE}" ${QUERY_ARG} } # Runs the compile profile benchmark helper @@ -713,7 +723,7 @@ run_cancellation() { RESULTS_FILE="${RESULTS_DIR}/cancellation.json" echo "RESULTS_FILE: ${RESULTS_FILE}" echo "Running cancellation benchmark..." - debug_run $CARGO_COMMAND --bin dfbench -- cancellation --iterations 5 --path "${DATA_DIR}/cancellation" -o "${RESULTS_FILE}" + debug_run $CARGO_COMMAND --bin dfbench -- ${PIN_THREADS_FLAG} cancellation --iterations 5 --path "${DATA_DIR}/cancellation" -o "${RESULTS_FILE}" } @@ -767,7 +777,7 @@ run_clickbench_1() { RESULTS_FILE="${RESULTS_DIR}/clickbench_1.json" echo "RESULTS_FILE: ${RESULTS_FILE}" echo "Running clickbench (1 file) benchmark..." - debug_run $CARGO_COMMAND --bin dfbench -- clickbench --iterations 5 --path "${DATA_DIR}/hits.parquet" --queries-path "${SCRIPT_DIR}/queries/clickbench/queries" -o "${RESULTS_FILE}" ${QUERY_ARG} + debug_run $CARGO_COMMAND --bin dfbench -- ${PIN_THREADS_FLAG} clickbench --iterations 5 --path "${DATA_DIR}/hits.parquet" --queries-path "${SCRIPT_DIR}/queries/clickbench/queries" -o "${RESULTS_FILE}" ${QUERY_ARG} } # Runs the clickbench benchmark with the partitioned parquet dataset (100 files) @@ -775,7 +785,7 @@ run_clickbench_partitioned() { RESULTS_FILE="${RESULTS_DIR}/clickbench_partitioned.json" echo "RESULTS_FILE: ${RESULTS_FILE}" echo "Running clickbench (partitioned, 100 files) benchmark..." - debug_run $CARGO_COMMAND --bin dfbench -- clickbench --iterations 5 --path "${DATA_DIR}/hits_partitioned" --queries-path "${SCRIPT_DIR}/queries/clickbench/queries" -o "${RESULTS_FILE}" ${QUERY_ARG} + debug_run $CARGO_COMMAND --bin dfbench -- ${PIN_THREADS_FLAG} clickbench --iterations 5 --path "${DATA_DIR}/hits_partitioned" --queries-path "${SCRIPT_DIR}/queries/clickbench/queries" -o "${RESULTS_FILE}" ${QUERY_ARG} } @@ -784,7 +794,7 @@ run_clickbench_pushdown() { RESULTS_FILE="${RESULTS_DIR}/clickbench_pushdown.json" echo "RESULTS_FILE: ${RESULTS_FILE}" echo "Running clickbench (partitioned, 100 files) benchmark with pushdown_filters=true, reorder_filters=true..." - debug_run $CARGO_COMMAND --bin dfbench -- clickbench --pushdown --iterations 5 --path "${DATA_DIR}/hits_partitioned" --queries-path "${SCRIPT_DIR}/queries/clickbench/queries" -o "${RESULTS_FILE}" ${QUERY_ARG} + debug_run $CARGO_COMMAND --bin dfbench -- ${PIN_THREADS_FLAG} clickbench --pushdown --iterations 5 --path "${DATA_DIR}/hits_partitioned" --queries-path "${SCRIPT_DIR}/queries/clickbench/queries" -o "${RESULTS_FILE}" ${QUERY_ARG} } @@ -793,7 +803,7 @@ run_clickbench_extended() { RESULTS_FILE="${RESULTS_DIR}/clickbench_extended.json" echo "RESULTS_FILE: ${RESULTS_FILE}" echo "Running clickbench (1 file) extended benchmark..." - debug_run $CARGO_COMMAND --bin dfbench -- clickbench --iterations 5 --path "${DATA_DIR}/hits.parquet" --queries-path "${SCRIPT_DIR}/queries/clickbench/extended" -o "${RESULTS_FILE}" ${QUERY_ARG} + debug_run $CARGO_COMMAND --bin dfbench -- ${PIN_THREADS_FLAG} clickbench --iterations 5 --path "${DATA_DIR}/hits.parquet" --queries-path "${SCRIPT_DIR}/queries/clickbench/extended" -o "${RESULTS_FILE}" ${QUERY_ARG} } # Downloads the csv.gz files IMDB datasets from Peter Boncz's homepage(one of the JOB paper authors) @@ -975,7 +985,7 @@ run_h2o() { QUERY_FILE="${SCRIPT_DIR}/queries/h2o/${RUN_Type}.sql" # Run the benchmark using the dynamically constructed file path and query file - debug_run $CARGO_COMMAND --bin dfbench -- h2o \ + debug_run $CARGO_COMMAND --bin dfbench -- ${PIN_THREADS_FLAG} h2o \ --iterations 3 \ --path "${H2O_DIR}/${FILE_NAME}" \ --queries-path "${QUERY_FILE}" \ @@ -1027,7 +1037,7 @@ h2o_runner() { # Set the query file name based on the RUN_Type QUERY_FILE="${SCRIPT_DIR}/queries/h2o/${RUN_Type}.sql" - debug_run $CARGO_COMMAND --bin dfbench -- h2o \ + debug_run $CARGO_COMMAND --bin dfbench -- ${PIN_THREADS_FLAG} h2o \ --iterations 3 \ --join-paths "${H2O_DIR}/${X_TABLE_FILE_NAME},${H2O_DIR}/${SMALL_TABLE_FILE_NAME},${H2O_DIR}/${MEDIUM_TABLE_FILE_NAME},${H2O_DIR}/${LARGE_TABLE_FILE_NAME}" \ --queries-path "${QUERY_FILE}" \ @@ -1073,7 +1083,7 @@ run_sort_tpch() { echo "RESULTS_FILE: ${RESULTS_FILE}" echo "Running sort tpch benchmark..." - debug_run $CARGO_COMMAND --bin dfbench -- sort-tpch --iterations 5 --path "${TPCH_DIR}" -o "${RESULTS_FILE}" ${QUERY_ARG} + debug_run $CARGO_COMMAND --bin dfbench -- ${PIN_THREADS_FLAG} sort-tpch --iterations 5 --path "${TPCH_DIR}" -o "${RESULTS_FILE}" ${QUERY_ARG} } # Runs the sort tpch integration benchmark with limit 100 (topk) @@ -1083,7 +1093,7 @@ run_topk_tpch() { echo "RESULTS_FILE: ${RESULTS_FILE}" echo "Running topk tpch benchmark..." - $CARGO_COMMAND --bin dfbench -- sort-tpch --iterations 5 --path "${TPCH_DIR}" -o "${RESULTS_FILE}" --limit 100 ${QUERY_ARG} + $CARGO_COMMAND --bin dfbench -- ${PIN_THREADS_FLAG} sort-tpch --iterations 5 --path "${TPCH_DIR}" -o "${RESULTS_FILE}" --limit 100 ${QUERY_ARG} } # Runs the nlj benchmark @@ -1091,7 +1101,7 @@ run_nlj() { RESULTS_FILE="${RESULTS_DIR}/nlj.json" echo "RESULTS_FILE: ${RESULTS_FILE}" echo "Running nlj benchmark..." - debug_run $CARGO_COMMAND --bin dfbench -- nlj --iterations 5 -o "${RESULTS_FILE}" ${QUERY_ARG} + debug_run $CARGO_COMMAND --bin dfbench -- ${PIN_THREADS_FLAG} nlj --iterations 5 -o "${RESULTS_FILE}" ${QUERY_ARG} } # Runs the hj benchmark @@ -1100,7 +1110,7 @@ run_hj() { RESULTS_FILE="${RESULTS_DIR}/hj.json" echo "RESULTS_FILE: ${RESULTS_FILE}" echo "Running hj benchmark..." - debug_run $CARGO_COMMAND --bin dfbench -- hj --iterations 5 --path "${TPCH_DIR}" -o "${RESULTS_FILE}" ${QUERY_ARG} + debug_run $CARGO_COMMAND --bin dfbench -- ${PIN_THREADS_FLAG} hj --iterations 5 --path "${TPCH_DIR}" -o "${RESULTS_FILE}" ${QUERY_ARG} } # Runs the smj benchmark @@ -1108,7 +1118,7 @@ run_smj() { RESULTS_FILE="${RESULTS_DIR}/smj.json" echo "RESULTS_FILE: ${RESULTS_FILE}" echo "Running smj benchmark..." - debug_run $CARGO_COMMAND --bin dfbench -- smj --iterations 5 -o "${RESULTS_FILE}" ${QUERY_ARG} + debug_run $CARGO_COMMAND --bin dfbench -- ${PIN_THREADS_FLAG} smj --iterations 5 -o "${RESULTS_FILE}" ${QUERY_ARG} } @@ -1243,7 +1253,7 @@ run_clickbench_sorted() { # Run benchmark with prefer_existing_sort configuration # This allows DataFusion to optimize away redundant sorts while maintaining parallelism - debug_run $CARGO_COMMAND --bin dfbench -- clickbench \ + debug_run $CARGO_COMMAND --bin dfbench -- ${PIN_THREADS_FLAG} clickbench \ --iterations 5 \ --path "${DATA_DIR}/hits_sorted.parquet" \ --queries-path "${SCRIPT_DIR}/queries/clickbench/queries/sorted_data" \ diff --git a/benchmarks/src/bin/dfbench.rs b/benchmarks/src/bin/dfbench.rs index 7e21890519fd1..594e1e7c17464 100644 --- a/benchmarks/src/bin/dfbench.rs +++ b/benchmarks/src/bin/dfbench.rs @@ -40,6 +40,10 @@ use datafusion_benchmarks::{ #[derive(Debug, Parser)] #[command(about = "benchmark command")] struct Cli { + /// Pin each tokio worker thread to a distinct CPU core for more stable benchmarks + #[arg(long = "pin-threads")] + pin_threads: bool, + #[command(subcommand)] command: Options, } @@ -58,12 +62,7 @@ enum Options { Tpcds(tpcds::RunOpt), } -// Main benchmark runner entrypoint -#[tokio::main] -pub async fn main() -> Result<()> { - env_logger::init(); - - let cli = Cli::parse(); +async fn run_command(cli: Cli) -> Result<()> { match cli.command { Options::Cancellation(opt) => opt.run().await, Options::Clickbench(opt) => opt.run().await, @@ -77,3 +76,35 @@ pub async fn main() -> Result<()> { Options::Tpcds(opt) => Box::pin(opt.run()).await, } } + +// Main benchmark runner entrypoint +pub fn main() -> Result<()> { + env_logger::init(); + + let cli = Cli::parse(); + let pin_threads = cli.pin_threads; + + let mut builder = tokio::runtime::Builder::new_multi_thread(); + builder.enable_all(); + + if pin_threads { + let core_ids = core_affinity::get_core_ids().expect("failed to get core IDs"); + + // Spawn a dedicated IO thread per core, pinned to the same core + // as the corresponding tokio worker. + datafusion_benchmarks::same_thread_local::init_io_thread_pool(&core_ids); + datafusion_benchmarks::same_thread_local::enable_same_thread_io(); + + let core_ids = + std::sync::Arc::new(std::sync::Mutex::new(core_ids.into_iter().cycle())); + builder.on_thread_start(move || { + let core_id = core_ids.lock().unwrap().next().unwrap(); + core_affinity::set_for_current(core_id); + }); + + eprintln!("Thread pinning enabled (with per-core IO threads)"); + } + + let runtime = builder.build().expect("failed to build tokio runtime"); + runtime.block_on(run_command(cli)) +} diff --git a/benchmarks/src/lib.rs b/benchmarks/src/lib.rs index a3bc221840ada..77f66a5826ee8 100644 --- a/benchmarks/src/lib.rs +++ b/benchmarks/src/lib.rs @@ -22,6 +22,7 @@ pub mod h2o; pub mod hj; pub mod imdb; pub mod nlj; +pub mod same_thread_local; pub mod smj; pub mod sort_tpch; pub mod tpcds; diff --git a/benchmarks/src/same_thread_local.rs b/benchmarks/src/same_thread_local.rs new file mode 100644 index 0000000000000..17814f5221b9b --- /dev/null +++ b/benchmarks/src/same_thread_local.rs @@ -0,0 +1,445 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! A [`LocalFileSystem`] wrapper that performs IO on a per-core IO thread +//! instead of dispatching to tokio's blocking thread pool. +//! +//! When partition threads are pinned to specific CPU cores, using +//! `spawn_blocking` dispatches IO to a random thread on a potentially +//! different core, losing cache locality. This module provides a dedicated +//! IO thread per core that stays pinned to the same core as the requesting +//! tokio worker, so file reads share the same L2/L3 cache. + +use std::fmt::{Debug, Display, Formatter}; +use std::fs::{File, Metadata}; +use std::io::{ErrorKind, Read, Seek, SeekFrom}; +use std::ops::Range; +use std::path::PathBuf; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::time::SystemTime; + +use async_trait::async_trait; +use bytes::Bytes; +use chrono::{DateTime, Utc}; +use futures::stream::BoxStream; +use object_store::local::LocalFileSystem; +use object_store::path::Path; +use object_store::{ + Attributes, CopyOptions, GetOptions, GetResult, GetResultPayload, ListResult, + MultipartUpload, ObjectMeta, ObjectStore, PutMultipartOptions, PutOptions, + PutPayload, PutResult, RenameOptions, Result, +}; +use tokio::sync::oneshot; + +/// Global flag: when set, `runtime_env_builder()` returns a builder with +/// [`SameThreadLocalFileSystem`] instead of the default [`LocalFileSystem`]. +static USE_SAME_THREAD_IO: AtomicBool = AtomicBool::new(false); + +/// Enable same-thread IO for all subsequently created object store registries. +pub fn enable_same_thread_io() { + USE_SAME_THREAD_IO.store(true, Ordering::Relaxed); +} + +/// Returns true if same-thread IO is enabled. +pub fn is_same_thread_io_enabled() -> bool { + USE_SAME_THREAD_IO.load(Ordering::Relaxed) +} + +// --------------------------------------------------------------------------- +// Per-core IO thread pool +// --------------------------------------------------------------------------- + +type IoWork = Box; + +/// A pool of IO threads, one per core, each pinned to its respective core. +/// Work is dispatched round-robin based on the calling thread's ID so that +/// a given tokio worker always talks to the same IO thread (the one sharing +/// its core). +struct IoThreadPool { + senders: Vec>, +} + +impl IoThreadPool { + /// Spawn one IO thread per core ID, pinned to that core. + fn new(core_ids: &[core_affinity::CoreId]) -> Self { + let mut senders = Vec::with_capacity(core_ids.len()); + + for &core_id in core_ids { + let (tx, rx) = std::sync::mpsc::channel::(); + std::thread::Builder::new() + .name(format!("datafusion-io-{}", core_id.id)) + .spawn(move || { + // Pin this IO thread to the same core + core_affinity::set_for_current(core_id); + // Process work items until the channel closes + while let Ok(work) = rx.recv() { + work(); + } + }) + .expect("failed to spawn IO thread"); + + senders.push(tx); + } + + Self { senders } + } + + /// Submit work to the IO thread for the current core. + /// Uses thread ID to consistently map a tokio worker to the same IO thread. + fn submit(&self, f: F) -> oneshot::Receiver + where + F: FnOnce() -> T + Send + 'static, + T: Send + 'static, + { + let (tx, rx) = oneshot::channel(); + // Use the current thread's ID to pick an IO thread. + // This gives a stable mapping: the same tokio worker always + // goes to the same IO thread (sharing its pinned core). + let thread_id = thread_id_hash(); + let idx = thread_id % self.senders.len(); + + let work: IoWork = Box::new(move || { + let result = f(); + let _ = tx.send(result); + }); + + // If the IO thread has shut down, this will fail — but that + // only happens during process exit. + let _ = self.senders[idx].send(work); + rx + } +} + +/// Fast, stable hash of the current thread's ID. +fn thread_id_hash() -> usize { + // ThreadId doesn't expose the underlying integer, but its Debug + // output is stable within a process. Using as_u64 via transmute + // is not stable. Instead use a thread-local counter assigned at + // first access, which is cheaper than hashing. + thread_local! { + static IDX: usize = next_thread_index(); + } + IDX.with(|idx| *idx) +} + +fn next_thread_index() -> usize { + use std::sync::atomic::AtomicUsize; + static COUNTER: AtomicUsize = AtomicUsize::new(0); + COUNTER.fetch_add(1, Ordering::Relaxed) +} + +/// Global IO thread pool, initialized once. +static IO_POOL: std::sync::OnceLock = std::sync::OnceLock::new(); + +/// Initialize the global IO thread pool with one thread per core. +/// Must be called before any `SameThreadLocalFileSystem` operations. +/// The provided core IDs should match those used for the tokio workers. +pub fn init_io_thread_pool(core_ids: &[core_affinity::CoreId]) { + IO_POOL + .set(IoThreadPool::new(core_ids)) + .ok() + .expect("IO thread pool already initialized"); +} + +fn io_pool() -> &'static IoThreadPool { + IO_POOL + .get() + .expect("IO thread pool not initialized — call init_io_thread_pool first") +} + +// --------------------------------------------------------------------------- +// ObjectStore implementation +// --------------------------------------------------------------------------- + +/// A [`LocalFileSystem`] wrapper that dispatches read IO to a dedicated +/// per-core IO thread, rather than tokio's blocking thread pool. +/// +/// Each tokio worker (pinned to core N) sends IO work to IO thread N +/// (also pinned to core N), preserving L2/L3 cache locality while +/// keeping the tokio worker free to run other async tasks. +#[derive(Debug)] +pub struct SameThreadLocalFileSystem { + inner: LocalFileSystem, +} + +impl SameThreadLocalFileSystem { + pub fn new() -> Self { + Self { + inner: LocalFileSystem::new(), + } + } +} + +impl Display for SameThreadLocalFileSystem { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "SameThreadLocalFileSystem({})", self.inner) + } +} + +// --------------------------------------------------------------------------- +// Sync file helpers (replicated from object_store::local since they're +// pub(crate) there) +// --------------------------------------------------------------------------- + +fn open_file(path: &PathBuf) -> Result<(File, Metadata)> { + match File::open(path).and_then(|f| Ok((f.metadata()?, f))) { + Err(e) => Err(match e.kind() { + ErrorKind::NotFound => object_store::Error::NotFound { + path: path.to_string_lossy().to_string(), + source: e.into(), + }, + _ => object_store::Error::Generic { + store: "SameThreadLocalFileSystem", + source: Box::new(e), + }, + }), + Ok((metadata, file)) => { + if metadata.is_dir() { + Err(object_store::Error::NotFound { + path: path.to_string_lossy().to_string(), + source: std::io::Error::new(ErrorKind::NotFound, "is directory") + .into(), + }) + } else { + Ok((file, metadata)) + } + } + } +} + +fn convert_metadata(metadata: &Metadata, location: Path) -> ObjectMeta { + let last_modified: DateTime = metadata + .modified() + .expect("Modified file time should be supported on this platform") + .into(); + + #[cfg(unix)] + let inode = std::os::unix::fs::MetadataExt::ino(metadata); + #[cfg(not(unix))] + let inode = 0u64; + + let size = metadata.len(); + let mtime = metadata + .modified() + .ok() + .and_then(|mtime| mtime.duration_since(SystemTime::UNIX_EPOCH).ok()) + .unwrap_or_default() + .as_micros(); + + let e_tag = format!("{inode:x}-{mtime:x}-{size:x}"); + + ObjectMeta { + location, + last_modified, + size, + e_tag: Some(e_tag), + version: None, + } +} + +fn read_range(file: &mut File, path: &PathBuf, range: Range) -> Result { + let file_len = file + .metadata() + .map_err(|e| object_store::Error::Generic { + store: "SameThreadLocalFileSystem", + source: Box::new(e), + })? + .len(); + + if range.start >= file_len { + return Err(object_store::Error::Generic { + store: "SameThreadLocalFileSystem", + source: format!( + "Range start {} exceeds file length {} for {}", + range.start, + file_len, + path.display() + ) + .into(), + }); + } + + let to_read = range.end.min(file_len) - range.start; + + file.seek(SeekFrom::Start(range.start)).map_err(|e| { + object_store::Error::Generic { + store: "SameThreadLocalFileSystem", + source: Box::new(e), + } + })?; + + let mut buf = Vec::with_capacity(to_read as usize); + let read = file.take(to_read).read_to_end(&mut buf).map_err(|e| { + object_store::Error::Generic { + store: "SameThreadLocalFileSystem", + source: Box::new(e), + } + })? as u64; + + if read != to_read { + return Err(object_store::Error::Generic { + store: "SameThreadLocalFileSystem", + source: format!( + "Out of range for {}: expected {} bytes, got {}", + path.display(), + to_read, + read + ) + .into(), + }); + } + + Ok(buf.into()) +} + +/// Reads smaller than this are run inline via `block_in_place` for best +/// L1/L2 cache locality. Larger reads are dispatched to the per-core IO +/// thread so the tokio worker stays free. +const INLINE_IO_THRESHOLD: u64 = 1024 * 1024; // 1 MB + +// --------------------------------------------------------------------------- +// ObjectStore trait +// --------------------------------------------------------------------------- + +#[async_trait] +impl ObjectStore for SameThreadLocalFileSystem { + async fn put_opts( + &self, + location: &Path, + payload: PutPayload, + opts: PutOptions, + ) -> Result { + self.inner.put_opts(location, payload, opts).await + } + + async fn put_multipart_opts( + &self, + location: &Path, + opts: PutMultipartOptions, + ) -> Result> { + self.inner.put_multipart_opts(location, opts).await + } + + async fn get_opts(&self, location: &Path, options: GetOptions) -> Result { + // get_opts just opens a file and stats it — always small/fast, + // so run inline for best L1 cache locality. + let location = location.clone(); + let path = self.inner.path_to_filesystem(&location)?; + tokio::task::block_in_place(move || { + let (file, metadata) = open_file(&path)?; + let meta = convert_metadata(&metadata, location); + options.check_preconditions(&meta)?; + + let range = match options.range { + Some(r) => { + r.as_range(meta.size) + .map_err(|e| object_store::Error::Generic { + store: "SameThreadLocalFileSystem", + source: Box::new(e), + })? + } + None => 0..meta.size, + }; + + Ok(GetResult { + payload: GetResultPayload::File(file, path), + attributes: Attributes::default(), + range, + meta, + }) + }) + } + + async fn get_ranges( + &self, + location: &Path, + ranges: &[Range], + ) -> Result> { + let path = self.inner.path_to_filesystem(location)?; + let ranges = ranges.to_vec(); + + let total_bytes: u64 = ranges.iter().map(|r| r.end - r.start).sum(); + + if total_bytes < INLINE_IO_THRESHOLD { + // Small reads: run inline on the current worker thread for + // best L1/L2 cache locality with zero coordination overhead. + tokio::task::block_in_place(move || { + let (mut file, _) = open_file(&path)?; + ranges + .into_iter() + .map(|r| read_range(&mut file, &path, r)) + .collect() + }) + } else { + // Large reads: dispatch to the per-core IO thread so the + // tokio worker stays free for other partitions. + let rx = io_pool().submit(move || { + let (mut file, _) = open_file(&path)?; + ranges + .into_iter() + .map(|r| read_range(&mut file, &path, r)) + .collect() + }); + + rx.await.map_err(|_| object_store::Error::Generic { + store: "SameThreadLocalFileSystem", + source: "IO thread shut down".into(), + })? + } + } + + fn delete_stream( + &self, + locations: BoxStream<'static, Result>, + ) -> BoxStream<'static, Result> { + self.inner.delete_stream(locations) + } + + fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result> { + self.inner.list(prefix) + } + + fn list_with_offset( + &self, + prefix: Option<&Path>, + offset: &Path, + ) -> BoxStream<'static, Result> { + self.inner.list_with_offset(prefix, offset) + } + + async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result { + self.inner.list_with_delimiter(prefix).await + } + + async fn copy_opts( + &self, + from: &Path, + to: &Path, + options: CopyOptions, + ) -> Result<()> { + self.inner.copy_opts(from, to, options).await + } + + async fn rename_opts( + &self, + from: &Path, + to: &Path, + options: RenameOptions, + ) -> Result<()> { + self.inner.rename_opts(from, to, options).await + } +} diff --git a/benchmarks/src/util/options.rs b/benchmarks/src/util/options.rs index add8ff17fbf85..214fcadb5bb6e 100644 --- a/benchmarks/src/util/options.rs +++ b/benchmarks/src/util/options.rs @@ -22,12 +22,15 @@ use datafusion::{ execution::{ disk_manager::DiskManagerBuilder, memory_pool::{FairSpillPool, GreedyMemoryPool, MemoryPool, TrackConsumersPool}, + object_store::{DefaultObjectStoreRegistry, ObjectStoreRegistry}, runtime_env::RuntimeEnvBuilder, }, prelude::SessionConfig, }; use datafusion_common::{DataFusionError, Result}; +use crate::same_thread_local::{SameThreadLocalFileSystem, is_same_thread_io_enabled}; + // Common benchmark options (don't use doc comments otherwise this doc // shows up in help files) #[derive(Debug, Args, Clone)] @@ -90,6 +93,17 @@ impl CommonOpt { /// Return an appropriately configured `RuntimeEnvBuilder` pub fn runtime_env_builder(&self) -> Result { let mut rt_builder = RuntimeEnvBuilder::new(); + + // When same-thread IO is enabled (via --pin-threads), register a + // LocalFileSystem that does IO on the calling thread instead of + // dispatching to the blocking pool, preserving core pinning. + if is_same_thread_io_enabled() { + let registry = DefaultObjectStoreRegistry::new(); + let url = url::Url::parse("file://").expect("valid url"); + registry.register_store(&url, Arc::new(SameThreadLocalFileSystem::new())); + rt_builder = + rt_builder.with_object_store_registry(Arc::new(registry)); + } const NUM_TRACKED_CONSUMERS: usize = 5; if let Some(memory_limit) = self.memory_limit { let pool: Arc = match self.mem_pool_type.as_str() {