From 83b6ab4a9e9a6448f7ef30dd7e833299d841dba1 Mon Sep 17 00:00:00 2001 From: Tomer Filiba Date: Sun, 18 Aug 2024 22:15:44 +0300 Subject: [PATCH] Move to index-based lists instead of linked-lists; add pidfile to make sure two processes don't use the same directory --- Cargo.lock | 11 + Cargo.toml | 1 + candy-crasher/src/main.rs | 18 +- candy-longliving/src/main.rs | 20 +- examples/lists.rs | 2 +- src/hashing.rs | 7 - src/lib.rs | 5 +- src/list_insert.rs | 437 -------------- src/list_remove.rs | 268 --------- src/lists.rs | 955 +++++++++++++++++++------------ src/store.rs | 12 +- src/typed.rs | 6 +- test-list-collisions/src/main.rs | 30 +- tests/test_lists.rs | 117 +--- 14 files changed, 682 insertions(+), 1207 deletions(-) delete mode 100644 src/list_insert.rs delete mode 100644 src/list_remove.rs diff --git a/Cargo.lock b/Cargo.lock index 1ea502a..692f8fa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -76,6 +76,7 @@ dependencies = [ "anyhow", "bytemuck", "databuf", + "fslock", "memmap", "parking_lot", "rand", @@ -124,6 +125,16 @@ version = "1.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "60b1af1c220855b6ceac025d3f6ecdd2b7c4894bfe9cd9bda4fbb4bc7c0d4cf0" +[[package]] +name = "fslock" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "04412b8935272e3a9bae6f48c7bfff74c2911f60525404edfdd28e49884c3bfb" +dependencies = [ + "libc", + "winapi", +] + [[package]] name = "getrandom" version = "0.2.15" diff --git a/Cargo.toml b/Cargo.toml index 78e7f0b..7a2c3f8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,6 +17,7 @@ anyhow = "1.0.86" parking_lot = "0.12.3" uuid = { version = "1.10.0" } rand = "0.8.5" +fslock = "0.2.1" [features] whitebox_testing = [] diff --git a/candy-crasher/src/main.rs b/candy-crasher/src/main.rs index 75eaaa8..f56a99a 100644 --- a/candy-crasher/src/main.rs +++ b/candy-crasher/src/main.rs @@ -144,9 +144,10 @@ fn child_list_removals() -> Result<()> { fn child_list_iterator_removals() -> Result<()> { let store = CandyStore::open("dbdir", Config::default())?; - if rand::thread_rng().gen() { + if rand::random() { + //println!("FWD"); for (i, res) in store.iter_list("xxx").enumerate() { - let (k, v) = res?.unwrap(); + let (k, v) = res?; let v2 = u32::from_le_bytes(v.try_into().unwrap()); if i == 0 { println!("FWD child starts at {v2}"); @@ -154,8 +155,9 @@ fn child_list_iterator_removals() -> Result<()> { store.remove_from_list("xxx", &k)?; } } else { + //println!("BACK"); for (i, res) in store.iter_list_backwards("xxx").enumerate() { - let (k, v) = res?.unwrap(); + let (k, v) = res?; let v2 = u32::from_le_bytes(v.try_into().unwrap()); if i == 0 { println!("BACK child starts at {v2}"); @@ -289,7 +291,7 @@ fn main() -> Result<()> { println!("DB validated successfully"); } - parent_run(shared_stuff, child_list_inserts, 10..30)?; + parent_run(shared_stuff, child_list_inserts, 10..300)?; { println!("Parent starts validating the DB..."); @@ -301,7 +303,7 @@ fn main() -> Result<()> { ); for (i, res) in store.iter_list("xxx").enumerate() { - let (k, v) = res?.unwrap(); + let (k, v) = res?; assert_eq!(k, (i as u32).to_le_bytes()); assert_eq!(v, b"yyy"); } @@ -322,7 +324,8 @@ fn main() -> Result<()> { assert_eq!(store.iter_list("xxx").count(), 0); - assert_eq!(store.iter_raw().count(), 0); + println!("leaked: {}", store.iter_raw().count()); + store.discard_list("xxx")?; println!("DB validated successfully"); } @@ -350,7 +353,7 @@ fn main() -> Result<()> { ); } - parent_run(shared_stuff, child_list_iterator_removals, 10..30)?; + parent_run(shared_stuff, child_list_iterator_removals, 10..200)?; { println!("Parent starts validating the DB..."); @@ -361,6 +364,7 @@ fn main() -> Result<()> { // we will surely leak some entries that were unlinked from the list before they were removed println!("leaked: {}", store.iter_raw().count()); + store.discard_list("xxx")?; println!("DB validated successfully"); } diff --git a/candy-longliving/src/main.rs b/candy-longliving/src/main.rs index f1ee900..f7bc1e2 100644 --- a/candy-longliving/src/main.rs +++ b/candy-longliving/src/main.rs @@ -1,4 +1,7 @@ -use std::sync::{atomic::AtomicU64, Arc}; +use std::{ + sync::{atomic::AtomicU64, Arc}, + time::Instant, +}; use candystore::{CandyStore, CandyTypedList, Config, Result}; @@ -26,14 +29,23 @@ fn main() -> Result<()> { let h = std::thread::spawn(move || { println!("started thread {thd}"); let typed = CandyTypedList::::new(db.clone()); + let listname = format!("mylist"); //format!("mylist{thd}"); + let mut t0 = Instant::now(); for i in 0..num_iters { if i % 10000 == 0 { - println!("thread {thd} at {i} {:?}", db.stats()); + let t1 = Instant::now(); + println!( + "thread {thd} at {i} {:?} rate={}us", + db.stats(), + t1.duration_since(t0).as_micros() / 10_000, + ); + t0 = t1; } - typed.set("mylist".into(), &(thd * num_iters + i), "xxx")?; + + typed.set(&listname, &(thd * num_iters + i), "xxx")?; ops.fetch_add(1, std::sync::atomic::Ordering::Relaxed); if i >= tail_length { - typed.remove("mylist".into(), &(thd * num_iters + i - tail_length))?; + typed.remove(&listname, &(thd * num_iters + i - tail_length))?; ops.fetch_add(1, std::sync::atomic::Ordering::Relaxed); } } diff --git a/examples/lists.rs b/examples/lists.rs index 0d86585..1c27853 100644 --- a/examples/lists.rs +++ b/examples/lists.rs @@ -16,7 +16,7 @@ fn main() -> Result<()> { db.set_in_list("europe", "greece", "greek")?; for res in db.iter_list("asia") { - let (k, v) = res?.unwrap(); + let (k, v) = res?; println!( "{} => {}", String::from_utf8(k).unwrap(), diff --git a/src/hashing.rs b/src/hashing.rs index 1bf3447..b7d2573 100644 --- a/src/hashing.rs +++ b/src/hashing.rs @@ -47,9 +47,6 @@ pub(crate) const INVALID_SIG: u32 = 0; pub static mut HASH_BITS_TO_KEEP: u64 = u64::MAX; // which bits to keep from the hash - for testing collisions impl PartedHash { - pub const LEN: usize = size_of::(); - pub const INVALID: Self = Self(0); - pub fn new(seed: &HashSeed, buf: &[u8]) -> Self { Self::from_hash(SipHasher24::new_with_key(&seed.0).hash(buf)) } @@ -58,10 +55,6 @@ impl PartedHash { pub fn is_valid(&self) -> bool { self.signature() != INVALID_SIG } - #[inline] - pub fn is_invalid(&self) -> bool { - self.signature() == INVALID_SIG - } #[inline] pub fn shard_selector(&self) -> u16 { diff --git a/src/lib.rs b/src/lib.rs index 2bdcb28..96f0219 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -38,7 +38,7 @@ //! db.set_in_list("spanish", "bye", "adios")?; //! db.set_in_list("spanish", "thanks", "gracias")?; //! -//! let items = db.iter_list("spanish").map(|res| res.unwrap().unwrap()).collect::>(); +//! let items = db.iter_list("spanish").map(|res| res.unwrap()).collect::>(); //! assert_eq!(items, vec![("bye".into(), "adios".into()), ("thanks".into(), "gracias".into())]); //! //! Ok(()) @@ -50,8 +50,6 @@ mod encodable; mod hashing; mod insertion; -mod list_insert; -mod list_remove; mod lists; mod shard; mod store; @@ -59,6 +57,7 @@ mod typed; pub use hashing::HashSeed; pub use insertion::{GetOrCreateStatus, ReplaceStatus, SetStatus}; +pub use lists::{LinkedListIterator, ListCompactionParams}; use std::fmt::{Display, Formatter}; pub use store::{CandyStore, CoarseHistogram, SizeHistogram, Stats}; pub use typed::{CandyTypedDeque, CandyTypedKey, CandyTypedList, CandyTypedStore}; diff --git a/src/list_insert.rs b/src/list_insert.rs deleted file mode 100644 index 64d5874..0000000 --- a/src/list_insert.rs +++ /dev/null @@ -1,437 +0,0 @@ -use bytemuck::{bytes_of, from_bytes}; -use databuf::config::num::LE; -use databuf::Encode; -use rand::Rng; -use uuid::Uuid; - -use crate::encodable::EncodableUuid; -use crate::hashing::PartedHash; -use crate::shard::InsertMode; -use crate::{CandyStore, GetOrCreateStatus, ReplaceStatus, Result, SetStatus}; - -use crate::lists::{ - corrupted_list, update_chain_next, update_chain_prev, Chain, FullPartedHash, InsertPosition, - InsertToListStatus, LinkedList, -}; - -impl CandyStore { - pub(crate) fn _insert_to_list( - &self, - list_key: Vec, - item_key: Vec, - mut val: Vec, - mode: InsertMode, - pos: InsertPosition, - ) -> Result { - let (list_ph, list_key) = self.make_list_key(list_key); - let (item_ph, item_key) = self.make_item_key(list_ph, item_key); - - let _guard = self._list_lock(list_ph); - - // if the item already exists, it means it belongs to this list. we just need to update the value and - // keep the existing chain part - if let Some(mut old_val) = self.get_raw(&item_key)? { - match mode { - InsertMode::GetOrCreate => { - // don't replace the existing value - old_val.truncate(old_val.len() - size_of::()); - return Ok(InsertToListStatus::ExistingValue(old_val)); - } - InsertMode::Replace(ev) => { - if ev.is_some_and(|ev| ev != &old_val[..old_val.len() - size_of::()]) { - old_val.truncate(old_val.len() - size_of::()); - return Ok(InsertToListStatus::WrongValue(old_val)); - } - // fall through - } - _ => { - // fall through - } - } - - val.extend_from_slice(&old_val[old_val.len() - size_of::()..]); - match self.replace_raw(&item_key, &val, None)? { - ReplaceStatus::DoesNotExist => { - corrupted_list!("list {list_ph} failed replacing existing item {item_key:?}"); - } - ReplaceStatus::PrevValue(mut v) => { - v.truncate(v.len() - size_of::()); - return Ok(InsertToListStatus::ExistingValue(v)); - } - ReplaceStatus::WrongValue(_) => { - unreachable!(); - } - } - } - - if matches!(mode, InsertMode::Replace(_)) { - // not allowed to create - return Ok(InsertToListStatus::DoesNotExist); - } - - let item_collidx = self._list_get_free_collidx(list_ph, item_ph)?; - let item_fph = FullPartedHash::new(item_ph, item_collidx); - - // item does not exist, and the list itself might also not exist. get or create the list - let curr_list = *from_bytes::( - &self - .get_or_create_raw( - &list_key, - bytes_of(&LinkedList::new(item_fph, item_fph)).to_vec(), - )? - .value(), - ); - - // we have the list. if the list points to this item, it means we've just created it - if curr_list.full_head() == item_fph { - if curr_list.full_tail() != item_fph { - corrupted_list!( - "list {list_ph} head ({}) != tail ({})", - curr_list.full_head(), - curr_list.full_tail(), - ); - } - // this first item needs to have prev=INVALID and next=INVALID - val.extend_from_slice(bytes_of(&Chain::new( - item_collidx, - FullPartedHash::INVALID, - FullPartedHash::INVALID, - ))); - if !self.set_raw(&item_key, &val)?.was_created() { - corrupted_list!("list {list_ph} expected to create {item_fph} {item_key:?}"); - } - val.truncate(val.len() - size_of::()); - return Ok(InsertToListStatus::CreatedNew(val)); - } - - let v = - match pos { - InsertPosition::Tail => self - ._insert_to_list_tail(list_ph, list_key, item_fph, item_key, val, curr_list)?, - InsertPosition::Head => self - ._insert_to_list_head(list_ph, list_key, item_fph, item_key, val, curr_list)?, - }; - - Ok(InsertToListStatus::CreatedNew(v)) - } - - fn _insert_to_list_head( - &self, - list_ph: PartedHash, - list_key: Vec, - item_fph: FullPartedHash, - item_key: Vec, - mut val: Vec, - curr_list: LinkedList, - ) -> Result> { - // the list already exists. start at list.head and find the true head (it's possible list. - // isn't up to date because of crashes) - let (head_fph, head_k, mut head_v) = self.find_true_head(list_ph, curr_list.full_head())?; - - // modify the current head item to point to the new item. if we crash after this, everything is okay because - // find_true_head will stop at this item - update_chain_prev(&mut head_v, item_fph); - if self.replace_raw(&head_k, &head_v, None)?.failed() { - corrupted_list!("list {list_ph} failed to point {head_k:?}->prev to {item_key:?}"); - } - - // now add item, with prev pointing to the old head. if we crash after this, find_head_tail - // will return the newly-added item as the head. - // possible optimization: only update the head every X operations, this reduces the expected - // number of IOs at the expense of more walking when inserting - let this_chain = Chain::new(item_fph.collidx, head_fph, FullPartedHash::INVALID); - val.extend_from_slice(bytes_of(&this_chain)); - if !self.set_raw(&item_key, &val)?.was_created() { - corrupted_list!("list {list_ph} tail {item_key:?} already exists"); - } - - // now update the list to point to the new tail. if we crash before it's committed, all's good - let new_list = LinkedList::new(item_fph, curr_list.full_tail()); - if self - .replace_raw(&list_key, bytes_of(&new_list), None)? - .failed() - { - corrupted_list!("list {item_fph} failed to point head to {item_key:?}"); - } - - val.truncate(val.len() - size_of::()); - Ok(val) - } - - fn _insert_to_list_tail( - &self, - list_ph: PartedHash, - list_key: Vec, - item_fph: FullPartedHash, - item_key: Vec, - mut val: Vec, - curr_list: LinkedList, - ) -> Result> { - // the list already exists. start at list.tail and find the true tail (it's possible list.tail - // isn't up to date because of crashes) - let (tail_fph, tail_k, mut tail_v) = self.find_true_tail(list_ph, curr_list.full_tail())?; - - // modify the last item to point to the new item. if we crash after this, everything is okay because - // find_true_tail will stop at this item - update_chain_next(&mut tail_v, item_fph); - - if self.replace_raw(&tail_k, &tail_v, None)?.failed() { - corrupted_list!("list {list_ph} failed to point {tail_k:?}->next to {item_key:?}"); - } - - // now add item, with prev pointing to the old tail. if we crash after this, find_true_tail - // will return the newly-added item as the tail. - // possible optimization: only update the tail every X operations, this reduces the expected - // number of IOs at the expense of more walking when inserting - let this_chain = Chain::new(item_fph.collidx, FullPartedHash::INVALID, tail_fph); - val.extend_from_slice(bytes_of(&this_chain)); - if self.set_raw(&item_key, &val)?.was_replaced() { - corrupted_list!("list {list_ph} tail {item_key:?} already exists"); - } - - // now update the list to point to the new tail. if we crash before it's committed, all's good - let new_list = LinkedList::new(curr_list.full_head(), item_fph); - if self - .replace_raw(&list_key, bytes_of(&new_list), None)? - .failed() - { - corrupted_list!("list {list_ph} failed to point tail to {item_key:?}"); - } - - val.truncate(val.len() - size_of::()); - Ok(val) - } - - /// Sets (or replaces) an item (identified by `item_key`) in a linked-list (identified by `list_key`) - - /// placing the item at the tail (end) of the list. Linked lists are created when the first item is - /// inserted to them, and removed when the last item is removed. - /// - /// If the item already exists in the list, its value is replaced but it keeps is relative position. - /// - /// See also [Self::set] - pub fn set_in_list< - B1: AsRef<[u8]> + ?Sized, - B2: AsRef<[u8]> + ?Sized, - B3: AsRef<[u8]> + ?Sized, - >( - &self, - list_key: &B1, - item_key: &B2, - val: &B3, - ) -> Result { - self.owned_set_in_list( - list_key.as_ref().to_owned(), - item_key.as_ref().to_owned(), - val.as_ref().to_owned(), - false, - ) - } - - /// Sets (or replaces) an item (identified by `item_key`) in a linked-list (identified by `list_key`) - - /// placing the item at the tail (end) of the list. If the item already exists in the list, - /// it is re-inserted at the end. - /// - /// This allows for the implementation of LRUs, where older items stay at the beginning and more - /// recent ones are at the end. - /// - /// Note: this operation is **not crash-safe**, as it removes and inserts the item. - /// - /// See also [Self::set], [Self::set_in_list] - pub fn set_in_list_promoting< - B1: AsRef<[u8]> + ?Sized, - B2: AsRef<[u8]> + ?Sized, - B3: AsRef<[u8]> + ?Sized, - >( - &self, - list_key: &B1, - item_key: &B2, - val: &B3, - ) -> Result { - self.owned_set_in_list( - list_key.as_ref().to_owned(), - item_key.as_ref().to_owned(), - val.as_ref().to_owned(), - true, - ) - } - - // Owned version of set_in_list, takes `promote` as a parameter instead - pub fn owned_set_in_list( - &self, - list_key: Vec, - item_key: Vec, - val: Vec, - promote: bool, - ) -> Result { - if promote { - self.owned_remove_from_list(list_key.clone(), item_key.clone())?; - } - match self._insert_to_list( - list_key, - item_key, - val, - InsertMode::Set, - InsertPosition::Tail, - )? { - InsertToListStatus::CreatedNew(_) => Ok(SetStatus::CreatedNew), - InsertToListStatus::ExistingValue(v) => Ok(SetStatus::PrevValue(v)), - InsertToListStatus::DoesNotExist => unreachable!(), - InsertToListStatus::WrongValue(_) => unreachable!(), - } - } - - /// Same as [Self::set_in_list], but will only replace an existing item (will not create one if the key - /// does not already exist). See also [Self::replace] - pub fn replace_in_list< - B1: AsRef<[u8]> + ?Sized, - B2: AsRef<[u8]> + ?Sized, - B3: AsRef<[u8]> + ?Sized, - >( - &self, - list_key: &B1, - item_key: &B2, - val: &B3, - expected_val: Option<&B3>, - ) -> Result { - self.owned_replace_in_list( - list_key.as_ref().to_owned(), - item_key.as_ref().to_owned(), - val.as_ref().to_owned(), - expected_val.map(|ev| ev.as_ref()), - ) - } - - /// Owned version of [Self::replace_in_list] - pub fn owned_replace_in_list( - &self, - list_key: Vec, - item_key: Vec, - val: Vec, - expected_val: Option<&[u8]>, - ) -> Result { - match self._insert_to_list( - list_key, - item_key, - val, - InsertMode::Replace(expected_val), - InsertPosition::Tail, - )? { - InsertToListStatus::DoesNotExist => Ok(ReplaceStatus::DoesNotExist), - InsertToListStatus::ExistingValue(v) => Ok(ReplaceStatus::PrevValue(v)), - InsertToListStatus::WrongValue(v) => Ok(ReplaceStatus::WrongValue(v)), - InsertToListStatus::CreatedNew(_) => unreachable!(), - } - } - - /// Returns the existing value of the element in the list, if it exists, or create it with the given - /// default value. - /// - /// See also [Self::get_or_create] - pub fn get_or_create_in_list< - B1: AsRef<[u8]> + ?Sized, - B2: AsRef<[u8]> + ?Sized, - B3: AsRef<[u8]> + ?Sized, - >( - &self, - list_key: &B1, - item_key: &B2, - val: &B3, - ) -> Result { - self.owned_get_or_create_in_list( - list_key.as_ref().to_owned(), - item_key.as_ref().to_owned(), - val.as_ref().to_owned(), - ) - } - - /// Owned version of [Self::get_or_create_in_list] - pub fn owned_get_or_create_in_list( - &self, - list_key: Vec, - item_key: Vec, - val: Vec, - ) -> Result { - match self._insert_to_list( - list_key, - item_key, - val, - InsertMode::GetOrCreate, - InsertPosition::Tail, - )? { - InsertToListStatus::CreatedNew(v) => Ok(GetOrCreateStatus::CreatedNew(v)), - InsertToListStatus::ExistingValue(v) => Ok(GetOrCreateStatus::ExistingValue(v)), - InsertToListStatus::DoesNotExist => unreachable!(), - InsertToListStatus::WrongValue(_) => unreachable!(), - } - } - - /// In case you only want to store values in a list (the keys are immaterial), this function - /// generates a random UUID and inserts the given element to the end (tail) of the list. - /// Can be used to implement queues, where elements are pushed at the back and popped from - /// the front. - /// - /// The function returns the generated UUID, and you can use it to access the item - /// using functions like [Self::remove_from_list], etc., but it's not the canonical use case - pub fn push_to_list_tail + ?Sized, B2: AsRef<[u8]> + ?Sized>( - &self, - list_key: &B1, - val: &B2, - ) -> Result { - self.owned_push_to_list_tail(list_key.as_ref().to_owned(), val.as_ref().to_owned()) - } - - fn _owned_push_to_list( - &self, - list_key: Vec, - val: Vec, - pos: InsertPosition, - ) -> Result { - // this rng does not produce "well formed UUID" like UUIDv4, but this is faster (because it doesn't - // use the OS rng) and produces 128 bits of entropy rather than 122 in UUIDv4 - let uuid = EncodableUuid::from(Uuid::from_bytes(rand::thread_rng().gen())); - let status = self._insert_to_list( - list_key, - uuid.to_bytes::(), - val, - InsertMode::GetOrCreate, - pos, - )?; - if !matches!(status, InsertToListStatus::CreatedNew(_)) { - corrupted_list!("list uuid collision {uuid} {status:?}"); - } - Ok(uuid) - } - - /// Owned version of [Self::push_to_list] - pub fn owned_push_to_list_tail( - &self, - list_key: Vec, - val: Vec, - ) -> Result { - self._owned_push_to_list(list_key, val, InsertPosition::Tail) - } - - /// In case you only want to store values in a list (the keys are immaterial), this function - /// generates a random UUID and inserts the given element to the head (head) of the list. - /// Can be used to implement queues, where elements are pushed at the back and popped from - /// the front. - /// - /// The function returns the generated UUID, and you can use it to access the item - /// using functions like [Self::remove_from_list], etc., but it's not the canonical use case - pub fn push_to_list_head + ?Sized, B2: AsRef<[u8]> + ?Sized>( - &self, - list_key: &B1, - val: &B2, - ) -> Result { - self.owned_push_to_list_head(list_key.as_ref().to_owned(), val.as_ref().to_owned()) - } - - /// Owned version of [Self::push_to_list_head] - pub fn owned_push_to_list_head( - &self, - list_key: Vec, - val: Vec, - ) -> Result { - self._owned_push_to_list(list_key, val, InsertPosition::Head) - } -} diff --git a/src/list_remove.rs b/src/list_remove.rs deleted file mode 100644 index fa44c18..0000000 --- a/src/list_remove.rs +++ /dev/null @@ -1,268 +0,0 @@ -use bytemuck::{bytes_of, from_bytes}; - -use crate::{ - hashing::PartedHash, - lists::{ - chain_of, corrupted_list, update_chain_next, update_chain_prev, Chain, FullPartedHash, - LinkedList, ITEM_SUFFIX_LEN, - }, - shard::KVPair, - CandyStore, Result, -}; - -impl CandyStore { - fn _remove_from_list_head( - &self, - mut list: LinkedList, - chain: Chain, - list_ph: PartedHash, - list_key: &[u8], - item_key: &[u8], - ) -> Result<()> { - let Some((next_k, mut next_v)) = self._list_get(list_ph, chain.full_next())? else { - corrupted_list!("list {list_ph} failed getting next of {item_key:?}"); - }; - - // update list.head from this to this.next. if we crash afterwards, the list will start - // at the expected place. - list.set_full_head(chain.full_next()); - if self.replace_raw(list_key, bytes_of(&list), None)?.failed() { - corrupted_list!( - "list {list_ph} failed pointing list head to point to {}", - chain.full_next() - ); - } - - // set the new head's prev link to INVALID. if we crash afterwards, everything is good. - update_chain_prev(&mut next_v, FullPartedHash::INVALID); - if self.replace_raw(&next_k, &next_v, None)?.failed() { - corrupted_list!( - "list {list_ph} failed updating prev=INVALID on the now-first {next_k:?} element" - ); - } - - // finally remove the item, sealing the deal - self.remove_raw(item_key)?; - Ok(()) - } - - fn _remove_from_list_tail( - &self, - mut list: LinkedList, - chain: Chain, - list_ph: PartedHash, - list_key: &[u8], - item_key: &[u8], - ) -> Result<()> { - let Some((prev_k, mut prev_v)) = self._list_get(list_ph, chain.full_prev())? else { - corrupted_list!("list {list_ph} missing prev element {item_key:?}"); - }; - - // point list.tail to the prev item. if we crash afterwards, the removed tail is still considered - // part of the list (find_true_tail will find it) - list.set_full_tail(chain.full_prev()); - if !self - .replace_raw(list_key, bytes_of(&list), None)? - .was_replaced() - { - corrupted_list!( - "failed updating list {list_ph} tail to point to prev {}", - chain.full_prev() - ); - } - - // XXX clear the item's chain so we can scrub it later? - - // update the new tail's next to INVALID. if we crash afterwards, the removed tail is no longer - // considered part of the list - update_chain_next(&mut prev_v, FullPartedHash::INVALID); - if self.replace_raw(&prev_k, &prev_v, None)?.failed() { - corrupted_list!( - "list {list_ph} failed updating next=INVALID on the now-last {prev_k:?} element" - ); - } - - // finally remove the item, sealing the deal - self.remove_raw(item_key)?; - Ok(()) - } - - fn _remove_from_list_middle( - &self, - chain: Chain, - list_ph: PartedHash, - item_fph: FullPartedHash, - item_key: &[u8], - ) -> Result<()> { - // this is a "middle" item, it has a prev one and a next one. set prev.next = this.next, - // set next.prev = prev, update list (for `len`) - // it might now have prev or next, in case we crashed after disconnecting one of them, so just - // continue from where we left off - - if chain.full_prev().is_valid() { - if let Some((prev_k, mut prev_v)) = self._list_get(list_ph, chain.full_prev())? { - if chain_of(&prev_v).full_next() == item_fph { - update_chain_next(&mut prev_v, chain.full_next()); - if self.replace_raw(&prev_k, &prev_v, None)?.failed() { - corrupted_list!("list {list_ph} failed updating prev.next on {prev_k:?}"); - } - } - } - } - - if chain.full_next().is_valid() { - if let Some((next_k, mut next_v)) = self._list_get(list_ph, chain.full_next())? { - if chain_of(&next_v).full_prev() == item_fph { - update_chain_prev(&mut next_v, chain.full_prev()); - if self.replace_raw(&next_k, &next_v, None)?.failed() { - corrupted_list!("list {list_ph} failed updating next.prev on {next_k:?}"); - } - } - } - } - - // now it's safe to remove the item - self.remove_raw(item_key)?; - Ok(()) - } - - pub fn remove_from_list + ?Sized, B2: AsRef<[u8]> + ?Sized>( - &self, - list_key: &B1, - item_key: &B2, - ) -> Result>> { - self.owned_remove_from_list(list_key.as_ref().to_owned(), item_key.as_ref().to_owned()) - } - - fn _remove_from_list( - &self, - list: LinkedList, - list_key: &[u8], - list_ph: PartedHash, - item_key: &[u8], - item_fph: FullPartedHash, - chain: Chain, - ) -> Result<()> { - // because of the crash model, it's possible list.head and list.tail are not up to date. - // it's also possible that we'll leak some entries if we crash mid-operation, i.e., an item - // might have been unlinked from its prev or next, but still exists on its own. - // XXX: maybe background compaction can check for leaks and remove them? - - let mut head_fph = list.full_head(); - if item_fph == head_fph && chain.full_prev().is_valid() { - let (true_head_fph, _, _) = self.find_true_head(list_ph, head_fph)?; - head_fph = true_head_fph; - } - - let mut tail_fph = list.full_tail(); - if item_fph == tail_fph && chain.full_next().is_valid() { - let (true_tail_fph, _, _) = self.find_true_tail(list_ph, tail_fph)?; - tail_fph = true_tail_fph; - } - - if item_fph == head_fph && item_fph == tail_fph { - // it's the only element in the list - self.remove_raw(list_key)?; - self.remove_raw(item_key)?; - } else if item_fph == head_fph { - // it's the head - self._remove_from_list_head(list, chain, list_ph, list_key, item_key)? - } else if item_fph == tail_fph { - // it's the tail - self._remove_from_list_tail(list, chain, list_ph, list_key, item_key)? - } else { - // it's a middle element - self._remove_from_list_middle(chain, list_ph, item_fph, item_key)? - } - - Ok(()) - } - - /// Owned version of [Self::remove_from_list] - pub fn owned_remove_from_list( - &self, - list_key: Vec, - item_key: Vec, - ) -> Result>> { - let (list_ph, list_key) = self.make_list_key(list_key); - let (item_ph, item_key) = self.make_item_key(list_ph, item_key); - - let _guard = self._list_lock(list_ph); - - // if the item does not exist -- all's good - let Some(mut v) = self.get_raw(&item_key)? else { - return Ok(None); - }; - - let chain = chain_of(&v); - v.truncate(v.len() - size_of::()); - let item_fph = FullPartedHash::new(item_ph, chain._this_collidx); - - // fetch the list - let Some(list_buf) = self.get_raw(&list_key)? else { - // if it does not exist, it means we've crashed right between removing the list and removing - // the only item it held - proceed to removing this item - self.remove_raw(&item_key)?; - return Ok(Some(v)); - }; - - let list = *from_bytes::(&list_buf); - self._remove_from_list(list, &list_key, list_ph, &item_key, item_fph, chain)?; - - Ok(Some(v)) - } - - /// Removes and returns the first (head) element from the list - pub fn pop_list_head + ?Sized>(&self, list_key: &B) -> Result> { - self.owned_pop_list_head(list_key.as_ref().to_owned()) - } - - /// Owned version of [Self::pop_list_head] - pub fn owned_pop_list_head(&self, list_key: Vec) -> Result> { - let (list_ph, list_key) = self.make_list_key(list_key); - - let _guard = self._list_lock(list_ph); - - let Some(list_buf) = self.get_raw(&list_key)? else { - return Ok(None); - }; - let list = *from_bytes::(&list_buf); - - let (item_fph, mut item_key, mut item_val) = - self.find_true_head(list_ph, list.full_head())?; - let chain = chain_of(&item_val); - item_val.truncate(item_val.len() - size_of::()); - - self._remove_from_list(list, &list_key, list_ph, &item_key, item_fph, chain)?; - - item_key.truncate(item_key.len() - ITEM_SUFFIX_LEN); - Ok(Some((item_key, item_val))) - } - - /// Removes and returns the last (tail) element from the list - pub fn pop_list_tail + ?Sized>(&self, list_key: &B) -> Result> { - self.owned_pop_list_tail(list_key.as_ref().to_owned()) - } - - /// Owned version of [Self::pop_list_tail] - pub fn owned_pop_list_tail(&self, list_key: Vec) -> Result> { - let (list_ph, list_key) = self.make_list_key(list_key); - - let _guard = self._list_lock(list_ph); - - let Some(list_buf) = self.get_raw(&list_key)? else { - return Ok(None); - }; - let list = *from_bytes::(&list_buf); - - let (item_fph, mut item_key, mut item_val) = - self.find_true_tail(list_ph, list.full_tail())?; - let chain = chain_of(&item_val); - item_val.truncate(item_val.len() - size_of::()); - - self._remove_from_list(list, &list_key, list_ph, &item_key, item_fph, chain)?; - - item_key.truncate(item_key.len() - ITEM_SUFFIX_LEN); - Ok(Some((item_key, item_val))) - } -} diff --git a/src/lists.rs b/src/lists.rs index 084bf45..6558830 100644 --- a/src/lists.rs +++ b/src/lists.rs @@ -1,396 +1,412 @@ use crate::{ + encodable::EncodableUuid, hashing::PartedHash, - shard::KVPair, - store::{ITEM_NAMESPACE, LIST_NAMESPACE}, - CandyStore, Result, + shard::{InsertMode, KVPair}, + store::{CHAIN_NAMESPACE, ITEM_NAMESPACE, LIST_NAMESPACE}, + CandyStore, GetOrCreateStatus, ReplaceStatus, Result, SetStatus, }; use bytemuck::{bytes_of, from_bytes, Pod, Zeroable}; use parking_lot::MutexGuard; +use uuid::Uuid; -#[derive(Debug, Clone, Copy, Pod, Zeroable, Default)] +#[derive(Clone, Copy, Pod, Zeroable)] #[repr(C)] -pub(crate) struct LinkedList { - pub _head: PartedHash, - pub _tail: PartedHash, - pub _head_collidx: u16, - pub _tail_collidx: u16, - pub _reserved: u32, +struct LinkedList { + pub head_idx: u64, // inclusive + pub tail_idx: u64, // exclusive + pub holes: u64, } -impl LinkedList { - pub fn new(head: FullPartedHash, tail: FullPartedHash) -> Self { - assert!(head.is_valid(), "creating a list with an invalid head"); - assert!(tail.is_valid(), "creating a list with an invalid tail"); - Self { - _head: head.ph, - _head_collidx: head.collidx, - _tail: tail.ph, - _tail_collidx: tail.collidx, - ..Default::default() - } - } - pub fn full_tail(&self) -> FullPartedHash { - FullPartedHash::new(self._tail, self._tail_collidx) - } - pub fn full_head(&self) -> FullPartedHash { - FullPartedHash::new(self._head, self._head_collidx) - } - pub fn set_full_tail(&mut self, fph: FullPartedHash) { - self._tail = fph.ph; - self._tail_collidx = fph.collidx; - } - pub fn set_full_head(&mut self, fph: FullPartedHash) { - self._head = fph.ph; - self._head_collidx = fph.collidx; +impl std::fmt::Debug for LinkedList { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "LinkedList(0x{:016x}..0x{:016x} len={} holes={})", + self.head_idx, + self.tail_idx, + self.tail_idx - self.head_idx, + self.holes + ) } } -#[derive(Debug, Clone, Copy, Pod, Zeroable, Default)] -#[repr(C)] -pub(crate) struct Chain { - pub _prev: PartedHash, - pub _next: PartedHash, - pub _prev_collidx: u16, - pub _next_collidx: u16, - pub _this_collidx: u16, - pub _reserved: u16, -} - -impl Chain { - pub fn new(this_collidx: u16, next_fph: FullPartedHash, prev_fph: FullPartedHash) -> Self { - Self { - _this_collidx: this_collidx, - _next: next_fph.ph, - _next_collidx: next_fph.collidx, - _prev: prev_fph.ph, - _prev_collidx: prev_fph.collidx, - ..Default::default() - } - } - pub fn full_next(&self) -> FullPartedHash { - FullPartedHash::new(self._next, self._next_collidx) - } - pub fn full_prev(&self) -> FullPartedHash { - FullPartedHash::new(self._prev, self._prev_collidx) - } - pub fn set_full_next(&mut self, fph: FullPartedHash) { - self._next = fph.ph; - self._next_collidx = fph.collidx; +impl LinkedList { + fn len(&self) -> u64 { + self.tail_idx - self.head_idx } - pub fn set_full_prev(&mut self, fph: FullPartedHash) { - self._prev = fph.ph; - self._prev_collidx = fph.collidx; + fn is_empty(&self) -> bool { + self.head_idx == self.tail_idx } } -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub(crate) struct FullPartedHash { - pub ph: PartedHash, - pub collidx: u16, +#[derive(Debug, Clone, Copy, Pod, Zeroable)] +#[repr(C, packed)] +struct ChainKey { + list_ph: PartedHash, + idx: u64, + namespace: u8, } -impl FullPartedHash { - pub const INVALID: Self = Self { - ph: PartedHash::INVALID, - collidx: 0, - }; - pub fn new(ph: PartedHash, collidx: u16) -> Self { - Self { ph, collidx } - } - pub fn is_valid(&self) -> bool { - self.ph.is_valid() - } +#[derive(Debug)] +pub struct ListCompactionParams { + pub min_length: u64, + pub min_holes_ratio: f64, } -impl std::fmt::Display for FullPartedHash { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{}:{}", self.ph, self.collidx) + +impl Default for ListCompactionParams { + fn default() -> Self { + Self { + min_length: 100, + min_holes_ratio: 0.25, + } } } pub(crate) const ITEM_SUFFIX_LEN: usize = size_of::() + ITEM_NAMESPACE.len(); -pub(crate) fn chain_of(buf: &[u8]) -> Chain { - bytemuck::pod_read_unaligned(&buf[buf.len() - size_of::()..]) -} -pub(crate) fn update_chain_prev(val: &mut Vec, fph: FullPartedHash) { - let offset = val.len() - size_of::(); - let mut chain: Chain = bytemuck::pod_read_unaligned(&val[offset..]); - chain.set_full_prev(fph); - val[offset..].copy_from_slice(bytes_of(&chain)); -} - -pub(crate) fn update_chain_next(val: &mut Vec, fph: FullPartedHash) { - let offset = val.len() - size_of::(); - let mut chain: Chain = bytemuck::pod_read_unaligned(&val[offset..]); - chain.set_full_next(fph); - val[offset..].copy_from_slice(bytes_of(&chain)); -} - pub struct LinkedListIterator<'a> { store: &'a CandyStore, list_key: Vec, list_ph: PartedHash, - fph: Option, + list: Option, + idx: u64, + fwd: bool, } impl<'a> Iterator for LinkedListIterator<'a> { - type Item = Result>; + type Item = Result; + fn next(&mut self) -> Option { - if self.fph.is_none() { - let _guard = self.store._list_lock(self.list_ph); - let buf = match self.store.get_raw(&self.list_key) { - Ok(buf) => buf, + if self.list.is_none() { + let _guard = self.store.lock_list(self.list_ph); + let list_bytes = match self.store.get_raw(&self.list_key) { + Ok(Some(list_bytes)) => list_bytes, + Ok(None) => return None, Err(e) => return Some(Err(e)), }; - let Some(buf) = buf else { - return None; + let list = *from_bytes::(&list_bytes); + self.list = Some(list); + self.idx = if self.fwd { + list.head_idx + } else { + list.tail_idx - 1 }; - let list = *from_bytes::(&buf); - match self.store.find_true_head(self.list_ph, list.full_head()) { - Ok((fph, _, _)) => { - self.fph = Some(fph); - } - Err(e) => { - return Some(Err(e)); - } - } } - let Some(fph) = self.fph else { + let Some(list) = self.list else { return None; }; - if fph.ph.is_invalid() { - return None; - } - let kv = match self.store._list_get(self.list_ph, fph) { - Err(e) => return Some(Err(e)), - Ok(kv) => kv, - }; - let Some((mut k, mut v)) = kv else { - // this means the current element was removed by another thread, and that's okay - // because we don't hold any locks during iteration. this is an early stop, - // which means the reader might want to retry - return Some(Ok(None)); - }; - k.truncate(k.len() - ITEM_SUFFIX_LEN); - let chain = chain_of(&v); - self.fph = Some(chain.full_next()); - v.truncate(v.len() - size_of::()); - Some(Ok(Some((k, v)))) - } -} - -// it doesn't really make sense to implement DoubleEndedIterator here, because we'd have to maintain both -// pointers and the protocol says iteration ends when they meet in the middle -pub struct RevLinkedListIterator<'a> { - store: &'a CandyStore, - list_key: Vec, - list_ph: PartedHash, - fph: Option, -} + while if self.fwd { + self.idx < list.tail_idx + } else { + self.idx >= list.head_idx + } { + let idx = self.idx; + if self.fwd { + self.idx += 1; + } else { + self.idx -= 1; + } -impl<'a> Iterator for RevLinkedListIterator<'a> { - type Item = Result>; - fn next(&mut self) -> Option { - if self.fph.is_none() { - let _guard = self.store._list_lock(self.list_ph); - let buf = match self.store.get_raw(&self.list_key) { - Ok(buf) => buf, + match self.store.get_from_list_at_index(self.list_ph, idx, true) { Err(e) => return Some(Err(e)), - }; - let Some(buf) = buf else { - return None; - }; - let list = *from_bytes::(&buf); - match self.store.find_true_tail(self.list_ph, list.full_tail()) { - Ok((fph, _, _)) => { - self.fph = Some(fph); - } - Err(e) => { - return Some(Err(e)); + Ok(Some((_, k, v))) => return Some(Ok((k, v))), + Ok(None) => { + // try next index } } } - let Some(fph) = self.fph else { - return None; - }; - if fph.ph.is_invalid() { - return None; - } - let kv = match self.store._list_get(self.list_ph, fph) { - Err(e) => return Some(Err(e)), - Ok(kv) => kv, - }; - let Some((mut k, mut v)) = kv else { - // this means the current element was removed by another thread, and that's okay - // because we don't hold any locks during iteration. this is an early stop, - // which means the reader might want to retry - return Some(Ok(None)); - }; - k.truncate(k.len() - ITEM_SUFFIX_LEN); - let chain = chain_of(&v); - self.fph = Some(chain.full_prev()); - v.truncate(v.len() - size_of::()); - Some(Ok(Some((k, v)))) + None } } -macro_rules! corrupted_list { - ($($arg:tt)*) => { - anyhow::bail!(crate::CandyError::CorruptedLinkedList(format!($($arg)*))); - }; -} -pub(crate) use corrupted_list; - -pub(crate) enum InsertPosition { - Head, - Tail, +impl<'a> DoubleEndedIterator for LinkedListIterator<'a> { + fn next_back(&mut self) -> Option { + None + } } #[derive(Debug)] -pub(crate) enum InsertToListStatus { - ExistingValue(Vec), - WrongValue(Vec), - CreatedNew(Vec), +enum InsertToListStatus { + Created(Vec), DoesNotExist, + WrongValue(Vec), + ExistingValue(Vec), + Replaced(Vec), +} + +enum InsertToListPos { + Head, + Tail, } impl CandyStore { - pub(crate) fn make_list_key(&self, mut list_key: Vec) -> (PartedHash, Vec) { + const FIRST_IDX: u64 = 0x8000_0000_0000_0000; + + fn make_list_key(&self, mut list_key: Vec) -> (PartedHash, Vec) { list_key.extend_from_slice(LIST_NAMESPACE); (PartedHash::new(&self.config.hash_seed, &list_key), list_key) } - pub(crate) fn make_item_key( - &self, - list_ph: PartedHash, - mut item_key: Vec, - ) -> (PartedHash, Vec) { + fn make_item_key(&self, list_ph: PartedHash, mut item_key: Vec) -> (PartedHash, Vec) { item_key.extend_from_slice(bytes_of(&list_ph)); item_key.extend_from_slice(ITEM_NAMESPACE); (PartedHash::new(&self.config.hash_seed, &item_key), item_key) } - pub(crate) fn _list_lock(&self, list_ph: PartedHash) -> MutexGuard<()> { + fn lock_list(&self, list_ph: PartedHash) -> MutexGuard<()> { self.keyed_locks[(list_ph.signature() & self.keyed_locks_mask) as usize].lock() } - pub(crate) fn _list_get_free_collidx( + fn _insert_to_list( &self, - list_ph: PartedHash, - item_ph: PartedHash, - ) -> Result { - let mut suffix = [0u8; ITEM_SUFFIX_LEN]; - suffix[0..PartedHash::LEN].copy_from_slice(bytes_of(&list_ph)); - suffix[PartedHash::LEN..].copy_from_slice(ITEM_NAMESPACE); - let mut max_seen = None; - - for res in self.get_by_hash(item_ph)? { - let (k, v) = res?; - if k.ends_with(&suffix) { - let chain = chain_of(&v); - if max_seen.is_none() - || max_seen.is_some_and(|max_seen| chain._this_collidx > max_seen) - { - max_seen = Some(chain._this_collidx); + list_key: Vec, + item_key: Vec, + mut val: Vec, + mode: InsertMode, + pos: InsertToListPos, + ) -> Result { + let (list_ph, list_key) = self.make_list_key(list_key); + let (item_ph, item_key) = self.make_item_key(list_ph, item_key); + + let _guard = self.lock_list(list_ph); + + // if the item already exists, it's already part of the list. just update it and preserve the index + if let Some(mut existing_val) = self.get_raw(&item_key)? { + match mode { + InsertMode::GetOrCreate => { + existing_val.truncate(existing_val.len() - size_of::()); + return Ok(InsertToListStatus::ExistingValue(existing_val)); + } + InsertMode::Replace(expected_val) => { + if let Some(expected_val) = expected_val { + if expected_val != &existing_val[existing_val.len() - size_of::()..] { + existing_val.truncate(existing_val.len() - size_of::()); + return Ok(InsertToListStatus::WrongValue(existing_val)); + } + } + // fall through + } + InsertMode::Set => { + // fall through } } + + val.extend_from_slice(&existing_val[existing_val.len() - size_of::()..]); + self.replace_raw(&item_key, &val, None)?; + existing_val.truncate(existing_val.len() - size_of::()); + return Ok(InsertToListStatus::Replaced(existing_val)); } - Ok(max_seen.map(|max_seen| max_seen + 1).unwrap_or(0)) - } - pub(crate) fn _list_get( - &self, - list_ph: PartedHash, - item_fph: FullPartedHash, - ) -> Result> { - let mut suffix = [0u8; ITEM_SUFFIX_LEN]; - suffix[0..PartedHash::LEN].copy_from_slice(bytes_of(&list_ph)); - suffix[PartedHash::LEN..].copy_from_slice(ITEM_NAMESPACE); - - for res in self.get_by_hash(item_fph.ph)? { - let (k, v) = res?; - if k.ends_with(&suffix) { - let chain = chain_of(&v); - if chain._this_collidx == item_fph.collidx { - return Ok(Some((k, v))); - } + if matches!(mode, InsertMode::Replace(_)) { + // not allowed to create + return Ok(InsertToListStatus::DoesNotExist); + } + + // get of create the list + let res = self.get_or_create_raw( + &list_key, + bytes_of(&LinkedList { + head_idx: Self::FIRST_IDX, + tail_idx: Self::FIRST_IDX + 1, + holes: 0, + }) + .to_owned(), + )?; + + match res { + crate::GetOrCreateStatus::CreatedNew(_) => { + //println!("Created list"); + + // list was just created. create chain + self.set_raw( + bytes_of(&ChainKey { + list_ph, + idx: Self::FIRST_IDX, + namespace: CHAIN_NAMESPACE, + }), + bytes_of(&item_ph), + )?; + + // create item + val.extend_from_slice(&Self::FIRST_IDX.to_le_bytes()); + self.set_raw(&item_key, &val)?; + } + crate::GetOrCreateStatus::ExistingValue(list_bytes) => { + let mut list = *from_bytes::(&list_bytes); + + let item_idx = match pos { + InsertToListPos::Tail => { + let idx = list.tail_idx; + list.tail_idx += 1; + idx + } + InsertToListPos::Head => { + list.head_idx -= 1; + list.head_idx + } + }; + + // update list + self.set_raw(&list_key, bytes_of(&list))?; + + // create chain + self.set_raw( + bytes_of(&ChainKey { + list_ph, + idx: item_idx, + namespace: CHAIN_NAMESPACE, + }), + bytes_of(&item_ph), + )?; + + // create item + val.extend_from_slice(&item_idx.to_le_bytes()); + self.set_raw(&item_key, &val)?; } } - Ok(None) + + val.truncate(val.len() - size_of::()); + Ok(InsertToListStatus::Created(val)) } - pub(crate) fn find_true_tail( + pub fn set_in_list< + B1: AsRef<[u8]> + ?Sized, + B2: AsRef<[u8]> + ?Sized, + B3: AsRef<[u8]> + ?Sized, + >( &self, - list_ph: PartedHash, - fph: FullPartedHash, - ) -> Result<(FullPartedHash, Vec, Vec)> { - let mut curr = fph; - let mut last_valid = None; - - loop { - if let Some((k, v)) = self._list_get(list_ph, curr)? { - let chain = chain_of(&v); - - if chain._next.is_invalid() { - // curr is the true tail - return Ok((curr, k, v)); - } - last_valid = Some((curr, k, v)); - curr = chain.full_next(); + list_key: &B1, + item_key: &B2, + val: &B3, + ) -> Result { + self.owned_set_in_list( + list_key.as_ref().to_owned(), + item_key.as_ref().to_owned(), + val.as_ref().to_owned(), + false, + ) + } + + pub fn set_in_list_promoting< + B1: AsRef<[u8]> + ?Sized, + B2: AsRef<[u8]> + ?Sized, + B3: AsRef<[u8]> + ?Sized, + >( + &self, + list_key: &B1, + item_key: &B2, + val: &B3, + ) -> Result { + self.owned_set_in_list( + list_key.as_ref().to_owned(), + item_key.as_ref().to_owned(), + val.as_ref().to_owned(), + true, + ) + } - if curr == fph { - corrupted_list!("list {list_ph} loop detected {curr} (find_true_tail)"); - } - } else if let Some(last_valid) = last_valid { - // last_valid is the true tail - assert_ne!(curr, fph); - return Ok(last_valid); - } else { - // if prev=None, it means we weren't able to find list.tail. this should never happen - assert_eq!(curr, fph); - corrupted_list!("list {list_ph} tail {fph} does not exist"); - } + pub fn owned_set_in_list( + &self, + list_key: Vec, + item_key: Vec, + val: Vec, + promote: bool, + ) -> Result { + if promote { + self.owned_remove_from_list(list_key.clone(), item_key.clone())?; + } + match self._insert_to_list( + list_key, + item_key, + val, + InsertMode::Set, + InsertToListPos::Tail, + )? { + InsertToListStatus::Created(_v) => Ok(SetStatus::CreatedNew), + InsertToListStatus::Replaced(v) => Ok(SetStatus::PrevValue(v)), + _ => unreachable!(), } } - pub(crate) fn find_true_head( + pub fn replace_in_list< + B1: AsRef<[u8]> + ?Sized, + B2: AsRef<[u8]> + ?Sized, + B3: AsRef<[u8]> + ?Sized, + >( &self, - list_ph: PartedHash, - fph: FullPartedHash, - ) -> Result<(FullPartedHash, Vec, Vec)> { - let mut curr = fph; - let mut last_valid = None; - - loop { - if let Some((k, v)) = self._list_get(list_ph, curr)? { - let chain = chain_of(&v); - if chain._prev.is_invalid() { - // curr is the true head - return Ok((curr, k, v)); - } - last_valid = Some((curr, k, v)); - curr = chain.full_prev(); + list_key: &B1, + item_key: &B2, + val: &B3, + expected_val: Option<&B3>, + ) -> Result { + self.owned_replace_in_list( + list_key.as_ref().to_owned(), + item_key.as_ref().to_owned(), + val.as_ref().to_owned(), + expected_val.map(|ev| ev.as_ref()), + ) + } + + pub fn owned_replace_in_list( + &self, + list_key: Vec, + item_key: Vec, + val: Vec, + expected_val: Option<&[u8]>, + ) -> Result { + match self._insert_to_list( + list_key, + item_key, + val, + InsertMode::Replace(expected_val), + InsertToListPos::Tail, + )? { + InsertToListStatus::DoesNotExist => Ok(ReplaceStatus::DoesNotExist), + InsertToListStatus::Replaced(v) => Ok(ReplaceStatus::PrevValue(v)), + InsertToListStatus::WrongValue(v) => Ok(ReplaceStatus::WrongValue(v)), + _ => unreachable!(), + } + } - if curr == fph { - corrupted_list!("list {list_ph} loop detected {curr} (find_true_head)"); - } - } else if let Some(last_valid) = last_valid { - // last_valid is the true head - assert_ne!(curr, fph); - return Ok(last_valid); - } else { - // if prev=None, it means we weren't able to find list.head. this should never happen - assert_eq!(curr, fph); - corrupted_list!("list {list_ph} head {fph} does not exist"); - } + pub fn get_or_create_in_list< + B1: AsRef<[u8]> + ?Sized, + B2: AsRef<[u8]> + ?Sized, + B3: AsRef<[u8]> + ?Sized, + >( + &self, + list_key: &B1, + item_key: &B2, + default_val: &B3, + ) -> Result { + self.owned_get_or_create_in_list( + list_key.as_ref().to_owned(), + item_key.as_ref().to_owned(), + default_val.as_ref().to_owned(), + ) + } + + pub fn owned_get_or_create_in_list( + &self, + list_key: Vec, + item_key: Vec, + default_val: Vec, + ) -> Result { + match self._insert_to_list( + list_key, + item_key, + default_val, + InsertMode::GetOrCreate, + InsertToListPos::Tail, + )? { + InsertToListStatus::ExistingValue(v) => Ok(GetOrCreateStatus::ExistingValue(v)), + InsertToListStatus::Created(v) => Ok(GetOrCreateStatus::CreatedNew(v)), + _ => unreachable!(), } } - /// - /// See also [Self::get] pub fn get_from_list + ?Sized, B2: AsRef<[u8]> + ?Sized>( &self, list_key: &B1, @@ -399,7 +415,6 @@ impl CandyStore { self.owned_get_from_list(list_key.as_ref().to_owned(), item_key.as_ref().to_owned()) } - /// Owned version of [Self::get_from_list] pub fn owned_get_from_list( &self, list_key: Vec, @@ -407,108 +422,334 @@ impl CandyStore { ) -> Result>> { let (list_ph, _) = self.make_list_key(list_key); let (_, item_key) = self.make_item_key(list_ph, item_key); - let Some(mut v) = self.get_raw(&item_key)? else { + let Some(mut val) = self.get_raw(&item_key)? else { + return Ok(None); + }; + val.truncate(val.len() - size_of::()); + Ok(Some(val)) + } + + pub fn remove_from_list + ?Sized, B2: AsRef<[u8]> + ?Sized>( + &self, + list_key: &B1, + item_key: &B2, + ) -> Result>> { + self.owned_remove_from_list(list_key.as_ref().to_owned(), item_key.as_ref().to_owned()) + } + + pub fn owned_remove_from_list( + &self, + list_key: Vec, + item_key: Vec, + ) -> Result>> { + let (list_ph, list_key) = self.make_list_key(list_key); + let (_, item_key) = self.make_item_key(list_ph, item_key); + + let _guard = self.lock_list(list_ph); + + let Some(mut existing_val) = self.get_raw(&item_key)? else { return Ok(None); }; - v.truncate(v.len() - size_of::()); - Ok(Some(v)) + + let item_idx = u64::from_le_bytes( + (&existing_val[existing_val.len() - size_of::()..]) + .try_into() + .unwrap(), + ); + existing_val.truncate(existing_val.len() - size_of::()); + + // update list, if the item was the head/tail + let list_bytes = self.get_raw(&list_key)?.unwrap(); + let mut list = *from_bytes::(&list_bytes); + + if list.head_idx == item_idx || list.tail_idx == item_idx + 1 { + if list.head_idx == item_idx { + list.head_idx += 1; + } else if list.tail_idx == item_idx + 1 { + list.tail_idx -= 1; + } + if list.is_empty() { + self.remove_raw(&list_key)?; + } else { + self.set_raw(&list_key, bytes_of(&list))?; + } + } else { + list.holes += 1; + self.set_raw(&list_key, bytes_of(&list))?; + } + + // remove chain + self.remove_raw(bytes_of(&ChainKey { + list_ph, + idx: item_idx, + namespace: CHAIN_NAMESPACE, + }))?; + + // remove item + self.remove_raw(&item_key)?; + + Ok(Some(existing_val)) } - /// Iterates over the elements of the linked list (identified by `list_key`) from the beginning (head) - /// to the end (tail). Note that if items are removed at random locations in the list, the iterator - /// may not be able to progress and will return an early stop. - pub fn iter_list + ?Sized>(&self, list_key: &B) -> LinkedListIterator { + fn get_from_list_at_index( + &self, + list_ph: PartedHash, + idx: u64, + truncate: bool, + ) -> Result, Vec)>> { + let Some(item_ph_bytes) = self.get_raw(bytes_of(&ChainKey { + idx, + list_ph, + namespace: CHAIN_NAMESPACE, + }))? + else { + return Ok(None); + }; + let item_ph = *from_bytes::(&item_ph_bytes); + + // handle unlikely (but possible) collisions on item_ph + for kv in self.get_by_hash(item_ph)? { + let Ok((mut k, mut v)) = kv else { + continue; + }; + if v.ends_with(&idx.to_le_bytes()) { + if truncate { + v.truncate(v.len() - size_of::()); + k.truncate(k.len() - ITEM_SUFFIX_LEN); + } + return Ok(Some((item_ph, k, v))); + } + } + + Ok(None) + } + + /// Not crash-safe + pub fn compact_list_if_needed + ?Sized>( + &self, + list_key: &B, + params: ListCompactionParams, + ) -> Result { + let (list_ph, list_key) = self.make_list_key(list_key.as_ref().to_owned()); + let _guard = self.lock_list(list_ph); + + let Some(list_bytes) = self.get_raw(&list_key)? else { + return Ok(false); + }; + let list = *from_bytes::(&list_bytes); + if list.len() < params.min_length { + return Ok(false); + } + if (list.holes as f64) < (list.len() as f64) * params.min_holes_ratio { + return Ok(false); + } + + let mut new_idx = list.tail_idx; + for idx in list.head_idx..list.tail_idx { + let Some((item_ph, full_k, mut full_v)) = + self.get_from_list_at_index(list_ph, idx, false)? + else { + continue; + }; + + // create new chain + self.set_raw( + bytes_of(&ChainKey { + idx: new_idx, + list_ph, + namespace: CHAIN_NAMESPACE, + }), + bytes_of(&item_ph), + )?; + + // update item's index suffix + let offset = full_v.len() - size_of::(); + full_v[offset..].copy_from_slice(&new_idx.to_le_bytes()); + self.set_raw(&full_k, &full_v)?; + + // remove old chain + self.remove_raw(bytes_of(&ChainKey { + idx, + list_ph, + namespace: CHAIN_NAMESPACE, + }))?; + + new_idx += 1; + } + + // update list head and tail, set holes=0 + self.set_raw( + &list_key, + bytes_of(&LinkedList { + head_idx: list.tail_idx, + tail_idx: new_idx, + holes: 0, + }), + )?; + + Ok(true) + } + + pub fn iter_list<'a, B: AsRef<[u8]> + ?Sized>(&'a self, list_key: &B) -> LinkedListIterator { self.owned_iter_list(list_key.as_ref().to_owned()) } - /// Owned version of [Self::iter_list] - pub fn owned_iter_list(&self, list_key: Vec) -> LinkedListIterator { + pub fn owned_iter_list<'a>(&'a self, list_key: Vec) -> LinkedListIterator { let (list_ph, list_key) = self.make_list_key(list_key); LinkedListIterator { store: &self, list_key, list_ph, - fph: None, + list: None, + idx: 0, + fwd: true, } } - /// Same as [Self::iter_list], but goes from the end (tail) to the beginning (head) - pub fn iter_list_backwards + ?Sized>( - &self, + pub fn iter_list_backwards<'a, B: AsRef<[u8]> + ?Sized>( + &'a self, list_key: &B, - ) -> RevLinkedListIterator { + ) -> LinkedListIterator { self.owned_iter_list_backwards(list_key.as_ref().to_owned()) } - /// Owned version of [Self::iter_list_backwards] - pub fn owned_iter_list_backwards(&self, list_key: Vec) -> RevLinkedListIterator { + pub fn owned_iter_list_backwards<'a>(&'a self, list_key: Vec) -> LinkedListIterator { let (list_ph, list_key) = self.make_list_key(list_key); - RevLinkedListIterator { + LinkedListIterator { store: &self, list_key, list_ph, - fph: None, + list: None, + idx: 0, + fwd: false, } } - /// Discards the given list (removes all elements). This also works for corrupt lists, in case they - /// need to be dropped. pub fn discard_list + ?Sized>(&self, list_key: &B) -> Result<()> { self.owned_discard_list(list_key.as_ref().to_owned()) } - /// Owned version of [Self::discard_list] pub fn owned_discard_list(&self, list_key: Vec) -> Result<()> { let (list_ph, list_key) = self.make_list_key(list_key); + let _guard = self.lock_list(list_ph); - let _guard = self._list_lock(list_ph); - - let Some(list_buf) = self.remove_raw(&list_key)? else { + let Some(list_bytes) = self.get_raw(&list_key)? else { return Ok(()); }; - let list = *from_bytes::(&list_buf); - let mut fph = list.full_head(); - - while fph.is_valid() { - let Some((k, v)) = self._list_get(list_ph, fph)? else { - break; + let list = *from_bytes::(&list_bytes); + for idx in list.head_idx..list.tail_idx { + let Some((_, full_key, _)) = self.get_from_list_at_index(list_ph, idx, false)? else { + continue; }; - - let chain = chain_of(&v); - fph = chain.full_next(); - self.remove_raw(&k)?; + self.remove_raw(bytes_of(&ChainKey { + list_ph, + idx, + namespace: CHAIN_NAMESPACE, + }))?; + self.remove_raw(&full_key)?; } + self.remove_raw(&list_key)?; Ok(()) } - // optimization: add singly-linked lists that allow removing only from the head and inserting - // only at the tail, but support O(1) access (by index) and update of existing elements. - // this would require only 2 IOs instead of 3 when inserting new elements. - - /// Returns the first (head) element of the list. Note that it's prone to spurious false positives - /// (returning an element that no longer exists) or false negatives (returning `None` when an element - /// exists) in case different threads pop the head pub fn peek_list_head + ?Sized>(&self, list_key: &B) -> Result> { self.owned_peek_list_head(list_key.as_ref().to_owned()) } - /// Owned version of [Self::peek_list_head] pub fn owned_peek_list_head(&self, list_key: Vec) -> Result> { - self.owned_iter_list(list_key).next().unwrap_or(Ok(None)) + let Some(kv) = self.owned_iter_list(list_key).next() else { + return Ok(None); + }; + Ok(Some(kv?)) } - /// Returns the last (tail) element of the list. Note that it's prone to spurious false positives - /// (returning an element that no longer exists) or false negatives (returning `None` when an element - /// exists) in case different threads pop the tail pub fn peek_list_tail + ?Sized>(&self, list_key: &B) -> Result> { self.owned_peek_list_tail(list_key.as_ref().to_owned()) } - /// Owned version of [Self::peek_list_tail] pub fn owned_peek_list_tail(&self, list_key: Vec) -> Result> { - self.owned_iter_list_backwards(list_key) - .next() - .unwrap_or(Ok(None)) + for kv in self.owned_iter_list_backwards(list_key) { + return Ok(Some(kv?)); + } + Ok(None) + } + + pub fn pop_list_head + ?Sized>(&self, list_key: &B) -> Result> { + self.owned_pop_list_head(list_key.as_ref().to_owned()) + } + + pub fn owned_pop_list_head(&self, list_key: Vec) -> Result> { + for kv in self.owned_iter_list(list_key.clone()) { + let (k, v) = kv?; + if let Some(_) = self.owned_remove_from_list(list_key.clone(), k.clone())? { + return Ok(Some((k, v))); + } + } + Ok(None) + } + + pub fn pop_list_tail + ?Sized>(&self, list_key: &B) -> Result> { + self.owned_pop_list_tail(list_key.as_ref().to_owned()) + } + + pub fn owned_pop_list_tail(&self, list_key: Vec) -> Result> { + for kv in self.owned_iter_list_backwards(list_key.clone()) { + let (k, v) = kv?; + if let Some(_) = self.owned_remove_from_list(list_key.clone(), k.clone())? { + return Ok(Some((k, v))); + } + } + Ok(None) + } + + pub fn push_to_list_head + ?Sized, B2: AsRef<[u8]> + ?Sized>( + &self, + list_key: &B1, + val: &B2, + ) -> Result { + self.owned_push_to_list_head(list_key.as_ref().to_owned(), val.as_ref().to_owned()) + } + + fn owned_push_to_list( + &self, + list_key: Vec, + val: Vec, + pos: InsertToListPos, + ) -> Result { + let uuid = Uuid::from_bytes(rand::random()); + let res = self._insert_to_list( + list_key, + uuid.as_bytes().to_vec(), + val, + InsertMode::GetOrCreate, + pos, + )?; + debug_assert!(matches!(res, InsertToListStatus::Created(_))); + Ok(EncodableUuid::from(uuid)) + } + + pub fn owned_push_to_list_head( + &self, + list_key: Vec, + val: Vec, + ) -> Result { + self.owned_push_to_list(list_key, val, InsertToListPos::Head) + } + + pub fn push_to_list_tail + ?Sized, B2: AsRef<[u8]> + ?Sized>( + &self, + list_key: &B1, + val: &B2, + ) -> Result { + self.owned_push_to_list_tail(list_key.as_ref().to_owned(), val.as_ref().to_owned()) + } + + pub fn owned_push_to_list_tail( + &self, + list_key: Vec, + val: Vec, + ) -> Result { + self.owned_push_to_list(list_key, val, InsertToListPos::Tail) } } diff --git a/src/store.rs b/src/store.rs index 954db40..024e44f 100644 --- a/src/store.rs +++ b/src/store.rs @@ -1,4 +1,5 @@ -use anyhow::{anyhow, Context}; +use anyhow::{anyhow, bail, Context}; +use fslock::LockFile; use parking_lot::{Mutex, RwLock}; use std::{ collections::BTreeMap, @@ -24,6 +25,7 @@ pub(crate) const USER_NAMESPACE: &[u8] = &[1]; pub(crate) const TYPED_NAMESPACE: &[u8] = &[2]; pub(crate) const LIST_NAMESPACE: &[u8] = &[3]; pub(crate) const ITEM_NAMESPACE: &[u8] = &[4]; +pub(crate) const CHAIN_NAMESPACE: u8 = 5; /// Stats from CandyStore #[derive(Debug, Default, PartialEq, Eq, Clone)] @@ -174,6 +176,7 @@ pub struct CandyStore { // locks for complicated operations pub(crate) keyed_locks_mask: u32, pub(crate) keyed_locks: Vec>, + _lockfile: LockFile, } /// An iterator over a CandyStore. Note that it's safe to modify (insert/delete) keys while iterating, @@ -279,6 +282,12 @@ impl CandyStore { let dir_path: PathBuf = dir_path.as_ref().into(); std::fs::create_dir_all(&dir_path)?; + let lockfilename = dir_path.join(".lock"); + let mut lockfile = fslock::LockFile::open(&lockfilename)?; + if !lockfile.try_lock_with_pid()? { + bail!("Failed to lock {lockfilename:?}"); + } + Self::load_existing_dir(&dir_path, &config, &mut shards)?; if shards.is_empty() { Self::create_first_shards(&dir_path, &config, &mut shards)?; @@ -303,6 +312,7 @@ impl CandyStore { num_splits: Default::default(), keyed_locks_mask: num_keyed_locks - 1, keyed_locks, + _lockfile: lockfile, }) } diff --git a/src/typed.rs b/src/typed.rs index c477072..f6aa79a 100644 --- a/src/typed.rs +++ b/src/typed.rs @@ -402,8 +402,7 @@ where let list_key = Self::make_list_key(list_key); self.store.owned_iter_list(list_key).map(|res| match res { Err(e) => Err(e), - Ok(None) => Ok(None), - Ok(Some((k, v))) => { + Ok((k, v)) => { let key = from_bytes::(&k)?; let val = from_bytes::(&v)?; Ok(Some((key, val))) @@ -424,8 +423,7 @@ where .owned_iter_list_backwards(list_key) .map(|res| match res { Err(e) => Err(e), - Ok(None) => Ok(None), - Ok(Some((k, v))) => { + Ok((k, v)) => { let key = from_bytes::(&k)?; let val = from_bytes::(&v)?; Ok(Some((key, val))) diff --git a/test-list-collisions/src/main.rs b/test-list-collisions/src/main.rs index b7fd3be..bbcd68d 100644 --- a/test-list-collisions/src/main.rs +++ b/test-list-collisions/src/main.rs @@ -5,32 +5,48 @@ fn main() -> Result<()> { db.clear()?; // force many elements to end up with the same PartedHash - unsafe { HASH_BITS_TO_KEEP = 0xf000_000f_0000_00ff }; + unsafe { HASH_BITS_TO_KEEP = 0xff00_000f_0000_00ff }; for i in 0u32..100_000 { - //println!("push {i}"); + if i % 10_000 == 0 { + println!("push {i}"); + } db.push_to_list_tail("xxx", &i.to_le_bytes())?; } for i in 0u32..100_000 { - //println!("pop {i}"); + if i % 10_000 == 0 { + println!("pop {i}"); + } assert_eq!(db.pop_list_head("xxx")?.unwrap().1, &i.to_le_bytes()); } assert!(db.pop_list_head("xxx")?.is_none()); + assert!(db.pop_list_tail("xxx")?.is_none()); + assert_eq!(db.iter_list("xxx").count(), 0); for i in 0u32..100_000 { - //println!("push {i}"); + if i % 10_000 == 0 { + println!("push {i}"); + } db.push_to_list_head("xxx", &i.to_le_bytes())?; } for i in 0u32..100_000 { - //println!("pop {i}"); - assert_eq!(db.pop_list_tail("xxx")?.unwrap().1, &i.to_le_bytes()); + if i % 10_000 == 0 { + println!("pop {i}"); + } + assert_eq!( + db.pop_list_tail("xxx")?.unwrap().1, + &i.to_le_bytes(), + "i={i}" + ); } assert!(db.pop_list_head("xxx")?.is_none()); + unsafe { HASH_BITS_TO_KEEP = 0x0000_000f_0000_00ff }; + for i in 0u32..1000 { db.set_in_list("xxx", &i.to_le_bytes(), &i.to_le_bytes())?; } @@ -57,7 +73,7 @@ fn main() -> Result<()> { let remaining = db .iter_list("xxx") - .map(|res| u32::from_le_bytes(res.unwrap().unwrap().1.try_into().unwrap())) + .map(|res| u32::from_le_bytes(res.unwrap().1.try_into().unwrap())) .collect::>(); let expectd = (100..400).chain(600..900).collect::>(); diff --git a/tests/test_lists.rs b/tests/test_lists.rs index 7947592..69f435b 100644 --- a/tests/test_lists.rs +++ b/tests/test_lists.rs @@ -1,12 +1,6 @@ mod common; -use std::{ - collections::HashSet, - sync::{ - atomic::{AtomicBool, AtomicUsize}, - Arc, - }, -}; +use std::sync::{atomic::AtomicUsize, Arc}; use candystore::{ CandyStore, CandyTypedDeque, CandyTypedList, Config, GetOrCreateStatus, ReplaceStatus, Result, @@ -44,7 +38,7 @@ fn test_lists() -> Result<()> { let items = db .iter_list("texas") - .map(|res| res.unwrap().unwrap()) + .map(|res| res.unwrap()) .collect::>(); assert_eq!(items[0].0, "dallas".as_bytes()); assert_eq!(items[2].0, "houston".as_bytes()); @@ -81,7 +75,7 @@ fn test_lists() -> Result<()> { assert!(db._num_splits() > 1); for (i, res) in db.iter_list("xxx").enumerate() { - let (k, _) = res?.unwrap(); + let (k, _) = res?; assert_eq!(k, format!("my key {i}").as_bytes()); db.remove_from_list("xxx", &k)?; } @@ -347,14 +341,14 @@ fn test_rev_iter() -> Result<()> { let items = db .iter_list("mylist") - .map(|res| res.unwrap().unwrap().0) + .map(|res| res.unwrap().0) .collect::>(); assert_eq!(items, vec![b"item1", b"item2", b"item3", b"item4"]); let items = db .iter_list_backwards("mylist") - .map(|res| res.unwrap().unwrap().0) + .map(|res| res.unwrap().0) .collect::>(); assert_eq!(items, vec![b"item4", b"item3", b"item2", b"item1"]); @@ -380,7 +374,7 @@ fn test_promote() -> Result<()> { let items = || { db.iter_list("mylist") - .map(|res| res.unwrap().unwrap().0) + .map(|res| res.unwrap().0) .collect::>() }; @@ -445,102 +439,3 @@ fn test_typed_promote() -> Result<()> { Ok(()) }) } - -#[test] -fn test_list_iter_early_stop() -> Result<()> { - run_in_tempdir(|dir| { - let db = Arc::new(CandyStore::open(dir, Config::default())?); - - for i in 0u32..1000 { - db.push_to_list_tail("xxx", &i.to_le_bytes())?; - } - - let mut got = vec![]; - for res in db.iter_list("xxx") { - let Some((_, v)) = res? else { - break; - }; - let i = u32::from_le_bytes(v.try_into().unwrap()); - if i == 10 { - db.discard_list("xxx")?; - // create a some new elements, but the original iterator is broken at this point - db.push_to_list_tail("xxx", &(i + 1).to_le_bytes())?; - db.push_to_list_tail("xxx", &(i + 2).to_le_bytes())?; - db.push_to_list_tail("xxx", &(i + 3).to_le_bytes())?; - } - got.push(i); - } - assert_eq!(got, vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]); - - assert_eq!( - db.iter_list("xxx") - .map(|res| u32::from_le_bytes(res.unwrap().unwrap().1.try_into().unwrap())) - .collect::>(), - vec![11, 12, 13] - ); - - db.discard_list("xxx")?; - - let done = Arc::new(AtomicBool::new(false)); - - let h1 = { - let db = db.clone(); - let done = done.clone(); - std::thread::spawn(move || { - for i in 0u32..1000 { - db.set_in_list("xxx", &i.to_le_bytes(), &i.to_le_bytes())?; - std::thread::yield_now(); - } - done.store(true, std::sync::atomic::Ordering::SeqCst); - - Result::<()>::Ok(()) - }) - }; - - let h2 = { - let db = db.clone(); - std::thread::spawn(move || { - for i in 0u32..1000 { - if db.remove_from_list("xxx", &i.to_le_bytes())?.is_none() { - std::thread::sleep(std::time::Duration::from_micros(100)); - } - } - - Result::<()>::Ok(()) - }) - }; - - let h3 = { - let db = db.clone(); - let done = done.clone(); - std::thread::spawn(move || { - let mut retry = true; - let mut got = HashSet::new(); - while retry || !done.load(std::sync::atomic::Ordering::SeqCst) { - retry = false; - for res in db.iter_list("xxx") { - let Some((k, v)) = res? else { - std::thread::sleep(std::time::Duration::from_micros(100)); - retry = true; - break; - }; - let k = u32::from_le_bytes(k.try_into().unwrap()); - let v = u32::from_le_bytes(v.try_into().unwrap()); - assert_eq!(k, v); - got.insert(k); - std::thread::yield_now(); - } - } - - Result::>::Ok(got) - }) - }; - - h1.join().unwrap()?; - h2.join().unwrap()?; - let got = h3.join().unwrap()?; - println!("{}", got.len()); - - Ok(()) - }) -}