diff --git a/src/collections.rs b/src/collections.rs new file mode 100644 index 0000000..40c2c76 --- /dev/null +++ b/src/collections.rs @@ -0,0 +1,280 @@ +use simd_itertools::PositionSimd; + +use crate::{ + hashing::{PartedHash, INVALID_SIG}, + shard::{KVPair, Shard}, + store::{COLL_NAMESPACE, ITEM_NAMESPACE}, + GetOrCreateStatus, ReplaceStatus, Result, SetStatus, VickyStore, +}; + +enum SetCollStatus { + Added, + BlockFull, + BlockMissing, +} + +const NUM_HASHES_IN_BLOCK: usize = 512; +const COLLECTION_BLOCK: &[u8] = &[0u8; NUM_HASHES_IN_BLOCK * PartedHash::LEN]; + +pub struct CollectionIterator<'a> { + store: &'a VickyStore, + suffix: [u8; PartedHash::LEN + ITEM_NAMESPACE.len()], + block_idx: u32, + coll_key: Vec, + curr_buf: Option>, + entry_idx: usize, +} + +impl<'a> Iterator for CollectionIterator<'a> { + type Item = Result; + fn next(&mut self) -> Option { + loop { + if self.curr_buf.is_none() { + self.curr_buf = match self.store.get_raw(&self.coll_key) { + Err(e) => return Some(Err(e)), + Ok(buf) => buf, + } + } + let Some(ref curr_buf) = self.curr_buf else { + return None; + }; + + let entries = unsafe { + std::slice::from_raw_parts(curr_buf.as_ptr() as *const u64, NUM_HASHES_IN_BLOCK) + }; + while self.entry_idx < NUM_HASHES_IN_BLOCK { + let item_ph = PartedHash::from_u64(entries[self.entry_idx]); + self.entry_idx += 1; + if item_ph.signature() == INVALID_SIG { + break; + } + + for res in self.store.get_by_hash(item_ph) { + let (mut k, v) = match res { + Err(e) => return Some(Err(e)), + Ok(kv) => kv, + }; + if k.ends_with(&self.suffix) { + k.truncate(k.len() - self.suffix.len()); + return Some(Ok((k, v))); + } + } + } + + // move to next block + self.entry_idx = 0; + self.curr_buf = None; + self.block_idx += 1; + let block_idx_offset = self.coll_key.len() - ITEM_NAMESPACE.len() - size_of::(); + self.coll_key[block_idx_offset..block_idx_offset + size_of::()] + .copy_from_slice(&self.block_idx.to_le_bytes()); + } + } +} + +// XXX: +// * hold number of added entries, so we could start at the right block +// * add number removed entries, and trigger compaction when this number gets to 0.5 of added entries +// * maybe find a way to store these counters in an mmap? +// * think of a way to create virtual-shards (same algorithm but use an underlying store instead of a file) + +impl VickyStore { + fn make_coll_key(&self, coll_key: &[u8]) -> (PartedHash, Vec) { + let mut full_key = coll_key.to_owned(); + full_key.extend_from_slice(&0u32.to_le_bytes()); + full_key.extend_from_slice(COLL_NAMESPACE); + (PartedHash::new(&self.config.hash_seed, &full_key), full_key) + } + + fn make_item_key(&self, coll_ph: PartedHash, item_key: &[u8]) -> (PartedHash, Vec) { + let mut full_key = item_key.to_owned(); + full_key.extend_from_slice(&coll_ph.to_bytes()); + full_key.extend_from_slice(ITEM_NAMESPACE); + (PartedHash::new(&self.config.hash_seed, &full_key), full_key) + } + + fn make_item_suffix( + &self, + coll_ph: PartedHash, + ) -> [u8; PartedHash::LEN + ITEM_NAMESPACE.len()] { + let mut suffix = [0u8; PartedHash::LEN + ITEM_NAMESPACE.len()]; + suffix[..PartedHash::LEN].copy_from_slice(&coll_ph.to_bytes()); + suffix[PartedHash::LEN..].copy_from_slice(ITEM_NAMESPACE); + suffix + } + + fn _add_to_collection(&self, mut coll_key: Vec, item_ph: PartedHash) -> Result<()> { + let block_idx_offset = coll_key.len() - (size_of::() + ITEM_NAMESPACE.len()); + let mut block_idx = 0u32; + loop { + coll_key[block_idx_offset..block_idx_offset + size_of::()] + .copy_from_slice(&block_idx.to_le_bytes()); + + let status = self.operate_on_key_mut(&coll_key, |shard, row, _, idx_kv| { + if let Some((row_idx, _, v)) = idx_kv { + assert_eq!(v.len(), COLLECTION_BLOCK.len()); + let entries = unsafe { + std::slice::from_raw_parts(v.as_ptr() as *const u64, NUM_HASHES_IN_BLOCK) + }; + if let Some(free_idx) = entries.iter().position_simd(0u64) { + let (klen, vlen, offset) = + Shard::extract_offset_and_size(row.offsets_and_sizes[row_idx]); + assert!(free_idx * PartedHash::LEN < vlen, "free_idx={free_idx}"); + shard.write_raw( + &item_ph.to_bytes(), + offset + klen as u64 + (free_idx * PartedHash::LEN) as u64, + )?; + Ok(SetCollStatus::Added) + } else { + Ok(SetCollStatus::BlockFull) + } + } else { + Ok(SetCollStatus::BlockMissing) + } + })?; + + match status { + SetCollStatus::Added => { + break; + } + SetCollStatus::BlockFull => { + block_idx += 1; + } + SetCollStatus::BlockMissing => { + self.get_or_create_raw(&coll_key, COLLECTION_BLOCK)?; + } + } + } + + Ok(()) + } + + pub fn set_in_collection< + B1: AsRef<[u8]> + ?Sized, + B2: AsRef<[u8]> + ?Sized, + B3: AsRef<[u8]> + ?Sized, + >( + &self, + coll_key: &B1, + item_key: &B2, + val: &B3, + ) -> Result { + let (coll_ph, coll_key) = self.make_coll_key(coll_key.as_ref()); + let (item_ph, item_key) = self.make_item_key(coll_ph, item_key.as_ref()); + + let res = self.set_raw(&item_key, val.as_ref())?; + if res.was_created() { + self._add_to_collection(coll_key, item_ph)?; + } + Ok(res) + } + + pub fn replace_in_collection< + B1: AsRef<[u8]> + ?Sized, + B2: AsRef<[u8]> + ?Sized, + B3: AsRef<[u8]> + ?Sized, + >( + &self, + coll_key: &B1, + item_key: &B2, + val: &B3, + ) -> Result { + let (coll_ph, _) = self.make_coll_key(coll_key.as_ref()); + let (_, item_key) = self.make_item_key(coll_ph, item_key.as_ref()); + + self.replace_raw(&item_key, val.as_ref()) + } + + pub fn get_or_create_in_collection< + B1: AsRef<[u8]> + ?Sized, + B2: AsRef<[u8]> + ?Sized, + B3: AsRef<[u8]> + ?Sized, + >( + &self, + coll_key: &B1, + item_key: &B2, + default_val: &B3, + ) -> Result { + let (coll_ph, coll_key) = self.make_coll_key(coll_key.as_ref()); + let (item_ph, item_key) = self.make_item_key(coll_ph, item_key.as_ref()); + + let res = self.get_or_create_raw(&item_key, default_val.as_ref())?; + if res.was_created() { + self._add_to_collection(coll_key, item_ph)?; + } + Ok(res) + } + + pub fn get_from_collection + ?Sized, B2: AsRef<[u8]> + ?Sized>( + &self, + coll_key: &B1, + item_key: &B2, + ) -> Result>> { + let (coll_ph, _) = self.make_coll_key(coll_key.as_ref()); + let (_, item_key) = self.make_item_key(coll_ph, item_key.as_ref()); + self.get_raw(&item_key) + } + + pub fn remove_from_collection + ?Sized, B2: AsRef<[u8]> + ?Sized>( + &self, + coll_key: &B1, + item_key: &B2, + ) -> Result>> { + let (coll_ph, mut coll_key) = self.make_coll_key(coll_key.as_ref()); + let (item_ph, item_key) = self.make_item_key(coll_ph, item_key.as_ref()); + + let Some(res) = self.remove_raw(&item_key)? else { + return Ok(None); + }; + + let block_idx_offset = coll_key.len() - ITEM_NAMESPACE.len() - size_of::(); + for block_idx in 0u32.. { + coll_key[block_idx_offset..block_idx_offset + size_of::()] + .copy_from_slice(&block_idx.to_le_bytes()); + + let found = self.operate_on_key_mut(&coll_key, |shard, row, _, idx_kv| { + let Some((row_idx, _, v)) = idx_kv else { + // block does not exist - end of chain + return Ok(true); + }; + let entries = unsafe { + std::slice::from_raw_parts(v.as_ptr() as *const u64, NUM_HASHES_IN_BLOCK) + }; + if let Some(item_idx) = entries.iter().position_simd(item_ph.as_u64()) { + let (klen, vlen, offset) = + Shard::extract_offset_and_size(row.offsets_and_sizes[row_idx]); + assert!(item_idx * PartedHash::LEN < vlen); + shard.write_raw( + &[0u8; PartedHash::LEN], + offset + klen as u64 + (item_idx * PartedHash::LEN) as u64, + )?; + Ok(true) + } else { + // try next block + Ok(false) + } + })?; + if found { + break; + } + } + + Ok(Some(res)) + } + + pub fn iter_collection<'a, B: AsRef<[u8]> + ?Sized>( + &'a self, + coll_key: &B, + ) -> CollectionIterator<'a> { + let (coll_ph, coll_key) = self.make_coll_key(coll_key.as_ref()); + + CollectionIterator { + coll_key, + block_idx: 0, + suffix: self.make_item_suffix(coll_ph), + curr_buf: None, + store: &self, + entry_idx: 0, + } + } +} diff --git a/src/hashing.rs b/src/hashing.rs index 0994c9a..e6f0820 100644 --- a/src/hashing.rs +++ b/src/hashing.rs @@ -23,76 +23,72 @@ impl HashSeed { } #[derive(Debug, Default, Clone, Copy, PartialEq, Eq)] -pub(crate) struct PartedHash { - pub shard_selector: u16, - pub row_selector: u16, - pub signature: u32, -} +pub(crate) struct PartedHash(u64); pub(crate) const INVALID_SIG: u32 = 0; impl PartedHash { - #[allow(dead_code)] pub const LEN: usize = size_of::(); pub fn new(seed: &HashSeed, buf: &[u8]) -> Self { Self::from_hash(SipHasher24::new_with_key(&seed.0).hash(buf)) } + #[inline] + pub fn shard_selector(&self) -> u16 { + (self.0 >> 48) as u16 + } + #[inline] + pub fn row_selector(&self) -> u16 { + (self.0 >> 32) as u16 + } + #[inline] + pub fn signature(&self) -> u32 { + self.0 as u32 + } + fn from_hash(h: Hash128) -> Self { - let mut signature = h.h1 as u32; - if signature == INVALID_SIG { - signature = h.h2 as u32; - if signature == INVALID_SIG { - signature = (h.h2 >> 32) as u32; - if signature == INVALID_SIG { - signature = 0x6052_c9b7; // this is so unlikely that it doesn't really matter + let mut sig = h.h1 as u32; + if sig == INVALID_SIG { + sig = h.h2 as u32; + if sig == INVALID_SIG { + sig = (h.h2 >> 32) as u32; + if sig == INVALID_SIG { + sig = 0x6052_c9b7; // this is so unlikely that it doesn't really matter } } } - Self { - shard_selector: (h.h1 >> 48) as u16, - row_selector: (h.h1 >> 32) as u16, - signature, - } + let shard = h.h1 & 0xffff_0000_0000_0000; + let row = h.h1 & 0x0000_ffff_0000_0000; + Self(shard | row | sig as u64) } - #[allow(dead_code)] - pub fn to_u64(&self) -> u64 { - ((self.shard_selector as u64) << 48) - | ((self.row_selector as u64) << 32) - | (self.signature as u64) - } - #[allow(dead_code)] pub fn to_bytes(&self) -> [u8; Self::LEN] { - self.to_u64().to_le_bytes() + self.0.to_le_bytes() } - #[allow(dead_code)] - pub fn from_u64(val: u64) -> Self { - Self { - shard_selector: (val >> 48) as u16, - row_selector: (val >> 32) as u16, - signature: val as u32, - } + pub fn as_u64(&self) -> u64 { + self.0 } - #[allow(dead_code)] - pub fn from_bytes(b: &[u8]) -> Self { - assert_eq!(b.len(), Self::LEN); - let buf: [u8; 8] = [b[0], b[1], b[2], b[3], b[4], b[5], b[6], b[7]]; - Self::from_u64(u64::from_le_bytes(buf)) + pub fn from_u64(val: u64) -> Self { + Self(val) } } #[test] fn test_parted_hash() -> Result<()> { - HashSeed::new("1234").expect_err("shouldn't work"); HashSeed::new("12341234123412341").expect_err("shouldn't work"); let seed = HashSeed::new("aaaabbbbccccdddd")?; + let h1 = PartedHash::new(&seed, b"hello world"); + assert_eq!(h1.as_u64(), 13445180190757400308,); + let h2 = PartedHash::from_u64(13445180190757400308); + assert_eq!(PartedHash::new(&seed, b"hello world"), h2); + + let h3 = PartedHash::from_u64(0x1020304050607080); assert_eq!( - PartedHash::new(&seed, b"hello world").to_u64(), - 13445180190757400308, + h3.to_bytes(), + [0x80, 0x70, 0x60, 0x50, 0x40, 0x30, 0x20, 0x10] ); Ok(()) diff --git a/src/insertion.rs b/src/insertion.rs index 04371af..2d445a1 100644 --- a/src/insertion.rs +++ b/src/insertion.rs @@ -130,7 +130,7 @@ impl VickyStore { let (k, v) = res?; let ph = PartedHash::new(&self.config.hash_seed, &k); - let status = if (ph.shard_selector as u32) < midpoint { + let status = if (ph.shard_selector() as u32) < midpoint { bottom_shard.insert(ph, &k, &v, InsertMode::Set)? } else { top_shard.insert(ph, &k, &v, InsertMode::Set)? @@ -172,7 +172,7 @@ impl VickyStore { mode: InsertMode, ) -> Result<(InsertStatus, u32, u32)> { let guard = self.shards.read().unwrap(); - let cursor = guard.lower_bound(Bound::Excluded(&(ph.shard_selector as u32))); + let cursor = guard.lower_bound(Bound::Excluded(&(ph.shard_selector() as u32))); let shard_start = cursor .peek_prev() .map(|(&shard_start, _)| shard_start) diff --git a/src/lib.rs b/src/lib.rs index 987819b..c634380 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,5 +1,6 @@ #![feature(btree_cursors)] +mod collections; mod hashing; mod insertion; mod shard; diff --git a/src/shard.rs b/src/shard.rs index 30f7da4..9850846 100644 --- a/src/shard.rs +++ b/src/shard.rs @@ -226,14 +226,14 @@ impl Shard { } pub(crate) fn iter_by_hash<'a>(&'a self, ph: PartedHash) -> ByHashIterator<'a> { - let row_idx = (ph.row_selector as usize) % NUM_ROWS; + let row_idx = (ph.row_selector() as usize) % NUM_ROWS; let guard = self.row_locks[row_idx].read().unwrap(); let row = &self.header.rows.0[row_idx]; ByHashIterator { shard: &self, _guard: guard, row, - signature: ph.signature, + signature: ph.signature(), start_idx: 0, } } @@ -257,7 +257,7 @@ impl Shard { mode: InsertMode, ) -> Result { let mut start = 0; - while let Some(idx) = row.signatures[start..].iter().position_simd(ph.signature) { + while let Some(idx) = row.signatures[start..].iter().position_simd(ph.signature()) { let (k, v) = self.read_kv(row.offsets_and_sizes[idx])?; if key == k { match mode { @@ -283,7 +283,7 @@ impl Shard { } fn get_row_mut(&self, ph: PartedHash) -> (RwLockWriteGuard<()>, &mut ShardRow) { - let row_idx = (ph.row_selector as usize) % NUM_ROWS; + let row_idx = (ph.row_selector() as usize) % NUM_ROWS; let guard = self.row_locks[row_idx].write().unwrap(); // this is safe because we hold a write lock on the row. the row sits in an mmap, so it can't be // owned by the lock itself @@ -329,7 +329,7 @@ impl Shard { // we don't want a reorder to happen here - first write the offset, then the signature row.offsets_and_sizes[idx] = new_off; std::sync::atomic::fence(Ordering::SeqCst); - row.signatures[idx] = ph.signature; + row.signatures[idx] = ph.signature(); self.header.num_inserted.fetch_add(1, Ordering::SeqCst); Ok(InsertStatus::Added) } else { @@ -369,7 +369,7 @@ impl Shard { let (_guard, row) = self.get_row_mut(ph); let mut start = 0; - while let Some(idx) = row.signatures[start..].iter().position_simd(ph.signature) { + while let Some(idx) = row.signatures[start..].iter().position_simd(ph.signature()) { let (k, v) = self.read_kv(row.offsets_and_sizes[idx])?; if key == k { return func(&self, row, ph, Some((idx, k, v))); diff --git a/src/store.rs b/src/store.rs index 2e6e536..3a2470c 100644 --- a/src/store.rs +++ b/src/store.rs @@ -14,6 +14,8 @@ use crate::{Config, Result}; pub(crate) const USER_NAMESPACE: &[u8] = &[1]; pub(crate) const TYPED_NAMESPACE: &[u8] = &[2]; +pub(crate) const COLL_NAMESPACE: &[u8] = &[3]; +pub(crate) const ITEM_NAMESPACE: &[u8] = &[4]; /// Stats from VickyStore, mainly useful for debugging #[derive(Debug, PartialEq, Eq, Clone)] @@ -306,12 +308,11 @@ impl VickyStore { full_key } - #[allow(dead_code)] pub(crate) fn get_by_hash(&self, ph: PartedHash) -> Vec> { self.shards .read() .unwrap() - .lower_bound(Bound::Excluded(&(ph.shard_selector as u32))) + .lower_bound(Bound::Excluded(&(ph.shard_selector() as u32))) .peek_next() .unwrap() .1 @@ -333,7 +334,7 @@ impl VickyStore { self.shards .read() .unwrap() - .lower_bound(Bound::Excluded(&(ph.shard_selector as u32))) + .lower_bound(Bound::Excluded(&(ph.shard_selector() as u32))) .peek_next() .unwrap() .1 @@ -345,7 +346,7 @@ impl VickyStore { self.shards .read() .unwrap() - .lower_bound(Bound::Excluded(&(ph.shard_selector as u32))) + .lower_bound(Bound::Excluded(&(ph.shard_selector() as u32))) .peek_next() .unwrap() .1 @@ -370,7 +371,7 @@ impl VickyStore { .shards .read() .unwrap() - .lower_bound(Bound::Excluded(&(ph.shard_selector as u32))) + .lower_bound(Bound::Excluded(&(ph.shard_selector() as u32))) .peek_next() .unwrap() .1 diff --git a/tests/test_atomics.rs b/tests/test_atomics.rs index abda563..013000d 100644 --- a/tests/test_atomics.rs +++ b/tests/test_atomics.rs @@ -5,7 +5,7 @@ use vicky_store::{Config, GetOrCreateStatus, Result, SetStatus, VickyStore}; use crate::common::run_in_tempdir; #[test] -fn test_get_or_insert_default() -> Result<()> { +fn test_atomics() -> Result<()> { run_in_tempdir(|dir| { let db = VickyStore::open(dir, Config::default())?; diff --git a/tests/test_collections.rs b/tests/test_collections.rs new file mode 100644 index 0000000..e21093d --- /dev/null +++ b/tests/test_collections.rs @@ -0,0 +1,53 @@ +mod common; + +use std::collections::HashMap; + +use vicky_store::{Config, Result, VickyStore}; + +use crate::common::run_in_tempdir; + +#[test] +fn test_collections() -> Result<()> { + run_in_tempdir(|dir| { + let db = VickyStore::open(dir, Config::default())?; + + db.set_in_collection("class1", "john", "100")?; + db.set_in_collection("class1", "bob", "90")?; + db.set_in_collection("class1", "greg", "80")?; + assert_eq!( + db.get_from_collection("class1", "john")?, + Some("100".into()) + ); + db.set_in_collection("class1", "john", "99")?; + assert_eq!(db.get_from_collection("class1", "john")?, Some("99".into())); + + //assert!(db.remove_from_collection("class1", "bob")?.is_some()); + //assert!(db.remove_from_collection("class1", "bob")?.is_none()); + + let items = db + .iter_collection("class1") + .map(|res| res.unwrap()) + .collect::>(); + assert_eq!(items.len(), 3); + assert_eq!(items.get("john".as_bytes()), Some(&"99".into())); + assert_eq!(items.get("greg".as_bytes()), Some(&"80".into())); + + for i in 0..5000 { + db.set_in_collection("mycoll", &format!("key{i}"), &format!("xxx{i}"))?; + } + for i in 100..500 { + db.set_in_collection("mycoll", &format!("key{i}"), &format!("xxx{i}"))?; + } + // for i in 800..500 { + // db._in_collection("mycoll", &format!("key{i}"), &format!("xxx{i}"))?; + // } + + let items = db + .iter_collection("mycoll") + .map(|res| res.unwrap()) + .collect::>(); + println!("{}", items.len()); + + Ok(()) + }) +}