diff --git a/candy-crasher/src/main.rs b/candy-crasher/src/main.rs index f56a99a..560a4ec 100644 --- a/candy-crasher/src/main.rs +++ b/candy-crasher/src/main.rs @@ -10,7 +10,13 @@ const TARGET: u32 = 1_000_000; fn child_inserts() -> Result<()> { // our job is to create 1M entries while being killed by our evil parent - let store = CandyStore::open("dbdir", Config::default())?; + let store = CandyStore::open( + "dbdir", + Config { + clear_on_unsupported_version: true, + ..Default::default() + }, + )?; let highest_bytes = store.get("highest")?.unwrap_or(vec![0, 0, 0, 0]); let highest = u32::from_le_bytes([ highest_bytes[0], @@ -38,7 +44,13 @@ fn child_inserts() -> Result<()> { fn child_removals() -> Result<()> { // our job is to remove 1M entries while being killed by our evil parent - let store = CandyStore::open("dbdir", Config::default())?; + let store = CandyStore::open( + "dbdir", + Config { + clear_on_unsupported_version: true, + ..Default::default() + }, + )?; let lowest_bytes = store.get("lowest")?.unwrap_or(vec![0, 0, 0, 0]); let lowest = u32::from_le_bytes([ lowest_bytes[0], @@ -66,7 +78,13 @@ fn child_removals() -> Result<()> { fn child_list_inserts() -> Result<()> { // our job is to insert 1M entries to a list while being killed by our evil parent - let store = CandyStore::open("dbdir", Config::default())?; + let store = CandyStore::open( + "dbdir", + Config { + clear_on_unsupported_version: true, + ..Default::default() + }, + )?; let highest_bytes = store.get("list_highest")?.unwrap_or(vec![0, 0, 0, 0]); let highest = u32::from_le_bytes([ @@ -95,7 +113,13 @@ fn child_list_inserts() -> Result<()> { fn child_list_removals() -> Result<()> { // our job is to remove 1M entries to a list while being killed by our evil parent - let store = CandyStore::open("dbdir", Config::default())?; + let store = CandyStore::open( + "dbdir", + Config { + clear_on_unsupported_version: true, + ..Default::default() + }, + )?; let lowest_bytes = store.get("list_lowest")?.unwrap_or(vec![0, 0, 0, 0]); let lowest = u32::from_le_bytes([ @@ -142,7 +166,13 @@ fn child_list_removals() -> Result<()> { } fn child_list_iterator_removals() -> Result<()> { - let store = CandyStore::open("dbdir", Config::default())?; + let store = CandyStore::open( + "dbdir", + Config { + clear_on_unsupported_version: true, + ..Default::default() + }, + )?; if rand::random() { //println!("FWD"); @@ -258,7 +288,13 @@ fn main() -> Result<()> { { println!("Parent starts validating the DB..."); - let store = CandyStore::open("dbdir", Config::default())?; + let store = CandyStore::open( + "dbdir", + Config { + clear_on_unsupported_version: true, + ..Default::default() + }, + )?; assert_eq!( store.remove("highest")?, Some((TARGET - 1).to_le_bytes().to_vec()) @@ -281,7 +317,13 @@ fn main() -> Result<()> { { println!("Parent starts validating the DB..."); - let store = CandyStore::open("dbdir", Config::default())?; + let store = CandyStore::open( + "dbdir", + Config { + clear_on_unsupported_version: true, + ..Default::default() + }, + )?; assert_eq!( store.remove("lowest")?, Some((TARGET - 1).to_le_bytes().to_vec()) @@ -296,7 +338,13 @@ fn main() -> Result<()> { { println!("Parent starts validating the DB..."); - let store = CandyStore::open("dbdir", Config::default())?; + let store = CandyStore::open( + "dbdir", + Config { + clear_on_unsupported_version: true, + ..Default::default() + }, + )?; assert_eq!( store.remove("list_highest")?, Some((TARGET - 1).to_le_bytes().to_vec()) @@ -316,7 +364,13 @@ fn main() -> Result<()> { { println!("Parent starts validating the DB..."); - let store = CandyStore::open("dbdir", Config::default())?; + let store = CandyStore::open( + "dbdir", + Config { + clear_on_unsupported_version: true, + ..Default::default() + }, + )?; assert_eq!( store.remove("list_lowest")?, Some((TARGET - 1).to_le_bytes().to_vec()) @@ -358,7 +412,13 @@ fn main() -> Result<()> { { println!("Parent starts validating the DB..."); - let store = CandyStore::open("dbdir", Config::default())?; + let store = CandyStore::open( + "dbdir", + Config { + clear_on_unsupported_version: true, + ..Default::default() + }, + )?; assert_eq!(store.iter_list("xxx").count(), 0); diff --git a/src/lib.rs b/src/lib.rs index d7b2e5d..499ecbe 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -107,6 +107,7 @@ pub struct Config { pub merge_small_shards: bool, // whether or not to merge small shards when items are removed pub max_concurrent_list_ops: u32, // number of keyed locks for concurrent list ops pub truncate_up: bool, // whether or not to truncate up shard files to their max size (spare files) + pub clear_on_unsupported_version: bool, // whether or not to clear the DB if the version is unsupported } impl Default for Config { @@ -119,6 +120,7 @@ impl Default for Config { merge_small_shards: false, max_concurrent_list_ops: 64, truncate_up: true, + clear_on_unsupported_version: false, } } } diff --git a/src/shard.rs b/src/shard.rs index 3e03ea0..49df89a 100644 --- a/src/shard.rs +++ b/src/shard.rs @@ -1,9 +1,10 @@ -use anyhow::ensure; -use bytemuck::{bytes_of, Pod, Zeroable}; +use anyhow::{bail, ensure}; +use bytemuck::{bytes_of_mut, Pod, Zeroable}; use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard}; use simd_itertools::PositionSimd; use std::{ fs::{File, OpenOptions}, + io::Read, ops::Range, os::unix::fs::FileExt, path::PathBuf, @@ -146,8 +147,19 @@ fn test_shard_size_histogram() { assert_eq!(hist.counts_16kb[3].load(Ordering::Relaxed), 2); } +pub(crate) const SHARD_FILE_MAGIC: u64 = 0x72745379646e6143; // "CandyStr" +pub(crate) const SHARD_FILE_VERSION: u64 = 0x06; + +#[derive(Clone, Copy, Default, Debug, Pod, Zeroable)] +#[repr(C)] +struct MetaHeader { + magic: u64, + version: u64, +} + #[repr(C)] struct ShardHeader { + metadata: MetaHeader, num_inserted: AtomicU64, num_removed: AtomicU64, wasted_bytes: AtomicU64, @@ -178,43 +190,6 @@ pub(crate) enum InsertMode<'a> { pub(crate) type KVPair = (Vec, Vec); -#[derive(Default, Debug, Clone, Copy, Pod, Zeroable)] -#[repr(C)] -struct Backpointer(u32); - -impl Backpointer { - // MSB LSB - // +-----+-----+----------+ - // + row | sig | entry | - // | idx | idx | size | - // | (6) | (9) | (17) | - // +-----+-----+----------+ - const SZ: u64 = size_of::() as u64; - - fn new(row_idx: u16, sig_idx: u16, entry_size: usize) -> Self { - debug_assert!((row_idx as usize % NUM_ROWS) < (1 << 6), "{row_idx}"); - debug_assert!(sig_idx < (1 << 9), "{sig_idx}"); - Self( - (((row_idx % (NUM_ROWS as u16)) as u32) << 26) - | ((sig_idx as u32) << 17) - | (entry_size as u32 & 0x1ffff), - ) - } - - #[allow(dead_code)] - fn entry_size(&self) -> u32 { - self.0 & 0x1ffff - } - #[allow(dead_code)] - fn row(&self) -> usize { - (self.0 >> 26) as usize - } - #[allow(dead_code)] - fn sig_idx(&self) -> usize { - ((self.0 >> 17) & 0x1ff) as usize - } -} - // Note: it's possible to reduce the number row_locks, it we make them per-store rather than per-shard. // the trivial way that would be to use NUM_ROWS (without risking deadlocks), which means you can have 64 // concurrent operations. if you'd want more concurrency, it's possible to take the number of shards, @@ -247,17 +222,49 @@ impl Shard { truncate: bool, config: Arc, ) -> Result { - let file = OpenOptions::new() + let mut file = OpenOptions::new() .create(true) .read(true) .write(true) .truncate(truncate) .open(&filename)?; - let md = file.metadata()?; - if md.len() < HEADER_SIZE { - file.set_len(0)?; + + let mut file_size = file.metadata()?.len(); + if file_size != 0 { + let mut meta_header = MetaHeader::default(); + let sz = file.read(bytes_of_mut(&mut meta_header))?; + if sz != size_of::() + || meta_header.magic != SHARD_FILE_MAGIC + || meta_header.version != SHARD_FILE_VERSION + { + if config.clear_on_unsupported_version { + file.set_len(0)?; + file_size = 0; + } else { + bail!( + "unsupported magic=0x{:016x} version=0x{:016x} size={}", + meta_header.magic, + meta_header.version, + file_size, + ); + } + } + + if file_size != 0 && file_size < HEADER_SIZE { + if config.clear_on_unsupported_version { + file.set_len(0)?; + file_size = 0; + } else { + bail!("corrupt shard file (size={})", file_size); + } + } + } + + if file_size == 0 { + file.set_len(HEADER_SIZE)?; if config.truncate_up { // when creating, set the file's length so that we won't need to extend it every time we write + // (saves on file metadata updates) file.set_len(HEADER_SIZE + config.max_shard_size as u64)?; } } @@ -265,6 +272,8 @@ impl Shard { let mut mmap = unsafe { MmapOptions::new().len(HEADER_SIZE as usize).map_mut(&file) }?; let header = unsafe { &mut *(mmap.as_mut_ptr() as *mut ShardHeader) }; + header.metadata.magic = SHARD_FILE_MAGIC; + header.metadata.version = SHARD_FILE_VERSION; let mut row_locks = Vec::with_capacity(NUM_ROWS); for _ in 0..NUM_ROWS { row_locks.push(RwLock::new(())); @@ -299,8 +308,7 @@ impl Shard { fn read_kv(&self, offset_and_size: u64) -> Result { let (klen, vlen, offset) = Self::extract_offset_and_size(offset_and_size); let mut buf = vec![0u8; klen + vlen]; - self.file - .read_exact_at(&mut buf, HEADER_SIZE + Backpointer::SZ + offset)?; + self.file.read_exact_at(&mut buf, HEADER_SIZE + offset)?; let val = buf[klen..klen + vlen].to_owned(); buf.truncate(klen); @@ -309,14 +317,11 @@ impl Shard { } // writing doesn't require holding any locks since we write with an offset - fn write_kv(&self, row_idx: u16, sig_idx: u16, key: &[u8], val: &[u8]) -> Result { - const BP: usize = Backpointer::SZ as usize; + fn write_kv(&self, key: &[u8], val: &[u8]) -> Result { let entry_size = key.len() + val.len(); - let mut buf = vec![0u8; BP + entry_size]; - let bp = Backpointer::new(row_idx, sig_idx, entry_size); - buf[..BP].copy_from_slice(bytes_of(&bp)); - buf[BP..BP + key.len()].copy_from_slice(key); - buf[BP + key.len()..].copy_from_slice(val); + let mut buf = vec![0u8; entry_size]; + buf[..key.len()].copy_from_slice(key); + buf[key.len()..].copy_from_slice(val); // atomically allocate some area. it may leak if the IO below fails or if we crash before updating the // offsets_and_size array, but we're okay with leaks @@ -442,12 +447,10 @@ impl Shard { // optimization if val != existing_val { - row.offsets_and_sizes[idx] = - self.write_kv(ph.row_selector(), idx as u16, key, val)?; - self.header.wasted_bytes.fetch_add( - Backpointer::SZ + (k.len() + existing_val.len()) as u64, - Ordering::SeqCst, - ); + row.offsets_and_sizes[idx] = self.write_kv(key, val)?; + self.header + .wasted_bytes + .fetch_add((k.len() + existing_val.len()) as u64, Ordering::SeqCst); } return Ok(TryReplaceStatus::KeyExistsReplaced(existing_val)); } @@ -499,7 +502,7 @@ impl Shard { // find an empty slot let mut start = 0; if let Some(idx) = row.lookup(INVALID_SIG, &mut start) { - let new_off = self.write_kv(ph.row_selector(), idx as u16, &full_key, val)?; + let new_off = self.write_kv(&full_key, val)?; // we don't want a reorder to happen here - first write the offset, then the signature row.offsets_and_sizes[idx] = new_off; @@ -529,10 +532,9 @@ impl Shard { row.signatures[idx] = INVALID_SIG; // we managed to remove this key self.header.num_removed.fetch_add(1, Ordering::Relaxed); - self.header.wasted_bytes.fetch_add( - (size_of::() + k.len() + v.len()) as u64, - Ordering::Relaxed, - ); + self.header + .wasted_bytes + .fetch_add((k.len() + v.len()) as u64, Ordering::Relaxed); return Ok(Some(v)); } } diff --git a/src/store.rs b/src/store.rs index d14d645..9945fc7 100644 --- a/src/store.rs +++ b/src/store.rs @@ -172,6 +172,7 @@ pub(crate) struct InternalConfig { pub expected_number_of_keys: usize, pub max_concurrent_list_ops: u32, pub truncate_up: bool, + pub clear_on_unsupported_version: bool, } #[derive(Debug, Clone, PartialEq, Eq)] @@ -346,6 +347,7 @@ impl CandyStore { max_shard_size: config.max_shard_size, min_compaction_threashold: config.min_compaction_threashold, truncate_up: config.truncate_up, + clear_on_unsupported_version: config.clear_on_unsupported_version, }); std::fs::create_dir_all(dir_path)?; diff --git a/tests/test_pre_split.rs b/tests/test_pre_split.rs index 8b37808..bf8724e 100644 --- a/tests/test_pre_split.rs +++ b/tests/test_pre_split.rs @@ -46,7 +46,7 @@ fn test_pre_split() -> Result<()> { // namespace byte as well assert_eq!( stats.wasted_bytes, - "????aaa?".len() + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa".len() + "aaa?".len() + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa".len() ); db.remove("aaa")?; @@ -55,9 +55,9 @@ fn test_pre_split() -> Result<()> { assert_eq!(stats.num_removed, 1); assert_eq!( stats.wasted_bytes, - "????aaa?".len() + "aaa?".len() + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa".len() - + "????aaa?".len() + + "aaa?".len() + "xxx".len() ); @@ -79,7 +79,7 @@ fn test_compaction() -> Result<()> { // fill the shard to the brim, creating waste for i in 0..10 { - db.set("aaa", &format!("1111222233334444555566667777888899990000111122223333444455556666777788889999000011112222333{:x}", i))?; + db.set("aaa", &format!("11112222333344445555666677778888999900001111222233334444555566667777888899990000111122223333444{:x}", i))?; let stats = db.stats(); assert_eq!(stats.num_inserted, 1, "i={i}"); @@ -94,7 +94,7 @@ fn test_compaction() -> Result<()> { assert_eq!(db._num_compactions(), 1); let stats = db.stats(); - assert_eq!(stats.used_bytes, 100 + "????bbb?".len() + "x".len()); + assert_eq!(stats.used_bytes, 100 + "bbb?".len() + "x".len()); assert_eq!(stats.wasted_bytes, 0); Ok(())