Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

38 changes: 38 additions & 0 deletions benchmarks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ mimalloc_extended = ["libmimalloc-sys/extended"]
[dependencies]
arrow = { workspace = true }
clap = { version = "4.5.60", features = ["derive"] }
criterion = { workspace = true, features = ["async_tokio"] }
datafusion = { workspace = true, default-features = true }
datafusion-common = { workspace = true, default-features = true }
env_logger = { workspace = true }
Expand All @@ -59,4 +60,41 @@ tokio = { workspace = true, features = ["rt-multi-thread", "parking_lot"] }
tokio-util = { version = "0.7.17" }

[dev-dependencies]
criterion = { workspace = true, features = ["async_tokio"] }
datafusion-proto = { workspace = true }

[[bench]]
harness = false
name = "tpch"

[[bench]]
harness = false
name = "clickbench"

[[bench]]
harness = false
name = "tpcds"

[[bench]]
harness = false
name = "h2o"

[[bench]]
harness = false
name = "imdb"

[[bench]]
harness = false
name = "sort_tpch"

[[bench]]
harness = false
name = "hj"

[[bench]]
harness = false
name = "nlj"

[[bench]]
harness = false
name = "smj"
113 changes: 113 additions & 0 deletions benchmarks/benches/clickbench.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use criterion::{criterion_group, criterion_main};
use datafusion::prelude::{SessionConfig, SessionContext};
use datafusion_benchmarks::benchmark_harness::{BenchMode, run_sql_benchmarks};
use datafusion_benchmarks::clickbench;
use std::path::PathBuf;
use tokio::runtime::Runtime;

fn data_path() -> String {
std::env::var("CLICKBENCH_DATA")
.unwrap_or_else(|_| "benchmarks/data/hits.parquet".to_string())
}

fn queries_path() -> PathBuf {
PathBuf::from("benchmarks/queries/clickbench/queries")
}

fn make_runtime() -> Runtime {
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap()
}

/// SQL to create the hits view with proper EventDate casting.
const HITS_VIEW_DDL: &str = r#"CREATE VIEW hits AS
SELECT * EXCEPT ("EventDate"),
CAST(CAST("EventDate" AS INTEGER) AS DATE) AS "EventDate"
FROM hits_raw"#;

fn register_hits(rt: &Runtime, ctx: &SessionContext, path: &str) {
rt.block_on(async {
ctx.register_parquet("hits_raw", path, Default::default())
.await
.unwrap_or_else(|e| panic!("Failed to register hits_raw: {e}"));
ctx.sql(HITS_VIEW_DDL)
.await
.unwrap()
.collect()
.await
.unwrap();
});
}

fn load_queries() -> Vec<(String, Vec<String>)> {
let query_dir = queries_path();
let mut queries = Vec::new();
for id in 0..=42 {
let query_path = clickbench::get_query_path(&query_dir, id);
if let Ok(Some(sql)) = clickbench::get_query_sql(&query_path) {
queries.push((format!("q{id}"), vec![sql]));
}
}
queries
}

fn benchmark_clickbench_warm(c: &mut criterion::Criterion) {
let rt = make_runtime();
let queries = load_queries();
let path = data_path();
let make_ctx = || {
let mut config = SessionConfig::from_env().unwrap();
config.options_mut().execution.parquet.binary_as_string = true;
let ctx = SessionContext::new_with_config(config);
register_hits(&rt, &ctx, &path);
ctx
};
run_sql_benchmarks(c, &rt, "clickbench", BenchMode::Warm, &make_ctx, &queries);
}

fn benchmark_clickbench_cold(c: &mut criterion::Criterion) {
let rt = make_runtime();
let queries = load_queries();
let path = data_path();
let make_ctx = || {
let mut config = SessionConfig::from_env().unwrap();
config.options_mut().execution.parquet.binary_as_string = true;
let ctx = SessionContext::new_with_config(config);
register_hits(&rt, &ctx, &path);
ctx
};
run_sql_benchmarks(
c,
&rt,
"clickbench_cold",
BenchMode::Cold,
&make_ctx,
&queries,
);
}

criterion_group!(
benches,
benchmark_clickbench_warm,
benchmark_clickbench_cold
);
criterion_main!(benches);
158 changes: 158 additions & 0 deletions benchmarks/benches/h2o.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use criterion::{criterion_group, criterion_main};
use datafusion::prelude::{SessionConfig, SessionContext};
use datafusion_benchmarks::benchmark_harness::{BenchMode, run_sql_benchmarks};
use std::path::Path;
use tokio::runtime::Runtime;

fn data_path() -> String {
std::env::var("H2O_DATA").unwrap_or_else(|_| "benchmarks/data/h2o".to_string())
}

fn make_runtime() -> Runtime {
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap()
}

fn register_csv(rt: &Runtime, ctx: &SessionContext, table_name: &str, path: &str) {
rt.block_on(async {
ctx.register_csv(table_name, path, Default::default())
.await
.unwrap_or_else(|e| panic!("Failed to register {table_name}: {e}"));
});
}

fn register_parquet(rt: &Runtime, ctx: &SessionContext, table_name: &str, path: &str) {
rt.block_on(async {
ctx.register_parquet(table_name, path, Default::default())
.await
.unwrap_or_else(|e| panic!("Failed to register {table_name}: {e}"));
});
}

fn register_data(rt: &Runtime, ctx: &SessionContext, table_name: &str, path: &str) {
let extension = Path::new(path)
.extension()
.and_then(|s| s.to_str())
.unwrap_or("");
match extension {
"csv" => register_csv(rt, ctx, table_name, path),
"parquet" => register_parquet(rt, ctx, table_name, path),
_ => panic!("Unsupported file extension: {extension}"),
}
}

fn load_queries_from_file(path: &str) -> Vec<(String, Vec<String>)> {
let contents = std::fs::read_to_string(path)
.unwrap_or_else(|e| panic!("Could not read query file {path}: {e}"));
contents
.split("\n\n")
.enumerate()
.filter(|(_, s)| !s.trim().is_empty())
.map(|(i, s)| (format!("q{}", i + 1), vec![s.trim().to_string()]))
.collect()
}

struct H2OVariant {
group_name: &'static str,
queries_file: &'static str,
setup: fn(&Runtime, &SessionContext, &str),
}

fn setup_groupby(rt: &Runtime, ctx: &SessionContext, base: &str) {
register_data(rt, ctx, "x", &format!("{base}/G1_1e7_1e7_100_0.csv"));
}

fn setup_join(rt: &Runtime, ctx: &SessionContext, base: &str) {
register_data(rt, ctx, "x", &format!("{base}/J1_1e7_NA_0.csv"));
register_data(rt, ctx, "small", &format!("{base}/J1_1e7_1e1_0.csv"));
register_data(rt, ctx, "medium", &format!("{base}/J1_1e7_1e4_0.csv"));
register_data(rt, ctx, "large", &format!("{base}/J1_1e7_1e7_NA.csv"));
}

fn setup_window(rt: &Runtime, ctx: &SessionContext, base: &str) {
register_data(rt, ctx, "large", &format!("{base}/J1_1e7_1e7_NA.csv"));
}

const VARIANTS: &[H2OVariant] = &[
H2OVariant {
group_name: "h2o_groupby",
queries_file: "benchmarks/queries/h2o/groupby.sql",
setup: setup_groupby,
},
H2OVariant {
group_name: "h2o_join",
queries_file: "benchmarks/queries/h2o/join.sql",
setup: setup_join,
},
H2OVariant {
group_name: "h2o_window",
queries_file: "benchmarks/queries/h2o/window.sql",
setup: setup_window,
},
];

fn benchmark_h2o_warm(c: &mut criterion::Criterion) {
let rt = make_runtime();
let base = data_path();
for variant in VARIANTS {
if !Path::new(variant.queries_file).exists() {
continue;
}
let queries = load_queries_from_file(variant.queries_file);
let make_ctx = || {
let config = SessionConfig::from_env().unwrap();
let ctx = SessionContext::new_with_config(config);
(variant.setup)(&rt, &ctx, &base);
ctx
};
run_sql_benchmarks(
c,
&rt,
variant.group_name,
BenchMode::Warm,
&make_ctx,
&queries,
);
}
}

fn benchmark_h2o_cold(c: &mut criterion::Criterion) {
let rt = make_runtime();
let base = data_path();
for variant in VARIANTS {
if !Path::new(variant.queries_file).exists() {
continue;
}
let queries = load_queries_from_file(variant.queries_file);
let group_name = format!("{}_cold", variant.group_name);
let make_ctx = || {
let config = SessionConfig::from_env().unwrap();
let ctx = SessionContext::new_with_config(config);
(variant.setup)(&rt, &ctx, &base);
ctx
};
run_sql_benchmarks(c, &rt, &group_name, BenchMode::Cold, &make_ctx, &queries);
}
}

criterion_group!(benches, benchmark_h2o_warm, benchmark_h2o_cold);
criterion_main!(benches);
Loading
Loading