Skip to content

Commit

Permalink
Add get_or_insert_default
Browse files Browse the repository at this point in the history
  • Loading branch information
tomerfiliba committed Aug 5, 2024
1 parent b87255c commit 2fd4135
Show file tree
Hide file tree
Showing 8 changed files with 129 additions and 41 deletions.
47 changes: 37 additions & 10 deletions src/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::ops::Bound;
use std::sync::atomic::Ordering;

use crate::hashing::{PartedHash, USER_NAMESPACE};
use crate::shard::{InsertStatus, Shard};
use crate::shard::{InsertMode, InsertStatus, Shard};
use crate::store::VickyStore;
use crate::{Result, VickyError};

Expand Down Expand Up @@ -44,7 +44,7 @@ impl VickyStore {
// XXX: this will not work with namespaces
let ph = PartedHash::from_buffer(USER_NAMESPACE, &self.config.secret_key, &k);

let status = compacted_shard.insert(ph, &k, &v)?;
let status = compacted_shard.insert(ph, &k, &v, InsertMode::Overwrite)?;
assert!(matches!(status, InsertStatus::Added), "{status:?}");
}

Expand Down Expand Up @@ -88,9 +88,9 @@ impl VickyStore {

let ph = PartedHash::from_buffer(USER_NAMESPACE, &self.config.secret_key, &k);
let status = if (ph.shard_selector as u32) < midpoint {
bottom_shard.insert(ph, &k, &v)?
bottom_shard.insert(ph, &k, &v, InsertMode::Overwrite)?
} else {
top_shard.insert(ph, &k, &v)?
top_shard.insert(ph, &k, &v, InsertMode::Overwrite)?
};
assert!(matches!(status, InsertStatus::Added), "{status:?}");
}
Expand Down Expand Up @@ -126,6 +126,7 @@ impl VickyStore {
ph: PartedHash,
key: &[u8],
val: &[u8],
mode: InsertMode,
) -> Result<(InsertStatus, u32, u32)> {
let guard = self.shards.read().unwrap();
let cursor = guard.lower_bound(Bound::Excluded(&(ph.shard_selector as u32)));
Expand All @@ -134,12 +135,18 @@ impl VickyStore {
.map(|(&shard_start, _)| shard_start)
.unwrap_or(0);
let (shard_end, shard) = cursor.peek_next().unwrap();
let status = shard.insert(ph, key, val)?;
let status = shard.insert(ph, key, val, mode)?;

Ok((status, shard_start, *shard_end))
}

pub(crate) fn insert_internal(&self, ph: PartedHash, key: &[u8], val: &[u8]) -> Result<()> {
pub(crate) fn insert_internal(
&self,
ph: PartedHash,
key: &[u8],
val: &[u8],
mode: InsertMode,
) -> Result<Option<Vec<u8>>> {
if key.len() > u16::MAX as usize {
return Err(Box::new(VickyError::KeyTooLong));
}
Expand All @@ -148,15 +155,18 @@ impl VickyStore {
}

loop {
let (status, shard_start, shard_end) = self.try_insert(ph, key, val)?;
let (status, shard_start, shard_end) = self.try_insert(ph, key, val, mode)?;

match status {
InsertStatus::Added => {
self.num_entries.fetch_add(1, Ordering::SeqCst);
return Ok(());
return Ok(None);
}
InsertStatus::Replaced => {
return Ok(());
return Ok(None);
}
InsertStatus::AlreadyExists(existing_val) => {
return Ok(Some(existing_val));
}
InsertStatus::CompactionNeeded(write_offset) => {
self.compact(shard_end, write_offset)?;
Expand All @@ -183,7 +193,24 @@ impl VickyStore {
val: &B2,
) -> Result<()> {
let ph = PartedHash::from_buffer(USER_NAMESPACE, &self.config.secret_key, key.as_ref());
self.insert_internal(ph, key.as_ref(), val.as_ref())
self.insert_internal(ph, key.as_ref(), val.as_ref(), InsertMode::Overwrite)?;
Ok(())
}

/// Gets the value of an entry or inserts the given default value. If the value existed, returns `Some(value)`.
/// If the value was created by this operation, `None`` is returned
pub fn get_or_insert_default<B1: AsRef<[u8]> + ?Sized, B2: AsRef<[u8]> + ?Sized>(
&self,
key: &B1,
default_val: &B2,
) -> Result<Option<Vec<u8>>> {
let ph = PartedHash::from_buffer(USER_NAMESPACE, &self.config.secret_key, key.as_ref());
self.insert_internal(
ph,
key.as_ref(),
default_val.as_ref(),
InsertMode::GetOrInsert,
)
}

/// Modifies an existing entry in-place, instead of creating a version. Note that the key must exist
Expand Down
54 changes: 39 additions & 15 deletions src/shard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,13 @@ pub(crate) enum InsertStatus {
Replaced,
CompactionNeeded(u32),
SplitNeeded,
AlreadyExists(Vec<u8>),
}

#[derive(Debug, Clone, Copy)]
pub(crate) enum InsertMode {
Overwrite,
GetOrInsert,
}

pub(crate) struct ByHashIterator<'a> {
Expand Down Expand Up @@ -225,26 +232,32 @@ impl Shard {
ph: PartedHash,
key: &[u8],
val: &[u8],
) -> Result<bool> {
mode: InsertMode,
) -> Result<(bool, Option<Vec<u8>>)> {
let mut start = 0;
while let Some(idx) = row.signatures[start..].iter().position_simd(ph.signature) {
let (k, v) = self.read_kv(row.offsets_and_sizes[idx])?;
if key == k {
// optimization
if val == v {
return Ok(true);
match mode {
InsertMode::GetOrInsert => {
// no-op, key already exists
return Ok((true, Some(v)));
}
InsertMode::Overwrite => {
// optimization
if val != v {
row.offsets_and_sizes[idx] = self.write_kv(key, val)?;
self.header
.wasted_bytes
.fetch_add((k.len() + v.len()) as u64, Ordering::SeqCst);
}
return Ok((true, None));
}
}

row.offsets_and_sizes[idx] = self.write_kv(key, val)?;
self.header
.wasted_bytes
.fetch_add((k.len() + v.len()) as u64, Ordering::SeqCst);

return Ok(true);
}
start = idx + 1;
}
Ok(false)
Ok((false, None))
}

fn get_row_mut(&self, ph: PartedHash) -> (RwLockWriteGuard<()>, &mut ShardRow) {
Expand All @@ -258,7 +271,13 @@ impl Shard {
(guard, row)
}

pub(crate) fn insert(&self, ph: PartedHash, key: &[u8], val: &[u8]) -> Result<InsertStatus> {
pub(crate) fn insert(
&self,
ph: PartedHash,
key: &[u8],
val: &[u8],
mode: InsertMode,
) -> Result<InsertStatus> {
if self.header.write_offset.load(Ordering::Relaxed) as u64 + (key.len() + val.len()) as u64
> self.config.max_shard_size as u64
{
Expand All @@ -278,8 +297,13 @@ impl Shard {
// see if we replace an existing key
let (_guard, row) = self.get_row_mut(ph);

if self.try_replace(row, ph, key, val)? {
return Ok(InsertStatus::Replaced);
let (found, existing_val) = self.try_replace(row, ph, key, val, mode)?;
if found {
if let Some(existing_val) = existing_val {
return Ok(InsertStatus::AlreadyExists(existing_val));
} else {
return Ok(InsertStatus::Replaced);
}
}

// find an empty slot
Expand Down
6 changes: 6 additions & 0 deletions src/typed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,12 @@ where
self.store.insert(&kbytes, &vbytes)
}

pub fn get_or_insert_default(&self, k: K, v: V) -> Result<Option<Vec<u8>>> {
let kbytes = Self::make_key(&k);
let vbytes = v.to_bytes::<LE>();
self.store.get_or_insert_default(&kbytes, &vbytes)
}

pub fn remove<Q: ?Sized + Encode>(&self, k: &Q) -> Result<Option<V>>
where
K: Borrow<Q>,
Expand Down
34 changes: 34 additions & 0 deletions tests/test_get_or_insert.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
mod common;

use vicky_store::{Config, Result, VickyStore};

use crate::common::run_in_tempdir;

#[test]
fn test_get_or_insert_default() -> Result<()> {
run_in_tempdir(|dir| {
let db = VickyStore::open(
dir,
Config {
max_shard_size: 20 * 1024, // use small files to force lots of splits and compactions
min_compaction_threashold: 10 * 1024,
expected_number_of_keys: 1_000_000,
..Default::default()
},
)?;

db.insert("aaa", "1111")?;
assert_eq!(
db.get_or_insert_default("aaa", "2222")?,
Some("1111".into())
);

assert_eq!(db.get_or_insert_default("bbbb", "2222")?, None);
assert_eq!(
db.get_or_insert_default("bbbb", "3333")?,
Some("2222".into())
);

Ok(())
})
}
8 changes: 3 additions & 5 deletions tests/test_loading.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
mod common;

use std::sync::Arc;

use vicky_store::{Config, Result, VickyStore};

use crate::common::{run_in_tempdir, LONG_VAL};
Expand All @@ -16,7 +14,7 @@ fn test_loading() -> Result<()> {
};

{
let db = Arc::new(VickyStore::open(dir, config.clone())?);
let db = VickyStore::open(dir, config.clone())?;

for i in 0..1000 {
db.insert(&format!("unique key {i}"), LONG_VAL)?;
Expand All @@ -27,7 +25,7 @@ fn test_loading() -> Result<()> {
}

{
let db = Arc::new(VickyStore::open(dir, config.clone())?);
let db = VickyStore::open(dir, config.clone())?;

assert_eq!(db.iter().count(), 1000);

Expand Down Expand Up @@ -56,7 +54,7 @@ fn test_loading() -> Result<()> {
std::fs::write(format!("{dir}/shard_{start:04x}-{mid:04x}"), "xxxx")?;
std::fs::write(format!("{dir}/shard_{mid:04x}-{end:04x}"), "xxxx")?;

let db = Arc::new(VickyStore::open(dir, config)?);
let db = VickyStore::open(dir, config)?;

assert!(!std::fs::exists(format!("{dir}/top_1234-5678"))?);
assert!(!std::fs::exists(format!("{dir}/bottom_1234-5678"))?);
Expand Down
9 changes: 6 additions & 3 deletions tests/test_logic.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
mod common;

use std::{collections::HashSet, sync::Arc};
use std::collections::HashSet;

use vicky_store::{Config, Result, Stats, VickyStore};

Expand All @@ -9,19 +9,22 @@ use crate::common::{run_in_tempdir, LONG_VAL};
#[test]
fn test_logic() -> Result<()> {
run_in_tempdir(|dir| {
let db = Arc::new(VickyStore::open(
let db = VickyStore::open(
dir,
Config {
max_shard_size: 20 * 1024, // use small files to force lots of splits and compactions
min_compaction_threashold: 10 * 1024,
..Default::default()
},
)?);
)?;

assert!(db.get("my name")?.is_none());
db.insert("my_name", "inigo montoya")?;
db.insert("your_name", "dread pirate robert")?;

assert!(db.contains("my_name")?);
assert!(!db.contains("My NaMe")?);

assert_eq!(db.get("my_name")?, Some("inigo montoya".into()));
assert_eq!(db.get("your_name")?, Some("dread pirate robert".into()));
db.insert("your_name", "vizzini")?;
Expand Down
6 changes: 2 additions & 4 deletions tests/test_modify_inplace.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,20 @@
mod common;

use std::sync::Arc;

use vicky_store::{Config, Result, VickyStore};

use crate::common::run_in_tempdir;

#[test]
fn test_modify_inplace() -> Result<()> {
run_in_tempdir(|dir| {
let db = Arc::new(VickyStore::open(
let db = VickyStore::open(
dir,
Config {
max_shard_size: 20 * 1024, // use small files to force lots of splits and compactions
min_compaction_threashold: 10 * 1024,
..Default::default()
},
)?);
)?;

db.insert("aaa", "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa")?;

Expand Down
6 changes: 2 additions & 4 deletions tests/test_pre_split.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,21 @@
mod common;

use std::sync::Arc;

use vicky_store::{Config, Result, VickyStore};

use crate::common::run_in_tempdir;

#[test]
fn test_pre_split() -> Result<()> {
run_in_tempdir(|dir| {
let db = Arc::new(VickyStore::open(
let db = VickyStore::open(
dir,
Config {
max_shard_size: 20 * 1024, // use small files to force lots of splits and compactions
min_compaction_threashold: 10 * 1024,
expected_number_of_keys: 1_000_000,
..Default::default()
},
)?);
)?;

db.insert("aaa", "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa")?;

Expand Down

0 comments on commit 2fd4135

Please sign in to comment.