diff --git a/Cargo.lock b/Cargo.lock index dc8369e..567e4bc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -71,7 +71,7 @@ dependencies = [ [[package]] name = "candystore" -version = "0.3.0" +version = "0.3.1" dependencies = [ "anyhow", "bytemuck", diff --git a/Cargo.toml b/Cargo.toml index 72927fd..8b02e53 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "candystore" -version = "0.3.0" +version = "0.3.1" edition = "2021" license = "Apache-2.0" keywords = ["key-value", "database", "persistent", "store", "rocksdb"] diff --git a/README.md b/README.md index 2d8dd6e..0062f8a 100644 --- a/README.md +++ b/README.md @@ -140,7 +140,7 @@ for res in db.iter_list("mylist") { ## Notes * The file format is not yet stable -* Requires nightly (for `simd_itertools` and BTree cursors), uses very little `unsafe` (required due to `mmap`) +* Requires nightly (for `simd_itertools`), uses very little `unsafe` (required due to `mmap`) ## Roadmap * Distributed protocol based on file locks (meant to run on a shared network folder) diff --git a/src/hashing.rs b/src/hashing.rs index b7d2573..fc9d5ff 100644 --- a/src/hashing.rs +++ b/src/hashing.rs @@ -1,7 +1,6 @@ -use anyhow::anyhow; use siphasher::sip128::{Hash128, SipHasher24}; -use crate::{CandyError, Result}; +use crate::Result; #[derive(Debug, Clone, Copy)] pub struct HashSeed([u8; 16]); @@ -11,15 +10,13 @@ pub struct HashSeed([u8; 16]); impl HashSeed { pub const LEN: usize = size_of::(); + pub fn new(bytes: [u8; Self::LEN]) -> Self { + Self(bytes) + } + /// Construct a HashSeed from the given byte buffer (must be 16 bytes in length) - pub fn new + ?Sized>(key: &B) -> Result { - let key = key.as_ref(); - if key.len() != Self::LEN { - return Err(anyhow!(CandyError::WrongHashSeedLength)); - } - let mut bytes = [0u8; Self::LEN]; - bytes.copy_from_slice(&key); - Ok(Self(bytes)) + pub fn from_buf + ?Sized>(key: &B) -> Result { + Ok(Self(key.as_ref().try_into()?)) } } @@ -97,9 +94,9 @@ impl PartedHash { fn test_parted_hash() -> Result<()> { use bytemuck::{bytes_of, from_bytes}; - HashSeed::new("12341234123412341").expect_err("shouldn't work"); + HashSeed::from_buf("12341234123412341").expect_err("shouldn't work"); - let seed = HashSeed::new("aaaabbbbccccdddd")?; + let seed = HashSeed::from_buf("aaaabbbbccccdddd")?; let h1 = PartedHash::new(&seed, b"hello world"); assert_eq!(h1.0, 13445180190757400308,); diff --git a/src/insertion.rs b/src/insertion.rs deleted file mode 100644 index 8f55b86..0000000 --- a/src/insertion.rs +++ /dev/null @@ -1,352 +0,0 @@ -use anyhow::{anyhow, Context}; -use std::ops::Bound; -use std::sync::atomic::Ordering; - -use crate::hashing::PartedHash; -use crate::shard::{InsertMode, InsertStatus, Shard}; -use crate::store::CandyStore; -use crate::{CandyError, Result, MAX_TOTAL_KEY_SIZE, MAX_VALUE_SIZE}; - -#[derive(Debug, Clone, PartialEq, Eq)] -pub enum ReplaceStatus { - PrevValue(Vec), - WrongValue(Vec), - DoesNotExist, -} -impl ReplaceStatus { - pub fn was_replaced(&self) -> bool { - matches!(*self, Self::PrevValue(_)) - } - pub fn failed(&self) -> bool { - !matches!(*self, Self::PrevValue(_)) - } - pub fn is_key_missing(&self) -> bool { - matches!(*self, Self::DoesNotExist) - } - pub fn is_wrong_value(&self) -> bool { - matches!(*self, Self::WrongValue(_)) - } -} - -#[derive(Debug, Clone, PartialEq, Eq)] -pub enum SetStatus { - PrevValue(Vec), - CreatedNew, -} -impl SetStatus { - pub fn was_created(&self) -> bool { - matches!(*self, Self::CreatedNew) - } - pub fn was_replaced(&self) -> bool { - matches!(*self, Self::PrevValue(_)) - } -} - -#[derive(Debug, Clone, PartialEq, Eq)] -pub enum GetOrCreateStatus { - ExistingValue(Vec), - CreatedNew(Vec), -} -impl GetOrCreateStatus { - pub fn was_created(&self) -> bool { - matches!(*self, Self::CreatedNew(_)) - } - pub fn already_exists(&self) -> bool { - matches!(*self, Self::ExistingValue(_)) - } - pub fn value(self) -> Vec { - match self { - Self::CreatedNew(val) => val, - Self::ExistingValue(val) => val, - } - } -} - -impl CandyStore { - fn compact(&self, shard_end: u32, write_offset: u32) -> Result { - let mut guard = self.shards.write(); - // it's possible that another thread already compacted this shard - if guard - .get(&shard_end) - .with_context(|| format!("missing shard {shard_end}"))? - .get_write_offset() - < write_offset - { - return Ok(false); - } - - let removed_shard = guard - .remove(&shard_end) - .with_context(|| format!("missing shard {shard_end}"))?; - let orig_filename = self.dir_path.join(format!( - "shard_{:04x}-{:04x}", - removed_shard.span.start, removed_shard.span.end - )); - let tmpfile = self.dir_path.join(format!( - "compact_{:04x}-{:04x}", - removed_shard.span.start, removed_shard.span.end - )); - let mut compacted_shard = Shard::open( - tmpfile.clone(), - removed_shard.span.clone(), - true, - self.config.clone(), - )?; - - self.num_compactions.fetch_add(1, Ordering::SeqCst); - - removed_shard.compact_into(&mut compacted_shard)?; - - std::fs::rename(tmpfile, &orig_filename)?; - guard.insert(shard_end, compacted_shard); - Ok(true) - } - - fn split(&self, shard_start: u32, shard_end: u32) -> Result { - let mut guard = self.shards.write(); - // it's possible that another thread already split this range - check if midpoint exists, and if so, bail out - let midpoint = shard_start / 2 + shard_end / 2; - if guard.contains_key(&midpoint) { - return Ok(false); - } - - let removed_shard = guard - .remove(&shard_end) - .with_context(|| format!("missing shard {shard_end}"))?; - - let bottomfile = self - .dir_path - .join(format!("bottom_{:04x}-{:04x}", shard_start, midpoint)); - let topfile = self - .dir_path - .join(format!("top_{:04x}-{:04x}", midpoint, shard_end)); - - let bottom_shard = Shard::open( - bottomfile.clone(), - shard_start..midpoint, - true, - self.config.clone(), - )?; - let top_shard = Shard::open( - topfile.clone(), - midpoint..shard_end, - true, - self.config.clone(), - )?; - - removed_shard.split_into(&bottom_shard, &top_shard)?; - - self.num_splits.fetch_add(1, Ordering::SeqCst); - - // this is not atomic, so when loading, we need to take the larger span if it exists and - // delete the partial ones - std::fs::rename( - bottomfile, - self.dir_path - .join(format!("shard_{:04x}-{:04x}", shard_start, midpoint)), - )?; - std::fs::rename( - topfile, - self.dir_path - .join(format!("shard_{:04x}-{:04x}", midpoint, shard_end)), - )?; - std::fs::remove_file( - self.dir_path - .join(format!("shard_{:04x}-{:04x}", shard_start, shard_end)), - )?; - - guard.insert(midpoint, bottom_shard); - guard.insert(shard_end, top_shard); - - Ok(true) - } - - fn try_insert( - &self, - ph: PartedHash, - key: &[u8], - val: &[u8], - mode: InsertMode, - ) -> Result<(InsertStatus, u32, u32)> { - let guard = self.shards.read(); - let cursor = guard.lower_bound(Bound::Excluded(&(ph.shard_selector() as u32))); - let shard_start = cursor - .peek_prev() - .map(|(&shard_start, _)| shard_start) - .unwrap_or(0); - let (shard_end, shard) = cursor.peek_next().with_context(|| { - format!( - "missing shard for selector 0x{:04x} start=0x{:04x}", - ph.shard_selector(), - shard_start - ) - })?; - let status = shard.insert(ph, key, val, mode)?; - - Ok((status, shard_start, *shard_end)) - } - - pub(crate) fn insert_internal( - &self, - full_key: &[u8], - val: &[u8], - mode: InsertMode, - ) -> Result { - let ph = PartedHash::new(&self.config.hash_seed, full_key); - - if full_key.len() > MAX_TOTAL_KEY_SIZE as usize { - return Err(anyhow!(CandyError::KeyTooLong(full_key.len()))); - } - if val.len() > MAX_VALUE_SIZE as usize { - return Err(anyhow!(CandyError::ValueTooLong(val.len()))); - } - if full_key.len() + val.len() > self.config.max_shard_size as usize { - return Err(anyhow!(CandyError::EntryCannotFitInShard( - full_key.len() + val.len(), - self.config.max_shard_size as usize - ))); - } - - loop { - let (status, shard_start, shard_end) = self.try_insert(ph, &full_key, val, mode)?; - - match status { - InsertStatus::Added => { - self.num_entries.fetch_add(1, Ordering::SeqCst); - return Ok(status); - } - InsertStatus::KeyDoesNotExist => { - return Ok(status); - } - InsertStatus::Replaced(_) => { - return Ok(status); - } - InsertStatus::AlreadyExists(_) => { - return Ok(status); - } - InsertStatus::CompactionNeeded(write_offset) => { - self.compact(shard_end, write_offset)?; - // retry - } - InsertStatus::SplitNeeded => { - self.split(shard_start, shard_end)?; - // retry - } - } - } - } - - pub(crate) fn set_raw(&self, full_key: &[u8], val: &[u8]) -> Result { - match self.insert_internal(full_key, val, InsertMode::Set)? { - InsertStatus::Added => Ok(SetStatus::CreatedNew), - InsertStatus::Replaced(v) => Ok(SetStatus::PrevValue(v)), - InsertStatus::AlreadyExists(v) => Ok(SetStatus::PrevValue(v)), - InsertStatus::KeyDoesNotExist => unreachable!(), - InsertStatus::CompactionNeeded(_) => unreachable!(), - InsertStatus::SplitNeeded => unreachable!(), - } - } - - /// Inserts a key-value pair, creating it or replacing an existing pair. Note that if the program crashed - /// while or "right after" this operation, or if the operating system is unable to flush the page cache, - /// you may lose some data. However, you will still be in a consistent state, where you will get a previous - /// version of the state. - /// - /// While this method is O(1) amortized, every so often it will trigger either a shard compaction or a - /// shard split, which requires rewriting the whole shard. However, unlike LSM trees, this operation is - /// constant in size - pub fn set + ?Sized, B2: AsRef<[u8]> + ?Sized>( - &self, - key: &B1, - val: &B2, - ) -> Result { - self.owned_set(key.as_ref().to_owned(), val.as_ref()) - } - - /// Same as [Self::set], but the key passed owned to this function - pub fn owned_set(&self, key: Vec, val: &[u8]) -> Result { - self.set_raw(&self.make_user_key(key), val) - } - - pub(crate) fn replace_raw( - &self, - full_key: &[u8], - val: &[u8], - expected_val: Option<&[u8]>, - ) -> Result { - match self.insert_internal(full_key, val, InsertMode::Replace(expected_val))? { - InsertStatus::Added => unreachable!(), - InsertStatus::Replaced(v) => Ok(ReplaceStatus::PrevValue(v)), - InsertStatus::AlreadyExists(v) => Ok(ReplaceStatus::WrongValue(v)), - InsertStatus::KeyDoesNotExist => Ok(ReplaceStatus::DoesNotExist), - InsertStatus::CompactionNeeded(_) => unreachable!(), - InsertStatus::SplitNeeded => unreachable!(), - } - } - - /// Replaces the value of an existing key with a new value. If the key existed, returns - /// `PrevValue(value)` with its old value, and if it did not, returns `DoesNotExist` but - /// does not create the key. - /// - /// See [Self::set] for more details - pub fn replace + ?Sized, B2: AsRef<[u8]> + ?Sized>( - &self, - key: &B1, - val: &B2, - expected_val: Option<&B2>, - ) -> Result { - self.owned_replace( - key.as_ref().to_owned(), - val.as_ref(), - expected_val.map(|ev| ev.as_ref()), - ) - } - - /// Same as [Self::replace], but the key passed owned to this function - pub fn owned_replace( - &self, - key: Vec, - val: &[u8], - expected_val: Option<&[u8]>, - ) -> Result { - self.replace_raw(&self.make_user_key(key), val.as_ref(), expected_val) - } - - pub(crate) fn get_or_create_raw( - &self, - full_key: &[u8], - default_val: Vec, - ) -> Result { - match self.insert_internal(full_key, &default_val, InsertMode::GetOrCreate)? { - InsertStatus::Added => Ok(GetOrCreateStatus::CreatedNew(default_val)), - InsertStatus::AlreadyExists(v) => Ok(GetOrCreateStatus::ExistingValue(v)), - InsertStatus::Replaced(_) => unreachable!(), - InsertStatus::KeyDoesNotExist => unreachable!(), - InsertStatus::CompactionNeeded(_) => unreachable!(), - InsertStatus::SplitNeeded => unreachable!(), - } - } - - /// Gets the value of the given key or creates it with the given default value. If the key did not exist, - /// returns `CreatedNew(default_val)`, and if it did, returns `ExistingValue(value)`. - /// This is done atomically, so it can be used to create a key only if it did not exist before, - /// like `open` with `O_EXCL`. - /// - /// See [Self::set] for more details - pub fn get_or_create + ?Sized, B2: AsRef<[u8]> + ?Sized>( - &self, - key: &B1, - default_val: &B2, - ) -> Result { - self.owned_get_or_create(key.as_ref().to_owned(), default_val.as_ref().to_owned()) - } - - /// Same as [Self::get_or_create], but the `key` and `default_val` are passed owned to this function - pub fn owned_get_or_create( - &self, - key: Vec, - default_val: Vec, - ) -> Result { - self.get_or_create_raw(&self.make_user_key(key), default_val) - } -} diff --git a/src/lib.rs b/src/lib.rs index a49f5fe..d7b2e5d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -45,21 +45,20 @@ //! } //! ``` -#![feature(btree_cursors)] - mod encodable; mod hashing; -mod insertion; mod lists; +mod router; mod shard; mod store; mod typed; pub use hashing::HashSeed; -pub use insertion::{GetOrCreateStatus, ReplaceStatus, SetStatus}; pub use lists::{ListCompactionParams, ListIterator}; use std::fmt::{Display, Formatter}; -pub use store::{CandyStore, CoarseHistogram, SizeHistogram, Stats}; +pub use store::{ + CandyStore, CoarseHistogram, GetOrCreateStatus, ReplaceStatus, SetStatus, SizeHistogram, Stats, +}; pub use typed::{CandyTypedDeque, CandyTypedKey, CandyTypedList, CandyTypedStore}; #[cfg(feature = "whitebox_testing")] @@ -115,7 +114,7 @@ impl Default for Config { Self { max_shard_size: 64 * 1024 * 1024, min_compaction_threashold: 8 * 1024 * 1024, - hash_seed: HashSeed::new(b"kOYLu0xvq2WtzcKJ").unwrap(), + hash_seed: HashSeed::from_buf(b"kOYLu0xvq2WtzcKJ").unwrap(), expected_number_of_keys: 0, merge_small_shards: false, max_concurrent_list_ops: 64, diff --git a/src/router.rs b/src/router.rs new file mode 100644 index 0000000..3b03875 --- /dev/null +++ b/src/router.rs @@ -0,0 +1,483 @@ +use anyhow::ensure; +use parking_lot::RwLock; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::{ops::Range, sync::Arc}; + +use crate::shard::{InsertMode, InsertStatus, Shard}; +use crate::Result; +use crate::{hashing::PartedHash, store::InternalConfig}; + +fn consolidate_ranges(mut ranges: Vec>) -> (Vec>, Vec>) { + // we may encounter unfinished splits, where we have any combination of the bottom half, top half and + // original shard existing. in this case, we want to keep the largest of them, e.g, suppose we find + // [0..16, 0..32], we want to remove 0..16 and keep only 0..32. to do that, we sort by `start` + // followed by sorting by end, so [0..16, 16..32, 0..32] is sorted as [0..32, 0..16, 16..32], which means + // we'll encounter all over-arching ranges before smaller ones + ranges.sort_by(|a, b| { + if a.start == b.start { + b.end.cmp(&a.end) + } else { + a.start.cmp(&b.start) + } + }); + + let mut removed = vec![]; + let mut i = 1; + while i < ranges.len() { + if ranges[i].start >= ranges[i - 1].start && ranges[i].end <= ranges[i - 1].end { + removed.push(ranges.remove(i)); + } else { + i += 1; + } + } + (ranges, removed) +} + +#[test] +fn test_consolidate_ranges() { + assert_eq!(consolidate_ranges(vec![0..16]), (vec![0..16], vec![])); + assert_eq!( + consolidate_ranges(vec![16..32, 0..16]), + (vec![0..16, 16..32], vec![]) + ); + assert_eq!( + consolidate_ranges(vec![16..32, 0..16, 0..32]), + (vec![0..32], vec![0..16, 16..32]) + ); + assert_eq!( + consolidate_ranges(vec![16..32, 0..16, 0..32, 48..64, 32..48, 50..60]), + (vec![0..32, 32..48, 48..64], vec![0..16, 16..32, 50..60]) + ); +} + +enum ShardNode { + Leaf(Shard), + Vertex(Arc, Arc), +} +impl std::fmt::Debug for ShardNode { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Leaf(sh) => write!(f, "Leaf({:?})", sh.span), + Self::Vertex(bottom, top) => { + write!(f, "Vetrex({:?}, {:?})", bottom.span, top.span) + } + } + } +} + +impl ShardNode { + fn span(&self) -> Range { + match self { + Self::Leaf(sh) => sh.span.clone(), + Self::Vertex(bottom, top) => bottom.span.start..top.span.end, + } + } + fn len(&self) -> usize { + self.span().len() + } +} + +pub(crate) struct ShardRouter { + span: Range, + config: Arc, + node: RwLock, + num_compactions: Arc, + num_splits: Arc, +} + +impl ShardRouter { + pub(crate) const END_OF_SHARDS: u32 = 1u32 << 16; + + pub(crate) fn new( + config: Arc, + num_compactions: Arc, + num_splits: Arc, + ) -> Result { + let mut shards = Self::load(&config)?; + if shards.is_empty() { + shards = Self::create_initial_shards(&config)?; + } + let root = Self::treeify(shards, num_compactions.clone(), num_splits.clone()); + Ok(Self { + span: root.span(), + config, + node: RwLock::new(root), + num_compactions, + num_splits, + }) + } + + fn load(config: &Arc) -> Result> { + let mut found_shards = vec![]; + for res in std::fs::read_dir(&config.dir_path)? { + let entry = res?; + let filename = entry.file_name(); + let Some(filename) = filename.to_str() else { + continue; + }; + let Ok(filetype) = entry.file_type() else { + continue; + }; + if !filetype.is_file() { + continue; + } + if filename.starts_with("compact_") + || filename.starts_with("bottom_") + || filename.starts_with("top_") + { + std::fs::remove_file(entry.path())?; + continue; + } else if !filename.starts_with("shard_") { + continue; + } + let Some((_, span)) = filename.split_once("_") else { + continue; + }; + let Some((start, end)) = span.split_once("-") else { + continue; + }; + let start = u32::from_str_radix(start, 16).expect(filename); + let end = u32::from_str_radix(end, 16).expect(filename); + + ensure!( + start < end && end <= Self::END_OF_SHARDS, + "Bad span for {filename}" + ); + + found_shards.push(start..end); + } + + let (shards_to_keep, shards_to_remove) = consolidate_ranges(found_shards); + for span in shards_to_remove { + std::fs::remove_file( + config + .dir_path + .join(format!("shard_{:04x}-{:04x}", span.start, span.end)), + )?; + } + + let mut shards = vec![]; + for span in shards_to_keep { + shards.push(Shard::open( + config + .dir_path + .join(format!("shard_{:04x}-{:04x}", span.start, span.end)), + span, + false, + config.clone(), + )?); + } + + Ok(shards) + } + + fn create_initial_shards(config: &Arc) -> Result> { + let step = (Self::END_OF_SHARDS as f64) + / (config.expected_number_of_keys as f64 / Shard::EXPECTED_CAPACITY as f64).max(1.0); + let step = 1 << (step as u32).ilog2(); + + let mut shards = vec![]; + let mut start = 0; + while start < Self::END_OF_SHARDS { + let end = start + step; + shards.push(Shard::open( + config + .dir_path + .join(format!("shard_{:04x}-{:04x}", start, end)), + start..end, + true, + config.clone(), + )?); + start = end; + } + + Ok(shards) + } + + fn from_shardnode( + n: ShardNode, + num_compactions: Arc, + num_splits: Arc, + ) -> Self { + let config = match n { + ShardNode::Leaf(ref sh) => sh.config.clone(), + ShardNode::Vertex(ref bottom, _) => bottom.config.clone(), + }; + Self { + config, + span: n.span(), + node: RwLock::new(n), + num_compactions, + num_splits, + } + } + + fn treeify( + shards: Vec, + num_compactions: Arc, + num_splits: Arc, + ) -> ShardNode { + let mut nodes = vec![]; + let mut spans_debug: Vec> = vec![]; + for sh in shards { + assert!( + spans_debug.is_empty() || spans_debug.last().unwrap().start != sh.span.start, + "two elements with the same start {spans_debug:?} {:?}", + sh.span + ); + spans_debug.push(sh.span.clone()); + nodes.push(ShardNode::Leaf(sh)); + } + assert!( + spans_debug.is_sorted_by(|a, b| a.start < b.start), + "not sorted {spans_debug:?}" + ); + + let mut unchanged_loops = 0; + let mut prev_len = nodes.len(); + while nodes.len() > 1 { + let mut i = 0; + while i < nodes.len() - 1 { + if nodes[i].span().end == nodes[i + 1].span().start + && nodes[i].len() == nodes[i + 1].len() + { + let n0 = nodes.remove(i); + let n1 = nodes.remove(i); + + nodes.insert( + i, + ShardNode::Vertex( + Arc::new(Self::from_shardnode( + n0, + num_compactions.clone(), + num_splits.clone(), + )), + Arc::new(Self::from_shardnode( + n1, + num_compactions.clone(), + num_splits.clone(), + )), + ), + ); + break; + } else { + i += 1; + } + } + if nodes.len() == prev_len { + unchanged_loops += 1; + } else { + unchanged_loops = 0; + } + if unchanged_loops > 2 { + panic!("store load: loop detected (len={prev_len}) {spans_debug:?} {nodes:?}"); + } + prev_len = nodes.len(); + } + + nodes.remove(0) + } + + pub(crate) fn shared_op( + &self, + shard_selector: u32, + func: impl FnOnce(&Shard) -> Result, + ) -> Result { + match &*self.node.read() { + ShardNode::Leaf(sh) => func(sh), + ShardNode::Vertex(bottom, top) => { + if shard_selector < bottom.span.end { + bottom.shared_op(shard_selector, func) + } else { + top.shared_op(shard_selector, func) + } + } + } + } + + pub(crate) fn clear(&self) -> Result<()> { + let mut guard = self.node.write(); + + for res in std::fs::read_dir(&self.config.dir_path)? { + let entry = res?; + let filename = entry.file_name(); + let Some(filename) = filename.to_str() else { + continue; + }; + let Ok(filetype) = entry.file_type() else { + continue; + }; + if !filetype.is_file() { + continue; + } + if filename.starts_with("shard_") + || filename.starts_with("compact_") + || filename.starts_with("bottom_") + || filename.starts_with("top_") + { + std::fs::remove_file(entry.path())?; + } + } + + let shards = Self::create_initial_shards(&self.config)?; + *guard = Self::treeify( + shards, + self.num_compactions.clone(), + self.num_splits.clone(), + ); + + Ok(()) + } + + pub(crate) fn call_on_all_shards( + &self, + mut func: impl FnMut(&Shard) -> Result + Copy, + ) -> Result> { + match &*self.node.read() { + ShardNode::Leaf(sh) => Ok(vec![func(sh)?]), + ShardNode::Vertex(bottom, top) => { + let mut v = bottom.call_on_all_shards(func)?; + v.extend(top.call_on_all_shards(func)?); + Ok(v) + } + } + } + + fn split_shard(&self) -> Result<()> { + let mut guard = self.node.write(); + let ShardNode::Leaf(sh) = &*guard else { + // already split + return Ok(()); + }; + let mid = (sh.span.start + sh.span.end) / 2; + + let bottomfile = self + .config + .dir_path + .join(format!("bottom_{:04x}-{:04x}", sh.span.start, mid)); + let topfile = self + .config + .dir_path + .join(format!("top_{:04x}-{:04x}", mid, sh.span.end)); + + let bottom_shard = Shard::open( + bottomfile.clone(), + sh.span.start..mid, + true, + self.config.clone(), + )?; + let top_shard = Shard::open(topfile.clone(), mid..sh.span.end, true, self.config.clone())?; + + sh.split_into(&bottom_shard, &top_shard)?; + + std::fs::rename( + bottomfile, + self.config.dir_path.join(format!( + "shard_{:04x}-{:04x}", + bottom_shard.span.start, bottom_shard.span.end + )), + )?; + std::fs::rename( + topfile, + self.config.dir_path.join(format!( + "shard_{:04x}-{:04x}", + top_shard.span.start, top_shard.span.end + )), + )?; + std::fs::remove_file( + self.config + .dir_path + .join(format!("shard_{:04x}-{:04x}", sh.span.start, sh.span.end)), + )?; + + self.num_splits.fetch_add(1, Ordering::SeqCst); + + *guard = ShardNode::Vertex( + Arc::new(ShardRouter { + span: bottom_shard.span.clone(), + config: self.config.clone(), + node: RwLock::new(ShardNode::Leaf(bottom_shard)), + num_compactions: self.num_compactions.clone(), + num_splits: self.num_splits.clone(), + }), + Arc::new(ShardRouter { + span: top_shard.span.clone(), + config: self.config.clone(), + node: RwLock::new(ShardNode::Leaf(top_shard)), + num_compactions: self.num_compactions.clone(), + num_splits: self.num_splits.clone(), + }), + ); + + Ok(()) + } + + fn compact_shard(&self, write_offset: u32) -> Result<()> { + let mut guard = self.node.write(); + let ShardNode::Leaf(sh) = &*guard else { + // was split + return Ok(()); + }; + if sh.get_write_offset() < write_offset { + // already compacted + return Ok(()); + }; + let orig_filename = self + .config + .dir_path + .join(format!("shard_{:04x}-{:04x}", sh.span.start, sh.span.end)); + let tmpfile = self + .config + .dir_path + .join(format!("compact_{:04x}-{:04x}", sh.span.start, sh.span.end)); + + let mut compacted_shard = + Shard::open(tmpfile.clone(), sh.span.clone(), true, self.config.clone())?; + + // XXX: this can be done in a background thread, holding a read lock until we're done, and then wrap it + // all up under a write lock + sh.compact_into(&mut compacted_shard)?; + + self.num_compactions.fetch_add(1, Ordering::SeqCst); + + std::fs::rename(tmpfile, orig_filename)?; + *guard = ShardNode::Leaf(compacted_shard); + + Ok(()) + } + + pub(crate) fn insert( + &self, + ph: PartedHash, + full_key: &[u8], + val: &[u8], + mode: InsertMode, + ) -> Result { + loop { + let res = match &*self.node.read() { + ShardNode::Leaf(sh) => sh.insert(ph, full_key, val, mode)?, + ShardNode::Vertex(bottom, top) => { + if (ph.shard_selector() as u32) < bottom.span.end { + bottom.insert(ph, full_key, val, mode)? + } else { + top.insert(ph, full_key, val, mode)? + } + } + }; + + match res { + InsertStatus::SplitNeeded => { + self.split_shard()?; + // retry + } + InsertStatus::CompactionNeeded(write_offset) => { + self.compact_shard(write_offset)?; + // retry + } + _ => { + return Ok(res); + } + } + } + } +} diff --git a/src/shard.rs b/src/shard.rs index 213871f..3e03ea0 100644 --- a/src/shard.rs +++ b/src/shard.rs @@ -15,11 +15,12 @@ use std::{ use memmap::{MmapMut, MmapOptions}; +use crate::Result; use crate::{ hashing::{PartedHash, INVALID_SIG}, + store::InternalConfig, SizeHistogram, }; -use crate::{Config, Result}; // // these numbers were chosen according to the simulation, as they allow for 90% utilization of the shard with @@ -224,7 +225,7 @@ impl Backpointer { pub(crate) struct Shard { pub(crate) span: Range, file: File, - config: Arc, + pub(crate) config: Arc, #[allow(dead_code)] mmap: MmapMut, // needed to prevent it from dropping header: &'static mut ShardHeader, @@ -244,7 +245,7 @@ impl Shard { filename: PathBuf, span: Range, truncate: bool, - config: Arc, + config: Arc, ) -> Result { let file = OpenOptions::new() .create(true) @@ -331,13 +332,13 @@ impl Shard { Ok(((key.len() as u64) << 48) | ((val.len() as u64) << 32) | write_offset) } - pub(crate) fn read_at(&self, row_idx: usize, entry_idx: usize) -> Option> { + pub(crate) fn read_at(&self, row_idx: usize, entry_idx: usize) -> Result> { let _guard = self.row_locks[row_idx].read(); let row = &self.header.rows.0[row_idx]; if row.signatures[entry_idx] != INVALID_SIG { - Some(self.read_kv(row.offsets_and_sizes[entry_idx])) + Ok(Some(self.read_kv(row.offsets_and_sizes[entry_idx])?)) } else { - None + Ok(None) } } diff --git a/src/store.rs b/src/store.rs index 50a678f..d14d645 100644 --- a/src/store.rs +++ b/src/store.rs @@ -1,9 +1,8 @@ -use anyhow::{anyhow, bail, Context}; +use anyhow::{anyhow, bail}; use fslock::LockFile; -use parking_lot::{Mutex, RwLock}; +use parking_lot::Mutex; use std::{ - collections::BTreeMap, - ops::{Bound, Range}, + ops::Range, path::{Path, PathBuf}, sync::{ atomic::{AtomicUsize, Ordering}, @@ -11,15 +10,15 @@ use std::{ }, }; +use crate::shard::{NUM_ROWS, ROW_WIDTH}; use crate::{ hashing::PartedHash, - shard::{KVPair, HEADER_SIZE}, + router::ShardRouter, + shard::{InsertMode, InsertStatus, KVPair, HEADER_SIZE}, + HashSeed, }; -use crate::{ - shard::{Shard, NUM_ROWS, ROW_WIDTH}, - CandyError, -}; -use crate::{Config, Result}; + +use crate::{CandyError, Config, Result, MAX_TOTAL_KEY_SIZE, MAX_VALUE_SIZE}; pub(crate) const USER_NAMESPACE: &[u8] = &[1]; pub(crate) const TYPED_NAMESPACE: &[u8] = &[2]; @@ -125,18 +124,6 @@ impl SizeHistogram { coarse } - fn accum(&mut self, hist: SizeHistogram) { - for (i, c) in hist.counts_64b.into_iter().enumerate() { - self.counts_64b[i] += c; - } - for (i, c) in hist.counts_1kb.into_iter().enumerate() { - self.counts_1kb[i] += c; - } - for (i, c) in hist.counts_16kb.into_iter().enumerate() { - self.counts_16kb[i] += c; - } - } - /// iterate over all non-empty buckets, and return their spans and counts pub fn iter<'a>(&'a self) -> impl Iterator, usize)> + 'a { self.counts_64b @@ -176,15 +163,80 @@ impl std::fmt::Display for SizeHistogram { } } +#[derive(Debug, Clone)] +pub(crate) struct InternalConfig { + pub dir_path: PathBuf, + pub max_shard_size: u32, + pub min_compaction_threashold: u32, + pub hash_seed: HashSeed, + pub expected_number_of_keys: usize, + pub max_concurrent_list_ops: u32, + pub truncate_up: bool, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum ReplaceStatus { + PrevValue(Vec), + WrongValue(Vec), + DoesNotExist, +} +impl ReplaceStatus { + pub fn was_replaced(&self) -> bool { + matches!(*self, Self::PrevValue(_)) + } + pub fn failed(&self) -> bool { + !matches!(*self, Self::PrevValue(_)) + } + pub fn is_key_missing(&self) -> bool { + matches!(*self, Self::DoesNotExist) + } + pub fn is_wrong_value(&self) -> bool { + matches!(*self, Self::WrongValue(_)) + } +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum SetStatus { + PrevValue(Vec), + CreatedNew, +} +impl SetStatus { + pub fn was_created(&self) -> bool { + matches!(*self, Self::CreatedNew) + } + pub fn was_replaced(&self) -> bool { + matches!(*self, Self::PrevValue(_)) + } +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum GetOrCreateStatus { + ExistingValue(Vec), + CreatedNew(Vec), +} +impl GetOrCreateStatus { + pub fn was_created(&self) -> bool { + matches!(*self, Self::CreatedNew(_)) + } + pub fn already_exists(&self) -> bool { + matches!(*self, Self::ExistingValue(_)) + } + pub fn value(self) -> Vec { + match self { + Self::CreatedNew(val) => val, + Self::ExistingValue(val) => val, + } + } +} + /// The CandyStore object. Note that it's fully sync'ed, so can be shared between threads using `Arc` pub struct CandyStore { - pub(crate) shards: RwLock>, - pub(crate) config: Arc, - pub(crate) dir_path: PathBuf, + pub(crate) root: ShardRouter, + pub(crate) config: Arc, // stats pub(crate) num_entries: AtomicUsize, - pub(crate) num_compactions: AtomicUsize, - pub(crate) num_splits: AtomicUsize, + pub(crate) num_compactions: Arc, + pub(crate) num_splits: Arc, // locks for complicated operations pub(crate) keyed_locks_mask: u32, pub(crate) keyed_locks: Vec>, @@ -195,18 +247,18 @@ pub struct CandyStore { /// but the results of the iteration may or may not include these changes. This is considered a /// well-defined behavior of the store. pub struct CandyStoreIterator<'a> { - db: &'a CandyStore, - shard_idx: u32, + store: &'a CandyStore, + shard_selector: u32, row_idx: usize, entry_idx: usize, raw: bool, } impl<'a> CandyStoreIterator<'a> { - fn new(db: &'a CandyStore, raw: bool) -> Self { + fn new(store: &'a CandyStore, raw: bool) -> Self { Self { - db, - shard_idx: 0, + store, + shard_selector: 0, row_idx: 0, entry_idx: 0, raw, @@ -216,16 +268,16 @@ impl<'a> CandyStoreIterator<'a> { /// Returns the cookie of the next item in the store. This can be used later to construct an iterator /// that starts at the given point. pub fn cookie(&self) -> u64 { - ((self.shard_idx as u64 & 0xffff) << 32) + ((self.shard_selector as u64 & 0xffff) << 32) | ((self.row_idx as u64 & 0xffff) << 16) | (self.entry_idx as u64 & 0xffff) } // Constructs an iterator starting at the given cookie - pub fn from_cookie(db: &'a CandyStore, cookie: u64, raw: bool) -> Self { + pub fn from_cookie(store: &'a CandyStore, cookie: u64, raw: bool) -> Self { Self { - db, - shard_idx: ((cookie >> 32) & 0xffff) as u32, + store, + shard_selector: ((cookie >> 32) & 0xffff) as u32, row_idx: ((cookie >> 16) & 0xffff) as usize, entry_idx: (cookie & 0xffff) as usize, raw, @@ -237,72 +289,70 @@ impl<'a> Iterator for CandyStoreIterator<'a> { type Item = Result; fn next(&mut self) -> Option { - let guard = self.db.shards.read(); - for (curr_shard_idx, shard) in guard.range(self.shard_idx..) { - self.shard_idx = *curr_shard_idx; - loop { - let kvres = shard.read_at(self.row_idx, self.entry_idx); - - // advance - let mut should_break = false; - self.entry_idx += 1; - if self.entry_idx >= ROW_WIDTH { - self.entry_idx = 0; - self.row_idx += 1; - if self.row_idx >= NUM_ROWS { - self.row_idx = 0; - self.shard_idx += 1; - should_break = true; + while self.shard_selector < ShardRouter::END_OF_SHARDS { + let res = self.store.root.shared_op(self.shard_selector, |sh| { + while self.row_idx < NUM_ROWS { + let row_idx = self.row_idx; + let entry_idx = self.entry_idx; + + self.entry_idx += 1; + if self.entry_idx >= ROW_WIDTH { + self.entry_idx = 0; + self.row_idx += 1; } - } - if let Some(res) = kvres { - match res { - Ok((mut k, v)) => { - if self.raw { - return Some(Ok((k, v))); - } - // filter anything other than USER_NAMESPACE - if k.ends_with(USER_NAMESPACE) { - k.truncate(k.len() - USER_NAMESPACE.len()); - return Some(Ok((k, v))); - } - } - Err(e) => { - return Some(Err(e)); - } + let Some((mut k, v)) = sh.read_at(row_idx, entry_idx)? else { + continue; + }; + if self.raw { + return Ok((sh.span.start, Some((k, v)))); + } else if k.ends_with(USER_NAMESPACE) { + k.truncate(k.len() - USER_NAMESPACE.len()); + return Ok((sh.span.start, Some((k, v)))); } } - if should_break { - break; + + self.entry_idx = 0; + self.row_idx = 0; + Ok((sh.span.end, None)) + }); + + match res { + Ok((shard_selector, kv)) => { + self.shard_selector = shard_selector; + if let Some(kv) = kv { + return Some(Ok(kv)); + } + // continue } + Err(e) => return Some(Err(e)), } } + None } } impl CandyStore { - const END_OF_SHARDS: u32 = 1u32 << 16; - /// Opens or creates a new CandyStore. /// * dir_path - the directory where shards will be kept /// * config - the configuration options for the store pub fn open(dir_path: impl AsRef, config: Config) -> Result { - let mut shards: BTreeMap = BTreeMap::new(); - let config = Arc::new(config); - let dir_path: PathBuf = dir_path.as_ref().into(); - - std::fs::create_dir_all(&dir_path)?; - let lockfilename = dir_path.join(".lock"); + let config = Arc::new(InternalConfig { + dir_path: dir_path.as_ref().to_path_buf(), + expected_number_of_keys: config.expected_number_of_keys, + hash_seed: config.hash_seed, + max_concurrent_list_ops: config.max_concurrent_list_ops, + max_shard_size: config.max_shard_size, + min_compaction_threashold: config.min_compaction_threashold, + truncate_up: config.truncate_up, + }); + + std::fs::create_dir_all(dir_path)?; + let lockfilename = config.dir_path.join(".lock"); let mut lockfile = fslock::LockFile::open(&lockfilename)?; if !lockfile.try_lock_with_pid()? { - bail!("Failed to lock {lockfilename:?}"); - } - - Self::load_existing_dir(&dir_path, &config, &mut shards)?; - if shards.is_empty() { - Self::create_first_shards(&dir_path, &config, &mut shards)?; + bail!("Lock file {lockfilename:?} is used by another process"); } let mut num_keyed_locks = config.max_concurrent_list_ops.max(4); @@ -315,180 +365,38 @@ impl CandyStore { keyed_locks.push(Mutex::new(())); } + let num_compactions = Arc::new(AtomicUsize::new(0)); + let num_splits = Arc::new(AtomicUsize::new(0)); + + let root = ShardRouter::new(config.clone(), num_compactions.clone(), num_splits.clone())?; + Ok(Self { config, - dir_path, - shards: RwLock::new(shards), + root, num_entries: Default::default(), - num_compactions: Default::default(), - num_splits: Default::default(), + num_compactions, + num_splits, keyed_locks_mask: num_keyed_locks - 1, keyed_locks, _lockfile: lockfile, }) } - fn load_existing_dir( - dir_path: &PathBuf, - config: &Arc, - shards: &mut BTreeMap, - ) -> Result<()> { - for res in std::fs::read_dir(&dir_path)? { - let entry = res?; - let filename = entry.file_name(); - let Some(filename) = filename.to_str() else { - continue; - }; - let Ok(filetype) = entry.file_type() else { - continue; - }; - if !filetype.is_file() { - continue; - } - if filename.starts_with("compact_") - || filename.starts_with("bottom_") - || filename.starts_with("top_") - { - std::fs::remove_file(entry.path())?; - continue; - } else if !filename.starts_with("shard_") { - continue; - } - let Some((_, span)) = filename.split_once("_") else { - continue; - }; - let Some((start, end)) = span.split_once("-") else { - continue; - }; - let start = u32::from_str_radix(start, 16).expect(filename); - let end = u32::from_str_radix(end, 16).expect(filename); - - if start > end || end > Self::END_OF_SHARDS { - return Err(anyhow!(CandyError::LoadingFailed(format!( - "Bad span for {filename}" - )))); - } - - if let Some(existing) = shards.get(&end) { - // this means we hit an uncompleted split - we need to take the wider of the two shards - // and delete the narrower one - if existing.span.start < start { - // keep existing, remove this one - std::fs::remove_file(entry.path())?; - continue; - } else { - // remove existing one - std::fs::remove_file(dir_path.join(format!( - "shard_{:04x}-{:04x}", - existing.span.start, existing.span.end - )))?; - } - } - shards.insert( - end, - Shard::open(entry.path(), start..end, false, config.clone())?, - ); - } - - // remove any split midpoints. we may come across one of [start..mid), [mid..end), [start..end) - // the case of [mid..end) and [start..end) (same end) is already handled above. so we need to - // detect two shards where first.start == second.start and remove the shorter one - let mut spans = vec![]; - for (end, shard) in shards.range(0..) { - spans.push((shard.span.start, *end)); - } - let mut to_remove = vec![]; - for (i, (start, end)) in spans.iter().enumerate() { - if i < spans.len() - 1 { - let (next_start, next_end) = spans[i + 1]; - if *start == next_start { - if next_end <= *end { - return Err(anyhow!(CandyError::LoadingFailed(format!( - "Removing in-progress split with start={} end={} next_start={} next_end={}", - *start, *end, next_start, next_end - )))); - } - - to_remove.push((*start, *end)) - } - } - } - for (start, end) in to_remove { - let bottomfile = dir_path.join(format!("shard_{start:04x}-{end:04x}")); - std::fs::remove_file(bottomfile)?; - shards.remove(&end); - } - - Ok(()) - } - - fn create_first_shards( - dir_path: &PathBuf, - config: &Arc, - shards: &mut BTreeMap, - ) -> Result<()> { - let step = (Self::END_OF_SHARDS as f64) - / (config.expected_number_of_keys as f64 / Shard::EXPECTED_CAPACITY as f64).max(1.0); - let step = 1 << (step as u32).ilog2(); - - let mut start = 0; - while start < Self::END_OF_SHARDS { - let end = start + step; - let shard = Shard::open( - dir_path.join(format!("shard_{:04x}-{:04x}", start, end)), - 0..Self::END_OF_SHARDS, - false, - config.clone(), - )?; - shards.insert(end, shard); - start = end; - } - - Ok(()) - } - /// Syncs all in-memory changes of all shards to disk. Concurrent changes are allowed while /// flushing, and may result in partially-sync'ed store. Use sparingly, as this is a costly operaton. pub fn flush(&self) -> Result<()> { - let guard = self.shards.read(); - for (_, shard) in guard.iter() { - shard.flush()?; - } + self.root.call_on_all_shards(|sh| sh.flush())?; Ok(()) } /// Clears the store (erasing all keys), and removing all shard files pub fn clear(&self) -> Result<()> { - let mut guard = self.shards.write(); - - for res in std::fs::read_dir(&self.dir_path)? { - let entry = res?; - let filename = entry.file_name(); - let Some(filename) = filename.to_str() else { - continue; - }; - let Ok(filetype) = entry.file_type() else { - continue; - }; - if !filetype.is_file() { - continue; - } - if filename.starts_with("shard_") - || filename.starts_with("compact_") - || filename.starts_with("bottom_") - || filename.starts_with("top_") - { - std::fs::remove_file(entry.path())?; - } - } + self.root.clear()?; self.num_entries.store(0, Ordering::Relaxed); self.num_compactions.store(0, Ordering::Relaxed); self.num_splits.store(0, Ordering::Relaxed); - guard.clear(); - Self::create_first_shards(&self.dir_path, &self.config, &mut guard)?; - Ok(()) } @@ -499,24 +407,14 @@ impl CandyStore { pub(crate) fn get_by_hash(&self, ph: PartedHash) -> Result> { debug_assert!(ph.is_valid()); - self.shards - .read() - .lower_bound(Bound::Excluded(&(ph.shard_selector() as u32))) - .peek_next() - .with_context(|| format!("missing shard for 0x{:04x}", ph.shard_selector()))? - .1 - .get_by_hash(ph) + self.root + .shared_op(ph.shard_selector() as u32, |sh| sh.get_by_hash(ph)) } pub(crate) fn get_raw(&self, full_key: &[u8]) -> Result>> { let ph = PartedHash::new(&self.config.hash_seed, full_key); - self.shards - .read() - .lower_bound(Bound::Excluded(&(ph.shard_selector() as u32))) - .peek_next() - .with_context(|| format!("missing shard for 0x{:04x}", ph.shard_selector()))? - .1 - .get(ph, &full_key) + self.root + .shared_op(ph.shard_selector() as u32, |sh| sh.get(ph, &full_key)) } /// Gets the value of a key from the store. If the key does not exist, `None` will be returned. @@ -544,13 +442,8 @@ impl CandyStore { let ph = PartedHash::new(&self.config.hash_seed, full_key); let val = self - .shards - .read() - .lower_bound(Bound::Excluded(&(ph.shard_selector() as u32))) - .peek_next() - .with_context(|| format!("missing shard for 0x{:04x}", ph.shard_selector()))? - .1 - .remove(ph, &full_key)?; + .root + .shared_op(ph.shard_selector() as u32, |sh| sh.remove(ph, &full_key))?; if val.is_some() { self.num_entries.fetch_sub(1, Ordering::SeqCst); } @@ -568,6 +461,161 @@ impl CandyStore { self.remove_raw(&self.make_user_key(key)) } + pub(crate) fn insert_internal( + &self, + full_key: &[u8], + val: &[u8], + mode: InsertMode, + ) -> Result { + let ph = PartedHash::new(&self.config.hash_seed, full_key); + + if full_key.len() > MAX_TOTAL_KEY_SIZE as usize { + return Err(anyhow!(CandyError::KeyTooLong(full_key.len()))); + } + if val.len() > MAX_VALUE_SIZE as usize { + return Err(anyhow!(CandyError::ValueTooLong(val.len()))); + } + if full_key.len() + val.len() > self.config.max_shard_size as usize { + return Err(anyhow!(CandyError::EntryCannotFitInShard( + full_key.len() + val.len(), + self.config.max_shard_size as usize + ))); + } + + loop { + let status = self.root.insert(ph, full_key, val, mode)?; + + match status { + InsertStatus::Added => { + self.num_entries.fetch_add(1, Ordering::SeqCst); + return Ok(status); + } + InsertStatus::KeyDoesNotExist + | InsertStatus::Replaced(_) + | InsertStatus::AlreadyExists(_) => { + return Ok(status); + } + InsertStatus::CompactionNeeded(_) | InsertStatus::SplitNeeded => { + unreachable!(); + } + } + } + } + + pub(crate) fn set_raw(&self, full_key: &[u8], val: &[u8]) -> Result { + match self.insert_internal(full_key, val, InsertMode::Set)? { + InsertStatus::Added => Ok(SetStatus::CreatedNew), + InsertStatus::Replaced(v) => Ok(SetStatus::PrevValue(v)), + InsertStatus::AlreadyExists(v) => Ok(SetStatus::PrevValue(v)), + InsertStatus::KeyDoesNotExist => unreachable!(), + InsertStatus::CompactionNeeded(_) => unreachable!(), + InsertStatus::SplitNeeded => unreachable!(), + } + } + + /// Inserts a key-value pair, creating it or replacing an existing pair. Note that if the program crashed + /// while or "right after" this operation, or if the operating system is unable to flush the page cache, + /// you may lose some data. However, you will still be in a consistent state, where you will get a previous + /// version of the state. + /// + /// While this method is O(1) amortized, every so often it will trigger either a shard compaction or a + /// shard split, which requires rewriting the whole shard. However, unlike LSM trees, this operation is + /// constant in size + pub fn set + ?Sized, B2: AsRef<[u8]> + ?Sized>( + &self, + key: &B1, + val: &B2, + ) -> Result { + self.owned_set(key.as_ref().to_owned(), val.as_ref()) + } + + /// Same as [Self::set], but the key passed owned to this function + pub fn owned_set(&self, key: Vec, val: &[u8]) -> Result { + self.set_raw(&self.make_user_key(key), val) + } + + pub(crate) fn replace_raw( + &self, + full_key: &[u8], + val: &[u8], + expected_val: Option<&[u8]>, + ) -> Result { + match self.insert_internal(full_key, val, InsertMode::Replace(expected_val))? { + InsertStatus::Added => unreachable!(), + InsertStatus::Replaced(v) => Ok(ReplaceStatus::PrevValue(v)), + InsertStatus::AlreadyExists(v) => Ok(ReplaceStatus::WrongValue(v)), + InsertStatus::KeyDoesNotExist => Ok(ReplaceStatus::DoesNotExist), + InsertStatus::CompactionNeeded(_) => unreachable!(), + InsertStatus::SplitNeeded => unreachable!(), + } + } + + /// Replaces the value of an existing key with a new value. If the key existed, returns + /// `PrevValue(value)` with its old value, and if it did not, returns `DoesNotExist` but + /// does not create the key. + /// + /// See [Self::set] for more details + pub fn replace + ?Sized, B2: AsRef<[u8]> + ?Sized>( + &self, + key: &B1, + val: &B2, + expected_val: Option<&B2>, + ) -> Result { + self.owned_replace( + key.as_ref().to_owned(), + val.as_ref(), + expected_val.map(|ev| ev.as_ref()), + ) + } + + /// Same as [Self::replace], but the key passed owned to this function + pub fn owned_replace( + &self, + key: Vec, + val: &[u8], + expected_val: Option<&[u8]>, + ) -> Result { + self.replace_raw(&self.make_user_key(key), val.as_ref(), expected_val) + } + + pub(crate) fn get_or_create_raw( + &self, + full_key: &[u8], + default_val: Vec, + ) -> Result { + match self.insert_internal(full_key, &default_val, InsertMode::GetOrCreate)? { + InsertStatus::Added => Ok(GetOrCreateStatus::CreatedNew(default_val)), + InsertStatus::AlreadyExists(v) => Ok(GetOrCreateStatus::ExistingValue(v)), + InsertStatus::Replaced(_) => unreachable!(), + InsertStatus::KeyDoesNotExist => unreachable!(), + InsertStatus::CompactionNeeded(_) => unreachable!(), + InsertStatus::SplitNeeded => unreachable!(), + } + } + + /// Gets the value of the given key or creates it with the given default value. If the key did not exist, + /// returns `CreatedNew(default_val)`, and if it did, returns `ExistingValue(value)`. + /// This is done atomically, so it can be used to create a key only if it did not exist before, + /// like `open` with `O_EXCL`. + /// + /// See [Self::set] for more details + pub fn get_or_create + ?Sized, B2: AsRef<[u8]> + ?Sized>( + &self, + key: &B1, + default_val: &B2, + ) -> Result { + self.owned_get_or_create(key.as_ref().to_owned(), default_val.as_ref().to_owned()) + } + + /// Same as [Self::get_or_create], but the `key` and `default_val` are passed owned to this function + pub fn owned_get_or_create( + &self, + key: Vec, + default_val: Vec, + ) -> Result { + self.get_or_create_raw(&self.make_user_key(key), default_val) + } + /// Ephemeral stats: number of inserts pub fn _num_entries(&self) -> usize { self.num_entries.load(Ordering::Acquire) @@ -583,13 +631,13 @@ impl CandyStore { /// Returns useful stats about the store pub fn stats(&self) -> Stats { - let guard = self.shards.read(); - let mut stats = Stats { - num_shards: guard.len(), - ..Default::default() - }; - for (_, shard) in guard.iter() { - let (num_inserted, num_removed, used_bytes, wasted_bytes) = shard.get_stats(); + let stats_vec = self + .root + .call_on_all_shards(|sh| Ok(sh.get_stats())) + .unwrap(); + let mut stats = Stats::default(); + for (num_inserted, num_removed, used_bytes, wasted_bytes) in stats_vec { + stats.num_shards += 1; stats.num_inserted += num_inserted; stats.num_removed += num_removed; stats.used_bytes += used_bytes; @@ -599,10 +647,22 @@ impl CandyStore { } pub fn size_histogram(&self) -> SizeHistogram { - let guard = self.shards.read(); let mut hist = SizeHistogram::default(); - for (_, shard) in guard.iter() { - hist.accum(shard.get_size_histogram()) + let hist_vec = self + .root + .call_on_all_shards(|sh| Ok(sh.get_size_histogram())) + .unwrap(); + + for v in hist_vec { + for (i, c) in v.counts_64b.into_iter().enumerate() { + hist.counts_64b[i] += c; + } + for (i, c) in v.counts_1kb.into_iter().enumerate() { + hist.counts_1kb[i] += c; + } + for (i, c) in v.counts_16kb.into_iter().enumerate() { + hist.counts_16kb[i] += c; + } } hist } diff --git a/src/typed.rs b/src/typed.rs index cf5eff3..a3eb24a 100644 --- a/src/typed.rs +++ b/src/typed.rs @@ -2,8 +2,7 @@ use anyhow::anyhow; use std::{borrow::Borrow, marker::PhantomData, sync::Arc}; use crate::{ - insertion::{ReplaceStatus, SetStatus}, - store::TYPED_NAMESPACE, + store::{ReplaceStatus, SetStatus, TYPED_NAMESPACE}, CandyStore, ListCompactionParams, };