Skip to content

Commit

Permalink
Implement more of the API (#710)
Browse files Browse the repository at this point in the history
Co-authored-by: Richard Pringle <[email protected]>
  • Loading branch information
rkuris and richardpringle authored Aug 27, 2024
1 parent e6ab2f6 commit c7c87ac
Show file tree
Hide file tree
Showing 11 changed files with 231 additions and 312 deletions.
107 changes: 44 additions & 63 deletions firewood/benches/hashops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
// hash benchmarks; run with 'cargo bench'

use criterion::{criterion_group, criterion_main, profiler::Profiler, BatchSize, Criterion};
use firewood::db::{BatchOp, DbConfig};
use firewood::merkle::Merkle;
use firewood::v2::api::{Db as _, Proposal as _};
use pprof::ProfilerGuard;
use rand::{distributions::Alphanumeric, rngs::StdRng, Rng, SeedableRng};
use std::sync::Arc;
Expand Down Expand Up @@ -52,23 +54,6 @@ impl Profiler for FlamegraphProfiler {
}
}

// TODO danlaine use or remove
// fn bench_trie_hash(criterion: &mut Criterion) {
// let mut to = [1u8; TRIE_HASH_LEN];
// let mut store = InMemLinearStore::new(TRIE_HASH_LEN as u64, 0u8);
// store.write(0, &*ZERO_HASH).expect("write should succeed");

// #[allow(clippy::unwrap_used)]
// criterion
// .benchmark_group("TrieHash")
// .bench_function("dehydrate", |b| {
// b.iter(|| ZERO_HASH.serialize(&mut to).unwrap());
// })
// .bench_function("hydrate", |b| {
// b.iter(|| TrieHash::deserialize(0, &store).unwrap());
// });
// }

// This benchmark peeks into the merkle layer and times how long it takes
// to insert NKEYS with a key length of KEYSIZE
#[allow(clippy::unwrap_used)]
Expand Down Expand Up @@ -107,57 +92,53 @@ fn bench_merkle<const NKEYS: usize, const KEYSIZE: usize>(criterion: &mut Criter
});
}

// This bechmark does the same thing as bench_merkle except it uses the revision manager
// TODO: Enable again once the revision manager is stable
// fn _bench_db<const N: usize>(criterion: &mut Criterion) {
// const KEY_LEN: usize = 4;
// let mut rng = StdRng::seed_from_u64(1234);

// #[allow(clippy::unwrap_used)]
// criterion
// .benchmark_group("Db")
// .sample_size(30)
// .bench_function("commit", |b| {
// b.to_async(tokio::runtime::Runtime::new().unwrap())
// .iter_batched(
// || {
// let batch_ops: Vec<_> = repeat_with(|| {
// (&mut rng)
// .sample_iter(&Alphanumeric)
// .take(KEY_LEN)
// .collect()
// })
// .map(|key: Vec<_>| BatchOp::Put {
// key,
// value: vec![b'v'],
// })
// .take(N)
// .collect();
// batch_ops
// },
// |batch_ops| async {
// let db_path = std::env::temp_dir();
// let db_path = db_path.join("benchmark_db");
// let cfg = DbConfig::builder();

// #[allow(clippy::unwrap_used)]
// let db = firewood::db::Db::new(db_path, cfg.clone().truncate(true).build())
// .await
// .unwrap();

// #[allow(clippy::unwrap_used)]
// db.propose(batch_ops).await.unwrap().commit().await.unwrap()
// },
// BatchSize::SmallInput,
// );
// });
// }
#[allow(clippy::unwrap_used)]
fn bench_db<const N: usize>(criterion: &mut Criterion) {
const KEY_LEN: usize = 4;
let mut rng = StdRng::seed_from_u64(1234);

criterion
.benchmark_group("Db")
.sample_size(30)
.bench_function("commit", |b| {
b.to_async(tokio::runtime::Runtime::new().unwrap())
.iter_batched(
|| {
let batch_ops: Vec<_> = repeat_with(|| {
(&mut rng)
.sample_iter(&Alphanumeric)
.take(KEY_LEN)
.collect()
})
.map(|key: Vec<_>| BatchOp::Put {
key,
value: vec![b'v'],
})
.take(N)
.collect();
batch_ops
},
|batch_ops| async {
let db_path = std::env::temp_dir();
let db_path = db_path.join("benchmark_db");
let cfg = DbConfig::builder();

let db = firewood::db::Db::new(db_path, cfg.clone().truncate(true).build())
.await
.unwrap();

db.propose(batch_ops).await.unwrap().commit().await.unwrap()
},
BatchSize::SmallInput,
);
});
}

criterion_group! {
name = benches;
config = Criterion::default().with_profiler(FlamegraphProfiler::Init(100));
// targets = bench_trie_hash, bench_merkle::<3, 32>, bench_db::<100>
targets = bench_merkle::<3, 4>, bench_merkle<3, 32>
targets = bench_merkle::<3, 4>, bench_merkle<3, 32>, bench_db<100>
}

criterion_main!(benches);
2 changes: 1 addition & 1 deletion firewood/examples/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ async fn main() -> Result<(), Box<dyn Error>> {

let args = Args::parse();

let mut db = Db::new("rev_db", cfg)
let db = Db::new("rev_db", cfg)
.await
.expect("db initiation should succeed");

Expand Down
183 changes: 51 additions & 132 deletions firewood/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,6 @@ use std::sync::{Arc, RwLock};
use storage::{Committed, FileBacked, HashedNodeReader, ImmutableProposal, NodeStore, TrieHash};
use typed_builder::TypedBuilder;

// TODO use or remove
const _VERSION_STR: &[u8; 16] = b"firewood v0.1\0\0\0";

#[derive(Debug)]
#[non_exhaustive]
pub enum DbError {
Expand Down Expand Up @@ -69,9 +66,10 @@ impl api::DbView for HistoricalRev {

async fn single_key_proof<K: api::KeyType>(
&self,
_key: K,
) -> Result<Option<Proof<ProofNode>>, api::Error> {
todo!()
key: K,
) -> Result<Proof<ProofNode>, api::Error> {
let merkle = Merkle::from(self);
merkle.prove(key.as_ref()).map_err(api::Error::from)
}

async fn range_proof<K: api::KeyType, V>(
Expand All @@ -91,110 +89,6 @@ impl api::DbView for HistoricalRev {
}
}

// impl<T: NodeReader> HistoricalRev<T> {
// pub fn stream(&self) -> MerkleKeyValueStream<'_, T> {
// todo!()
// }

// pub fn stream_from(&self, _start_key: &[u8]) -> MerkleKeyValueStream<'_, T> {
// todo!()
// }

// /// Get root hash of the generic key-value storage.
// pub fn kv_root_hash(&self) -> Result<TrieHash, DbError> {
// todo!()
// }

// /// Get a value associated with a key.
// pub fn get(&self, _key: &[u8]) -> Option<Vec<u8>> {
// todo!()
// }

// /// Dump the Trie of the generic key-value storage.
// pub fn dump(&self, _w: &mut dyn Write) -> Result<(), DbError> {
// todo!()
// }

// pub fn prove(&self, _key: &[u8]) -> Result<Proof<ProofNode>, MerkleError> {
// todo!()
// }

// /// Verifies a range proof is valid for a set of keys.
// pub fn verify_range_proof<V: AsRef<[u8]>>(
// &self,
// _proof: &Proof<impl Hashable>,
// _first_key: &[u8],
// _last_key: &[u8],
// _keys: Vec<&[u8]>,
// _values: Vec<V>,
// ) -> Result<bool, ProofError> {
// todo!()
// }
// }

/// TODO danlaine: implement
// pub struct Proposal<T> {
// _proposal: T,
// }

// #[async_trait]
// impl<T: NodeWriter> api::Proposal for Proposal<T> {
// type Proposal = Proposal<T>;

// async fn commit(self: Arc<Self>) -> Result<(), api::Error> {
// todo!()
// }

// async fn propose<K: api::KeyType, V: api::ValueType>(
// self: Arc<Self>,
// _data: api::Batch<K, V>,
// ) -> Result<Arc<Self::Proposal>, api::Error> {
// todo!()
// }
// }

// #[async_trait]
// impl<T: NodeReader> api::DbView for Proposal<T> {
// type Stream<'a> = MerkleKeyValueStream<'a, T> where T: 'a;

// async fn root_hash(&self) -> Result<api::HashKey, api::Error> {
// todo!()
// }

// async fn val<K>(&self, _key: K) -> Result<Option<Vec<u8>>, api::Error>
// where
// K: api::KeyType,
// {
// todo!()
// }

// async fn single_key_proof<K>(&self, _key: K) -> Result<Option<Proof<ProofNode>>, api::Error>
// where
// K: api::KeyType,
// {
// todo!()
// }

// async fn range_proof<K, V>(
// &self,
// _first_key: Option<K>,
// _last_key: Option<K>,
// _limit: Option<usize>,
// ) -> Result<Option<api::RangeProof<Vec<u8>, Vec<u8>, ProofNode>>, api::Error>
// where
// K: api::KeyType,
// {
// todo!();
// }

// fn iter_option<K: KeyType>(
// &self,
// _first_key: Option<K>,
// ) -> Result<Self::Stream<'_>, api::Error> {
// todo!()
// }
// }

/// Database configuration.
#[derive(Clone, TypedBuilder, Debug)]
pub struct DbConfig {
Expand Down Expand Up @@ -238,7 +132,7 @@ where
}

async fn propose<'p, K: KeyType, V: ValueType>(
&'p mut self,
&'p self,
batch: api::Batch<K, V>,
) -> Result<Arc<Self::Proposal<'p>>, api::Error>
where
Expand Down Expand Up @@ -293,17 +187,17 @@ impl Db {
Ok(db)
}

/// Create a proposal.
// pub fn new_proposal<K: KeyType, V: ValueType>(
// &self,
// _data: Batch<K, V>,
// ) -> Result<Proposal<ProposedMutable>, DbError> {
// todo!()
// }

/// Dump the Trie of the latest revision.
pub fn dump(&self, _w: &mut dyn Write) -> Result<(), DbError> {
todo!()
pub fn dump(&self, w: &mut dyn Write) -> Result<(), DbError> {
let latest_rev_nodestore = self
.manager
.read()
.expect("poisoned lock")
.current_revision();
let merkle = Merkle::from(latest_rev_nodestore);
// TODO: This should be a stream
let output = merkle.dump().map_err(DbError::Merkle)?;
write!(w, "{}", output).map_err(DbError::IO)
}

pub fn metrics(&self) -> Arc<DbMetrics> {
Expand All @@ -322,18 +216,17 @@ impl<'a> api::DbView for Proposal<'a> {
type Stream<'b> = MerkleKeyValueStream<'b, NodeStore<Arc<ImmutableProposal>, FileBacked>> where Self: 'b;

async fn root_hash(&self) -> Result<Option<api::HashKey>, api::Error> {
todo!()
self.nodestore.root_hash().map_err(api::Error::from)
}

async fn val<K: KeyType>(&self, _key: K) -> Result<Option<Box<[u8]>>, api::Error> {
todo!()
async fn val<K: KeyType>(&self, key: K) -> Result<Option<Box<[u8]>>, api::Error> {
let merkle = Merkle::from(self.nodestore.clone());
merkle.get_value(key.as_ref()).map_err(api::Error::from)
}

async fn single_key_proof<K: KeyType>(
&self,
_key: K,
) -> Result<Option<Proof<ProofNode>>, api::Error> {
todo!()
async fn single_key_proof<K: KeyType>(&self, key: K) -> Result<Proof<ProofNode>, api::Error> {
let merkle = Merkle::from(self.nodestore.clone());
merkle.prove(key.as_ref()).map_err(api::Error::from)
}

async fn range_proof<K: KeyType, V>(
Expand All @@ -359,9 +252,35 @@ impl<'a> api::Proposal for Proposal<'a> {

async fn propose<K: KeyType, V: ValueType>(
self: Arc<Self>,
_data: api::Batch<K, V>,
batch: api::Batch<K, V>,
) -> Result<Arc<Self::Proposal>, api::Error> {
todo!()
let parent = self.nodestore.clone();
let proposal = NodeStore::new(parent)?;
let mut merkle = Merkle::from(proposal);
for op in batch {
match op {
BatchOp::Put { key, value } => {
merkle.insert(key.as_ref(), value.as_ref().into())?;
}
BatchOp::Delete { key } => {
merkle.remove(key.as_ref())?;
}
}
}
let nodestore = merkle.into_inner();
let immutable: Arc<NodeStore<Arc<ImmutableProposal>, FileBacked>> =
Arc::new(nodestore.into());
self.db
.manager
.write()
.expect("poisoned lock")
.add_proposal(immutable.clone());

Ok(Self::Proposal {
nodestore: immutable,
db: self.db,
}
.into())
}

async fn commit(self: Arc<Self>) -> Result<(), api::Error> {
Expand Down Expand Up @@ -391,7 +310,7 @@ mod test {

#[tokio::test]
async fn test_cloned_proposal_error() {
let mut db = testdb().await;
let db = testdb().await;
let proposal = db
.propose::<Vec<u8>, Vec<u8>>(Default::default())
.await
Expand Down
Loading

0 comments on commit c7c87ac

Please sign in to comment.