From 9818f68dfcd92e13a794896b2aa1c89717a36274 Mon Sep 17 00:00:00 2001 From: Tomer Filiba Date: Mon, 5 Aug 2024 00:12:21 +0300 Subject: [PATCH] Add examples, document APIs --- .github/workflows/rust.yml | 6 +- README.md | 49 ++++-- examples/multithreaded.rs | 57 +++++++ examples/simple.rs | 35 ++++ src/hashing.rs | 4 + src/lib.rs | 12 +- src/shard.rs | 6 + src/store.rs | 105 ++++++++---- tests/common/mod.rs | 16 ++ tests/test_loading.rs | 76 +++++++++ tests/test_logic.rs | 111 ++++++++++++ tests/test_modify_inplace.rs | 41 +++++ tests/test_mutlithreading.rs | 78 +++++++++ tests/test_pre_split.rs | 33 ++++ tests/tests.rs | 320 ----------------------------------- 15 files changed, 577 insertions(+), 372 deletions(-) create mode 100644 examples/multithreaded.rs create mode 100644 examples/simple.rs create mode 100644 tests/common/mod.rs create mode 100644 tests/test_loading.rs create mode 100644 tests/test_logic.rs create mode 100644 tests/test_modify_inplace.rs create mode 100644 tests/test_mutlithreading.rs create mode 100644 tests/test_pre_split.rs delete mode 100644 tests/tests.rs diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index c324310..05ba72d 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -19,4 +19,8 @@ jobs: - name: Build run: cargo build --verbose - name: Run tests - run: cargo test --release --verbose + run: cargo test --release --verbose -- --nocapture + - name: Run simple example + run: cargo run --example simple + - name: Run multithreaded example + run: cargo run --example multithreaded diff --git a/README.md b/README.md index 74fd086..486f3b2 100644 --- a/README.md +++ b/README.md @@ -2,6 +2,40 @@ A pure rust implementation of a fast, persistent, in-process key-value store, that relies on a novel sharding mechanism. +## Example +```rust +use vicky_store::{Config, Result, VickyStore}; + +let db = VickyStore::open("/tmp/vicky-dir", Config::default())?; + +db.insert("mykey", "myval")?; +assert_eq!(db.get("mykey")?, Some("myval".into())); + +assert_eq!(db.get("yourkey")?, None); + +assert_eq!(db.iter().count(), 1); + +for res in db.iter() { + let (k, v) = res?; + assert_eq!(k, "mykey".into()); + assert_eq!(v, "myval".into()); +} + +assert_eq!(db.iter().count(), 0); +``` + +## Design Goals +* Fast and efficient +* Low memory footprint +* No heavy/unbounded merges +* No Write-Ahead Log (WAL) or journalling of any kind +* Crash safe: you may lose the latest operations, but never be in an inconsistent state +* Splitting/compaction happens per-shard, so there's no global locking +* Suitable for both write-heavy/read-heavy workloads +* Concurrent by design (multiple threads getting/setting/removing keys at the same time) +* The backing store is taken to be an SSD, thus it's not optimized for HDDs + +## Algorithm The algorithm is straight forward: * A key is hashed, producing 64 bits of hash. The most significant 16 bits are taken to be "shard selector", followed by 16 bits of "row selector", followed by 32 bits of "signature". @@ -12,7 +46,7 @@ The algorithm is straight forward: the keys are divided according to their shard selector. This process repeats as needed. * Inside a shard, we have a header table made of rows, each being an array of signatures. The row selector selects the key's row, and within the row we use SIMD operations for matching the signature very quickly. This - part of the file is kept mmap'ed. + part of the file is kept `mmap`ed. * Once we find the correct entry, we get its data offset in the file and read it. The default parameters (chosen by simulations) are shards with 64 rows, each with 512 entries. The chances @@ -25,20 +59,11 @@ server, followed by the normal sharding mechanism described above. ## Notes * The file format is not yet stable - -## Design Goals -* Fast and efficient -* Low memory footprint -* No heavy/unbounded merges -* No Write-Ahead Log (WAL) or journalling of any kind -* Splitting/compaction happens per-shard, so there's no global locking -* Suitable for both write-heavy/read-heavy workloads -* Concurrent by design (multiple threads getting/setting/removing keys at the same time) -* The backing store is taken to be an SSD, thus it's not optimized for HDDs +* Requires nightly (for `simd_itertools` and BTree cursors), uses very little `unsafe` (required due to `mmap`) ## Roadmap * Add TTL to keys (easy, but will screw up accounting) * Add key-prefixes which can be used to implement grouping of keys into "families", i.e. lightweight indexing by storing their parted-hash with an "anti collision" in a modifiable entry * Distributed protocol based on file locks (meant to run on a shared network folder) - +* Add some schema-like features, maybe using rkyv diff --git a/examples/multithreaded.rs b/examples/multithreaded.rs new file mode 100644 index 0000000..b3fe04b --- /dev/null +++ b/examples/multithreaded.rs @@ -0,0 +1,57 @@ +use core::str; +use std::{sync::Arc, time::Duration}; + +use vicky_store::{Config, Result, VickyStore}; + +fn main() -> Result<()> { + let db = Arc::new(VickyStore::open("/tmp/vicky-dir-mt", Config::default())?); + + // clear the DB just in case we has something there before. in real-life scenarios you would probably + // not clear the DB every time + db.clear()?; + + // clone db and spawn thread 1 + let db1 = db.clone(); + let h1 = std::thread::spawn(move || -> Result<()> { + for i in 0..100 { + db1.insert(&format!("key{i}"), "thread 1")?; + std::thread::sleep(Duration::from_millis(1)); + } + Ok(()) + }); + + // clone db and spawn thread 2 + let db2 = db.clone(); + let h2 = std::thread::spawn(move || -> Result<()> { + for i in 0..100 { + db2.insert(&format!("key{i}"), "thread 2")?; + std::thread::sleep(Duration::from_millis(1)); + } + Ok(()) + }); + + h1.join().unwrap()?; + h2.join().unwrap()?; + + for res in db.iter() { + let (k, v) = res?; + println!( + "{} = {}", + str::from_utf8(&k).unwrap(), + str::from_utf8(&v).unwrap() + ); + } + + // key35 = thread 1 + // key41 = thread 1 + // key52 = thread 2 + // key59 = thread 2 + // key48 = thread 2 + // key85 = thread 2 + // key91 = thread 2 + // key26 = thread 1 + // key31 = thread 1 + // ... + + Ok(()) +} diff --git a/examples/simple.rs b/examples/simple.rs new file mode 100644 index 0000000..1b7ee2b --- /dev/null +++ b/examples/simple.rs @@ -0,0 +1,35 @@ +use core::str; + +use vicky_store::{Config, Result, VickyStore}; + +fn main() -> Result<()> { + let db = VickyStore::open("/tmp/vicky-dir", Config::default())?; + + // clear the DB just in case we has something there before. in real-life scenarios you would probably + // not clear the DB every time + db.clear()?; + + println!("{:?}", db.get("mykey")?); // None + + db.insert("mykey", "myval")?; + println!("{:?}", db.get("mykey")?); // Some([109, 121, 118, 97, 108]) + + println!("{:?}", db.remove("mykey")?); // Some([109, 121, 118, 97, 108]) + println!("{:?}", db.remove("mykey")?); // None + + println!("{:?}", db.get("mykey")?); // None + + for i in 0..10 { + db.insert(&format!("mykey{i}"), &format!("myval{i}"))?; + } + for res in db.iter() { + let (k, v) = res?; + println!( + "{} = {}", + str::from_utf8(&k).unwrap(), + str::from_utf8(&v).unwrap() + ); + } + + Ok(()) +} diff --git a/src/hashing.rs b/src/hashing.rs index 7678658..e084a60 100644 --- a/src/hashing.rs +++ b/src/hashing.rs @@ -7,9 +7,13 @@ use crate::{Error, Result}; #[derive(Debug, Clone, Copy)] pub struct SecretKey([u8; 16]); +/// A struct that represents a "nonce" for seeding the hash function (keyed hash). +/// Keeping it secret is only meaningful if you're concerned with DoS attacks impl SecretKey { pub const LEN: usize = size_of::(); + /// Construct a SecretKey from the given byte buffer (must be 16 bytes in length) + /// pub fn new + ?Sized>(key: &B) -> Result { let key = key.as_ref(); if key.len() != Self::LEN { diff --git a/src/lib.rs b/src/lib.rs index cf4ce3c..2fe4fbd 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -5,10 +5,7 @@ mod shard; mod store; pub use hashing::SecretKey; -use std::{ - fmt::{Display, Formatter}, - path::PathBuf, -}; +use std::fmt::{Display, Formatter}; pub use store::{Stats, VickyStore}; #[derive(Debug)] @@ -42,19 +39,18 @@ impl std::error::Error for Error {} pub type Result = std::result::Result; +/// The configuration options for VickyStore. Comes with sane defaults, feel free to use them #[derive(Debug, Clone)] pub struct Config { - pub dir_path: PathBuf, - pub max_shard_size: u32, + pub max_shard_size: u32, // we don't want huge shards, because splitting would be expensive pub min_compaction_threashold: u32, // should be ~10% of max_shard_size - pub secret_key: SecretKey, + pub secret_key: SecretKey, // just some entropy, not so important unless you fear DoS pub expected_number_of_keys: usize, // hint for creating number of shards accordingly) } impl Default for Config { fn default() -> Self { Self { - dir_path: PathBuf::new(), max_shard_size: 64 * 1024 * 1024, min_compaction_threashold: 8 * 1024 * 1024, secret_key: SecretKey::new(b"kOYLu0xvq2WtzcKJ").unwrap(), diff --git a/src/shard.rs b/src/shard.rs index 275127a..fd367aa 100644 --- a/src/shard.rs +++ b/src/shard.rs @@ -155,6 +155,12 @@ impl Shard { }) } + pub(crate) fn flush(&self) -> Result<()> { + //self.mmap.flush()?; + self.file.sync_data()?; + Ok(()) + } + #[inline] fn extract_offset_and_size(offset_and_size: u64) -> (usize, usize, u64) { let klen = (offset_and_size >> 48) as usize; diff --git a/src/store.rs b/src/store.rs index f8c91c6..d3e5300 100644 --- a/src/store.rs +++ b/src/store.rs @@ -1,6 +1,7 @@ use std::{ collections::BTreeMap, ops::Bound, + path::{Path, PathBuf}, sync::{ atomic::{AtomicUsize, Ordering}, Arc, RwLock, @@ -15,6 +16,7 @@ use crate::{ }; use crate::{shard::EntryRef, Config, Result}; +/// Stats from VickyStore, mainly useful for debugging #[derive(Debug, PartialEq, Eq, Clone)] pub struct Stats { pub num_entries: usize, @@ -22,15 +24,20 @@ pub struct Stats { pub num_compactions: usize, } +/// The VickyStore object. Note that it's fully sync'ed, so can be shared between threads using `Arc` pub struct VickyStore { shards: RwLock>, config: Arc, + dir_path: PathBuf, // stats num_entries: AtomicUsize, num_compactions: AtomicUsize, num_splits: AtomicUsize, } +/// An iterator over a VickyStore. Note that it's safe to modify (insert/delete) keys while iterating, +/// but the results of the iteration may or may not include these changes. This is considered a +/// well-defined behavior of the store. pub struct VickyStoreIterator<'a> { db: &'a VickyStore, shard_idx: u32, @@ -48,12 +55,15 @@ impl<'a> VickyStoreIterator<'a> { } } + /// Returns the cookie of the next item in the store. This can be used later to construct an iterator + /// that starts at the given point. pub fn cookie(&self) -> u64 { ((self.shard_idx as u64 & 0xffff) << 32) | ((self.row_idx as u64 & 0xffff) << 16) | (self.entry_idx as u64 & 0xffff) } + // Constructs an iterator starting at the given cookie pub fn from_cookie(db: &'a VickyStore, cookie: u64) -> Self { Self { db, @@ -102,19 +112,23 @@ impl<'a> Iterator for VickyStoreIterator<'a> { impl VickyStore { const END_OF_SHARDS: u32 = 1u32 << 16; - pub fn open(config: Config) -> Result { + /// Opens or creates a new VickyStore. + /// * dir_path - the directory where shards will be kept + /// * config - the configuration options for the store + pub fn open(dir_path: impl AsRef, config: Config) -> Result { let mut shards: BTreeMap = BTreeMap::new(); let config = Arc::new(config); + let dir_path: PathBuf = dir_path.as_ref().into(); - std::fs::create_dir_all(&config.dir_path)?; - Self::load_existing_dir(config.clone(), &mut shards)?; - + std::fs::create_dir_all(&dir_path)?; + Self::load_existing_dir(&dir_path, &config, &mut shards)?; if shards.is_empty() { - Self::create_first_shards(&config, &mut shards)? + Self::create_first_shards(&dir_path, &config, &mut shards)?; } Ok(Self { config, + dir_path, shards: RwLock::new(shards), num_entries: 0.into(), num_compactions: 0.into(), @@ -122,8 +136,12 @@ impl VickyStore { }) } - fn load_existing_dir(config: Arc, shards: &mut BTreeMap) -> Result<()> { - for res in std::fs::read_dir(&config.dir_path)? { + fn load_existing_dir( + dir_path: &PathBuf, + config: &Arc, + shards: &mut BTreeMap, + ) -> Result<()> { + for res in std::fs::read_dir(&dir_path)? { let entry = res?; let filename = entry.file_name(); let Some(filename) = filename.to_str() else { @@ -163,7 +181,7 @@ impl VickyStore { continue; } else { // remove existing one - std::fs::remove_file(config.dir_path.join(format!( + std::fs::remove_file(dir_path.join(format!( "shard_{:04x}-{:04x}", existing.span.start, existing.span.end )))?; @@ -193,7 +211,7 @@ impl VickyStore { } } for (start, end) in to_remove { - let bottomfile = config.dir_path.join(format!("shard_{start:04x}-{end:04x}")); + let bottomfile = dir_path.join(format!("shard_{start:04x}-{end:04x}")); std::fs::remove_file(bottomfile)?; shards.remove(&end); } @@ -201,7 +219,11 @@ impl VickyStore { Ok(()) } - fn create_first_shards(config: &Arc, shards: &mut BTreeMap) -> Result<()> { + fn create_first_shards( + dir_path: &PathBuf, + config: &Arc, + shards: &mut BTreeMap, + ) -> Result<()> { let shards_needed = (config.expected_number_of_keys / Shard::EXPECTED_CAPACITY).max(1); let step = Self::END_OF_SHARDS / 2u32.pow(shards_needed.ilog2()); @@ -211,9 +233,7 @@ impl VickyStore { shards.insert( Self::END_OF_SHARDS, Shard::open( - config - .dir_path - .join(format!("shard_{:04x}-{:04x}", start, end)), + dir_path.join(format!("shard_{:04x}-{:04x}", start, end)), 0..Self::END_OF_SHARDS, false, config.clone(), @@ -225,6 +245,8 @@ impl VickyStore { Ok(()) } + /// Gets the value of a key from the store. If the key does not exist, `None` will be returned. + /// The data is fully-owned, no references are returned. pub fn get + ?Sized>(&self, key: &B) -> Result>> { let key = key.as_ref(); let ph = PartedHash::from_buffer(USER_NAMESPACE, &self.config.secret_key, key); @@ -253,11 +275,11 @@ impl VickyStore { } let removed_shard = guard.remove(&shard_end).unwrap(); - let orig_filename = self.config.dir_path.join(format!( + let orig_filename = self.dir_path.join(format!( "shard_{:04x}-{:04x}", removed_shard.span.start, removed_shard.span.end )); - let tmpfile = self.config.dir_path.join(format!( + let tmpfile = self.dir_path.join(format!( "compact_{:04x}-{:04x}", removed_shard.span.start, removed_shard.span.end )); @@ -294,11 +316,9 @@ impl VickyStore { let removed_shard = guard.remove(&shard_end).unwrap(); let bottomfile = self - .config .dir_path .join(format!("bottom_{:04x}-{:04x}", shard_start, midpoint)); let topfile = self - .config .dir_path .join(format!("top_{:04x}-{:04x}", midpoint, shard_end)); @@ -333,19 +353,16 @@ impl VickyStore { // delete the partial ones std::fs::rename( bottomfile, - self.config - .dir_path + self.dir_path .join(format!("shard_{:04x}-{:04x}", shard_start, midpoint)), )?; std::fs::rename( topfile, - self.config - .dir_path + self.dir_path .join(format!("shard_{:04x}-{:04x}", midpoint, shard_end)), )?; std::fs::remove_file( - self.config - .dir_path + self.dir_path .join(format!("shard_{:04x}-{:04x}", shard_start, shard_end)), ) .unwrap(); @@ -400,6 +417,23 @@ impl VickyStore { } } + /// Attempts for sync all in-memory changes of all shards to disk. Concurrent changes are allowed while + /// flushing, and may result in partially-sync'ed store. Use sparingly, as this is a costly operaton. + pub fn flush(&self) -> Result<()> { + let guard = self.shards.read().unwrap(); + for (_, shard) in guard.iter() { + shard.flush()?; + } + Ok(()) + } + + /// Inserts a key-value pair, creating it or replacing an existing pair. Note that if the program crashed + /// while or "right after" this operation, or if the operating system is unable to flush the page cache, + /// you may lose some data. However, you will still be in a consistent state, where you will get a previous + /// version of the state. + /// + /// While this method is O(1) amortized, every so often it will trigger either a shard compaction or a + /// shard split, which requires rewriting the whole shard. pub fn insert + ?Sized, B2: AsRef<[u8]> + ?Sized>( &self, key: &B1, @@ -413,12 +447,13 @@ impl VickyStore { self._insert(ph, entry) } - // Modify an existing entry in-place, instead of creating a version. Note that the key must exist - // and `patch.len() + patch_offset` must be less than or equal to the current value's length. - // This method is guaranteed to never trigger a split or a compaction - // - // This is not crash-safe as it overwrite existing data, and thus may produce inconsistent results - // on crashes (part old data, part new data) + /// Modifies an existing entry in-place, instead of creating a version. Note that the key must exist + /// and `patch.len() + patch_offset` must be less than or equal to the current value's length. + /// + /// This is operation is NOT crash-safe as it overwrites existing data, and thus may produce inconsistent + /// results on crashes (reading part old data, part new data). + /// + /// This method will never trigger a shard split or a shard compaction. pub fn modify_inplace + ?Sized, B2: AsRef<[u8]> + ?Sized>( &self, key: &B1, @@ -438,6 +473,8 @@ impl VickyStore { .modify_inplace(ph, key, patch, patch_offset) } + /// Removes a key-value pair from the store, returning `None` if the key did not exist, + /// or `Some(old_value)` if it did pub fn remove + ?Sized>(&self, key: &B) -> Result>> { let key = key.as_ref(); let ph = PartedHash::from_buffer(USER_NAMESPACE, &self.config.secret_key, key); @@ -456,6 +493,8 @@ impl VickyStore { Ok(val) } + /// Returns some stats, useful for debugging. Note that stats are local to the VickyStore instance and + /// are not persisted, so closing and opening the store will reset the stats. pub fn stats(&self) -> Stats { Stats { num_entries: self.num_entries.load(Ordering::Acquire), @@ -464,18 +503,22 @@ impl VickyStore { } } + /// Returns an iterator over the whole store pub fn iter(&self) -> VickyStoreIterator { VickyStoreIterator::new(self) } + /// Returns an iterator starting from the specified cookie (obtained from `get_cookie` of a + /// previously created `VickyStoreIterator` pub fn iter_from_cookie(&self, cookie: u64) -> VickyStoreIterator { VickyStoreIterator::from_cookie(self, cookie) } + /// Clears the store (erasing all keys) pub fn clear(&self) -> Result<()> { let mut guard = self.shards.write().unwrap(); - for res in std::fs::read_dir(&self.config.dir_path)? { + for res in std::fs::read_dir(&self.dir_path)? { let entry = res?; let filename = entry.file_name(); let Some(filename) = filename.to_str() else { @@ -501,7 +544,7 @@ impl VickyStore { self.num_splits.store(0, Ordering::Relaxed); guard.clear(); - Self::create_first_shards(&self.config, &mut guard)?; + Self::create_first_shards(&self.dir_path, &self.config, &mut guard)?; Ok(()) } diff --git a/tests/common/mod.rs b/tests/common/mod.rs new file mode 100644 index 0000000..70ebedb --- /dev/null +++ b/tests/common/mod.rs @@ -0,0 +1,16 @@ +use rand::random; +use vicky_store::Result; + +pub fn run_in_tempdir(f: impl FnOnce(&str) -> Result<()>) -> Result<()> { + let rand: u64 = random(); + let dir = format!("/tmp/vicky-{rand}"); + _ = std::fs::remove_dir_all(&dir); + + f(&dir)?; + + _ = std::fs::remove_dir_all(&dir); + Ok(()) +} + +#[allow(dead_code)] +pub const LONG_VAL: &str = "a very long valueeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee"; diff --git a/tests/test_loading.rs b/tests/test_loading.rs new file mode 100644 index 0000000..161269d --- /dev/null +++ b/tests/test_loading.rs @@ -0,0 +1,76 @@ +mod common; + +use std::sync::Arc; + +use vicky_store::{Config, Result, VickyStore}; + +use crate::common::{run_in_tempdir, LONG_VAL}; + +#[test] +fn test_loading() -> Result<()> { + run_in_tempdir(|dir| { + let config = Config { + max_shard_size: 20 * 1024, // use small files to force lots of splits and compactions + min_compaction_threashold: 10 * 1024, + ..Default::default() + }; + + { + let db = Arc::new(VickyStore::open(dir, config.clone())?); + + for i in 0..1000 { + db.insert(&format!("unique key {i}"), LONG_VAL)?; + } + + assert!(db.stats().num_splits > 1); + assert_eq!(db.iter().count(), 1000); + } + + { + let db = Arc::new(VickyStore::open(dir, config.clone())?); + + assert_eq!(db.iter().count(), 1000); + + for res in db.iter() { + let (key, val) = res?; + assert_eq!(val, LONG_VAL.as_bytes()); + assert!(key.starts_with(b"unique key ")); + } + } + + { + let existing = std::fs::read_dir(dir)? + .map(|res| res.unwrap().file_name().to_str().unwrap().to_string()) + .filter(|name| name.starts_with("shard_")) + .collect::>(); + + std::fs::write(format!("{dir}/top_1234-5678"), "xxxx")?; + std::fs::write(format!("{dir}/bottom_1234-5678"), "xxxx")?; + std::fs::write(format!("{dir}/compact_1234-5678"), "xxxx")?; + + let (_, span) = existing[0].split_once("_").unwrap(); + let (start, end) = span.split_once("-").unwrap(); + let start = u32::from_str_radix(start, 16).unwrap(); + let end = u32::from_str_radix(end, 16).unwrap(); + let mid = (start + end) / 2; + std::fs::write(format!("{dir}/shard_{start:04x}-{mid:04x}"), "xxxx")?; + std::fs::write(format!("{dir}/shard_{mid:04x}-{end:04x}"), "xxxx")?; + + let db = Arc::new(VickyStore::open(dir, config)?); + + assert!(!std::fs::exists(format!("{dir}/top_1234-5678"))?); + assert!(!std::fs::exists(format!("{dir}/bottom_1234-5678"))?); + assert!(!std::fs::exists(format!("{dir}/compact_1234-5678"))?); + assert!(!std::fs::exists(format!( + "{dir}/shard_{start:04x}-{mid:04x}" + ))?); + assert!(!std::fs::exists(format!( + "{dir}/shard_{mid:04x}-{end:04x}" + ))?); + + assert_eq!(db.iter().count(), 1000); + } + + Ok(()) + }) +} diff --git a/tests/test_logic.rs b/tests/test_logic.rs new file mode 100644 index 0000000..edc4126 --- /dev/null +++ b/tests/test_logic.rs @@ -0,0 +1,111 @@ +mod common; + +use std::{collections::HashSet, sync::Arc}; + +use vicky_store::{Config, Result, Stats, VickyStore}; + +use crate::common::{run_in_tempdir, LONG_VAL}; + +#[test] +fn test_logic() -> Result<()> { + run_in_tempdir(|dir| { + let db = Arc::new(VickyStore::open( + dir, + Config { + max_shard_size: 20 * 1024, // use small files to force lots of splits and compactions + min_compaction_threashold: 10 * 1024, + ..Default::default() + }, + )?); + + assert!(db.get("my name")?.is_none()); + db.insert("my_name", "inigo montoya")?; + db.insert("your_name", "dread pirate robert")?; + + assert_eq!(db.get("my_name")?, Some("inigo montoya".into())); + assert_eq!(db.get("your_name")?, Some("dread pirate robert".into())); + db.insert("your_name", "vizzini")?; + assert_eq!(db.get("your_name")?, Some("vizzini".into())); + assert_eq!(db.remove("my_name")?, Some("inigo montoya".into())); + assert!(db.remove("my_name")?.is_none()); + assert!(db.get("my name")?.is_none()); + + let stats = db.stats(); + assert_eq!(stats.num_entries, 1); + assert_eq!(stats.num_compactions, 0); + assert_eq!(stats.num_splits, 0); + + for _ in 0..1000 { + db.insert( + "a very long keyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyy", + LONG_VAL, + )?; + assert!(db + .remove("a very long keyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyy")? + .is_some()); + } + + let stats = db.stats(); + assert_eq!(stats.num_entries, 1); + assert!(stats.num_compactions >= 2); + assert_eq!(stats.num_splits, 0); + + for i in 0..1000 { + db.insert(&format!("unique key {i}"), LONG_VAL)?; + } + + let stats2 = db.stats(); + assert_eq!(stats2.num_entries, 1001); + assert_eq!(stats2.num_compactions, stats.num_compactions); + assert!(stats2.num_splits > stats.num_splits); + + assert_eq!(db.get("your_name")?, Some("vizzini".into())); + db.clear()?; + assert_eq!(db.get("your_name")?, None); + + assert_eq!( + db.stats(), + Stats { + num_compactions: 0, + num_entries: 0, + num_splits: 0 + } + ); + + for i in 0..1000 { + db.insert(&format!("unique key {i}"), LONG_VAL)?; + } + + let mut all_keys = HashSet::new(); + + for res in db.iter() { + let (key, val) = res?; + assert_eq!(val, LONG_VAL.as_bytes()); + assert!(key.starts_with(b"unique key ")); + all_keys.insert(key); + } + + assert_eq!(all_keys.len(), 1000); + + all_keys.clear(); + + let cookie = { + let mut iter1 = db.iter(); + for _ in 0..100 { + let res = iter1.next().unwrap(); + let (key, _) = res?; + all_keys.insert(key); + } + iter1.cookie() + }; + + for res in db.iter_from_cookie(cookie) { + let (key, _) = res?; + all_keys.insert(key); + } + + assert_eq!(all_keys.len(), 1000); + + Ok(()) + }) +} diff --git a/tests/test_modify_inplace.rs b/tests/test_modify_inplace.rs new file mode 100644 index 0000000..2062ca3 --- /dev/null +++ b/tests/test_modify_inplace.rs @@ -0,0 +1,41 @@ +mod common; + +use std::sync::Arc; + +use vicky_store::{Config, Error, Result, VickyStore}; + +use crate::common::run_in_tempdir; + +#[test] +fn test_modify_inplace() -> Result<()> { + run_in_tempdir(|dir| { + let db = Arc::new(VickyStore::open( + dir, + Config { + max_shard_size: 20 * 1024, // use small files to force lots of splits and compactions + min_compaction_threashold: 10 * 1024, + ..Default::default() + }, + )?); + + db.insert("aaa", "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa")?; + + assert!(matches!( + db.modify_inplace("zzz", "bbb", 7), + Err(Error::KeyNotFound) + )); + + assert!(matches!( + db.modify_inplace("aaa", "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", 7), + Err(Error::ValueTooLong) + )); + + db.modify_inplace("aaa", "bbb", 7)?; + assert_eq!( + db.get("aaa")?, + Some("aaaaaaabbbaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa".into()) + ); + + Ok(()) + }) +} diff --git a/tests/test_mutlithreading.rs b/tests/test_mutlithreading.rs new file mode 100644 index 0000000..f43111b --- /dev/null +++ b/tests/test_mutlithreading.rs @@ -0,0 +1,78 @@ +mod common; + +use std::sync::{atomic::AtomicUsize, Arc}; + +use rand::random; +use vicky_store::{Config, Result, VickyStore}; + +use crate::common::run_in_tempdir; + +#[test] +fn test_multithreaded() -> Result<()> { + run_in_tempdir(|dir| { + for attempt in 0..10 { + let db = Arc::new(VickyStore::open( + dir, + Config { + max_shard_size: 20 * 1024, + min_compaction_threashold: 10 * 1024, + ..Default::default() + }, + )?); + + const NUM_ITEMS: usize = 10_000; + let succ_gets = Arc::new(AtomicUsize::new(0)); + let succ_removals = Arc::new(AtomicUsize::new(0)); + + let mut thds = Vec::new(); + for thid in 0..50 { + let db = db.clone(); + let succ_gets = succ_gets.clone(); + let succ_removals = succ_removals.clone(); + let handle = std::thread::spawn(move || -> Result<()> { + let value = format!("data{thid}"); + for i in 0..NUM_ITEMS { + let key = format!("key{i}"); + db.insert(&key, &value)?; + + if random::() > 0.8 { + if db.remove(&key)?.is_some() { + succ_removals.fetch_add(1, std::sync::atomic::Ordering::SeqCst); + } + } else { + let val2 = db.get(&key)?; + if let Some(val2) = val2 { + assert!(val2.starts_with(b"data")); + succ_gets.fetch_add(1, std::sync::atomic::Ordering::SeqCst); + } + } + } + Ok(()) + }); + //handle.join().unwrap().unwrap(); + thds.push(handle); + } + + for thd in thds { + thd.join().unwrap()?; + } + + let gets = succ_gets.load(std::sync::atomic::Ordering::SeqCst); + let removals = succ_removals.load(std::sync::atomic::Ordering::SeqCst); + + let stats = db.stats(); + println!( + "[{attempt}] {stats:?} gets={gets} removals={removals} diff={}", + gets - removals + ); + + assert_eq!(db.iter().count(), stats.num_entries); + assert!( + stats.num_entries >= (NUM_ITEMS * 7) / 10 + && stats.num_entries <= (NUM_ITEMS * 9) / 10 + ); + db.clear()?; + } + Ok(()) + }) +} diff --git a/tests/test_pre_split.rs b/tests/test_pre_split.rs new file mode 100644 index 0000000..6f13d4a --- /dev/null +++ b/tests/test_pre_split.rs @@ -0,0 +1,33 @@ +mod common; + +use std::sync::Arc; + +use vicky_store::{Config, Result, VickyStore}; + +use crate::common::run_in_tempdir; + +#[test] +fn test_pre_split() -> Result<()> { + run_in_tempdir(|dir| { + let db = Arc::new(VickyStore::open( + dir, + Config { + max_shard_size: 20 * 1024, // use small files to force lots of splits and compactions + min_compaction_threashold: 10 * 1024, + expected_number_of_keys: 1_000_000, + ..Default::default() + }, + )?); + + db.insert("aaa", "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa")?; + + let files = std::fs::read_dir(&dir)? + .map(|res| res.unwrap().file_name().to_string_lossy().to_string()) + .filter(|filename| filename.starts_with("shard_")) + .collect::>(); + + assert_eq!(files.len(), 32); + + Ok(()) + }) +} diff --git a/tests/tests.rs b/tests/tests.rs deleted file mode 100644 index 55447e4..0000000 --- a/tests/tests.rs +++ /dev/null @@ -1,320 +0,0 @@ -use std::{ - collections::HashSet, - sync::{atomic::AtomicUsize, Arc}, -}; - -use rand::random; -use vicky_store::{Config, Error, Result, SecretKey, Stats, VickyStore}; - -fn run_in_tempdir(f: impl FnOnce(&str) -> Result<()>) -> Result<()> { - let rand: u64 = random(); - let dir = format!("/tmp/vicky-{rand}"); - _ = std::fs::remove_dir_all(&dir); - - f(&dir)?; - - _ = std::fs::remove_dir_all(&dir); - Ok(()) -} - -const LONG_VAL: &str = "a very long valueeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee"; - -#[test] -fn test_logic() -> Result<()> { - run_in_tempdir(|dir| { - let db = Arc::new(VickyStore::open(Config { - dir_path: dir.into(), - max_shard_size: 20 * 1024, // use small files to force lots of splits and compactions - min_compaction_threashold: 10 * 1024, - secret_key: SecretKey::new("very very secret")?, - ..Default::default() - })?); - - assert!(db.get("my name")?.is_none()); - db.insert("my_name", "inigo montoya")?; - db.insert("your_name", "dread pirate robert")?; - - assert_eq!(db.get("my_name")?, Some("inigo montoya".into())); - assert_eq!(db.get("your_name")?, Some("dread pirate robert".into())); - db.insert("your_name", "vizzini")?; - assert_eq!(db.get("your_name")?, Some("vizzini".into())); - assert_eq!(db.remove("my_name")?, Some("inigo montoya".into())); - assert!(db.remove("my_name")?.is_none()); - assert!(db.get("my name")?.is_none()); - - let stats = db.stats(); - assert_eq!(stats.num_entries, 1); - assert_eq!(stats.num_compactions, 0); - assert_eq!(stats.num_splits, 0); - - for _ in 0..1000 { - db.insert( - "a very long keyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyy", - LONG_VAL, - )?; - assert!(db - .remove("a very long keyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyy")? - .is_some()); - } - - let stats = db.stats(); - assert_eq!(stats.num_entries, 1); - assert!(stats.num_compactions >= 2); - assert_eq!(stats.num_splits, 0); - - for i in 0..1000 { - db.insert(&format!("unique key {i}"), LONG_VAL)?; - } - - let stats2 = db.stats(); - assert_eq!(stats2.num_entries, 1001); - assert_eq!(stats2.num_compactions, stats.num_compactions); - assert!(stats2.num_splits > stats.num_splits); - - assert_eq!(db.get("your_name")?, Some("vizzini".into())); - db.clear()?; - assert_eq!(db.get("your_name")?, None); - - assert_eq!( - db.stats(), - Stats { - num_compactions: 0, - num_entries: 0, - num_splits: 0 - } - ); - - for i in 0..1000 { - db.insert(&format!("unique key {i}"), LONG_VAL)?; - } - - let mut all_keys = HashSet::new(); - - for res in db.iter() { - let (key, val) = res?; - assert_eq!(val, LONG_VAL.as_bytes()); - assert!(key.starts_with(b"unique key ")); - all_keys.insert(key); - } - - assert_eq!(all_keys.len(), 1000); - - all_keys.clear(); - - let cookie = { - let mut iter1 = db.iter(); - for _ in 0..100 { - let res = iter1.next().unwrap(); - let (key, _) = res?; - all_keys.insert(key); - } - iter1.cookie() - }; - - for res in db.iter_from_cookie(cookie) { - let (key, _) = res?; - all_keys.insert(key); - } - - assert_eq!(all_keys.len(), 1000); - - Ok(()) - }) -} - -#[test] -fn test_loading() -> Result<()> { - run_in_tempdir(|dir| { - let config = Config { - dir_path: dir.into(), - max_shard_size: 20 * 1024, // use small files to force lots of splits and compactions - min_compaction_threashold: 10 * 1024, - secret_key: SecretKey::new("very very secret")?, - ..Default::default() - }; - - { - let db = Arc::new(VickyStore::open(config.clone())?); - - for i in 0..1000 { - db.insert(&format!("unique key {i}"), LONG_VAL)?; - } - - assert!(db.stats().num_splits > 1); - assert_eq!(db.iter().count(), 1000); - } - - { - let db = Arc::new(VickyStore::open(config.clone())?); - - assert_eq!(db.iter().count(), 1000); - - for res in db.iter() { - let (key, val) = res?; - assert_eq!(val, LONG_VAL.as_bytes()); - assert!(key.starts_with(b"unique key ")); - } - } - - { - let existing = std::fs::read_dir(dir)? - .map(|res| res.unwrap().file_name().to_str().unwrap().to_string()) - .filter(|name| name.starts_with("shard_")) - .collect::>(); - - std::fs::write(format!("{dir}/top_1234-5678"), "xxxx")?; - std::fs::write(format!("{dir}/bottom_1234-5678"), "xxxx")?; - std::fs::write(format!("{dir}/compact_1234-5678"), "xxxx")?; - - let (_, span) = existing[0].split_once("_").unwrap(); - let (start, end) = span.split_once("-").unwrap(); - let start = u32::from_str_radix(start, 16).unwrap(); - let end = u32::from_str_radix(end, 16).unwrap(); - let mid = (start + end) / 2; - std::fs::write(format!("{dir}/shard_{start:04x}-{mid:04x}"), "xxxx")?; - std::fs::write(format!("{dir}/shard_{mid:04x}-{end:04x}"), "xxxx")?; - - let db = Arc::new(VickyStore::open(config)?); - - assert!(!std::fs::exists(format!("{dir}/top_1234-5678"))?); - assert!(!std::fs::exists(format!("{dir}/bottom_1234-5678"))?); - assert!(!std::fs::exists(format!("{dir}/compact_1234-5678"))?); - assert!(!std::fs::exists(format!( - "{dir}/shard_{start:04x}-{mid:04x}" - ))?); - assert!(!std::fs::exists(format!( - "{dir}/shard_{mid:04x}-{end:04x}" - ))?); - - assert_eq!(db.iter().count(), 1000); - } - - Ok(()) - }) -} - -#[test] -fn test_multithreaded() -> Result<()> { - run_in_tempdir(|dir| { - for attempt in 0..10 { - let db = Arc::new(VickyStore::open(Config { - dir_path: dir.into(), - max_shard_size: 20 * 1024, - min_compaction_threashold: 10 * 1024, - secret_key: SecretKey::new("very very secret")?, - ..Default::default() - })?); - - const NUM_ITEMS: usize = 10_000; - let succ_gets = Arc::new(AtomicUsize::new(0)); - let succ_removals = Arc::new(AtomicUsize::new(0)); - - let mut thds = Vec::new(); - for thid in 0..50 { - let db = db.clone(); - let succ_gets = succ_gets.clone(); - let succ_removals = succ_removals.clone(); - let handle = std::thread::spawn(move || -> Result<()> { - let value = format!("data{thid}"); - for i in 0..NUM_ITEMS { - let key = format!("key{i}"); - db.insert(&key, &value)?; - - if random::() > 0.8 { - if db.remove(&key)?.is_some() { - succ_removals.fetch_add(1, std::sync::atomic::Ordering::SeqCst); - } - } else { - let val2 = db.get(&key)?; - if let Some(val2) = val2 { - assert!(val2.starts_with(b"data")); - succ_gets.fetch_add(1, std::sync::atomic::Ordering::SeqCst); - } - } - } - Ok(()) - }); - //handle.join().unwrap().unwrap(); - thds.push(handle); - } - - for thd in thds { - thd.join().unwrap()?; - } - - let gets = succ_gets.load(std::sync::atomic::Ordering::SeqCst); - let removals = succ_removals.load(std::sync::atomic::Ordering::SeqCst); - - let stats = db.stats(); - println!( - "[{attempt}] {stats:?} gets={gets} removals={removals} diff={}", - gets - removals - ); - - assert_eq!(db.iter().count(), stats.num_entries); - assert!( - stats.num_entries >= (NUM_ITEMS * 7) / 10 - && stats.num_entries <= (NUM_ITEMS * 9) / 10 - ); - db.clear()?; - } - Ok(()) - }) -} - -#[test] -fn test_modify_inplace() -> Result<()> { - run_in_tempdir(|dir| { - let db = Arc::new(VickyStore::open(Config { - dir_path: dir.into(), - max_shard_size: 20 * 1024, // use small files to force lots of splits and compactions - min_compaction_threashold: 10 * 1024, - secret_key: SecretKey::new("very very secret")?, - ..Default::default() - })?); - - db.insert("aaa", "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa")?; - - assert!(matches!( - db.modify_inplace("zzz", "bbb", 7), - Err(Error::KeyNotFound) - )); - - assert!(matches!( - db.modify_inplace("aaa", "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", 7), - Err(Error::ValueTooLong) - )); - - db.modify_inplace("aaa", "bbb", 7)?; - assert_eq!( - db.get("aaa")?, - Some("aaaaaaabbbaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa".into()) - ); - - Ok(()) - }) -} - -#[test] -fn test_pre_split() -> Result<()> { - run_in_tempdir(|dir| { - let db = Arc::new(VickyStore::open(Config { - dir_path: dir.into(), - max_shard_size: 20 * 1024, // use small files to force lots of splits and compactions - min_compaction_threashold: 10 * 1024, - secret_key: SecretKey::new("very very secret")?, - expected_number_of_keys: 1_000_000, - })?); - - db.insert("aaa", "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa")?; - - let files = std::fs::read_dir(&dir)? - .map(|res| res.unwrap().file_name().to_string_lossy().to_string()) - .filter(|filename| filename.starts_with("shard_")) - .collect::>(); - - assert_eq!(files.len(), 32); - - Ok(()) - }) -}