diff --git a/src/collections.rs b/src/collections.rs index c668f5c..1b6c306 100644 --- a/src/collections.rs +++ b/src/collections.rs @@ -109,6 +109,39 @@ macro_rules! corrupted_unless { }; } +/// in order to make linked-lists crash-safe, the algorithm will be as follows: +/// +/// insert: +/// * check for existance of item, if it exists, it's already a member of the list +/// * if it does not exist, go to list.tail. this element must exist. +/// * start walking from list.tail over the next elements until we find a valid item +/// * this is the true tail of the list. if curr->next is valid but missing, we consider +/// curr to be true end as well. +/// * make curr->next = new item's key +/// * insert new item (prev pointing to curr) +/// * set list.tail = new item, len+=1 +/// +/// this is safe because if we crash at any point, the list is still valid, and +/// the accounting will be fixed by the next insert (patching len and tail) +/// +/// removal: +/// * check if the element exists. if not, no-op +/// * if the element is the only item in the list, remove the list, and then remove the item. +/// * if the element is the first in the (non-empty) list: +/// * point list.head to element->next, set len-=1 +/// * point the new first element.prev = INVALID +/// * remove the element +/// * if the element is the last in the (non-empty) list: +/// * point list.tail to element->prev, set len-=1 +/// * point the new last element.next = INVALID +/// * remove the element +/// * if the element is a middle element: +/// * point element->prev->next to element->next -- now the element will not be traversed by iteration +/// * point element->next->prev to element->prev -- now the element is completely disconnected +/// * set list.len -= 1 -- THIS IS NOT CRASH SAFE. better remove the len altogether +/// * remove the element +/// + impl VickyStore { fn make_coll_key(&self, coll_key: &[u8]) -> (PartedHash, Vec) { let mut full_key = coll_key.to_owned(); @@ -124,7 +157,7 @@ impl VickyStore { } fn lock_collection(&self, coll_ph: PartedHash) -> MutexGuard<()> { - self.keyed_locks[coll_ph.signature() as usize % self.keyed_locks.len()] + self.keyed_locks[(coll_ph.signature() & self.keyed_locks_mask) as usize] .lock() .unwrap() } diff --git a/src/lib.rs b/src/lib.rs index de9216f..ed0e471 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -54,6 +54,7 @@ pub struct Config { pub hash_seed: HashSeed, // just some entropy, not so important unless you fear DoS pub expected_number_of_keys: usize, // hint for creating number of shards accordingly) pub merge_small_shards: bool, // whether or not to merge small shards when items are removed + pub max_concurrent_collection_ops: u32, // number of keyed locks for concurrent collection ops } impl Default for Config { @@ -64,6 +65,7 @@ impl Default for Config { hash_seed: HashSeed::new(b"kOYLu0xvq2WtzcKJ").unwrap(), expected_number_of_keys: 0, merge_small_shards: false, + max_concurrent_collection_ops: 64, } } } diff --git a/src/store.rs b/src/store.rs index 7f17f8f..f30af1c 100644 --- a/src/store.rs +++ b/src/store.rs @@ -38,6 +38,7 @@ pub struct VickyStore { pub(crate) num_compactions: AtomicUsize, pub(crate) num_splits: AtomicUsize, // locks for complicated operations + pub(crate) keyed_locks_mask: u32, pub(crate) keyed_locks: Vec>, } @@ -143,8 +144,13 @@ impl VickyStore { Self::create_first_shards(&dir_path, &config, &mut shards)?; } + let mut num_keyed_locks = config.max_concurrent_collection_ops.max(4); + if !num_keyed_locks.is_power_of_two() { + num_keyed_locks = 1 << (num_keyed_locks.ilog2() + 1); + } + let mut keyed_locks = vec![]; - for _ in 0..1024 { + for _ in 0..num_keyed_locks { keyed_locks.push(Mutex::new(())); } @@ -155,6 +161,7 @@ impl VickyStore { num_entries: 0.into(), num_compactions: 0.into(), num_splits: 0.into(), + keyed_locks_mask: num_keyed_locks - 1, keyed_locks, }) } diff --git a/vicky-crasher/src/main.rs b/vicky-crasher/src/main.rs index 1fcf1cb..0f5afec 100644 --- a/vicky-crasher/src/main.rs +++ b/vicky-crasher/src/main.rs @@ -1,3 +1,4 @@ +use std::ops::Range; use std::time::Duration; use rand::Rng; @@ -5,7 +6,7 @@ use vicky_store::{Config, Result, VickyStore}; const TARGET: u32 = 1_000_000; -fn child_func() -> Result<()> { +fn child_inserts() -> Result<()> { // our job is to create 1M entries while being killed by our evil parent let store = VickyStore::open("dbdir", Config::default())?; @@ -33,18 +34,46 @@ fn child_func() -> Result<()> { Ok(()) } -fn main() -> Result<()> { - _ = std::fs::remove_dir_all("dbdir"); +fn child_removals() -> Result<()> { + // our job is to create 1M entries while being killed by our evil parent + + let store = VickyStore::open("dbdir", Config::default())?; + let lowest_bytes = store.get("lowest")?.unwrap_or(vec![0, 0, 0, 0]); + let lowest = u32::from_le_bytes([ + lowest_bytes[0], + lowest_bytes[1], + lowest_bytes[2], + lowest_bytes[3], + ]); + + if lowest == TARGET - 1 { + println!("child finished (already at {lowest})"); + return Ok(()); + } + + println!("child starting at {lowest}"); + + for i in lowest..TARGET { + store.remove(&i.to_le_bytes())?; + store.set("lowest", &i.to_le_bytes())?; + } + println!("child finished"); + + Ok(()) +} +fn parent_run(mut child_func: impl FnMut() -> Result<()>, sleep: Range) -> Result<()> { for i in 0.. { let pid = unsafe { libc::fork() }; assert!(pid >= 0); if pid == 0 { - child_func()?; - return Ok(()); + let res = child_func(); + unsafe { libc::exit(if res.is_err() { 1 } else { 0 }) }; } else { // parent - std::thread::sleep(Duration::from_millis(rand::thread_rng().gen_range(10..300))); + std::thread::sleep(Duration::from_millis( + rand::thread_rng().gen_range(sleep.clone()), + )); let mut status = 0i32; let rc = unsafe { libc::waitpid(pid, &mut status, libc::WNOHANG) }; if rc == 0 { @@ -60,6 +89,13 @@ fn main() -> Result<()> { } } } + Ok(()) +} + +fn main() -> Result<()> { + _ = std::fs::remove_dir_all("dbdir"); + + parent_run(child_inserts, 10..300)?; println!("Parent starts validating the DB..."); @@ -80,5 +116,17 @@ fn main() -> Result<()> { println!("DB validated successfully"); + parent_run(child_removals, 10..30)?; + + println!("Parent starts validating the DB..."); + + assert_eq!( + store.remove("lowest")?, + Some((TARGET - 1).to_le_bytes().to_vec()) + ); + assert_eq!(store.iter().count(), 0); + + println!("DB validated successfully"); + Ok(()) }