diff --git a/Cargo.lock b/Cargo.lock index 6914453b3da2c..77c1e3be9764a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1793,6 +1793,7 @@ version = "52.3.0" dependencies = [ "arrow", "clap", + "criterion", "datafusion", "datafusion-common", "datafusion-proto", diff --git a/benchmarks/Cargo.toml b/benchmarks/Cargo.toml index cb4a308ceb516..444b1dc89934d 100644 --- a/benchmarks/Cargo.toml +++ b/benchmarks/Cargo.toml @@ -41,6 +41,7 @@ mimalloc_extended = ["libmimalloc-sys/extended"] [dependencies] arrow = { workspace = true } clap = { version = "4.5.60", features = ["derive"] } +criterion = { workspace = true, features = ["async_tokio"] } datafusion = { workspace = true, default-features = true } datafusion-common = { workspace = true, default-features = true } env_logger = { workspace = true } @@ -59,4 +60,41 @@ tokio = { workspace = true, features = ["rt-multi-thread", "parking_lot"] } tokio-util = { version = "0.7.17" } [dev-dependencies] +criterion = { workspace = true, features = ["async_tokio"] } datafusion-proto = { workspace = true } + +[[bench]] +harness = false +name = "tpch" + +[[bench]] +harness = false +name = "clickbench" + +[[bench]] +harness = false +name = "tpcds" + +[[bench]] +harness = false +name = "h2o" + +[[bench]] +harness = false +name = "imdb" + +[[bench]] +harness = false +name = "sort_tpch" + +[[bench]] +harness = false +name = "hj" + +[[bench]] +harness = false +name = "nlj" + +[[bench]] +harness = false +name = "smj" diff --git a/benchmarks/benches/clickbench.rs b/benchmarks/benches/clickbench.rs new file mode 100644 index 0000000000000..6c0bc56e0a610 --- /dev/null +++ b/benchmarks/benches/clickbench.rs @@ -0,0 +1,113 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use criterion::{criterion_group, criterion_main}; +use datafusion::prelude::{SessionConfig, SessionContext}; +use datafusion_benchmarks::benchmark_harness::{BenchMode, run_sql_benchmarks}; +use datafusion_benchmarks::clickbench; +use std::path::PathBuf; +use tokio::runtime::Runtime; + +fn data_path() -> String { + std::env::var("CLICKBENCH_DATA") + .unwrap_or_else(|_| "benchmarks/data/hits.parquet".to_string()) +} + +fn queries_path() -> PathBuf { + PathBuf::from("benchmarks/queries/clickbench/queries") +} + +fn make_runtime() -> Runtime { + tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .unwrap() +} + +/// SQL to create the hits view with proper EventDate casting. +const HITS_VIEW_DDL: &str = r#"CREATE VIEW hits AS +SELECT * EXCEPT ("EventDate"), + CAST(CAST("EventDate" AS INTEGER) AS DATE) AS "EventDate" +FROM hits_raw"#; + +fn register_hits(rt: &Runtime, ctx: &SessionContext, path: &str) { + rt.block_on(async { + ctx.register_parquet("hits_raw", path, Default::default()) + .await + .unwrap_or_else(|e| panic!("Failed to register hits_raw: {e}")); + ctx.sql(HITS_VIEW_DDL) + .await + .unwrap() + .collect() + .await + .unwrap(); + }); +} + +fn load_queries() -> Vec<(String, Vec)> { + let query_dir = queries_path(); + let mut queries = Vec::new(); + for id in 0..=42 { + let query_path = clickbench::get_query_path(&query_dir, id); + if let Ok(Some(sql)) = clickbench::get_query_sql(&query_path) { + queries.push((format!("q{id}"), vec![sql])); + } + } + queries +} + +fn benchmark_clickbench_warm(c: &mut criterion::Criterion) { + let rt = make_runtime(); + let queries = load_queries(); + let path = data_path(); + let make_ctx = || { + let mut config = SessionConfig::from_env().unwrap(); + config.options_mut().execution.parquet.binary_as_string = true; + let ctx = SessionContext::new_with_config(config); + register_hits(&rt, &ctx, &path); + ctx + }; + run_sql_benchmarks(c, &rt, "clickbench", BenchMode::Warm, &make_ctx, &queries); +} + +fn benchmark_clickbench_cold(c: &mut criterion::Criterion) { + let rt = make_runtime(); + let queries = load_queries(); + let path = data_path(); + let make_ctx = || { + let mut config = SessionConfig::from_env().unwrap(); + config.options_mut().execution.parquet.binary_as_string = true; + let ctx = SessionContext::new_with_config(config); + register_hits(&rt, &ctx, &path); + ctx + }; + run_sql_benchmarks( + c, + &rt, + "clickbench_cold", + BenchMode::Cold, + &make_ctx, + &queries, + ); +} + +criterion_group!( + benches, + benchmark_clickbench_warm, + benchmark_clickbench_cold +); +criterion_main!(benches); diff --git a/benchmarks/benches/h2o.rs b/benchmarks/benches/h2o.rs new file mode 100644 index 0000000000000..aaa189d438707 --- /dev/null +++ b/benchmarks/benches/h2o.rs @@ -0,0 +1,158 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use criterion::{criterion_group, criterion_main}; +use datafusion::prelude::{SessionConfig, SessionContext}; +use datafusion_benchmarks::benchmark_harness::{BenchMode, run_sql_benchmarks}; +use std::path::Path; +use tokio::runtime::Runtime; + +fn data_path() -> String { + std::env::var("H2O_DATA").unwrap_or_else(|_| "benchmarks/data/h2o".to_string()) +} + +fn make_runtime() -> Runtime { + tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .unwrap() +} + +fn register_csv(rt: &Runtime, ctx: &SessionContext, table_name: &str, path: &str) { + rt.block_on(async { + ctx.register_csv(table_name, path, Default::default()) + .await + .unwrap_or_else(|e| panic!("Failed to register {table_name}: {e}")); + }); +} + +fn register_parquet(rt: &Runtime, ctx: &SessionContext, table_name: &str, path: &str) { + rt.block_on(async { + ctx.register_parquet(table_name, path, Default::default()) + .await + .unwrap_or_else(|e| panic!("Failed to register {table_name}: {e}")); + }); +} + +fn register_data(rt: &Runtime, ctx: &SessionContext, table_name: &str, path: &str) { + let extension = Path::new(path) + .extension() + .and_then(|s| s.to_str()) + .unwrap_or(""); + match extension { + "csv" => register_csv(rt, ctx, table_name, path), + "parquet" => register_parquet(rt, ctx, table_name, path), + _ => panic!("Unsupported file extension: {extension}"), + } +} + +fn load_queries_from_file(path: &str) -> Vec<(String, Vec)> { + let contents = std::fs::read_to_string(path) + .unwrap_or_else(|e| panic!("Could not read query file {path}: {e}")); + contents + .split("\n\n") + .enumerate() + .filter(|(_, s)| !s.trim().is_empty()) + .map(|(i, s)| (format!("q{}", i + 1), vec![s.trim().to_string()])) + .collect() +} + +struct H2OVariant { + group_name: &'static str, + queries_file: &'static str, + setup: fn(&Runtime, &SessionContext, &str), +} + +fn setup_groupby(rt: &Runtime, ctx: &SessionContext, base: &str) { + register_data(rt, ctx, "x", &format!("{base}/G1_1e7_1e7_100_0.csv")); +} + +fn setup_join(rt: &Runtime, ctx: &SessionContext, base: &str) { + register_data(rt, ctx, "x", &format!("{base}/J1_1e7_NA_0.csv")); + register_data(rt, ctx, "small", &format!("{base}/J1_1e7_1e1_0.csv")); + register_data(rt, ctx, "medium", &format!("{base}/J1_1e7_1e4_0.csv")); + register_data(rt, ctx, "large", &format!("{base}/J1_1e7_1e7_NA.csv")); +} + +fn setup_window(rt: &Runtime, ctx: &SessionContext, base: &str) { + register_data(rt, ctx, "large", &format!("{base}/J1_1e7_1e7_NA.csv")); +} + +const VARIANTS: &[H2OVariant] = &[ + H2OVariant { + group_name: "h2o_groupby", + queries_file: "benchmarks/queries/h2o/groupby.sql", + setup: setup_groupby, + }, + H2OVariant { + group_name: "h2o_join", + queries_file: "benchmarks/queries/h2o/join.sql", + setup: setup_join, + }, + H2OVariant { + group_name: "h2o_window", + queries_file: "benchmarks/queries/h2o/window.sql", + setup: setup_window, + }, +]; + +fn benchmark_h2o_warm(c: &mut criterion::Criterion) { + let rt = make_runtime(); + let base = data_path(); + for variant in VARIANTS { + if !Path::new(variant.queries_file).exists() { + continue; + } + let queries = load_queries_from_file(variant.queries_file); + let make_ctx = || { + let config = SessionConfig::from_env().unwrap(); + let ctx = SessionContext::new_with_config(config); + (variant.setup)(&rt, &ctx, &base); + ctx + }; + run_sql_benchmarks( + c, + &rt, + variant.group_name, + BenchMode::Warm, + &make_ctx, + &queries, + ); + } +} + +fn benchmark_h2o_cold(c: &mut criterion::Criterion) { + let rt = make_runtime(); + let base = data_path(); + for variant in VARIANTS { + if !Path::new(variant.queries_file).exists() { + continue; + } + let queries = load_queries_from_file(variant.queries_file); + let group_name = format!("{}_cold", variant.group_name); + let make_ctx = || { + let config = SessionConfig::from_env().unwrap(); + let ctx = SessionContext::new_with_config(config); + (variant.setup)(&rt, &ctx, &base); + ctx + }; + run_sql_benchmarks(c, &rt, &group_name, BenchMode::Cold, &make_ctx, &queries); + } +} + +criterion_group!(benches, benchmark_h2o_warm, benchmark_h2o_cold); +criterion_main!(benches); diff --git a/benchmarks/benches/hj.rs b/benchmarks/benches/hj.rs new file mode 100644 index 0000000000000..036ad84ff43c6 --- /dev/null +++ b/benchmarks/benches/hj.rs @@ -0,0 +1,91 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use criterion::{criterion_group, criterion_main}; +use datafusion::prelude::{SessionConfig, SessionContext}; +use datafusion_benchmarks::benchmark_harness::{BenchMode, run_sql_benchmarks}; +use datafusion_benchmarks::hj::HASH_QUERIES; +use tokio::runtime::Runtime; + +fn data_path() -> String { + std::env::var("TPCH_DATA").unwrap_or_else(|_| "benchmarks/data/tpch_sf1".to_string()) +} + +fn make_runtime() -> Runtime { + tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .unwrap() +} + +fn register_tables(rt: &Runtime, ctx: &SessionContext, path: &str) { + for table in &["lineitem", "supplier", "nation", "customer"] { + let table_path = format!("{path}/{table}"); + rt.block_on(async { + ctx.register_parquet(*table, &table_path, Default::default()) + .await + .unwrap_or_else(|e| panic!("Failed to register table {table}: {e}")); + }); + } +} + +fn load_queries() -> Vec<(String, Vec)> { + HASH_QUERIES + .iter() + .enumerate() + .map(|(i, q)| { + let name = format!( + "q{}_d{}_h{}_{}x{}", + i + 1, + q.density, + q.prob_hit, + q.build_size, + q.probe_size + ); + (name, vec![q.sql.to_string()]) + }) + .collect() +} + +fn benchmark_hj_warm(c: &mut criterion::Criterion) { + let rt = make_runtime(); + let queries = load_queries(); + let path = data_path(); + let make_ctx = || { + let config = SessionConfig::from_env().unwrap(); + let ctx = SessionContext::new_with_config(config); + register_tables(&rt, &ctx, &path); + ctx + }; + run_sql_benchmarks(c, &rt, "hj", BenchMode::Warm, &make_ctx, &queries); +} + +fn benchmark_hj_cold(c: &mut criterion::Criterion) { + let rt = make_runtime(); + let queries = load_queries(); + let path = data_path(); + let make_ctx = || { + let config = SessionConfig::from_env().unwrap(); + let ctx = SessionContext::new_with_config(config); + register_tables(&rt, &ctx, &path); + ctx + }; + run_sql_benchmarks(c, &rt, "hj_cold", BenchMode::Cold, &make_ctx, &queries); +} + +criterion_group!(benches, benchmark_hj_warm, benchmark_hj_cold); +criterion_main!(benches); diff --git a/benchmarks/benches/imdb.rs b/benchmarks/benches/imdb.rs new file mode 100644 index 0000000000000..29fa030ef818d --- /dev/null +++ b/benchmarks/benches/imdb.rs @@ -0,0 +1,86 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use criterion::{criterion_group, criterion_main}; +use datafusion::prelude::{SessionConfig, SessionContext}; +use datafusion_benchmarks::benchmark_harness::{BenchMode, run_sql_benchmarks}; +use datafusion_benchmarks::imdb::{ + IMDB_QUERY_END_ID, IMDB_QUERY_START_ID, IMDB_TABLES, get_query_sql, + map_query_id_to_str, +}; +use tokio::runtime::Runtime; + +fn data_path() -> String { + std::env::var("IMDB_DATA").unwrap_or_else(|_| "benchmarks/data/imdb".to_string()) +} + +fn make_runtime() -> Runtime { + tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .unwrap() +} + +fn register_tables(rt: &Runtime, ctx: &SessionContext, path: &str) { + for table in IMDB_TABLES { + let table_path = format!("{path}/{table}.parquet"); + rt.block_on(async { + ctx.register_parquet(*table, &table_path, Default::default()) + .await + .unwrap_or_else(|e| panic!("Failed to register table {table}: {e}")); + }); + } +} + +fn load_queries() -> Vec<(String, Vec)> { + (IMDB_QUERY_START_ID..=IMDB_QUERY_END_ID) + .filter_map(|id| { + let query_str = map_query_id_to_str(id); + let sqls = get_query_sql(query_str).ok()?; + Some((format!("q{query_str}"), sqls)) + }) + .collect() +} + +fn benchmark_imdb_warm(c: &mut criterion::Criterion) { + let rt = make_runtime(); + let queries = load_queries(); + let path = data_path(); + let make_ctx = || { + let config = SessionConfig::from_env().unwrap(); + let ctx = SessionContext::new_with_config(config); + register_tables(&rt, &ctx, &path); + ctx + }; + run_sql_benchmarks(c, &rt, "imdb", BenchMode::Warm, &make_ctx, &queries); +} + +fn benchmark_imdb_cold(c: &mut criterion::Criterion) { + let rt = make_runtime(); + let queries = load_queries(); + let path = data_path(); + let make_ctx = || { + let config = SessionConfig::from_env().unwrap(); + let ctx = SessionContext::new_with_config(config); + register_tables(&rt, &ctx, &path); + ctx + }; + run_sql_benchmarks(c, &rt, "imdb_cold", BenchMode::Cold, &make_ctx, &queries); +} + +criterion_group!(benches, benchmark_imdb_warm, benchmark_imdb_cold); +criterion_main!(benches); diff --git a/benchmarks/benches/nlj.rs b/benchmarks/benches/nlj.rs new file mode 100644 index 0000000000000..76dca68512a19 --- /dev/null +++ b/benchmarks/benches/nlj.rs @@ -0,0 +1,60 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use criterion::{criterion_group, criterion_main}; +use datafusion::prelude::{SessionConfig, SessionContext}; +use datafusion_benchmarks::benchmark_harness::{BenchMode, run_sql_benchmarks}; +use datafusion_benchmarks::nlj::NLJ_QUERIES; +use tokio::runtime::Runtime; + +fn make_runtime() -> Runtime { + tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .unwrap() +} + +fn load_queries() -> Vec<(String, Vec)> { + NLJ_QUERIES + .iter() + .enumerate() + .map(|(i, sql)| (format!("q{}", i + 1), vec![sql.trim().to_string()])) + .collect() +} + +fn benchmark_nlj_warm(c: &mut criterion::Criterion) { + let rt = make_runtime(); + let queries = load_queries(); + let make_ctx = || { + let config = SessionConfig::from_env().unwrap(); + SessionContext::new_with_config(config) + }; + run_sql_benchmarks(c, &rt, "nlj", BenchMode::Warm, &make_ctx, &queries); +} + +fn benchmark_nlj_cold(c: &mut criterion::Criterion) { + let rt = make_runtime(); + let queries = load_queries(); + let make_ctx = || { + let config = SessionConfig::from_env().unwrap(); + SessionContext::new_with_config(config) + }; + run_sql_benchmarks(c, &rt, "nlj_cold", BenchMode::Cold, &make_ctx, &queries); +} + +criterion_group!(benches, benchmark_nlj_warm, benchmark_nlj_cold); +criterion_main!(benches); diff --git a/benchmarks/benches/smj.rs b/benchmarks/benches/smj.rs new file mode 100644 index 0000000000000..68246a49adaf3 --- /dev/null +++ b/benchmarks/benches/smj.rs @@ -0,0 +1,62 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use criterion::{criterion_group, criterion_main}; +use datafusion::prelude::{SessionConfig, SessionContext}; +use datafusion_benchmarks::benchmark_harness::{BenchMode, run_sql_benchmarks}; +use datafusion_benchmarks::smj::SMJ_QUERIES; +use tokio::runtime::Runtime; + +fn make_runtime() -> Runtime { + tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .unwrap() +} + +fn load_queries() -> Vec<(String, Vec)> { + SMJ_QUERIES + .iter() + .enumerate() + .map(|(i, sql)| (format!("q{}", i + 1), vec![sql.trim().to_string()])) + .collect() +} + +fn benchmark_smj_warm(c: &mut criterion::Criterion) { + let rt = make_runtime(); + let queries = load_queries(); + let make_ctx = || { + let mut config = SessionConfig::from_env().unwrap(); + config = config.set_bool("datafusion.optimizer.prefer_hash_join", false); + SessionContext::new_with_config(config) + }; + run_sql_benchmarks(c, &rt, "smj", BenchMode::Warm, &make_ctx, &queries); +} + +fn benchmark_smj_cold(c: &mut criterion::Criterion) { + let rt = make_runtime(); + let queries = load_queries(); + let make_ctx = || { + let mut config = SessionConfig::from_env().unwrap(); + config = config.set_bool("datafusion.optimizer.prefer_hash_join", false); + SessionContext::new_with_config(config) + }; + run_sql_benchmarks(c, &rt, "smj_cold", BenchMode::Cold, &make_ctx, &queries); +} + +criterion_group!(benches, benchmark_smj_warm, benchmark_smj_cold); +criterion_main!(benches); diff --git a/benchmarks/benches/sort_tpch.rs b/benchmarks/benches/sort_tpch.rs new file mode 100644 index 0000000000000..4649e4556cd5f --- /dev/null +++ b/benchmarks/benches/sort_tpch.rs @@ -0,0 +1,86 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use criterion::{criterion_group, criterion_main}; +use datafusion::prelude::{SessionConfig, SessionContext}; +use datafusion_benchmarks::benchmark_harness::{BenchMode, run_sql_benchmarks}; +use datafusion_benchmarks::sort_tpch::RunOpt; +use tokio::runtime::Runtime; + +fn data_path() -> String { + std::env::var("TPCH_DATA").unwrap_or_else(|_| "benchmarks/data/tpch_sf1".to_string()) +} + +fn make_runtime() -> Runtime { + tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .unwrap() +} + +fn register_lineitem(rt: &Runtime, ctx: &SessionContext, path: &str) { + let table_path = format!("{path}/lineitem"); + rt.block_on(async { + ctx.register_parquet("lineitem", &table_path, Default::default()) + .await + .unwrap_or_else(|e| panic!("Failed to register lineitem: {e}")); + }); +} + +fn load_queries() -> Vec<(String, Vec)> { + RunOpt::SORT_QUERIES + .iter() + .enumerate() + .map(|(i, sql)| (format!("q{}", i + 1), vec![sql.trim().to_string()])) + .collect() +} + +fn benchmark_sort_tpch_warm(c: &mut criterion::Criterion) { + let rt = make_runtime(); + let queries = load_queries(); + let path = data_path(); + let make_ctx = || { + let config = SessionConfig::from_env().unwrap(); + let ctx = SessionContext::new_with_config(config); + register_lineitem(&rt, &ctx, &path); + ctx + }; + run_sql_benchmarks(c, &rt, "sort_tpch", BenchMode::Warm, &make_ctx, &queries); +} + +fn benchmark_sort_tpch_cold(c: &mut criterion::Criterion) { + let rt = make_runtime(); + let queries = load_queries(); + let path = data_path(); + let make_ctx = || { + let config = SessionConfig::from_env().unwrap(); + let ctx = SessionContext::new_with_config(config); + register_lineitem(&rt, &ctx, &path); + ctx + }; + run_sql_benchmarks( + c, + &rt, + "sort_tpch_cold", + BenchMode::Cold, + &make_ctx, + &queries, + ); +} + +criterion_group!(benches, benchmark_sort_tpch_warm, benchmark_sort_tpch_cold); +criterion_main!(benches); diff --git a/benchmarks/benches/tpcds.rs b/benchmarks/benches/tpcds.rs new file mode 100644 index 0000000000000..d8d5442bada12 --- /dev/null +++ b/benchmarks/benches/tpcds.rs @@ -0,0 +1,91 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use criterion::{criterion_group, criterion_main}; +use datafusion::prelude::{SessionConfig, SessionContext}; +use datafusion_benchmarks::benchmark_harness::{BenchMode, run_sql_benchmarks}; +use datafusion_benchmarks::tpcds::{ + TPCDS_QUERY_END_ID, TPCDS_QUERY_START_ID, TPCDS_TABLES, get_query_sql, +}; +use tokio::runtime::Runtime; + +fn data_path() -> String { + std::env::var("TPCDS_DATA") + .unwrap_or_else(|_| "benchmarks/data/tpcds_sf1".to_string()) +} + +fn query_path() -> String { + std::env::var("TPCDS_QUERY_PATH") + .unwrap_or_else(|_| "datafusion/core/tests/tpc-ds".to_string()) +} + +fn make_runtime() -> Runtime { + tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .unwrap() +} + +fn register_tables(rt: &Runtime, ctx: &SessionContext, path: &str) { + for table in TPCDS_TABLES { + let table_path = format!("{path}/{table}.parquet"); + rt.block_on(async { + ctx.register_parquet(*table, &table_path, Default::default()) + .await + .unwrap_or_else(|e| panic!("Failed to register table {table}: {e}")); + }); + } +} + +fn load_queries() -> Vec<(String, Vec)> { + let qpath = query_path(); + (TPCDS_QUERY_START_ID..=TPCDS_QUERY_END_ID) + .filter_map(|id| { + let sqls = get_query_sql(&qpath, id).ok()?; + Some((format!("q{id}"), sqls)) + }) + .collect() +} + +fn benchmark_tpcds_warm(c: &mut criterion::Criterion) { + let rt = make_runtime(); + let queries = load_queries(); + let path = data_path(); + let make_ctx = || { + let config = SessionConfig::from_env().unwrap(); + let ctx = SessionContext::new_with_config(config); + register_tables(&rt, &ctx, &path); + ctx + }; + run_sql_benchmarks(c, &rt, "tpcds", BenchMode::Warm, &make_ctx, &queries); +} + +fn benchmark_tpcds_cold(c: &mut criterion::Criterion) { + let rt = make_runtime(); + let queries = load_queries(); + let path = data_path(); + let make_ctx = || { + let config = SessionConfig::from_env().unwrap(); + let ctx = SessionContext::new_with_config(config); + register_tables(&rt, &ctx, &path); + ctx + }; + run_sql_benchmarks(c, &rt, "tpcds_cold", BenchMode::Cold, &make_ctx, &queries); +} + +criterion_group!(benches, benchmark_tpcds_warm, benchmark_tpcds_cold); +criterion_main!(benches); diff --git a/benchmarks/benches/tpch.rs b/benchmarks/benches/tpch.rs new file mode 100644 index 0000000000000..cd03143bad1d1 --- /dev/null +++ b/benchmarks/benches/tpch.rs @@ -0,0 +1,84 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use criterion::{criterion_group, criterion_main}; +use datafusion::prelude::{SessionConfig, SessionContext}; +use datafusion_benchmarks::benchmark_harness::{BenchMode, run_sql_benchmarks}; +use datafusion_benchmarks::tpch::{ + TPCH_QUERY_END_ID, TPCH_QUERY_START_ID, TPCH_TABLES, get_query_sql, +}; +use tokio::runtime::Runtime; + +fn data_path() -> String { + std::env::var("TPCH_DATA").unwrap_or_else(|_| "benchmarks/data/tpch_sf1".to_string()) +} + +fn make_runtime() -> Runtime { + tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .unwrap() +} + +fn register_tables(rt: &Runtime, ctx: &SessionContext, path: &str) { + for table in TPCH_TABLES { + let table_path = format!("{path}/{table}"); + rt.block_on(async { + ctx.register_parquet(*table, &table_path, Default::default()) + .await + .unwrap_or_else(|e| panic!("Failed to register table {table}: {e}")); + }); + } +} + +fn load_queries() -> Vec<(String, Vec)> { + (TPCH_QUERY_START_ID..=TPCH_QUERY_END_ID) + .filter_map(|id| { + let sqls = get_query_sql(id).ok()?; + Some((format!("q{id}"), sqls)) + }) + .collect() +} + +fn benchmark_tpch_warm(c: &mut criterion::Criterion) { + let rt = make_runtime(); + let queries = load_queries(); + let path = data_path(); + let make_ctx = || { + let config = SessionConfig::from_env().unwrap(); + let ctx = SessionContext::new_with_config(config); + register_tables(&rt, &ctx, &path); + ctx + }; + run_sql_benchmarks(c, &rt, "tpch", BenchMode::Warm, &make_ctx, &queries); +} + +fn benchmark_tpch_cold(c: &mut criterion::Criterion) { + let rt = make_runtime(); + let queries = load_queries(); + let path = data_path(); + let make_ctx = || { + let config = SessionConfig::from_env().unwrap(); + let ctx = SessionContext::new_with_config(config); + register_tables(&rt, &ctx, &path); + ctx + }; + run_sql_benchmarks(c, &rt, "tpch_cold", BenchMode::Cold, &make_ctx, &queries); +} + +criterion_group!(benches, benchmark_tpch_warm, benchmark_tpch_cold); +criterion_main!(benches); diff --git a/benchmarks/src/benchmark_harness.rs b/benchmarks/src/benchmark_harness.rs new file mode 100644 index 0000000000000..b39b952fdedb9 --- /dev/null +++ b/benchmarks/src/benchmark_harness.rs @@ -0,0 +1,192 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Shared benchmark harness for criterion-based SQL benchmarks. + +use criterion::{BenchmarkGroup, SamplingMode, measurement::WallTime}; +use datafusion::prelude::SessionContext; +use datafusion_common::instant::Instant; +use std::time::Duration; +use tokio::runtime::Runtime; + +/// Controls whether benchmarks reuse a single [`SessionContext`] or create a +/// fresh one for every measurement iteration. +/// +/// Both modes produce **one criterion benchmark per query** — the difference is +/// only in how the context (and therefore caches, registered tables, etc.) is +/// managed. +/// +/// # Warm +/// +/// A single [`SessionContext`] is created once via `make_ctx`, and then every +/// query is executed once as a warmup pass (to populate OS page cache, parquet +/// metadata caches, etc.). Criterion then measures each query individually, +/// reusing the same context across all iterations. This is closest to +/// steady-state production performance. +/// +/// # Cold +/// +/// For each measurement iteration, a **fresh** [`SessionContext`] is created via +/// `make_ctx`. The context creation and table registration happen *outside* the +/// timed region (via [`criterion::Bencher::iter_custom`]), so only the query +/// execution itself is measured — but there are no warm caches. This captures +/// first-query latency without penalizing for setup cost. +#[derive(Clone, Copy)] +pub enum BenchMode { + Warm, + Cold, +} + +/// Execute a sequence of SQL statements against `ctx`, collecting and +/// discarding all result batches. +/// +/// Most queries consist of a single SQL string, but some (e.g. TPC-H Q15) +/// comprise multiple statements (CREATE TEMP VIEW, SELECT, DROP VIEW) that must +/// run together as an atomic unit. Each element of `sqls` is executed in order. +pub async fn execute_sqls(ctx: &SessionContext, sqls: &[String]) { + for sql in sqls { + ctx.sql(sql).await.unwrap().collect().await.unwrap(); + } +} + +/// Register a group of SQL queries as individual criterion benchmarks, using +/// either [`BenchMode::Warm`] or [`BenchMode::Cold`] execution strategy. +/// +/// # How queries map to benchmarks +/// +/// Each entry in `queries` becomes its **own** criterion benchmark function. +/// For example, given `group_name = "tpch"` and queries +/// `[("q1", [...]), ("q2", [...]), ...]`, criterion will report separate +/// timings for `tpch/q1`, `tpch/q2`, etc. This is important for per-query +/// regression detection in CI (e.g. CodSpeed). +/// +/// The `Vec` in each query entry is the list of SQL statements that +/// make up that single logical query (usually just one statement; see +/// [`execute_sqls`] for why some queries have multiple). +/// +/// # Arguments +/// +/// * `c` — The criterion context, passed through from the benchmark function. +/// * `rt` — A tokio [`Runtime`] used to drive async query execution. The same +/// runtime is reused across all queries in the group. +/// * `group_name` — Criterion benchmark group name. Appears as the prefix in +/// benchmark output (e.g. `"tpch"` → `tpch/q1`). Convention: use the suite +/// name for warm mode, and `"{suite}_cold"` for cold mode. +/// * `mode` — Whether to run in warm or cold mode (see [`BenchMode`]). +/// * `make_ctx` — A closure that creates a fully configured [`SessionContext`] +/// with all necessary tables registered. Called once in warm mode, or once +/// per iteration in cold mode. Must be safe to call from a synchronous +/// context (use `rt.block_on(...)` internally if async registration is +/// needed). +/// * `queries` — The list of `(benchmark_name, sql_statements)` pairs. Each +/// pair becomes one criterion benchmark. +/// +/// # Criterion tuning +/// +/// All benchmarks in the group share the same tuning parameters (see +/// [`configure_group`]): +/// - **`SamplingMode::Flat`** — takes exactly `sample_size` measurements +/// rather than auto-scaling iteration count. Necessary because these are +/// long-running benchmarks (milliseconds to seconds per query). +/// - **`sample_size(10)`** — the minimum criterion allows; 10 measurements +/// per query is sufficient for stable statistics on queries of this duration. +/// - **`measurement_time(30s)`** — generous time budget so that criterion +/// doesn't cut measurements short for slower queries. +/// - **`warm_up_time(5s)`** — criterion's built-in warmup phase (separate from +/// the explicit warmup pass in [`BenchMode::Warm`]). +pub fn run_sql_benchmarks( + c: &mut criterion::Criterion, + rt: &Runtime, + group_name: &str, + mode: BenchMode, + make_ctx: &dyn Fn() -> SessionContext, + queries: &[(String, Vec)], +) { + let mut group = c.benchmark_group(group_name); + configure_group(&mut group); + + match mode { + BenchMode::Warm => { + // Create one context for the entire group and run each query once + // to warm caches (OS page cache, parquet metadata, etc.) + let ctx = make_ctx(); + for (_, sqls) in queries { + rt.block_on(execute_sqls(&ctx, sqls)); + } + // Now register each query as its own criterion benchmark, all + // sharing the same pre-warmed context. + for (name, sqls) in queries { + group.bench_function(name, |b| { + b.to_async(rt).iter(|| execute_sqls(&ctx, sqls)); + }); + } + } + BenchMode::Cold => { + // Each query gets its own criterion benchmark. Within each + // benchmark, every iteration creates a fresh context. We use + // `iter_custom` so that only the query execution is timed — the + // context creation and table registration are excluded. + for (name, sqls) in queries { + group.bench_function(name, |b| { + b.to_async(rt).iter_custom(|iters| { + let sqls = sqls.clone(); + async move { + let mut total = Duration::ZERO; + for _ in 0..iters { + let ctx = make_ctx(); + let start = Instant::now(); + execute_sqls(&ctx, &sqls).await; + total += start.elapsed(); + } + total + } + }); + }); + } + } + } + group.finish(); +} + +/// Apply criterion tuning parameters appropriate for long-running SQL +/// benchmarks (milliseconds to seconds per query). +/// +/// These settings are shared by all benchmark groups created by this harness: +/// +/// - **`SamplingMode::Flat`**: Don't auto-detect iteration count. Flat mode +/// runs exactly `sample_size` iterations, which is correct for benchmarks +/// where a single iteration already takes a meaningful amount of time. +/// (The default `Auto` mode would try to pack many iterations into each +/// sample, which is wasteful for multi-millisecond queries.) +/// +/// - **`sample_size(10)`**: The minimum criterion allows. Ten measurements +/// is enough for stable statistics on queries that take tens of milliseconds +/// or more, and keeps total benchmark runtime manageable. +/// +/// - **`measurement_time(30s)`**: Total time budget per benchmark function. +/// Generous enough that slow queries (1-2 seconds) still get their full +/// 10 samples. +/// +/// - **`warm_up_time(5s)`**: Criterion's own warmup phase, which runs before +/// measurement begins. This is in addition to any explicit warmup done in +/// [`BenchMode::Warm`]. +fn configure_group(group: &mut BenchmarkGroup) { + group.sampling_mode(SamplingMode::Flat); + group.sample_size(10); + group.measurement_time(Duration::from_secs(30)); + group.warm_up_time(Duration::from_secs(5)); +} diff --git a/benchmarks/src/hj.rs b/benchmarks/src/hj.rs index 6eb828a3aedf8..cab7a67b2d2e9 100644 --- a/benchmarks/src/hj.rs +++ b/benchmarks/src/hj.rs @@ -53,16 +53,16 @@ pub struct RunOpt { output_path: Option, } -struct HashJoinQuery { - sql: &'static str, - density: f64, - prob_hit: f64, - build_size: &'static str, - probe_size: &'static str, +pub struct HashJoinQuery { + pub sql: &'static str, + pub density: f64, + pub prob_hit: f64, + pub build_size: &'static str, + pub probe_size: &'static str, } /// Inline SQL queries for Hash Join benchmarks -const HASH_QUERIES: &[HashJoinQuery] = &[ +pub const HASH_QUERIES: &[HashJoinQuery] = &[ // Q1: Very Small Build Side (Dense) // Build Side: nation (25 rows) | Probe Side: customer (1.5M rows) HashJoinQuery { diff --git a/benchmarks/src/imdb/mod.rs b/benchmarks/src/imdb/mod.rs index 87462bc3e81ba..e2f9f4d42555b 100644 --- a/benchmarks/src/imdb/mod.rs +++ b/benchmarks/src/imdb/mod.rs @@ -27,7 +27,7 @@ pub use convert::ConvertOpt; use std::fs; mod run; -pub use run::RunOpt; +pub use run::{RunOpt, map_query_id_to_str}; // we have 21 tables in the IMDB dataset pub const IMDB_TABLES: &[&str] = &[ diff --git a/benchmarks/src/imdb/run.rs b/benchmarks/src/imdb/run.rs index 29ca5249aa5b3..7757649212dd3 100644 --- a/benchmarks/src/imdb/run.rs +++ b/benchmarks/src/imdb/run.rs @@ -98,7 +98,7 @@ pub struct RunOpt { hash_join_buffering_capacity: usize, } -fn map_query_id_to_str(query_id: usize) -> &'static str { +pub fn map_query_id_to_str(query_id: usize) -> &'static str { match query_id { // 1 1 => "1a", diff --git a/benchmarks/src/lib.rs b/benchmarks/src/lib.rs index a3bc221840ada..fb41e3f84a289 100644 --- a/benchmarks/src/lib.rs +++ b/benchmarks/src/lib.rs @@ -16,6 +16,7 @@ // under the License. //! DataFusion benchmark runner +pub mod benchmark_harness; pub mod cancellation; pub mod clickbench; pub mod h2o; diff --git a/benchmarks/src/nlj.rs b/benchmarks/src/nlj.rs index ade8c0f7789bc..a594f097b1b53 100644 --- a/benchmarks/src/nlj.rs +++ b/benchmarks/src/nlj.rs @@ -61,7 +61,7 @@ pub struct RunOpt { /// Each query's comment includes: /// - Left (build) side row count × Right (probe) side row count /// - Join predicate selectivity (1% means the output size is 1% * input size) -const NLJ_QUERIES: &[&str] = &[ +pub const NLJ_QUERIES: &[&str] = &[ // Q1: INNER 10K x 10K | LOW 0.1% r#" SELECT * diff --git a/benchmarks/src/smj.rs b/benchmarks/src/smj.rs index b420ef1d64c60..80a25d01662b2 100644 --- a/benchmarks/src/smj.rs +++ b/benchmarks/src/smj.rs @@ -59,7 +59,7 @@ pub struct RunOpt { /// - Left row count × Right row count /// - Key cardinality (rows per key) /// - Filter selectivity (if applicable) -const SMJ_QUERIES: &[&str] = &[ +pub const SMJ_QUERIES: &[&str] = &[ // Q1: INNER 100K x 100K | 1:1 r#" WITH t1_sorted AS ( diff --git a/benchmarks/src/sort_tpch.rs b/benchmarks/src/sort_tpch.rs index 806f1f6c33d0f..55a62d6f5eef3 100644 --- a/benchmarks/src/sort_tpch.rs +++ b/benchmarks/src/sort_tpch.rs @@ -99,7 +99,7 @@ impl RunOpt { /// Payload Columns: /// - Thin variant: `l_partkey` column with `BIGINT` type (1 column) /// - Wide variant: all columns except for possible key columns (12 columns) - const SORT_QUERIES: [&'static str; 11] = [ + pub const SORT_QUERIES: [&'static str; 11] = [ // Q1: 1 sort key (type: INTEGER, cardinality: 7) + 1 payload column r#" SELECT l_linenumber, l_partkey diff --git a/benchmarks/src/tpcds/mod.rs b/benchmarks/src/tpcds/mod.rs index 4829eb9fd348a..3948abdac8d01 100644 --- a/benchmarks/src/tpcds/mod.rs +++ b/benchmarks/src/tpcds/mod.rs @@ -16,4 +16,6 @@ // under the License. mod run; -pub use run::RunOpt; +pub use run::{ + RunOpt, TPCDS_QUERY_END_ID, TPCDS_QUERY_START_ID, TPCDS_TABLES, get_query_sql, +};