From e401c08a7c1816988efa467cc0d1a2283acfb894 Mon Sep 17 00:00:00 2001 From: Tomer Filiba Date: Mon, 2 Sep 2024 20:45:12 +0300 Subject: [PATCH] Background compaction in a thread pool --- Cargo.lock | 16 + Cargo.toml | 1 + candy-crasher/src/main.rs | 43 +- candy-longliving/src/main.rs | 2 +- src/hashing.rs | 1 + src/lib.rs | 7 +- src/lists.rs | 1 - src/router.rs | 216 +++------ src/shard.rs | 857 +++++++++++++++++++++++++---------- src/store.rs | 23 +- tests/test_flush_agg.rs | 2 +- tests/test_loading.rs | 2 - tests/test_pre_split.rs | 2 +- 13 files changed, 744 insertions(+), 429 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 85ec16f..b3ab8e5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -75,6 +75,7 @@ version = "0.3.4" dependencies = [ "anyhow", "bytemuck", + "crossbeam-channel", "databuf", "fslock", "libc", @@ -92,6 +93,21 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" +[[package]] +name = "crossbeam-channel" +version = "0.5.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "33480d6946193aa8033910124896ca395333cae7e2d1113d1fef6c3272217df2" +dependencies = [ + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-utils" +version = "0.8.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "22ec99545bb0ed0ea7bb9b8e1e9122ea386ff8a48c0922e43f36d45ab09e0e80" + [[package]] name = "databuf" version = "0.5.0" diff --git a/Cargo.toml b/Cargo.toml index 6982239..aa7047c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,6 +19,7 @@ uuid = { version = "1.10.0" } rand = "0.8.5" fslock = "0.2.1" libc = "0.2.158" +crossbeam-channel = "0.5.13" [features] whitebox_testing = [] diff --git a/candy-crasher/src/main.rs b/candy-crasher/src/main.rs index 1fa5ef9..10787cf 100644 --- a/candy-crasher/src/main.rs +++ b/candy-crasher/src/main.rs @@ -15,6 +15,7 @@ const CONFIG: Config = Config { truncate_up: true, clear_on_unsupported_version: true, mlock_headers: false, + num_compaction_threads: 4, }; fn child_inserts() -> Result<()> { @@ -22,12 +23,7 @@ fn child_inserts() -> Result<()> { let store = CandyStore::open("dbdir", CONFIG)?; let highest_bytes = store.get("highest")?.unwrap_or(vec![0, 0, 0, 0]); - let highest = u32::from_le_bytes([ - highest_bytes[0], - highest_bytes[1], - highest_bytes[2], - highest_bytes[3], - ]); + let highest = u32::from_le_bytes(highest_bytes.try_into().unwrap()); if highest == TARGET - 1 { println!("child finished (already at {highest})"); @@ -50,12 +46,7 @@ fn child_removals() -> Result<()> { let store = CandyStore::open("dbdir", CONFIG)?; let lowest_bytes = store.get("lowest")?.unwrap_or(vec![0, 0, 0, 0]); - let lowest = u32::from_le_bytes([ - lowest_bytes[0], - lowest_bytes[1], - lowest_bytes[2], - lowest_bytes[3], - ]); + let lowest = u32::from_le_bytes(lowest_bytes.try_into().unwrap()); if lowest == TARGET - 1 { println!("child finished (already at {lowest})"); @@ -79,12 +70,7 @@ fn child_list_inserts() -> Result<()> { let store = CandyStore::open("dbdir", CONFIG)?; let highest_bytes = store.get("list_highest")?.unwrap_or(vec![0, 0, 0, 0]); - let highest = u32::from_le_bytes([ - highest_bytes[0], - highest_bytes[1], - highest_bytes[2], - highest_bytes[3], - ]); + let highest = u32::from_le_bytes(highest_bytes.try_into().unwrap()); if highest == TARGET - 1 { println!("child finished (already at {highest})"); @@ -108,12 +94,7 @@ fn child_list_removals() -> Result<()> { let store = CandyStore::open("dbdir", CONFIG)?; let lowest_bytes = store.get("list_lowest")?.unwrap_or(vec![0, 0, 0, 0]); - let lowest = u32::from_le_bytes([ - lowest_bytes[0], - lowest_bytes[1], - lowest_bytes[2], - lowest_bytes[3], - ]); + let lowest = u32::from_le_bytes(lowest_bytes.try_into().unwrap()); if lowest == TARGET - 1 { println!("child finished (already at {lowest})"); @@ -258,6 +239,7 @@ fn main() -> Result<()> { // "dbdir", // Config { // expected_number_of_keys: 1_000_000, + // clear_on_unsupported_version: true, // ..Default::default() // }, // )?; @@ -277,7 +259,7 @@ fn main() -> Result<()> { for res in store.iter() { let (k, v) = res?; assert_eq!(v, b"i am a key"); - let k = u32::from_le_bytes([k[0], k[1], k[2], k[3]]); + let k = u32::from_le_bytes(k.try_into().unwrap()); assert!(k < TARGET); count += 1; } @@ -296,7 +278,12 @@ fn main() -> Result<()> { store.remove("lowest")?, Some((TARGET - 1).to_le_bytes().to_vec()) ); - assert_eq!(store.iter().count(), 0); + assert_eq!( + store.iter().count(), + 0, + "{:?}", + store.iter().collect::>() + ); println!("DB validated successfully"); } @@ -314,14 +301,14 @@ fn main() -> Result<()> { for (i, res) in store.iter_list("xxx").enumerate() { let (k, v) = res?; - assert_eq!(k, (i as u32).to_le_bytes()); + assert_eq!(u32::from_le_bytes(k.try_into().unwrap()), i as u32); assert_eq!(v, b"yyy"); } println!("DB validated successfully"); } - parent_run(shared_stuff, child_list_removals, 10..30)?; + parent_run(shared_stuff, child_list_removals, 10..80)?; { println!("Parent starts validating the DB..."); diff --git a/candy-longliving/src/main.rs b/candy-longliving/src/main.rs index f7bc1e2..164d8ea 100644 --- a/candy-longliving/src/main.rs +++ b/candy-longliving/src/main.rs @@ -35,7 +35,7 @@ fn main() -> Result<()> { if i % 10000 == 0 { let t1 = Instant::now(); println!( - "thread {thd} at {i} {:?} rate={}us", + "thread {thd} at {i} {} rate={}us", db.stats(), t1.duration_since(t0).as_micros() / 10_000, ); diff --git a/src/hashing.rs b/src/hashing.rs index b075647..0ed1505 100644 --- a/src/hashing.rs +++ b/src/hashing.rs @@ -52,6 +52,7 @@ impl PartedHash { self.0 as u32 } + #[allow(dead_code)] pub fn as_u64(&self) -> u64 { self.0 } diff --git a/src/lib.rs b/src/lib.rs index 4e1b4fe..d0a71ef 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -70,7 +70,6 @@ pub enum CandyError { KeyTooLong(usize), ValueTooLong(usize), EntryCannotFitInShard(usize, usize), - KeyAlreadyExists(Vec, u64), } impl Display for CandyError { @@ -78,9 +77,6 @@ impl Display for CandyError { match self { Self::KeyTooLong(sz) => write!(f, "key too long {sz}"), Self::ValueTooLong(sz) => write!(f, "value too long {sz}"), - Self::KeyAlreadyExists(key, ph) => { - write!(f, "key {key:?} already exists (0x{ph:016x})") - } Self::EntryCannotFitInShard(sz, max) => { write!(f, "entry too big ({sz}) for a single shard file ({max})") } @@ -111,6 +107,8 @@ pub struct Config { pub clear_on_unsupported_version: bool, /// whether or not to mlock the shard headers to RAM (POSIX only) pub mlock_headers: bool, + /// number of background compaction threads + pub num_compaction_threads: usize, /// optionally delay modifying operations before for the given duration before flushing data to disk, /// to ensure reboot consistency #[cfg(feature = "flush_aggregation")] @@ -128,6 +126,7 @@ impl Default for Config { truncate_up: true, clear_on_unsupported_version: false, mlock_headers: false, + num_compaction_threads: 4, #[cfg(feature = "flush_aggregation")] flush_aggregation_delay: None, } diff --git a/src/lists.rs b/src/lists.rs index 8ba53a0..293a8c4 100644 --- a/src/lists.rs +++ b/src/lists.rs @@ -174,7 +174,6 @@ impl CandyStore { // if the item already exists, it's already part of the list. just update it and preserve the index if let Some(mut existing_val) = self.get_raw(&item_key)? { match mode { - InsertMode::MustCreate => unreachable!(), InsertMode::GetOrCreate => { existing_val.truncate(existing_val.len() - size_of::()); return Ok(InsertToListStatus::ExistingValue(existing_val)); diff --git a/src/router.rs b/src/router.rs index 179b322..6f74b05 100644 --- a/src/router.rs +++ b/src/router.rs @@ -1,9 +1,8 @@ use anyhow::ensure; use parking_lot::RwLock; -use std::time::Instant; use std::{ops::Range, sync::Arc}; -use crate::shard::{InsertMode, InsertStatus, Shard}; +use crate::shard::{CompactionThreadPool, InsertMode, InsertStatus, Shard}; use crate::stats::InternalStats; use crate::Result; use crate::{hashing::PartedHash, store::InternalConfig}; @@ -73,26 +72,36 @@ pub(crate) struct ShardRouter { config: Arc, node: RwLock, stats: Arc, + threadpool: Arc, } impl ShardRouter { pub(crate) const END_OF_SHARDS: u32 = 1u32 << 16; - pub(crate) fn new(config: Arc, stats: Arc) -> Result { - let mut shards = Self::load(&config, &stats)?; + pub(crate) fn new( + config: Arc, + stats: Arc, + threadpool: Arc, + ) -> Result { + let mut shards = Self::load(&config, &stats, &threadpool)?; if shards.is_empty() { - shards = Self::create_initial_shards(&config, &stats)?; + shards = Self::create_initial_shards(&config, &stats, &threadpool)?; } - let root = Self::treeify(shards, stats.clone()); + let root = Self::treeify(shards, stats.clone(), threadpool.clone()); Ok(Self { span: root.span(), config, node: RwLock::new(root), stats, + threadpool, }) } - fn load(config: &Arc, stats: &Arc) -> Result> { + fn load( + config: &Arc, + stats: &Arc, + threadpool: &Arc, + ) -> Result> { let mut found_shards = vec![]; for res in std::fs::read_dir(&config.dir_path)? { let entry = res?; @@ -106,10 +115,7 @@ impl ShardRouter { if !filetype.is_file() { continue; } - if filename.starts_with("compact_") - || filename.starts_with("bottom_") - || filename.starts_with("top_") - { + if filename.starts_with("bottom_") || filename.starts_with("top_") { std::fs::remove_file(entry.path())?; continue; } else if !filename.starts_with("shard_") { @@ -144,13 +150,11 @@ impl ShardRouter { let mut shards = vec![]; for span in shards_to_keep { shards.push(Shard::open( - config - .dir_path - .join(format!("shard_{:04x}-{:04x}", span.start, span.end)), span, false, config.clone(), stats.clone(), + threadpool.clone(), )?); } @@ -160,6 +164,7 @@ impl ShardRouter { fn create_initial_shards( config: &Arc, stats: &Arc, + threadpool: &Arc, ) -> Result> { let step = (Self::END_OF_SHARDS as f64) / (config.expected_number_of_keys as f64 / Shard::EXPECTED_CAPACITY as f64).max(1.0); @@ -170,13 +175,11 @@ impl ShardRouter { while start < Self::END_OF_SHARDS { let end = start + step; shards.push(Shard::open( - config - .dir_path - .join(format!("shard_{:04x}-{:04x}", start, end)), start..end, true, config.clone(), stats.clone(), + threadpool.clone(), )?); start = end; } @@ -184,7 +187,11 @@ impl ShardRouter { Ok(shards) } - fn from_shardnode(n: ShardNode, stats: Arc) -> Self { + fn from_shardnode( + n: ShardNode, + stats: Arc, + threadpool: Arc, + ) -> Self { let config = match n { ShardNode::Leaf(ref sh) => sh.config.clone(), ShardNode::Vertex(ref bottom, _) => bottom.config.clone(), @@ -194,10 +201,15 @@ impl ShardRouter { span: n.span(), node: RwLock::new(n), stats, + threadpool, } } - fn treeify(shards: Vec, stats: Arc) -> ShardNode { + fn treeify( + shards: Vec, + stats: Arc, + threadpool: Arc, + ) -> ShardNode { // algorithm: first find the smallest span, and let that be our base unit, say it's 1K. then go over // 0..64K in 1K increments and pair up every consecutive pairs whose size is 1K. we count on the spans to be // sorted, so we'll merge 0..1K with 1K..2K, and not 1K..3K with 2K..3K. @@ -239,8 +251,8 @@ impl ShardRouter { nodes.insert( i, ShardNode::Vertex( - Arc::new(Self::from_shardnode(n0, stats.clone())), - Arc::new(Self::from_shardnode(n1, stats.clone())), + Arc::new(Self::from_shardnode(n0, stats.clone(), threadpool.clone())), + Arc::new(Self::from_shardnode(n1, stats.clone(), threadpool.clone())), ), ); } else { @@ -296,8 +308,8 @@ impl ShardRouter { } } - let shards = Self::create_initial_shards(&self.config, &self.stats)?; - *guard = Self::treeify(shards, self.stats.clone()); + let shards = Self::create_initial_shards(&self.config, &self.stats, &self.threadpool)?; + *guard = Self::treeify(shards, self.stats.clone(), self.threadpool.clone()); Ok(()) } @@ -316,132 +328,6 @@ impl ShardRouter { } } - fn split_shard(&self) -> Result<()> { - let mut guard = self.node.write(); - let ShardNode::Leaf(sh) = &*guard else { - // already split - return Ok(()); - }; - let mid = (sh.span.start + sh.span.end) / 2; - - let t0 = Instant::now(); - - let bottomfile = self - .config - .dir_path - .join(format!("bottom_{:04x}-{:04x}", sh.span.start, mid)); - let topfile = self - .config - .dir_path - .join(format!("top_{:04x}-{:04x}", mid, sh.span.end)); - - let bottom_shard = Shard::open( - bottomfile.clone(), - sh.span.start..mid, - true, - self.config.clone(), - self.stats.clone(), - )?; - let top_shard = Shard::open( - topfile.clone(), - mid..sh.span.end, - true, - self.config.clone(), - self.stats.clone(), - )?; - - sh.split_into(&bottom_shard, &top_shard)?; - - std::fs::rename( - bottomfile, - self.config.dir_path.join(format!( - "shard_{:04x}-{:04x}", - bottom_shard.span.start, bottom_shard.span.end - )), - )?; - std::fs::rename( - topfile, - self.config.dir_path.join(format!( - "shard_{:04x}-{:04x}", - top_shard.span.start, top_shard.span.end - )), - )?; - std::fs::remove_file( - self.config - .dir_path - .join(format!("shard_{:04x}-{:04x}", sh.span.start, sh.span.end)), - )?; - - self.stats.report_split( - t0, - bottom_shard.get_write_offset(), - top_shard.get_write_offset(), - ); - - *guard = ShardNode::Vertex( - Arc::new(ShardRouter { - span: bottom_shard.span.clone(), - config: self.config.clone(), - node: RwLock::new(ShardNode::Leaf(bottom_shard)), - stats: self.stats.clone(), - }), - Arc::new(ShardRouter { - span: top_shard.span.clone(), - config: self.config.clone(), - node: RwLock::new(ShardNode::Leaf(top_shard)), - stats: self.stats.clone(), - }), - ); - - Ok(()) - } - - fn compact_shard(&self, write_offset: u64) -> Result<()> { - let mut guard = self.node.write(); - let ShardNode::Leaf(sh) = &*guard else { - // was split - return Ok(()); - }; - if sh.get_write_offset() < write_offset { - // already compacted - return Ok(()); - }; - - let t0 = Instant::now(); - let orig_filename = self - .config - .dir_path - .join(format!("shard_{:04x}-{:04x}", sh.span.start, sh.span.end)); - let tmpfile = self - .config - .dir_path - .join(format!("compact_{:04x}-{:04x}", sh.span.start, sh.span.end)); - - let mut compacted_shard = Shard::open( - tmpfile.clone(), - sh.span.clone(), - true, - self.config.clone(), - self.stats.clone(), - )?; - - // XXX: this can be done in a background thread, holding a read lock until we're done, and then wrap it - // all up under a write lock - sh.compact_into(&mut compacted_shard)?; - - std::fs::rename(tmpfile, orig_filename)?; - - self.stats.report_compaction( - t0, - sh.get_write_offset(), - compacted_shard.get_write_offset(), - ); - - *guard = ShardNode::Leaf(compacted_shard); - - Ok(()) - } - pub(crate) fn insert( &self, ph: PartedHash, @@ -463,11 +349,33 @@ impl ShardRouter { match res { InsertStatus::SplitNeeded => { - self.split_shard()?; - // retry - } - InsertStatus::CompactionNeeded(write_offset) => { - self.compact_shard(write_offset)?; + let mut guard = self.node.upgradable_read(); + let ShardNode::Leaf(sh) = &*guard else { + // already split + continue; + }; + + let (bottom, top) = sh.split()?; + + guard.with_upgraded(|g| { + *g = ShardNode::Vertex( + Arc::new(ShardRouter { + span: bottom.span.clone(), + config: self.config.clone(), + node: RwLock::new(ShardNode::Leaf(bottom)), + stats: self.stats.clone(), + threadpool: self.threadpool.clone(), + }), + Arc::new(ShardRouter { + span: top.span.clone(), + config: self.config.clone(), + node: RwLock::new(ShardNode::Leaf(top)), + stats: self.stats.clone(), + threadpool: self.threadpool.clone(), + }), + ); + }); + // retry } _ => { diff --git a/src/shard.rs b/src/shard.rs index 69465c6..d9152fd 100644 --- a/src/shard.rs +++ b/src/shard.rs @@ -1,27 +1,29 @@ use anyhow::bail; use bytemuck::{bytes_of_mut, Pod, Zeroable}; -use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard}; +use parking_lot::{Mutex, RwLock, RwLockWriteGuard}; use simd_itertools::PositionSimd; use std::{ fs::{File, OpenOptions}, io::Read, ops::Range, os::{fd::AsRawFd, unix::fs::FileExt}, - path::PathBuf, + path::{Path, PathBuf}, sync::{ - atomic::{AtomicU64, Ordering}, + atomic::{AtomicU64, AtomicUsize, Ordering}, Arc, }, + thread::JoinHandle, + time::Instant, }; use memmap::{MmapMut, MmapOptions}; +use crate::Result; use crate::{ hashing::{PartedHash, INVALID_SIG}, stats::InternalStats, store::InternalConfig, }; -use crate::{CandyError, Result}; // // these numbers were chosen according to the simulation, as they allow for 90% utilization of the shard with @@ -112,6 +114,7 @@ struct ShardHeader { write_offset: AtomicU64, num_inserts: AtomicU64, num_removals: AtomicU64, + compacted_up_to: AtomicUsize, rows: PageAligned<[ShardRow; NUM_ROWS]>, } @@ -123,7 +126,6 @@ pub(crate) enum InsertStatus { Added, Replaced(Vec), KeyDoesNotExist, - CompactionNeeded(u64), SplitNeeded, AlreadyExists(Vec), } @@ -133,7 +135,6 @@ pub(crate) enum InsertMode<'a> { Set, Replace(Option<&'a [u8]>), GetOrCreate, - MustCreate, } enum TryReplaceStatus<'a> { @@ -144,20 +145,189 @@ enum TryReplaceStatus<'a> { pub(crate) type KVPair = (Vec, Vec); -// Note: it's possible to reduce the number row_locks, it we make them per-store rather than per-shard. -// the trivial way that would be to use NUM_ROWS (without risking deadlocks), which means you can have 64 -// concurrent operations. if you'd want more concurrency, it's possible to take the number of shards, -// rounded down to the nearest power of two, and add that many MSBs from the shard selector to create a -// shard+row combination that would be safe from deadlocks. however, it seems that holding 64 locks for -// 64MB isn't that much, and you'd still need a RW lock per shard anyway. +struct MmapFile { + file: File, + mmap: MmapMut, +} + +impl MmapFile { + fn new(file: File, mlock_headers: bool) -> Result { + let mmap = unsafe { MmapOptions::new().len(HEADER_SIZE as usize).map_mut(&file) }?; + + #[cfg(target_family = "unix")] + if mlock_headers { + unsafe { libc::mlock(mmap.as_ptr() as *const _, mmap.len()) }; + } + + // optimization, we don't care about the return code + #[cfg(target_family = "unix")] + unsafe { + libc::posix_fallocate(file.as_raw_fd(), 0, HEADER_SIZE as i64) + }; + + let header = unsafe { &mut *(mmap.as_ptr() as *mut ShardHeader) }; + header.metadata.magic = SHARD_FILE_MAGIC; + header.metadata.version = SHARD_FILE_VERSION; + + Ok(Self { file, mmap }) + } + + fn create(filename: impl AsRef, config: &InternalConfig) -> Result { + let file = OpenOptions::new() + .read(true) + .write(true) + .create(true) + .truncate(true) + .open(filename)?; + file.set_len( + HEADER_SIZE + + if config.truncate_up { + config.max_shard_size as u64 + } else { + 0 + }, + )?; + Self::new(file, config.mlock_headers) + } + + #[inline(always)] + fn header(&self) -> &ShardHeader { + unsafe { &*(self.mmap.as_ptr() as *const ShardHeader) } + } + #[inline(always)] + fn header_mut(&self) -> &mut ShardHeader { + unsafe { &mut *(self.mmap.as_ptr() as *mut ShardHeader) } + } + #[inline(always)] + fn row(&self, row_idx: usize) -> &ShardRow { + &self.header().rows.0[row_idx] + } + #[inline(always)] + fn row_mut(&self, row_idx: usize) -> &mut ShardRow { + &mut self.header_mut().rows.0[row_idx] + } + + // reading doesn't require holding any locks - we only ever extend the file, never overwrite data + fn read_kv(&self, stats: &InternalStats, offset_and_size: u64) -> Result { + let klen = (offset_and_size >> 48) as usize; + debug_assert_eq!(klen >> 14, 0, "attempting to read a special key"); + let vlen = ((offset_and_size >> 32) & 0xffff) as usize; + let offset = (offset_and_size as u32) as u64; + let mut buf = vec![0u8; klen + vlen]; + self.file.read_exact_at(&mut buf, HEADER_SIZE + offset)?; + + stats.num_read_bytes.fetch_add(buf.len(), Ordering::Relaxed); + stats.num_read_ops.fetch_add(1, Ordering::Relaxed); + + let val = buf[klen..klen + vlen].to_owned(); + buf.truncate(klen); + + Ok((buf, val)) + } + + // writing doesn't require holding any locks since we write with an offset + fn write_kv(&self, stats: &InternalStats, key: &[u8], val: &[u8]) -> Result { + let entry_size = key.len() + val.len(); + let mut buf = vec![0u8; entry_size]; + buf[..key.len()].copy_from_slice(key); + buf[key.len()..].copy_from_slice(val); + + // atomically allocate some area. it may leak if the IO below fails or if we crash before updating the + // offsets_and_size array, but we're okay with leaks + let write_offset = self + .header() + .write_offset + .fetch_add(buf.len() as u64, Ordering::SeqCst) as u64; + + // now writing can be non-atomic (pwrite) + self.file.write_all_at(&buf, HEADER_SIZE + write_offset)?; + stats.add_entry(entry_size); + + Ok(((key.len() as u64) << 48) | ((val.len() as u64) << 32) | write_offset) + } +} + +struct TPHandle { + rx: crossbeam_channel::Receiver>, +} +impl TPHandle { + fn wait(&self) -> Result<()> { + self.rx.recv()? + } + fn finished(&self) -> bool { + !self.rx.is_empty() + } +} + +struct CompactionInfo { + config: Arc, + stats: Arc, + files: Arc)>>, + row_locks: Arc<[RwLock<()>; NUM_ROWS]>, + t0: Instant, + src_filename: PathBuf, + target_filename: PathBuf, +} + +pub(crate) struct CompactionThreadPool { + tx: crossbeam_channel::Sender>)>>, + threads: Vec>>, +} + +impl CompactionThreadPool { + pub fn new(num_threads: usize) -> Self { + let (tx, rx) = crossbeam_channel::unbounded::< + Option<(CompactionInfo, crossbeam_channel::Sender>)>, + >(); + let mut threads = Vec::with_capacity(num_threads); + for _ in 0..num_threads { + let rx = rx.clone(); + let handle = std::thread::spawn(move || { + for elem in rx.iter() { + let Some((info, handle_tx)) = elem else { + break; + }; + let res = Shard::background_compact(info); + handle_tx.send(res)?; + } + Ok(()) + }); + threads.push(handle); + } + + Self { tx, threads } + } + + fn submit(&self, info: CompactionInfo) -> Result { + let (tx, rx) = crossbeam_channel::bounded(1); + self.tx.send(Some((info, tx)))?; + Ok(TPHandle { rx }) + } + + #[allow(dead_code)] + pub fn terminate(self) -> Result<()> { + for _ in self.threads.iter() { + self.tx.send(None)?; + } + + for th in self.threads { + match th.join() { + Err(e) => std::panic::resume_unwind(e), + Ok(res) => res?, + } + } + Ok(()) + } +} pub(crate) struct Shard { pub(crate) span: Range, pub(crate) config: Arc, stats: Arc, - file: File, - mmap: MmapMut, - row_locks: [RwLock<()>; NUM_ROWS], + files: Arc)>>, + row_locks: Arc<[RwLock<()>; NUM_ROWS]>, + threadpool: Arc, + compaction_handle: Arc>>, #[cfg(feature = "flush_aggregation")] sync_agg_mutex: parking_lot::Mutex<()>, #[cfg(feature = "flush_aggregation")] @@ -168,12 +338,15 @@ impl Shard { pub(crate) const EXPECTED_CAPACITY: usize = (NUM_ROWS * ROW_WIDTH * 9) / 10; // ~ 29,500 pub(crate) fn open( - filename: PathBuf, span: Range, truncate: bool, config: Arc, stats: Arc, + threadpool: Arc, ) -> Result { + let filename = config + .dir_path + .join(format!("shard_{:04x}-{:04x}", span.start, span.end)); let mut file = OpenOptions::new() .create(true) .read(true) @@ -220,36 +393,71 @@ impl Shard { } else { file.set_len(HEADER_SIZE)?; } - // optimization, we don't care about the return code - #[cfg(target_family = "unix")] - unsafe { - libc::posix_fallocate(file.as_raw_fd(), 0, HEADER_SIZE as i64) - }; } - let mut mmap = unsafe { MmapOptions::new().len(HEADER_SIZE as usize).map_mut(&file) }?; + let mut row_locks = Vec::with_capacity(NUM_ROWS); + for _ in 0..NUM_ROWS { + row_locks.push(RwLock::new(())); + } + let row_locks: [RwLock<()>; NUM_ROWS] = row_locks.try_into().unwrap(); + + let mut mmap_file = MmapFile::new(file, config.mlock_headers)?; - #[cfg(target_family = "unix")] - if config.mlock_headers { - unsafe { libc::mlock(mmap.as_ptr() as *const _, mmap.len()) }; + let compacted_filename = config + .dir_path + .join(format!("compact_{:04x}-{:04x}", span.start, span.end)); + if truncate { + _ = std::fs::remove_file(compacted_filename); + } else { + if let Ok(compacted_file) = OpenOptions::new() + .read(true) + .write(true) + .open(&compacted_filename) + { + let target = MmapFile::new(compacted_file, config.mlock_headers)?; + Self::do_compaction(&row_locks, &mmap_file, &target, &stats, &config)?; + + std::fs::rename(compacted_filename, filename)?; + mmap_file = target; + } } - let header = unsafe { &mut *(mmap.as_mut_ptr() as *mut ShardHeader) }; - header.metadata.magic = SHARD_FILE_MAGIC; - header.metadata.version = SHARD_FILE_VERSION; + Ok(Self { + span, + config, + stats, + files: Arc::new(RwLock::new((mmap_file, None))), + row_locks: Arc::new(row_locks), + threadpool, + compaction_handle: Arc::new(Mutex::new(None)), + #[cfg(feature = "flush_aggregation")] + sync_agg_mutex: parking_lot::Mutex::new(()), + #[cfg(feature = "flush_aggregation")] + in_sync_agg_delay: std::sync::atomic::AtomicBool::new(false), + }) + } + fn new( + span: Range, + mmap_file: MmapFile, + config: Arc, + stats: Arc, + threadpool: Arc, + ) -> Result { let mut row_locks = Vec::with_capacity(NUM_ROWS); for _ in 0..NUM_ROWS { row_locks.push(RwLock::new(())); } + let row_locks: [RwLock<()>; NUM_ROWS] = row_locks.try_into().unwrap(); Ok(Self { span, config, stats, - file, - mmap, - row_locks: row_locks.try_into().unwrap(), + files: Arc::new(RwLock::new((mmap_file, None))), + row_locks: Arc::new(row_locks), + threadpool, + compaction_handle: Arc::new(Mutex::new(None)), #[cfg(feature = "flush_aggregation")] sync_agg_mutex: parking_lot::Mutex::new(()), #[cfg(feature = "flush_aggregation")] @@ -257,149 +465,252 @@ impl Shard { }) } - #[inline(always)] - fn header(&self) -> &ShardHeader { - unsafe { &*(self.mmap.as_ptr() as *const ShardHeader) } - } - #[inline(always)] - fn header_mut(&self) -> &mut ShardHeader { - unsafe { &mut *(self.mmap.as_ptr() as *mut ShardHeader) } - } + fn do_compaction( + row_locks: &[RwLock<()>; NUM_ROWS], + src: &MmapFile, + target: &MmapFile, + stats: &InternalStats, + config: &InternalConfig, + ) -> Result<()> { + let mut first_row = true; + loop { + let row_idx = target.header().compacted_up_to.load(Ordering::Acquire); + if row_idx >= NUM_ROWS { + break; + } - pub(crate) fn flush(&self) -> Result<()> { - //self.mmap.flush()? -- fdatasync should take care of that as well - self.file.sync_data()?; - Ok(()) - } + let _row_guard = row_locks[row_idx].write(); + let src_row = src.row(row_idx); + let target_row = target.row_mut(row_idx); + let mut target_col = 0; - // reading doesn't require holding any locks - we only ever extend the file, never overwrite data - fn read_kv(&self, offset_and_size: u64) -> Result { - let klen = (offset_and_size >> 48) as usize; - debug_assert_eq!(klen >> 14, 0, "attempting to read a special key"); - let vlen = ((offset_and_size >> 32) & 0xffff) as usize; - let offset = (offset_and_size as u32) as u64; - let mut buf = vec![0u8; klen + vlen]; - self.file.read_exact_at(&mut buf, HEADER_SIZE + offset)?; + for (src_col, &sig) in src_row.signatures.iter().enumerate() { + if sig == INVALID_SIG { + continue; + } + let (k, v) = src.read_kv(&stats, src_row.offsets_and_sizes[src_col])?; + + assert!( + first_row || target_row.signatures[target_col] == INVALID_SIG, + "row={row_idx} col={target_col} sig={}", + target_row.signatures[target_col] + ); + let ph = PartedHash::new(&config.hash_seed, &k); + assert_eq!(ph.row_selector(), row_idx); + target_row.offsets_and_sizes[target_col] = target.write_kv(&stats, &k, &v)?; + std::sync::atomic::fence(Ordering::SeqCst); + target_row.signatures[target_col] = ph.signature(); + target.header().num_inserts.fetch_add(1, Ordering::Relaxed); + target_col += 1; + } - self.stats - .num_read_bytes - .fetch_add(buf.len(), Ordering::Relaxed); - self.stats.num_read_ops.fetch_add(1, Ordering::Relaxed); + target + .header() + .compacted_up_to + .fetch_add(1, Ordering::Release); + first_row = false; + } - let val = buf[klen..klen + vlen].to_owned(); - buf.truncate(klen); + Ok(()) + } - Ok((buf, val)) + pub(crate) fn flush(&self) -> Result<()> { + //self.mmap.flush()? -- fdatasync should take care of that as well + self.files.read().0.file.sync_data()?; + Ok(()) } - // writing doesn't require holding any locks since we write with an offset - fn write_kv(&self, key: &[u8], val: &[u8]) -> Result { - let entry_size = key.len() + val.len(); - let mut buf = vec![0u8; entry_size]; - buf[..key.len()].copy_from_slice(key); - buf[key.len()..].copy_from_slice(val); + pub(crate) fn split(&self) -> Result<(Shard, Shard)> { + let files_guard = self.files.write(); + self.wait_for_compaction()?; - // atomically allocate some area. it may leak if the IO below fails or if we crash before updating the - // offsets_and_size array, but we're okay with leaks - let write_offset = self - .header() - .write_offset - .fetch_add(buf.len() as u64, Ordering::SeqCst) as u64; + let mid = (self.span.start + self.span.end) / 2; - // now writing can be non-atomic (pwrite) - self.file.write_all_at(&buf, HEADER_SIZE + write_offset)?; - self.stats.add_entry(entry_size); + let t0 = Instant::now(); - Ok(((key.len() as u64) << 48) | ((val.len() as u64) << 32) | write_offset) - } + let bottom_filename = self + .config + .dir_path + .join(format!("bottom_{:04x}-{:04x}", self.span.start, mid)); + let top_filename = self + .config + .dir_path + .join(format!("top_{:04x}-{:04x}", mid, self.span.end)); - pub(crate) fn read_at(&self, row_idx: usize, entry_idx: usize) -> Result> { - let _guard = self.row_locks[row_idx].read(); - let row = &self.header().rows.0[row_idx]; - if row.signatures[entry_idx] != INVALID_SIG { - Ok(Some(self.read_kv(row.offsets_and_sizes[entry_idx])?)) - } else { - Ok(None) - } - } + let bottom_file = MmapFile::create(&bottom_filename, &self.config)?; + let top_file = MmapFile::create(&top_filename, &self.config)?; - pub(crate) fn compact_into(&self, new_shard: &mut Shard) -> Result<()> { - for row in self.header().rows.0.iter() { - for (col, &sig) in row.signatures.iter().enumerate() { + for (row_idx, src_row) in files_guard.0.header().rows.0.iter().enumerate() { + let mut bottom_col = 0; + let mut top_col = 0; + for (col, &sig) in src_row.signatures.iter().enumerate() { if sig == INVALID_SIG { continue; } - let (k, v) = self.read_kv(row.offsets_and_sizes[col])?; + let (k, v) = files_guard + .0 + .read_kv(&self.stats, src_row.offsets_and_sizes[col])?; let ph = PartedHash::new(&self.config.hash_seed, &k); - new_shard.insert(ph, &k, &v, InsertMode::MustCreate)?; + assert_eq!(row_idx, ph.row_selector()); + + let (file, col) = if ph.shard_selector() < mid { + (&bottom_file, &mut bottom_col) + } else { + (&top_file, &mut top_col) + }; + + let target_row = file.row_mut(ph.row_selector()); + assert_eq!( + target_row.signatures[*col], INVALID_SIG, + "row={} col={} sig={}", + row_idx, *col, target_row.signatures[*col] + ); + target_row.offsets_and_sizes[*col] = file.write_kv(&self.stats, &k, &v)?; + std::sync::atomic::fence(Ordering::SeqCst); + target_row.signatures[*col] = ph.signature(); + file.header().num_inserts.fetch_add(1, Ordering::Relaxed); + *col += 1; } } - Ok(()) + std::fs::rename( + bottom_filename, + self.config + .dir_path + .join(format!("shard_{:04x}-{:04x}", self.span.start, mid,)), + )?; + std::fs::rename( + top_filename, + self.config + .dir_path + .join(format!("shard_{:04x}-{:04x}", mid, self.span.end)), + )?; + std::fs::remove_file(self.config.dir_path.join(format!( + "shard_{:04x}-{:04x}", + self.span.start, self.span.end + )))?; + + self.stats.report_split( + t0, + bottom_file.header().write_offset.load(Ordering::Relaxed), + top_file.header().write_offset.load(Ordering::Relaxed), + ); + + let bottom = Self::new( + self.span.start..mid, + bottom_file, + self.config.clone(), + self.stats.clone(), + self.threadpool.clone(), + )?; + let top = Self::new( + mid..self.span.end, + top_file, + self.config.clone(), + self.stats.clone(), + self.threadpool.clone(), + )?; + + Ok((bottom, top)) } - pub(crate) fn split_into(&self, bottom_shard: &Shard, top_shard: &Shard) -> Result<()> { - for row in self.header().rows.0.iter() { - for (col, &sig) in row.signatures.iter().enumerate() { - if sig == INVALID_SIG { - continue; - } - let (k, v) = self.read_kv(row.offsets_and_sizes[col])?; - let ph = PartedHash::new(&self.config.hash_seed, &k); - if ph.shard_selector() < bottom_shard.span.end { - bottom_shard.insert(ph, &k, &v, InsertMode::MustCreate)?; - } else { - top_shard.insert(ph, &k, &v, InsertMode::MustCreate)?; - } + fn operate_on_row( + &self, + row_idx: usize, + func: impl FnOnce(&MmapFile, &ShardRow) -> Result, + ) -> Result { + let files_guard = self.files.read(); + let _row_guard = self.row_locks[row_idx].read(); + let file = if let Some(ref target) = files_guard.1 { + if row_idx < target.header().compacted_up_to.load(Ordering::Acquire) { + target + } else { + &files_guard.0 } - } - Ok(()) + } else { + &files_guard.0 + }; + + func(file, file.row(row_idx)) + } + + fn operate_on_row_mut( + &self, + row_idx: usize, + func: impl FnOnce(&MmapFile, bool, RwLockWriteGuard<()>, &mut ShardRow) -> Result, + ) -> Result { + let files_guard = self.files.read(); + let row_guard = self.row_locks[row_idx].write(); + let file = if let Some(ref target) = files_guard.1 { + if row_idx < target.header().compacted_up_to.load(Ordering::Acquire) { + target + } else { + &files_guard.0 + } + } else { + &files_guard.0 + }; + + func( + &file, + files_guard.1.is_some(), + row_guard, + file.row_mut(row_idx), + ) } - fn get_row(&self, ph: PartedHash) -> (RwLockReadGuard<()>, &ShardRow) { - let row_idx = ph.row_selector(); - let guard = self.row_locks[row_idx].read(); - (guard, &self.header().rows.0[row_idx]) + pub(crate) fn read_at(&self, row_idx: usize, entry_idx: usize) -> Result> { + self.operate_on_row(row_idx, |file, row| { + if row.signatures[entry_idx] != INVALID_SIG { + Ok(Some( + file.read_kv(&self.stats, row.offsets_and_sizes[entry_idx])?, + )) + } else { + Ok(None) + } + }) } pub(crate) fn get_by_hash(&self, ph: PartedHash) -> Result> { - let mut first_time = true; - let (_guard, row) = self.get_row(ph); - let mut kvs = Vec::with_capacity(1); - let mut start = 0; - while let Some(idx) = row.lookup(ph.signature(), &mut start) { - kvs.push(self.read_kv(row.offsets_and_sizes[idx])?); - if first_time { + self.operate_on_row(ph.row_selector(), |file, row| { + let mut first_time = true; + let mut kvs = Vec::with_capacity(1); + let mut start = 0; + while let Some(idx) = row.lookup(ph.signature(), &mut start) { + kvs.push(file.read_kv(&self.stats, row.offsets_and_sizes[idx])?); + if first_time { + self.stats + .num_positive_lookups + .fetch_add(1, Ordering::Relaxed); + first_time = false; + } + } + if kvs.is_empty() { self.stats - .num_positive_lookups + .num_negative_lookups .fetch_add(1, Ordering::Relaxed); - first_time = false; } - } - if kvs.is_empty() { - self.stats - .num_negative_lookups - .fetch_add(1, Ordering::Relaxed); - } - Ok(kvs) + Ok(kvs) + }) } pub(crate) fn get(&self, ph: PartedHash, key: &[u8]) -> Result>> { - let (_guard, row) = self.get_row(ph); - let mut start = 0; - while let Some(idx) = row.lookup(ph.signature(), &mut start) { - let (k, v) = self.read_kv(row.offsets_and_sizes[idx])?; - if key == k { - self.stats - .num_positive_lookups - .fetch_add(1, Ordering::Relaxed); - return Ok(Some(v)); + self.operate_on_row(ph.row_selector(), |file, row| { + let mut start = 0; + while let Some(idx) = row.lookup(ph.signature(), &mut start) { + let (k, v) = file.read_kv(&self.stats, row.offsets_and_sizes[idx])?; + if key == k { + self.stats + .num_positive_lookups + .fetch_add(1, Ordering::Relaxed); + return Ok(Some(v)); + } } - } - self.stats - .num_negative_lookups - .fetch_add(1, Ordering::Relaxed); - Ok(None) + self.stats + .num_negative_lookups + .fetch_add(1, Ordering::Relaxed); + Ok(None) + }) } #[cfg(feature = "flush_aggregation")] @@ -412,7 +723,7 @@ impl Shard { self.in_sync_agg_delay.store(true, Ordering::SeqCst); std::thread::sleep(delay); self.in_sync_agg_delay.store(false, Ordering::SeqCst); - self.file.sync_data() + self.flush() }; if let Some(_guard) = self.sync_agg_mutex.try_lock() { @@ -433,7 +744,8 @@ impl Shard { fn try_replace<'a>( &'a self, - guard: RwLockWriteGuard<'a, ()>, + file: &MmapFile, + row_guard: RwLockWriteGuard<'a, ()>, row: &mut ShardRow, ph: PartedHash, key: &[u8], @@ -443,15 +755,12 @@ impl Shard { let mut start = 0; let mut had_collision = false; while let Some(idx) = row.lookup(ph.signature(), &mut start) { - let (k, existing_val) = self.read_kv(row.offsets_and_sizes[idx])?; + let (k, existing_val) = file.read_kv(&self.stats, row.offsets_and_sizes[idx])?; if key != k { had_collision = true; continue; } match mode { - InsertMode::MustCreate => { - bail!(CandyError::KeyAlreadyExists(key.into(), ph.as_u64())) - } InsertMode::GetOrCreate => { // no-op, key already exists self.stats @@ -471,27 +780,104 @@ impl Shard { // optimization if val != existing_val { - row.offsets_and_sizes[idx] = self.write_kv(key, val)?; - self.header() + row.offsets_and_sizes[idx] = file.write_kv(&self.stats, key, val)?; + file.header() .wasted_bytes .fetch_add((k.len() + existing_val.len()) as u64, Ordering::Relaxed); self.stats.num_updates.fetch_add(1, Ordering::Relaxed); #[cfg(feature = "flush_aggregation")] { - drop(guard); + drop(row_guard); self.flush_aggregation()?; } } return Ok(TryReplaceStatus::KeyExistsReplaced(existing_val)); } - Ok(TryReplaceStatus::KeyDoesNotExist(guard, had_collision)) + Ok(TryReplaceStatus::KeyDoesNotExist(row_guard, had_collision)) } - fn get_row_mut(&self, ph: PartedHash) -> (RwLockWriteGuard<()>, &mut ShardRow) { - let row_idx = ph.row_selector(); - let guard = self.row_locks[row_idx].write(); - (guard, &mut (self.header_mut().rows.0[row_idx])) + fn wait_for_compaction(&self) -> Result<()> { + let mut handle_guard = self.compaction_handle.lock(); + if let Some(handle) = handle_guard.take() { + handle.wait()?; + } + Ok(()) + } + + fn begin_compaction(&self, min_write_offset: u64) -> Result<()> { + let mut files_guard = self.files.write(); + let mut handle_guard = self.compaction_handle.lock(); + + if files_guard.0.header().write_offset.load(Ordering::Relaxed) < min_write_offset { + // already compacted by someone else + return Ok(()); + } + + if files_guard.1.is_some() { + // if the compaction target exists and the thread is still running -- all good + if let Some(ref handle) = *handle_guard { + if !handle.finished() { + return Ok(()); + } + } else { + return Ok(()); + } + } + + // the thread could've crashed in the middle of a compaction, and here's the place to extract the error + if let Some(handle) = handle_guard.take() { + handle.wait()?; + } + + assert!(files_guard.1.is_none()); + + let t0 = Instant::now(); + let src_filename = self.config.dir_path.join(format!( + "shard_{:04x}-{:04x}", + self.span.start, self.span.end + )); + let target_filename = self.config.dir_path.join(format!( + "compact_{:04x}-{:04x}", + self.span.start, self.span.end + )); + let target = MmapFile::create(&target_filename, &self.config)?; + target.header().compacted_up_to.store(0, Ordering::Release); + files_guard.1 = Some(target); + + let handle = self.threadpool.submit(CompactionInfo { + files: self.files.clone(), + stats: self.stats.clone(), + row_locks: self.row_locks.clone(), + config: self.config.clone(), + t0, + src_filename, + target_filename, + })?; + *handle_guard = Some(handle); + + Ok(()) + } + + fn background_compact(info: CompactionInfo) -> Result<()> { + let mut files_guard = info.files.upgradable_read(); + let src = &files_guard.0; + let target = files_guard.1.as_ref().unwrap(); + + Self::do_compaction(&info.row_locks, src, target, &info.stats, &info.config)?; + + std::fs::rename(&info.target_filename, &info.src_filename)?; + + info.stats.report_compaction( + info.t0, + src.header().write_offset.load(Ordering::Relaxed), + target.header().write_offset.load(Ordering::Relaxed), + ); + + files_guard.with_upgraded(|files| { + files.0 = files.1.take().unwrap(); + }); + Ok(()) } pub(crate) fn insert( @@ -501,95 +887,110 @@ impl Shard { val: &[u8], mode: InsertMode, ) -> Result { - let (guard, row) = self.get_row_mut(ph); - - if self.header().write_offset.load(Ordering::Relaxed) as u64 - + (full_key.len() + val.len()) as u64 - > self.config.max_shard_size as u64 - { - if self.header().wasted_bytes.load(Ordering::Relaxed) - > self.config.min_compaction_threashold as u64 - { - return Ok(InsertStatus::CompactionNeeded( - self.header().write_offset.load(Ordering::Relaxed), - )); - } else { - return Ok(InsertStatus::SplitNeeded); - } - } + let mut should_compact = None; - let status = self.try_replace(guard, row, ph, &full_key, val, mode)?; - match status { - TryReplaceStatus::KeyDoesNotExist(_guard, had_collision) => { - if matches!(mode, InsertMode::Replace(_)) { - return Ok(InsertStatus::KeyDoesNotExist); + let status = + self.operate_on_row_mut(ph.row_selector(), |file, is_compacting, row_guard, row| { + if !is_compacting { + if file.header().wasted_bytes.load(Ordering::Relaxed) + >= self.config.min_compaction_threashold as u64 + { + should_compact = Some(file.header().write_offset.load(Ordering::Relaxed)); + } else if file.header().write_offset.load(Ordering::Relaxed) + + (full_key.len() + val.len()) as u64 + > self.config.max_shard_size as u64 + { + return Ok(InsertStatus::SplitNeeded); + } } - // find an empty slot - let mut start = 0; - if let Some(idx) = row.lookup(INVALID_SIG, &mut start) { - let new_off = self.write_kv(&full_key, val)?; - - // we don't want a reorder to happen here - first write the offset, then the signature - row.offsets_and_sizes[idx] = new_off; - std::sync::atomic::fence(Ordering::SeqCst); - row.signatures[idx] = ph.signature(); - if had_collision { - self.stats.num_collisions.fetch_add(1, Ordering::Relaxed); + let status = self.try_replace(file, row_guard, row, ph, &full_key, val, mode)?; + match status { + TryReplaceStatus::KeyDoesNotExist(_guard, had_collision) => { + if matches!(mode, InsertMode::Replace(_)) { + return Ok(InsertStatus::KeyDoesNotExist); + } + + // find an empty slot + let mut start = 0; + if let Some(idx) = row.lookup(INVALID_SIG, &mut start) { + let new_off = file.write_kv(&self.stats, &full_key, val)?; + + // we don't want a reorder to happen here - first write the offset, then the signature + row.offsets_and_sizes[idx] = new_off; + std::sync::atomic::fence(Ordering::SeqCst); + row.signatures[idx] = ph.signature(); + if had_collision { + self.stats.num_collisions.fetch_add(1, Ordering::Relaxed); + } + file.header().num_inserts.fetch_add(1, Ordering::Relaxed); + #[cfg(feature = "flush_aggregation")] + { + drop(_guard); + self.flush_aggregation()?; + } + Ok(InsertStatus::Added) + } else { + // no room in this row, must split + Ok(InsertStatus::SplitNeeded) + } } - self.header().num_inserts.fetch_add(1, Ordering::Relaxed); - #[cfg(feature = "flush_aggregation")] - { - drop(_guard); - self.flush_aggregation()?; + TryReplaceStatus::KeyExistsNotReplaced(existing) => { + Ok(InsertStatus::AlreadyExists(existing)) + } + TryReplaceStatus::KeyExistsReplaced(existing) => { + Ok(InsertStatus::Replaced(existing)) } - Ok(InsertStatus::Added) - } else { - // no room in this row, must split - Ok(InsertStatus::SplitNeeded) } - } - TryReplaceStatus::KeyExistsNotReplaced(existing) => { - Ok(InsertStatus::AlreadyExists(existing)) - } - TryReplaceStatus::KeyExistsReplaced(existing) => Ok(InsertStatus::Replaced(existing)), + })?; + + if let Some(min_write_offset) = should_compact { + self.begin_compaction(min_write_offset)?; } + Ok(status) } pub(crate) fn remove(&self, ph: PartedHash, key: &[u8]) -> Result>> { - let (_guard, row) = self.get_row_mut(ph); - - let mut start = 0; - while let Some(idx) = row.lookup(ph.signature(), &mut start) { - let (k, v) = self.read_kv(row.offsets_and_sizes[idx])?; - if key == k { - row.signatures[idx] = INVALID_SIG; - // we managed to remove this key - self.header().num_removals.fetch_add(1, Ordering::Relaxed); - self.header() - .wasted_bytes - .fetch_add((k.len() + v.len()) as u64, Ordering::Relaxed); - #[cfg(feature = "flush_aggregation")] - { - drop(_guard); - self.flush_aggregation()?; + self.operate_on_row_mut(ph.row_selector(), |file, _, _guard, row| { + let mut start = 0; + + while let Some(idx) = row.lookup(ph.signature(), &mut start) { + let (k, v) = file.read_kv(&self.stats, row.offsets_and_sizes[idx])?; + if key == k { + row.signatures[idx] = INVALID_SIG; + // we managed to remove this key + file.header().num_removals.fetch_add(1, Ordering::Relaxed); + file.header() + .wasted_bytes + .fetch_add((k.len() + v.len()) as u64, Ordering::Relaxed); + #[cfg(feature = "flush_aggregation")] + { + drop(_guard); + self.flush_aggregation()?; + } + return Ok(Some(v)); } - return Ok(Some(v)); } - } - Ok(None) + Ok(None) + }) } - pub(crate) fn get_write_offset(&self) -> u64 { - self.header().write_offset.load(Ordering::Relaxed) + pub(crate) fn get_stats(&self) -> Result<(u64, u64, u64, u64)> { + self.wait_for_compaction()?; + let files_guard = self.files.read(); + let hdr = files_guard.0.header(); + Ok(( + hdr.write_offset.load(Ordering::Relaxed), + hdr.wasted_bytes.load(Ordering::Relaxed), + hdr.num_inserts.load(Ordering::Relaxed), + hdr.num_removals.load(Ordering::Relaxed), + )) } - pub(crate) fn get_stats(&self) -> (u64, u64, u64, u64) { - ( - self.header().write_offset.load(Ordering::Relaxed), - self.header().wasted_bytes.load(Ordering::Relaxed), - self.header().num_inserts.load(Ordering::Relaxed), - self.header().num_removals.load(Ordering::Relaxed), - ) +} + +impl Drop for Shard { + fn drop(&mut self) { + _ = self.wait_for_compaction(); } } diff --git a/src/store.rs b/src/store.rs index 2edec87..e6e8eaa 100644 --- a/src/store.rs +++ b/src/store.rs @@ -9,7 +9,7 @@ use std::{ use crate::{ hashing::{HashSeed, PartedHash}, router::ShardRouter, - shard::{InsertMode, InsertStatus, KVPair}, + shard::{CompactionThreadPool, InsertMode, InsertStatus, KVPair}, Stats, }; use crate::{ @@ -36,6 +36,7 @@ pub(crate) struct InternalConfig { pub truncate_up: bool, pub clear_on_unsupported_version: bool, pub mlock_headers: bool, + pub num_compaction_threads: usize, #[cfg(feature = "flush_aggregation")] pub flush_aggregation_delay: Option, } @@ -103,6 +104,7 @@ pub struct CandyStore { pub(crate) keyed_locks: Vec>, _lockfile: LockFile, stats: Arc, + //threadpool: Arc, } /// An iterator over a CandyStore. Note that it's safe to modify (insert/delete) keys while iterating, @@ -210,6 +212,7 @@ impl CandyStore { truncate_up: config.truncate_up, clear_on_unsupported_version: config.clear_on_unsupported_version, mlock_headers: config.mlock_headers, + num_compaction_threads: config.num_compaction_threads, #[cfg(feature = "flush_aggregation")] flush_aggregation_delay: config.flush_aggregation_delay, }); @@ -232,7 +235,8 @@ impl CandyStore { } let stats = Arc::new(InternalStats::default()); - let root = ShardRouter::new(config.clone(), stats.clone())?; + let threadpool = Arc::new(CompactionThreadPool::new(config.num_compaction_threads)); + let root = ShardRouter::new(config.clone(), stats.clone(), threadpool.clone())?; Ok(Self { config, @@ -241,6 +245,7 @@ impl CandyStore { keyed_locks, _lockfile: lockfile, stats, + //threadpool, }) } @@ -344,7 +349,6 @@ impl CandyStore { InsertStatus::Replaced(v) => Ok(SetStatus::PrevValue(v)), InsertStatus::AlreadyExists(v) => Ok(SetStatus::PrevValue(v)), InsertStatus::KeyDoesNotExist => unreachable!(), - InsertStatus::CompactionNeeded(_) => unreachable!(), InsertStatus::SplitNeeded => unreachable!(), } } @@ -381,7 +385,6 @@ impl CandyStore { InsertStatus::Replaced(v) => Ok(ReplaceStatus::PrevValue(v)), InsertStatus::AlreadyExists(v) => Ok(ReplaceStatus::WrongValue(v)), InsertStatus::KeyDoesNotExist => Ok(ReplaceStatus::DoesNotExist), - InsertStatus::CompactionNeeded(_) => unreachable!(), InsertStatus::SplitNeeded => unreachable!(), } } @@ -424,7 +427,6 @@ impl CandyStore { InsertStatus::AlreadyExists(v) => Ok(GetOrCreateStatus::ExistingValue(v)), InsertStatus::Replaced(_) => unreachable!(), InsertStatus::KeyDoesNotExist => unreachable!(), - InsertStatus::CompactionNeeded(_) => unreachable!(), InsertStatus::SplitNeeded => unreachable!(), } } @@ -468,10 +470,7 @@ impl CandyStore { /// Returns useful stats about the store pub fn stats(&self) -> Stats { - let shard_stats = self - .root - .call_on_all_shards(|sh| Ok(sh.get_stats())) - .unwrap(); + let shard_stats = self.root.call_on_all_shards(|sh| sh.get_stats()).unwrap(); let mut stats = Stats::default(); self.stats.fill_stats(&mut stats); @@ -486,3 +485,9 @@ impl CandyStore { stats } } + +// impl Drop for CandyStore { +// fn drop(&mut self) { +// _ = self.threadpool.terminate(); +// } +// } diff --git a/tests/test_flush_agg.rs b/tests/test_flush_agg.rs index 717e54b..a1d4488 100644 --- a/tests/test_flush_agg.rs +++ b/tests/test_flush_agg.rs @@ -12,7 +12,7 @@ use candystore::{CandyStore, Config, Result}; use crate::common::run_in_tempdir; #[test] -fn test_lists() -> Result<()> { +fn test_flush_aggregation() -> Result<()> { run_in_tempdir(|dir| { let db = Arc::new(CandyStore::open( dir, diff --git a/tests/test_loading.rs b/tests/test_loading.rs index f686809..5470601 100644 --- a/tests/test_loading.rs +++ b/tests/test_loading.rs @@ -44,7 +44,6 @@ fn test_loading() -> Result<()> { std::fs::write(format!("{dir}/top_1234-5678"), "xxxx")?; std::fs::write(format!("{dir}/bottom_1234-5678"), "xxxx")?; - std::fs::write(format!("{dir}/compact_1234-5678"), "xxxx")?; let (_, span) = existing[0].split_once("_").unwrap(); let (start, end) = span.split_once("-").unwrap(); @@ -58,7 +57,6 @@ fn test_loading() -> Result<()> { assert!(!std::fs::exists(format!("{dir}/top_1234-5678"))?); assert!(!std::fs::exists(format!("{dir}/bottom_1234-5678"))?); - assert!(!std::fs::exists(format!("{dir}/compact_1234-5678"))?); assert!(!std::fs::exists(format!( "{dir}/shard_{start:04x}-{mid:04x}" ))?); diff --git a/tests/test_pre_split.rs b/tests/test_pre_split.rs index 25cd3c1..e291d14 100644 --- a/tests/test_pre_split.rs +++ b/tests/test_pre_split.rs @@ -72,7 +72,7 @@ fn test_compaction() -> Result<()> { dir, Config { max_shard_size: 1000, - min_compaction_threashold: 0, + min_compaction_threashold: 900, ..Default::default() }, )?;