Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 41 additions & 9 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions benchmarks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,11 @@ mimalloc_extended = ["libmimalloc-sys/extended"]

[dependencies]
arrow = { workspace = true }
async-trait = { workspace = true }
bytes = "1"
chrono = { workspace = true }
clap = { version = "4.5.60", features = ["derive"] }
core_affinity = "0.8"
datafusion = { workspace = true, default-features = true }
datafusion-common = { workspace = true, default-features = true }
env_logger = { workspace = true }
Expand All @@ -56,6 +60,7 @@ serde = { version = "1.0.228", features = ["derive"] }
serde_json = { workspace = true }
snmalloc-rs = { version = "0.3", optional = true }
tokio = { workspace = true, features = ["rt-multi-thread", "parking_lot"] }
url = { workspace = true }
tokio-util = { version = "0.7.17" }

[dev-dependencies]
Expand Down
42 changes: 26 additions & 16 deletions benchmarks/bench.sh
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,13 @@ DATAFUSION_DIR=${DATAFUSION_DIR:-$SCRIPT_DIR/..}
DATA_DIR=${DATA_DIR:-$SCRIPT_DIR/data}
CARGO_COMMAND=${CARGO_COMMAND:-"cargo run --release"}
PREFER_HASH_JOIN=${PREFER_HASH_JOIN:-true}
PIN_THREADS=${PIN_THREADS:-false}

# Build the --pin-threads flag if enabled
PIN_THREADS_FLAG=""
if [ "${PIN_THREADS}" = "true" ]; then
PIN_THREADS_FLAG="--pin-threads"
fi

usage() {
echo "
Expand Down Expand Up @@ -141,6 +148,7 @@ CARGO_COMMAND command that runs the benchmark binary
DATAFUSION_DIR directory to use (default $DATAFUSION_DIR)
RESULTS_NAME folder where the benchmark files are stored
PREFER_HASH_JOIN Prefer hash join algorithm (default true)
PIN_THREADS Pin each tokio worker thread to a distinct CPU core (default false)
DATAFUSION_* Set the given datafusion configuration
"
exit 1
Expand Down Expand Up @@ -189,6 +197,7 @@ main() {
echo "DATA_DIR: ${DATA_DIR}"
echo "CARGO_COMMAND: ${CARGO_COMMAND}"
echo "PREFER_HASH_JOIN: ${PREFER_HASH_JOIN}"
echo "PIN_THREADS: ${PIN_THREADS}"
echo "***************************"
case "$BENCHMARK" in
all)
Expand Down Expand Up @@ -371,6 +380,7 @@ main() {
echo "RESULTS_DIR: ${RESULTS_DIR}"
echo "CARGO_COMMAND: ${CARGO_COMMAND}"
echo "PREFER_HASH_JOIN: ${PREFER_HASH_JOIN}"
echo "PIN_THREADS: ${PIN_THREADS}"
echo "***************************"

# navigate to the appropriate directory
Expand Down Expand Up @@ -655,7 +665,7 @@ run_tpch() {
echo "Running tpch benchmark..."

FORMAT=$2
debug_run $CARGO_COMMAND --bin dfbench -- tpch --iterations 5 --path "${TPCH_DIR}" --prefer_hash_join "${PREFER_HASH_JOIN}" --format ${FORMAT} -o "${RESULTS_FILE}" ${QUERY_ARG}
debug_run $CARGO_COMMAND --bin dfbench -- ${PIN_THREADS_FLAG} tpch --iterations 5 --path "${TPCH_DIR}" --prefer_hash_join "${PREFER_HASH_JOIN}" --format ${FORMAT} -o "${RESULTS_FILE}" ${QUERY_ARG}
}

# Runs the tpch in memory (needs tpch parquet data)
Expand All @@ -671,7 +681,7 @@ run_tpch_mem() {
echo "RESULTS_FILE: ${RESULTS_FILE}"
echo "Running tpch_mem benchmark..."
# -m means in memory
debug_run $CARGO_COMMAND --bin dfbench -- tpch --iterations 5 --path "${TPCH_DIR}" --prefer_hash_join "${PREFER_HASH_JOIN}" -m --format parquet -o "${RESULTS_FILE}" ${QUERY_ARG}
debug_run $CARGO_COMMAND --bin dfbench -- ${PIN_THREADS_FLAG} tpch --iterations 5 --path "${TPCH_DIR}" --prefer_hash_join "${PREFER_HASH_JOIN}" -m --format parquet -o "${RESULTS_FILE}" ${QUERY_ARG}
}

# Runs the tpcds benchmark
Expand All @@ -691,7 +701,7 @@ run_tpcds() {
echo "RESULTS_FILE: ${RESULTS_FILE}"
echo "Running tpcds benchmark..."

debug_run $CARGO_COMMAND --bin dfbench -- tpcds --iterations 5 --path "${TPCDS_DIR}" --query_path "../datafusion/core/tests/tpc-ds" --prefer_hash_join "${PREFER_HASH_JOIN}" -o "${RESULTS_FILE}" ${QUERY_ARG}
debug_run $CARGO_COMMAND --bin dfbench -- ${PIN_THREADS_FLAG} tpcds --iterations 5 --path "${TPCDS_DIR}" --query_path "../datafusion/core/tests/tpc-ds" --prefer_hash_join "${PREFER_HASH_JOIN}" -o "${RESULTS_FILE}" ${QUERY_ARG}
}

# Runs the compile profile benchmark helper
Expand All @@ -713,7 +723,7 @@ run_cancellation() {
RESULTS_FILE="${RESULTS_DIR}/cancellation.json"
echo "RESULTS_FILE: ${RESULTS_FILE}"
echo "Running cancellation benchmark..."
debug_run $CARGO_COMMAND --bin dfbench -- cancellation --iterations 5 --path "${DATA_DIR}/cancellation" -o "${RESULTS_FILE}"
debug_run $CARGO_COMMAND --bin dfbench -- ${PIN_THREADS_FLAG} cancellation --iterations 5 --path "${DATA_DIR}/cancellation" -o "${RESULTS_FILE}"
}


Expand Down Expand Up @@ -767,15 +777,15 @@ run_clickbench_1() {
RESULTS_FILE="${RESULTS_DIR}/clickbench_1.json"
echo "RESULTS_FILE: ${RESULTS_FILE}"
echo "Running clickbench (1 file) benchmark..."
debug_run $CARGO_COMMAND --bin dfbench -- clickbench --iterations 5 --path "${DATA_DIR}/hits.parquet" --queries-path "${SCRIPT_DIR}/queries/clickbench/queries" -o "${RESULTS_FILE}" ${QUERY_ARG}
debug_run $CARGO_COMMAND --bin dfbench -- ${PIN_THREADS_FLAG} clickbench --iterations 5 --path "${DATA_DIR}/hits.parquet" --queries-path "${SCRIPT_DIR}/queries/clickbench/queries" -o "${RESULTS_FILE}" ${QUERY_ARG}
}

# Runs the clickbench benchmark with the partitioned parquet dataset (100 files)
run_clickbench_partitioned() {
RESULTS_FILE="${RESULTS_DIR}/clickbench_partitioned.json"
echo "RESULTS_FILE: ${RESULTS_FILE}"
echo "Running clickbench (partitioned, 100 files) benchmark..."
debug_run $CARGO_COMMAND --bin dfbench -- clickbench --iterations 5 --path "${DATA_DIR}/hits_partitioned" --queries-path "${SCRIPT_DIR}/queries/clickbench/queries" -o "${RESULTS_FILE}" ${QUERY_ARG}
debug_run $CARGO_COMMAND --bin dfbench -- ${PIN_THREADS_FLAG} clickbench --iterations 5 --path "${DATA_DIR}/hits_partitioned" --queries-path "${SCRIPT_DIR}/queries/clickbench/queries" -o "${RESULTS_FILE}" ${QUERY_ARG}
}


Expand All @@ -784,7 +794,7 @@ run_clickbench_pushdown() {
RESULTS_FILE="${RESULTS_DIR}/clickbench_pushdown.json"
echo "RESULTS_FILE: ${RESULTS_FILE}"
echo "Running clickbench (partitioned, 100 files) benchmark with pushdown_filters=true, reorder_filters=true..."
debug_run $CARGO_COMMAND --bin dfbench -- clickbench --pushdown --iterations 5 --path "${DATA_DIR}/hits_partitioned" --queries-path "${SCRIPT_DIR}/queries/clickbench/queries" -o "${RESULTS_FILE}" ${QUERY_ARG}
debug_run $CARGO_COMMAND --bin dfbench -- ${PIN_THREADS_FLAG} clickbench --pushdown --iterations 5 --path "${DATA_DIR}/hits_partitioned" --queries-path "${SCRIPT_DIR}/queries/clickbench/queries" -o "${RESULTS_FILE}" ${QUERY_ARG}
}


Expand All @@ -793,7 +803,7 @@ run_clickbench_extended() {
RESULTS_FILE="${RESULTS_DIR}/clickbench_extended.json"
echo "RESULTS_FILE: ${RESULTS_FILE}"
echo "Running clickbench (1 file) extended benchmark..."
debug_run $CARGO_COMMAND --bin dfbench -- clickbench --iterations 5 --path "${DATA_DIR}/hits.parquet" --queries-path "${SCRIPT_DIR}/queries/clickbench/extended" -o "${RESULTS_FILE}" ${QUERY_ARG}
debug_run $CARGO_COMMAND --bin dfbench -- ${PIN_THREADS_FLAG} clickbench --iterations 5 --path "${DATA_DIR}/hits.parquet" --queries-path "${SCRIPT_DIR}/queries/clickbench/extended" -o "${RESULTS_FILE}" ${QUERY_ARG}
}

# Downloads the csv.gz files IMDB datasets from Peter Boncz's homepage(one of the JOB paper authors)
Expand Down Expand Up @@ -975,7 +985,7 @@ run_h2o() {
QUERY_FILE="${SCRIPT_DIR}/queries/h2o/${RUN_Type}.sql"

# Run the benchmark using the dynamically constructed file path and query file
debug_run $CARGO_COMMAND --bin dfbench -- h2o \
debug_run $CARGO_COMMAND --bin dfbench -- ${PIN_THREADS_FLAG} h2o \
--iterations 3 \
--path "${H2O_DIR}/${FILE_NAME}" \
--queries-path "${QUERY_FILE}" \
Expand Down Expand Up @@ -1027,7 +1037,7 @@ h2o_runner() {
# Set the query file name based on the RUN_Type
QUERY_FILE="${SCRIPT_DIR}/queries/h2o/${RUN_Type}.sql"

debug_run $CARGO_COMMAND --bin dfbench -- h2o \
debug_run $CARGO_COMMAND --bin dfbench -- ${PIN_THREADS_FLAG} h2o \
--iterations 3 \
--join-paths "${H2O_DIR}/${X_TABLE_FILE_NAME},${H2O_DIR}/${SMALL_TABLE_FILE_NAME},${H2O_DIR}/${MEDIUM_TABLE_FILE_NAME},${H2O_DIR}/${LARGE_TABLE_FILE_NAME}" \
--queries-path "${QUERY_FILE}" \
Expand Down Expand Up @@ -1073,7 +1083,7 @@ run_sort_tpch() {
echo "RESULTS_FILE: ${RESULTS_FILE}"
echo "Running sort tpch benchmark..."

debug_run $CARGO_COMMAND --bin dfbench -- sort-tpch --iterations 5 --path "${TPCH_DIR}" -o "${RESULTS_FILE}" ${QUERY_ARG}
debug_run $CARGO_COMMAND --bin dfbench -- ${PIN_THREADS_FLAG} sort-tpch --iterations 5 --path "${TPCH_DIR}" -o "${RESULTS_FILE}" ${QUERY_ARG}
}

# Runs the sort tpch integration benchmark with limit 100 (topk)
Expand All @@ -1083,15 +1093,15 @@ run_topk_tpch() {
echo "RESULTS_FILE: ${RESULTS_FILE}"
echo "Running topk tpch benchmark..."

$CARGO_COMMAND --bin dfbench -- sort-tpch --iterations 5 --path "${TPCH_DIR}" -o "${RESULTS_FILE}" --limit 100 ${QUERY_ARG}
$CARGO_COMMAND --bin dfbench -- ${PIN_THREADS_FLAG} sort-tpch --iterations 5 --path "${TPCH_DIR}" -o "${RESULTS_FILE}" --limit 100 ${QUERY_ARG}
}

# Runs the nlj benchmark
run_nlj() {
RESULTS_FILE="${RESULTS_DIR}/nlj.json"
echo "RESULTS_FILE: ${RESULTS_FILE}"
echo "Running nlj benchmark..."
debug_run $CARGO_COMMAND --bin dfbench -- nlj --iterations 5 -o "${RESULTS_FILE}" ${QUERY_ARG}
debug_run $CARGO_COMMAND --bin dfbench -- ${PIN_THREADS_FLAG} nlj --iterations 5 -o "${RESULTS_FILE}" ${QUERY_ARG}
}

# Runs the hj benchmark
Expand All @@ -1100,15 +1110,15 @@ run_hj() {
RESULTS_FILE="${RESULTS_DIR}/hj.json"
echo "RESULTS_FILE: ${RESULTS_FILE}"
echo "Running hj benchmark..."
debug_run $CARGO_COMMAND --bin dfbench -- hj --iterations 5 --path "${TPCH_DIR}" -o "${RESULTS_FILE}" ${QUERY_ARG}
debug_run $CARGO_COMMAND --bin dfbench -- ${PIN_THREADS_FLAG} hj --iterations 5 --path "${TPCH_DIR}" -o "${RESULTS_FILE}" ${QUERY_ARG}
}

# Runs the smj benchmark
run_smj() {
RESULTS_FILE="${RESULTS_DIR}/smj.json"
echo "RESULTS_FILE: ${RESULTS_FILE}"
echo "Running smj benchmark..."
debug_run $CARGO_COMMAND --bin dfbench -- smj --iterations 5 -o "${RESULTS_FILE}" ${QUERY_ARG}
debug_run $CARGO_COMMAND --bin dfbench -- ${PIN_THREADS_FLAG} smj --iterations 5 -o "${RESULTS_FILE}" ${QUERY_ARG}
}


Expand Down Expand Up @@ -1243,7 +1253,7 @@ run_clickbench_sorted() {

# Run benchmark with prefer_existing_sort configuration
# This allows DataFusion to optimize away redundant sorts while maintaining parallelism
debug_run $CARGO_COMMAND --bin dfbench -- clickbench \
debug_run $CARGO_COMMAND --bin dfbench -- ${PIN_THREADS_FLAG} clickbench \
--iterations 5 \
--path "${DATA_DIR}/hits_sorted.parquet" \
--queries-path "${SCRIPT_DIR}/queries/clickbench/queries/sorted_data" \
Expand Down
Loading
Loading