From 91e7835175d4ea9fa453263de45184846727353a Mon Sep 17 00:00:00 2001 From: Tomer Filiba Date: Tue, 20 Aug 2024 14:19:56 +0300 Subject: [PATCH] Shard: cleaner separation of pub(crate) and private APIs --- Cargo.toml | 3 - src/insertion.rs | 34 +-------- src/lists.rs | 5 +- src/shard.rs | 162 +++++++++++++++++++++++++--------------- src/store.rs | 39 +++++----- tests/test_pre_split.rs | 2 +- 6 files changed, 128 insertions(+), 117 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index bdf39c2..72927fd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,8 +22,5 @@ fslock = "0.2.1" [features] whitebox_testing = [] -[dev-dependencies] -rand = "0.8.5" - [workspace] members = ["simulator", "candy-crasher", "candy-longliving", "candy-perf", "mini-candy", "test-list-collisions"] diff --git a/src/insertion.rs b/src/insertion.rs index 851a8b7..8f55b86 100644 --- a/src/insertion.rs +++ b/src/insertion.rs @@ -69,9 +69,7 @@ impl CandyStore { if guard .get(&shard_end) .with_context(|| format!("missing shard {shard_end}"))? - .header - .write_offset - .load(Ordering::Relaxed) + .get_write_offset() < write_offset { return Ok(false); @@ -88,7 +86,7 @@ impl CandyStore { "compact_{:04x}-{:04x}", removed_shard.span.start, removed_shard.span.end )); - let compacted_shard = Shard::open( + let mut compacted_shard = Shard::open( tmpfile.clone(), removed_shard.span.clone(), true, @@ -97,17 +95,7 @@ impl CandyStore { self.num_compactions.fetch_add(1, Ordering::SeqCst); - for res in removed_shard.unlocked_iter() { - let (k, v) = res?; - let ph = PartedHash::new(&self.config.hash_seed, &k); - let status = compacted_shard.insert(ph, &k, &v, InsertMode::Set)?; - if !matches!(status, InsertStatus::Added) { - return Err(anyhow!(CandyError::CompactionFailed(format!( - "{ph:?} [{}..{}] shard {status:?} k={k:?} v={v:?}", - removed_shard.span.start, removed_shard.span.end - )))); - } - } + removed_shard.compact_into(&mut compacted_shard)?; std::fs::rename(tmpfile, &orig_filename)?; guard.insert(shard_end, compacted_shard); @@ -146,21 +134,7 @@ impl CandyStore { self.config.clone(), )?; - for res in removed_shard.unlocked_iter() { - let (k, v) = res?; - - let ph = PartedHash::new(&self.config.hash_seed, &k); - let status = if (ph.shard_selector() as u32) < midpoint { - bottom_shard.insert(ph, &k, &v, InsertMode::Set)? - } else { - top_shard.insert(ph, &k, &v, InsertMode::Set)? - }; - if !matches!(status, InsertStatus::Added) { - return Err(anyhow!(CandyError::SplitFailed(format!( - "{ph:?} {status:?} [{shard_start} {midpoint} {shard_end}] k={k:?} v={v:?}", - )))); - } - } + removed_shard.split_into(&bottom_shard, &top_shard)?; self.num_splits.fetch_add(1, Ordering::SeqCst); diff --git a/src/lists.rs b/src/lists.rs index d8c2589..7b04986 100644 --- a/src/lists.rs +++ b/src/lists.rs @@ -534,10 +534,7 @@ impl CandyStore { let item_ph = *from_bytes::(&item_ph_bytes); // handle unlikely (but possible) collisions on item_ph - for kv in self.get_by_hash(item_ph)? { - let Ok((mut k, mut v)) = kv else { - continue; - }; + for (mut k, mut v) in self.get_by_hash(item_ph)? { if v.ends_with(&idx.to_le_bytes()) { if truncate { v.truncate(v.len() - size_of::()); diff --git a/src/shard.rs b/src/shard.rs index e50949c..213871f 100644 --- a/src/shard.rs +++ b/src/shard.rs @@ -1,3 +1,5 @@ +use anyhow::ensure; +use bytemuck::{bytes_of, Pod, Zeroable}; use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard}; use simd_itertools::PositionSimd; use std::{ @@ -13,7 +15,10 @@ use std::{ use memmap::{MmapMut, MmapOptions}; -use crate::hashing::{PartedHash, INVALID_SIG}; +use crate::{ + hashing::{PartedHash, INVALID_SIG}, + SizeHistogram, +}; use crate::{Config, Result}; // @@ -28,9 +33,9 @@ pub(crate) const NUM_ROWS: usize = 64; pub(crate) const ROW_WIDTH: usize = 512; #[repr(C)] -pub(crate) struct ShardRow { - pub signatures: [u32; ROW_WIDTH], - pub offsets_and_sizes: [u64; ROW_WIDTH], // | key_size: 16 | val_size: 16 | file_offset: 32 | +struct ShardRow { + signatures: [u32; ROW_WIDTH], + offsets_and_sizes: [u64; ROW_WIDTH], // | key_size: 16 | val_size: 16 | file_offset: 32 | } impl ShardRow { @@ -86,13 +91,13 @@ fn test_row_lookup() -> Result<()> { } #[repr(C, align(4096))] -pub(crate) struct PageAligned(pub T); +struct PageAligned(T); #[repr(C)] -pub(crate) struct ShardSizeHistogram { - pub counts_64b: [AtomicU32; 16], - pub counts_1kb: [AtomicU32; 15], - pub counts_16kb: [AtomicU32; 4], +struct ShardSizeHistogram { + counts_64b: [AtomicU32; 16], + counts_1kb: [AtomicU32; 15], + counts_16kb: [AtomicU32; 4], } impl ShardSizeHistogram { @@ -141,13 +146,13 @@ fn test_shard_size_histogram() { } #[repr(C)] -pub(crate) struct ShardHeader { - pub num_inserted: AtomicU64, - pub num_removed: AtomicU64, - pub wasted_bytes: AtomicU64, - pub write_offset: AtomicU32, - pub size_histogram: ShardSizeHistogram, - pub rows: PageAligned<[ShardRow; NUM_ROWS]>, +struct ShardHeader { + num_inserted: AtomicU64, + num_removed: AtomicU64, + wasted_bytes: AtomicU64, + write_offset: AtomicU32, + size_histogram: ShardSizeHistogram, + rows: PageAligned<[ShardRow; NUM_ROWS]>, } pub(crate) const HEADER_SIZE: u64 = size_of::() as u64; @@ -170,28 +175,10 @@ pub(crate) enum InsertMode<'a> { GetOrCreate, } -pub(crate) struct ByHashIterator<'a> { - shard: &'a Shard, - _guard: RwLockReadGuard<'a, ()>, - row: &'a ShardRow, - signature: u32, - start_idx: usize, -} - pub(crate) type KVPair = (Vec, Vec); -impl<'a> Iterator for ByHashIterator<'a> { - type Item = Result; - fn next(&mut self) -> Option { - if let Some(idx) = self.row.lookup(self.signature, &mut self.start_idx) { - Some(self.shard.read_kv(self.row.offsets_and_sizes[idx])) - } else { - None - } - } -} - -#[derive(Default, Debug, Clone, Copy)] +#[derive(Default, Debug, Clone, Copy, Pod, Zeroable)] +#[repr(C)] struct Backpointer(u32); impl Backpointer { @@ -201,6 +188,8 @@ impl Backpointer { // | idx | idx | size | // | (6) | (9) | (17) | // +-----+-----+----------+ + const SZ: u64 = size_of::() as u64; + fn new(row_idx: u16, sig_idx: u16, entry_size: usize) -> Self { debug_assert!((row_idx as usize % NUM_ROWS) < (1 << 6), "{row_idx}"); debug_assert!(sig_idx < (1 << 9), "{sig_idx}"); @@ -238,8 +227,8 @@ pub(crate) struct Shard { config: Arc, #[allow(dead_code)] mmap: MmapMut, // needed to prevent it from dropping - pub(crate) header: &'static mut ShardHeader, - pub(crate) row_locks: Vec>, + header: &'static mut ShardHeader, + row_locks: Vec>, } enum TryReplaceStatus { @@ -296,13 +285,8 @@ impl Shard { Ok(()) } - // #[inline] - // fn is_special_offset(offset_and_size: u64) -> bool { - // (offset_and_size >> 62) != 0 - // } - #[inline] - pub(crate) fn extract_offset_and_size(offset_and_size: u64) -> (usize, usize, u64) { + fn extract_offset_and_size(offset_and_size: u64) -> (usize, usize, u64) { let klen = (offset_and_size >> 48) as usize; debug_assert_eq!(klen >> 14, 0, "attempting to read a special key"); let vlen = ((offset_and_size >> 32) & 0xffff) as usize; @@ -312,12 +296,10 @@ impl Shard { // reading doesn't require holding any locks - we only ever extend the file, never overwrite data fn read_kv(&self, offset_and_size: u64) -> Result { - const BP: u64 = size_of::() as u64; - let (klen, vlen, offset) = Self::extract_offset_and_size(offset_and_size); let mut buf = vec![0u8; klen + vlen]; self.file - .read_exact_at(&mut buf, HEADER_SIZE + BP + offset)?; + .read_exact_at(&mut buf, HEADER_SIZE + Backpointer::SZ + offset)?; let val = buf[klen..klen + vlen].to_owned(); buf.truncate(klen); @@ -327,12 +309,11 @@ impl Shard { // writing doesn't require holding any locks since we write with an offset fn write_kv(&self, row_idx: u16, sig_idx: u16, key: &[u8], val: &[u8]) -> Result { - const BP: usize = size_of::(); - + const BP: usize = Backpointer::SZ as usize; let entry_size = key.len() + val.len(); let mut buf = vec![0u8; BP + entry_size]; let bp = Backpointer::new(row_idx, sig_idx, entry_size); - buf[..BP].copy_from_slice(&bp.0.to_le_bytes()); + buf[..BP].copy_from_slice(bytes_of(&bp)); buf[BP..BP + key.len()].copy_from_slice(key); buf[BP + key.len()..].copy_from_slice(val); @@ -360,7 +341,35 @@ impl Shard { } } - pub(crate) fn unlocked_iter<'b>(&'b self) -> impl Iterator> + 'b { + pub(crate) fn compact_into(&self, new_shard: &mut Shard) -> Result<()> { + for res in self.unlocked_iter() { + let (k, v) = res?; + let ph = PartedHash::new(&self.config.hash_seed, &k); + let status = new_shard.insert(ph, &k, &v, InsertMode::Set)?; + ensure!(matches!(status, InsertStatus::Added)); + } + + Ok(()) + } + pub(crate) fn split_into(&self, bottom_shard: &Shard, top_shard: &Shard) -> Result<()> { + for res in self.unlocked_iter() { + let (k, v) = res?; + + let ph = PartedHash::new(&self.config.hash_seed, &k); + let status = if (ph.shard_selector() as u32) < bottom_shard.span.end { + bottom_shard.insert(ph, &k, &v, InsertMode::Set)? + } else { + top_shard.insert(ph, &k, &v, InsertMode::Set)? + }; + ensure!( + matches!(status, InsertStatus::Added), + "{ph} key={k:?} already exists in new_shard" + ); + } + Ok(()) + } + + fn unlocked_iter<'b>(&'b self) -> impl Iterator> + 'b { self.header.rows.0.iter().flat_map(|row| { row.signatures.iter().enumerate().filter_map(|(idx, &sig)| { if sig == INVALID_SIG { @@ -372,22 +381,28 @@ impl Shard { }) } - pub(crate) fn iter_by_hash<'a>(&'a self, ph: PartedHash) -> ByHashIterator<'a> { + fn get_row(&self, ph: PartedHash) -> (RwLockReadGuard<()>, &ShardRow) { let row_idx = (ph.row_selector() as usize) % NUM_ROWS; let guard = self.row_locks[row_idx].read(); let row = &self.header.rows.0[row_idx]; - ByHashIterator { - shard: &self, - _guard: guard, - row, - signature: ph.signature(), - start_idx: 0, + (guard, row) + } + + pub(crate) fn get_by_hash(&self, ph: PartedHash) -> Result> { + let (_guard, row) = self.get_row(ph); + let mut kvs = Vec::with_capacity(1); + let mut start = 0; + while let Some(idx) = row.lookup(ph.signature(), &mut start) { + kvs.push(self.read_kv(row.offsets_and_sizes[idx])?); } + Ok(kvs) } pub(crate) fn get(&self, ph: PartedHash, key: &[u8]) -> Result>> { - for res in self.iter_by_hash(ph) { - let (k, v) = res?; + let (_guard, row) = self.get_row(ph); + let mut start = 0; + while let Some(idx) = row.lookup(ph.signature(), &mut start) { + let (k, v) = self.read_kv(row.offsets_and_sizes[idx])?; if key == k { return Ok(Some(v)); } @@ -429,7 +444,7 @@ impl Shard { row.offsets_and_sizes[idx] = self.write_kv(ph.row_selector(), idx as u16, key, val)?; self.header.wasted_bytes.fetch_add( - (size_of::() + k.len() + existing_val.len()) as u64, + Backpointer::SZ + (k.len() + existing_val.len()) as u64, Ordering::SeqCst, ); } @@ -523,4 +538,29 @@ impl Shard { Ok(None) } + + pub(crate) fn get_write_offset(&self) -> u32 { + self.header.write_offset.load(Ordering::Relaxed) + } + pub(crate) fn get_stats(&self) -> (usize, usize, usize, usize) { + ( + self.header.num_inserted.load(Ordering::Relaxed) as usize, + self.header.num_removed.load(Ordering::Relaxed) as usize, + self.header.write_offset.load(Ordering::Relaxed) as usize, + self.header.wasted_bytes.load(Ordering::Relaxed) as usize, + ) + } + pub(crate) fn get_size_histogram(&self) -> SizeHistogram { + let mut hist = SizeHistogram::default(); + for (i, h) in self.header.size_histogram.counts_64b.iter().enumerate() { + hist.counts_64b[i] = h.load(Ordering::Relaxed) as usize; + } + for (i, h) in self.header.size_histogram.counts_1kb.iter().enumerate() { + hist.counts_1kb[i] = h.load(Ordering::Relaxed) as usize; + } + for (i, h) in self.header.size_histogram.counts_16kb.iter().enumerate() { + hist.counts_16kb[i] = h.load(Ordering::Relaxed) as usize; + } + hist + } } diff --git a/src/store.rs b/src/store.rs index 60eee54..50a678f 100644 --- a/src/store.rs +++ b/src/store.rs @@ -125,6 +125,18 @@ impl SizeHistogram { coarse } + fn accum(&mut self, hist: SizeHistogram) { + for (i, c) in hist.counts_64b.into_iter().enumerate() { + self.counts_64b[i] += c; + } + for (i, c) in hist.counts_1kb.into_iter().enumerate() { + self.counts_1kb[i] += c; + } + for (i, c) in hist.counts_16kb.into_iter().enumerate() { + self.counts_16kb[i] += c; + } + } + /// iterate over all non-empty buckets, and return their spans and counts pub fn iter<'a>(&'a self) -> impl Iterator, usize)> + 'a { self.counts_64b @@ -485,17 +497,15 @@ impl CandyStore { key } - pub(crate) fn get_by_hash(&self, ph: PartedHash) -> Result>> { + pub(crate) fn get_by_hash(&self, ph: PartedHash) -> Result> { debug_assert!(ph.is_valid()); - Ok(self - .shards + self.shards .read() .lower_bound(Bound::Excluded(&(ph.shard_selector() as u32))) .peek_next() .with_context(|| format!("missing shard for 0x{:04x}", ph.shard_selector()))? .1 - .iter_by_hash(ph) - .collect::>()) + .get_by_hash(ph) } pub(crate) fn get_raw(&self, full_key: &[u8]) -> Result>> { @@ -579,10 +589,11 @@ impl CandyStore { ..Default::default() }; for (_, shard) in guard.iter() { - stats.num_inserted += shard.header.num_inserted.load(Ordering::Relaxed) as usize; - stats.num_removed += shard.header.num_removed.load(Ordering::Relaxed) as usize; - stats.used_bytes += shard.header.write_offset.load(Ordering::Relaxed) as usize; - stats.wasted_bytes += shard.header.wasted_bytes.load(Ordering::Relaxed) as usize; + let (num_inserted, num_removed, used_bytes, wasted_bytes) = shard.get_stats(); + stats.num_inserted += num_inserted; + stats.num_removed += num_removed; + stats.used_bytes += used_bytes; + stats.wasted_bytes += wasted_bytes; } stats } @@ -591,15 +602,7 @@ impl CandyStore { let guard = self.shards.read(); let mut hist = SizeHistogram::default(); for (_, shard) in guard.iter() { - for (i, h) in shard.header.size_histogram.counts_64b.iter().enumerate() { - hist.counts_64b[i] += h.load(Ordering::Relaxed) as usize; - } - for (i, h) in shard.header.size_histogram.counts_1kb.iter().enumerate() { - hist.counts_1kb[i] += h.load(Ordering::Relaxed) as usize; - } - for (i, h) in shard.header.size_histogram.counts_16kb.iter().enumerate() { - hist.counts_16kb[i] += h.load(Ordering::Relaxed) as usize; - } + hist.accum(shard.get_size_histogram()) } hist } diff --git a/tests/test_pre_split.rs b/tests/test_pre_split.rs index a97af3e..8b37808 100644 --- a/tests/test_pre_split.rs +++ b/tests/test_pre_split.rs @@ -77,7 +77,7 @@ fn test_compaction() -> Result<()> { }, )?; - // fill the shard to the rim, creating waste + // fill the shard to the brim, creating waste for i in 0..10 { db.set("aaa", &format!("1111222233334444555566667777888899990000111122223333444455556666777788889999000011112222333{:x}", i))?;