From 2fd4135cd7dd92610bfc7fa89db8b20300f6ec96 Mon Sep 17 00:00:00 2001 From: Tomer Filiba Date: Mon, 5 Aug 2024 16:52:41 +0300 Subject: [PATCH] Add get_or_insert_default --- src/insert.rs | 47 ++++++++++++++++++++++++------- src/shard.rs | 54 ++++++++++++++++++++++++++---------- src/typed.rs | 6 ++++ tests/test_get_or_insert.rs | 34 +++++++++++++++++++++++ tests/test_loading.rs | 8 ++---- tests/test_logic.rs | 9 ++++-- tests/test_modify_inplace.rs | 6 ++-- tests/test_pre_split.rs | 6 ++-- 8 files changed, 129 insertions(+), 41 deletions(-) create mode 100644 tests/test_get_or_insert.rs diff --git a/src/insert.rs b/src/insert.rs index 5349799..e7def6d 100644 --- a/src/insert.rs +++ b/src/insert.rs @@ -2,7 +2,7 @@ use std::ops::Bound; use std::sync::atomic::Ordering; use crate::hashing::{PartedHash, USER_NAMESPACE}; -use crate::shard::{InsertStatus, Shard}; +use crate::shard::{InsertMode, InsertStatus, Shard}; use crate::store::VickyStore; use crate::{Result, VickyError}; @@ -44,7 +44,7 @@ impl VickyStore { // XXX: this will not work with namespaces let ph = PartedHash::from_buffer(USER_NAMESPACE, &self.config.secret_key, &k); - let status = compacted_shard.insert(ph, &k, &v)?; + let status = compacted_shard.insert(ph, &k, &v, InsertMode::Overwrite)?; assert!(matches!(status, InsertStatus::Added), "{status:?}"); } @@ -88,9 +88,9 @@ impl VickyStore { let ph = PartedHash::from_buffer(USER_NAMESPACE, &self.config.secret_key, &k); let status = if (ph.shard_selector as u32) < midpoint { - bottom_shard.insert(ph, &k, &v)? + bottom_shard.insert(ph, &k, &v, InsertMode::Overwrite)? } else { - top_shard.insert(ph, &k, &v)? + top_shard.insert(ph, &k, &v, InsertMode::Overwrite)? }; assert!(matches!(status, InsertStatus::Added), "{status:?}"); } @@ -126,6 +126,7 @@ impl VickyStore { ph: PartedHash, key: &[u8], val: &[u8], + mode: InsertMode, ) -> Result<(InsertStatus, u32, u32)> { let guard = self.shards.read().unwrap(); let cursor = guard.lower_bound(Bound::Excluded(&(ph.shard_selector as u32))); @@ -134,12 +135,18 @@ impl VickyStore { .map(|(&shard_start, _)| shard_start) .unwrap_or(0); let (shard_end, shard) = cursor.peek_next().unwrap(); - let status = shard.insert(ph, key, val)?; + let status = shard.insert(ph, key, val, mode)?; Ok((status, shard_start, *shard_end)) } - pub(crate) fn insert_internal(&self, ph: PartedHash, key: &[u8], val: &[u8]) -> Result<()> { + pub(crate) fn insert_internal( + &self, + ph: PartedHash, + key: &[u8], + val: &[u8], + mode: InsertMode, + ) -> Result>> { if key.len() > u16::MAX as usize { return Err(Box::new(VickyError::KeyTooLong)); } @@ -148,15 +155,18 @@ impl VickyStore { } loop { - let (status, shard_start, shard_end) = self.try_insert(ph, key, val)?; + let (status, shard_start, shard_end) = self.try_insert(ph, key, val, mode)?; match status { InsertStatus::Added => { self.num_entries.fetch_add(1, Ordering::SeqCst); - return Ok(()); + return Ok(None); } InsertStatus::Replaced => { - return Ok(()); + return Ok(None); + } + InsertStatus::AlreadyExists(existing_val) => { + return Ok(Some(existing_val)); } InsertStatus::CompactionNeeded(write_offset) => { self.compact(shard_end, write_offset)?; @@ -183,7 +193,24 @@ impl VickyStore { val: &B2, ) -> Result<()> { let ph = PartedHash::from_buffer(USER_NAMESPACE, &self.config.secret_key, key.as_ref()); - self.insert_internal(ph, key.as_ref(), val.as_ref()) + self.insert_internal(ph, key.as_ref(), val.as_ref(), InsertMode::Overwrite)?; + Ok(()) + } + + /// Gets the value of an entry or inserts the given default value. If the value existed, returns `Some(value)`. + /// If the value was created by this operation, `None`` is returned + pub fn get_or_insert_default + ?Sized, B2: AsRef<[u8]> + ?Sized>( + &self, + key: &B1, + default_val: &B2, + ) -> Result>> { + let ph = PartedHash::from_buffer(USER_NAMESPACE, &self.config.secret_key, key.as_ref()); + self.insert_internal( + ph, + key.as_ref(), + default_val.as_ref(), + InsertMode::GetOrInsert, + ) } /// Modifies an existing entry in-place, instead of creating a version. Note that the key must exist diff --git a/src/shard.rs b/src/shard.rs index c9b2b17..19f2ff5 100644 --- a/src/shard.rs +++ b/src/shard.rs @@ -55,6 +55,13 @@ pub(crate) enum InsertStatus { Replaced, CompactionNeeded(u32), SplitNeeded, + AlreadyExists(Vec), +} + +#[derive(Debug, Clone, Copy)] +pub(crate) enum InsertMode { + Overwrite, + GetOrInsert, } pub(crate) struct ByHashIterator<'a> { @@ -225,26 +232,32 @@ impl Shard { ph: PartedHash, key: &[u8], val: &[u8], - ) -> Result { + mode: InsertMode, + ) -> Result<(bool, Option>)> { let mut start = 0; while let Some(idx) = row.signatures[start..].iter().position_simd(ph.signature) { let (k, v) = self.read_kv(row.offsets_and_sizes[idx])?; if key == k { - // optimization - if val == v { - return Ok(true); + match mode { + InsertMode::GetOrInsert => { + // no-op, key already exists + return Ok((true, Some(v))); + } + InsertMode::Overwrite => { + // optimization + if val != v { + row.offsets_and_sizes[idx] = self.write_kv(key, val)?; + self.header + .wasted_bytes + .fetch_add((k.len() + v.len()) as u64, Ordering::SeqCst); + } + return Ok((true, None)); + } } - - row.offsets_and_sizes[idx] = self.write_kv(key, val)?; - self.header - .wasted_bytes - .fetch_add((k.len() + v.len()) as u64, Ordering::SeqCst); - - return Ok(true); } start = idx + 1; } - Ok(false) + Ok((false, None)) } fn get_row_mut(&self, ph: PartedHash) -> (RwLockWriteGuard<()>, &mut ShardRow) { @@ -258,7 +271,13 @@ impl Shard { (guard, row) } - pub(crate) fn insert(&self, ph: PartedHash, key: &[u8], val: &[u8]) -> Result { + pub(crate) fn insert( + &self, + ph: PartedHash, + key: &[u8], + val: &[u8], + mode: InsertMode, + ) -> Result { if self.header.write_offset.load(Ordering::Relaxed) as u64 + (key.len() + val.len()) as u64 > self.config.max_shard_size as u64 { @@ -278,8 +297,13 @@ impl Shard { // see if we replace an existing key let (_guard, row) = self.get_row_mut(ph); - if self.try_replace(row, ph, key, val)? { - return Ok(InsertStatus::Replaced); + let (found, existing_val) = self.try_replace(row, ph, key, val, mode)?; + if found { + if let Some(existing_val) = existing_val { + return Ok(InsertStatus::AlreadyExists(existing_val)); + } else { + return Ok(InsertStatus::Replaced); + } } // find an empty slot diff --git a/src/typed.rs b/src/typed.rs index 511241a..22f3c04 100644 --- a/src/typed.rs +++ b/src/typed.rs @@ -89,6 +89,12 @@ where self.store.insert(&kbytes, &vbytes) } + pub fn get_or_insert_default(&self, k: K, v: V) -> Result>> { + let kbytes = Self::make_key(&k); + let vbytes = v.to_bytes::(); + self.store.get_or_insert_default(&kbytes, &vbytes) + } + pub fn remove(&self, k: &Q) -> Result> where K: Borrow, diff --git a/tests/test_get_or_insert.rs b/tests/test_get_or_insert.rs new file mode 100644 index 0000000..68489a4 --- /dev/null +++ b/tests/test_get_or_insert.rs @@ -0,0 +1,34 @@ +mod common; + +use vicky_store::{Config, Result, VickyStore}; + +use crate::common::run_in_tempdir; + +#[test] +fn test_get_or_insert_default() -> Result<()> { + run_in_tempdir(|dir| { + let db = VickyStore::open( + dir, + Config { + max_shard_size: 20 * 1024, // use small files to force lots of splits and compactions + min_compaction_threashold: 10 * 1024, + expected_number_of_keys: 1_000_000, + ..Default::default() + }, + )?; + + db.insert("aaa", "1111")?; + assert_eq!( + db.get_or_insert_default("aaa", "2222")?, + Some("1111".into()) + ); + + assert_eq!(db.get_or_insert_default("bbbb", "2222")?, None); + assert_eq!( + db.get_or_insert_default("bbbb", "3333")?, + Some("2222".into()) + ); + + Ok(()) + }) +} diff --git a/tests/test_loading.rs b/tests/test_loading.rs index 161269d..db05461 100644 --- a/tests/test_loading.rs +++ b/tests/test_loading.rs @@ -1,7 +1,5 @@ mod common; -use std::sync::Arc; - use vicky_store::{Config, Result, VickyStore}; use crate::common::{run_in_tempdir, LONG_VAL}; @@ -16,7 +14,7 @@ fn test_loading() -> Result<()> { }; { - let db = Arc::new(VickyStore::open(dir, config.clone())?); + let db = VickyStore::open(dir, config.clone())?; for i in 0..1000 { db.insert(&format!("unique key {i}"), LONG_VAL)?; @@ -27,7 +25,7 @@ fn test_loading() -> Result<()> { } { - let db = Arc::new(VickyStore::open(dir, config.clone())?); + let db = VickyStore::open(dir, config.clone())?; assert_eq!(db.iter().count(), 1000); @@ -56,7 +54,7 @@ fn test_loading() -> Result<()> { std::fs::write(format!("{dir}/shard_{start:04x}-{mid:04x}"), "xxxx")?; std::fs::write(format!("{dir}/shard_{mid:04x}-{end:04x}"), "xxxx")?; - let db = Arc::new(VickyStore::open(dir, config)?); + let db = VickyStore::open(dir, config)?; assert!(!std::fs::exists(format!("{dir}/top_1234-5678"))?); assert!(!std::fs::exists(format!("{dir}/bottom_1234-5678"))?); diff --git a/tests/test_logic.rs b/tests/test_logic.rs index edc4126..7250489 100644 --- a/tests/test_logic.rs +++ b/tests/test_logic.rs @@ -1,6 +1,6 @@ mod common; -use std::{collections::HashSet, sync::Arc}; +use std::collections::HashSet; use vicky_store::{Config, Result, Stats, VickyStore}; @@ -9,19 +9,22 @@ use crate::common::{run_in_tempdir, LONG_VAL}; #[test] fn test_logic() -> Result<()> { run_in_tempdir(|dir| { - let db = Arc::new(VickyStore::open( + let db = VickyStore::open( dir, Config { max_shard_size: 20 * 1024, // use small files to force lots of splits and compactions min_compaction_threashold: 10 * 1024, ..Default::default() }, - )?); + )?; assert!(db.get("my name")?.is_none()); db.insert("my_name", "inigo montoya")?; db.insert("your_name", "dread pirate robert")?; + assert!(db.contains("my_name")?); + assert!(!db.contains("My NaMe")?); + assert_eq!(db.get("my_name")?, Some("inigo montoya".into())); assert_eq!(db.get("your_name")?, Some("dread pirate robert".into())); db.insert("your_name", "vizzini")?; diff --git a/tests/test_modify_inplace.rs b/tests/test_modify_inplace.rs index 04eeaee..2bc38bf 100644 --- a/tests/test_modify_inplace.rs +++ b/tests/test_modify_inplace.rs @@ -1,7 +1,5 @@ mod common; -use std::sync::Arc; - use vicky_store::{Config, Result, VickyStore}; use crate::common::run_in_tempdir; @@ -9,14 +7,14 @@ use crate::common::run_in_tempdir; #[test] fn test_modify_inplace() -> Result<()> { run_in_tempdir(|dir| { - let db = Arc::new(VickyStore::open( + let db = VickyStore::open( dir, Config { max_shard_size: 20 * 1024, // use small files to force lots of splits and compactions min_compaction_threashold: 10 * 1024, ..Default::default() }, - )?); + )?; db.insert("aaa", "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa")?; diff --git a/tests/test_pre_split.rs b/tests/test_pre_split.rs index 6f13d4a..18d8389 100644 --- a/tests/test_pre_split.rs +++ b/tests/test_pre_split.rs @@ -1,7 +1,5 @@ mod common; -use std::sync::Arc; - use vicky_store::{Config, Result, VickyStore}; use crate::common::run_in_tempdir; @@ -9,7 +7,7 @@ use crate::common::run_in_tempdir; #[test] fn test_pre_split() -> Result<()> { run_in_tempdir(|dir| { - let db = Arc::new(VickyStore::open( + let db = VickyStore::open( dir, Config { max_shard_size: 20 * 1024, // use small files to force lots of splits and compactions @@ -17,7 +15,7 @@ fn test_pre_split() -> Result<()> { expected_number_of_keys: 1_000_000, ..Default::default() }, - )?); + )?; db.insert("aaa", "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa")?;