Skip to content

Commit

Permalink
Add benchmark for an insert/seek workflow (in addition to the insert/…
Browse files Browse the repository at this point in the history
…scan)
  • Loading branch information
rdaum committed Jan 7, 2024
1 parent 0bfae23 commit 3fd7730
Showing 1 changed file with 121 additions and 58 deletions.
179 changes: 121 additions & 58 deletions crates/db/benches/tb_single_thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use std::sync::Arc;
use std::time::{Duration, Instant};

// This is a struct that tells Criterion.rs to use the "futures" crate's current-thread executor
use moor_db::tuplebox::{RelationId, Transaction};
use moor_db::tuplebox::RelationId;
use moor_values::util::{BitArray, Bitset64};
use tokio::runtime::Runtime;

Expand Down Expand Up @@ -57,79 +57,138 @@ fn load_history() -> Vec<History> {
events.collect::<Vec<_>>()
}

// Run the list-append workload on a single thread.
async fn list_append_workload(
db: Arc<TupleBox>,
events: &Vec<History>,
processes: &mut BitArray<Arc<Transaction>, 64, Bitset64<1>>,
) {
for e in events {
match e.r#type {
Type::invoke => {
// Start a transaction.
let tx = Arc::new(db.clone().start_tx());
assert!(
!processes.check(e.process as usize),
"T{} already exists committed",
e.process
);
processes.set(e.process as usize, tx.clone());
// Execute the actions
for ev in &e.value {
match ev {
Value::append(_, register, value) => {
// Insert the value into the relation.
let relation = RelationId(*register as usize);
tx.clone()
.relation(relation)
.await
.insert_tuple(from_val(*value), from_val(*value))
.await
.unwrap();
}
Value::r(_, register, _) => {
let relation = RelationId(*register as usize);

// Full-scan.
tx.relation(relation)
.await
.predicate_scan(&|_| true)
.await
.unwrap();
async fn list_append_scan_workload(iters: u64, events: &Vec<History>) -> Duration {
let mut cumulative = Duration::new(0, 0);
for _ in 0..iters {
// We create a brand new db for each iteration, so we have a clean slate.
let db = test_db().await;

// Where to track the transactions running.
let mut processes: BitArray<_, 256, Bitset64<8>> = BitArray::new();

let start = Instant::now();

black_box(for e in events {
match e.r#type {
Type::invoke => {
// Start a transaction.
let tx = Arc::new(db.clone().start_tx());
assert!(
!processes.check(e.process as usize),
"T{} already exists committed",
e.process
);
processes.set(e.process as usize, tx.clone());
// Execute the actions
for ev in &e.value {
match ev {
Value::append(_, register, value) => {
// Insert the value into the relation.
let relation = RelationId(*register as usize);
tx.clone()
.relation(relation)
.await
.insert_tuple(from_val(*value), from_val(*value))
.await
.unwrap();
}
Value::r(_, register, _) => {
let relation = RelationId(*register as usize);

// Full-scan.
tx.relation(relation)
.await
.predicate_scan(&|_| true)
.await
.unwrap();
}
}
}
}
Type::ok => {
let tx = processes.erase(e.process as usize).unwrap();
tx.commit().await.unwrap();
}
Type::fail => {
let tx = processes.erase(e.process as usize).unwrap();
tx.rollback().await.unwrap();
}
}
Type::ok => {
let tx = processes.erase(e.process as usize).unwrap();
tx.commit().await.unwrap();
}
Type::fail => {
let tx = processes.erase(e.process as usize).unwrap();
tx.rollback().await.unwrap();
}
}
});
cumulative += start.elapsed();
}
cumulative
}
async fn do_insert_workload(iters: u64, events: &Vec<History>) -> Duration {

/// Same as above, but instead of predicate scan, does an individual tuple lookup, to measure that.
async fn list_append_seek_workload(iters: u64, events: &Vec<History>) -> Duration {
let mut cumulative = Duration::new(0, 0);
for _ in 0..iters {
// We create a brand new db for each iteration, so we have a clean slate.
let db = test_db().await;

// Where to track the transactions running.
let mut processes = BitArray::new();

let mut processes: BitArray<_, 256, Bitset64<8>> = BitArray::new();
let start = Instant::now();
list_append_workload(db, events, &mut processes).await;
black_box(());

black_box(for e in events {
match e.r#type {
Type::invoke => {
// Start a transaction.
let tx = Arc::new(db.clone().start_tx());
assert!(
!processes.check(e.process as usize),
"T{} already exists committed",
e.process
);
processes.set(e.process as usize, tx.clone());
// Execute the actions
for ev in &e.value {
match ev {
Value::append(_, register, value) => {
// Insert the value into the relation.
let relation = RelationId(*register as usize);
tx.clone()
.relation(relation)
.await
.insert_tuple(from_val(*value), from_val(*value))
.await
.unwrap();
}
Value::r(_, register, Some(tuples)) => {
let relation = RelationId(*register as usize);

for t in tuples {
tx.relation(relation)
.await
.seek_by_domain(from_val(*t))
.await
.unwrap();
}
}
Value::r(_, _, None) => {
continue;
}
}
}
}
Type::ok => {
let tx = processes.erase(e.process as usize).unwrap();
tx.commit().await.unwrap();
}
Type::fail => {
let tx = processes.erase(e.process as usize).unwrap();
tx.rollback().await.unwrap();
}
}
});
cumulative += start.elapsed();
}
cumulative
}

// Measure the # of commit/rollbacks per second using the list-append Jepsen workload.
pub fn workload_commits(c: &mut Criterion) {
pub fn throughput_bench(c: &mut Criterion) {
let events = load_history();

// Count the # of commit/rollback (unique transactions) in the workload.
Expand All @@ -140,12 +199,16 @@ pub fn workload_commits(c: &mut Criterion) {
group.sample_size(1000);
group.measurement_time(Duration::from_secs(10));
group.throughput(criterion::Throughput::Elements(tx_count as u64));
group.bench_function("commit_rate", |b| {
group.bench_function("list_append_scan", |b| {
b.to_async(&rt)
.iter_custom(|iters| list_append_scan_workload(iters, &events));
});
group.bench_function("list_append_seek", |b| {
b.to_async(&rt)
.iter_custom(|iters| do_insert_workload(iters, &events));
.iter_custom(|iters| list_append_seek_workload(iters, &events));
});
group.finish();
}

criterion_group!(benches, workload_commits);
criterion_group!(benches, throughput_bench);
criterion_main!(benches);

0 comments on commit 3fd7730

Please sign in to comment.