Skip to content

Commit

Permalink
Add modify_inplace
Browse files Browse the repository at this point in the history
  • Loading branch information
tomerfiliba committed Aug 4, 2024
1 parent 1b5caec commit 6dd3540
Show file tree
Hide file tree
Showing 5 changed files with 118 additions and 6 deletions.
3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@
name = "vicky-store"
version = "0.1.0"
edition = "2021"
license = "Apache-2.0"
keywords = ["key-value", "persistent", "store"]
description = "A lean, efficient and fast peristent in-process key-value store"

[dependencies]
memmap = "0.7.0"
Expand Down
4 changes: 3 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,17 @@ mod hashing;
mod shard;
mod store;

use std::fmt::{Display, Formatter};
pub use hashing::SecretKey;
pub use shard::Config;
use std::fmt::{Display, Formatter};
pub use store::{Stats, VickyStore};

#[derive(Debug)]
pub enum Error {
WrongSecretKeyLength,
KeyTooLong,
ValueTooLong,
KeyNotFound,
IOError(std::io::Error),
}

Expand All @@ -28,6 +29,7 @@ impl Display for Error {
match self {
Error::WrongSecretKeyLength => write!(f, "wrong secret length"),
Error::KeyTooLong => write!(f, "key too long"),
Error::KeyNotFound => write!(f, "key not found"),
Error::ValueTooLong => write!(f, "value too long"),
Error::IOError(err) => write!(f, "IO error: {err}"),
}
Expand Down
58 changes: 54 additions & 4 deletions src/shard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use std::{
use memmap::{MmapMut, MmapOptions};

use crate::hashing::{PartedHash, SecretKey, INVALID_SIG};
use crate::Result;
use crate::{Error, Result};

//
// these numbers were chosen according to the simulation, as they allow for 90% utilization of the shard with
Expand Down Expand Up @@ -57,6 +57,17 @@ pub struct Config {
pub secret_key: SecretKey,
}

impl Default for Config {
fn default() -> Self {
Self {
dir_path: PathBuf::new(),
max_shard_size: 64 * 1024 * 1024,
min_compaction_threashold: 8 * 1024 * 1024,
secret_key: SecretKey::new(b"kOYLu0xvq2WtzcKJ").unwrap(),
}
}
}

#[derive(Debug)]
pub(crate) enum InsertStatus {
Added,
Expand Down Expand Up @@ -161,11 +172,17 @@ impl Shard {
})
}

// reading doesn't require holding any locks - we only ever extend the file, never overwrite data
pub(crate) fn read_kv(&self, offset_and_size: u64) -> Result<Entry> {
#[inline]
fn extract_offset_and_size(offset_and_size: u64) -> (usize, usize, u64) {
let klen = (offset_and_size >> 48) as usize;
let vlen = ((offset_and_size >> 32) & 0xffff) as usize;
let offset = (offset_and_size as u32) as u64;
(klen, vlen, offset)
}

// reading doesn't require holding any locks - we only ever extend the file, never overwrite data
pub(crate) fn read_kv(&self, offset_and_size: u64) -> Result<Entry> {
let (klen, vlen, offset) = Self::extract_offset_and_size(offset_and_size);

let mut buf = vec![0u8; klen + vlen];
self.file.read_exact_at(&mut buf, HEADER_SIZE + offset)?;
Expand Down Expand Up @@ -306,8 +323,9 @@ impl Shard {
if let Some(idx) = row.signatures.iter().position_simd(INVALID_SIG) {
let new_off = self.write_kv(entry)?;

row.signatures[idx] = ph.signature;
row.offsets_and_sizes[idx] = new_off;
std::sync::atomic::fence(Ordering::SeqCst);
row.signatures[idx] = ph.signature;
self.header.num_inserted.fetch_add(1, Ordering::SeqCst);
Ok(InsertStatus::Added)
} else {
Expand All @@ -316,6 +334,38 @@ impl Shard {
}
}

// this is NOT crash safe (may produce inconsistent results)
pub(crate) fn modify_inplace(
&self,
ph: PartedHash,
key: &[u8],
patch: &[u8],
patch_offset: usize,
) -> Result<()> {
let (_guard, row) = self.get_row_mut(ph);

let mut start = 0;
while let Some(idx) = row.signatures[start..].iter().position_simd(ph.signature) {
let curr_entry = self.read_kv(row.offsets_and_sizes[idx])?;
if key == curr_entry.key {
let (klen, vlen, offset) =
Self::extract_offset_and_size(row.offsets_and_sizes[idx]);
if patch_offset + patch.len() > vlen as usize {
return Err(Error::ValueTooLong);
}

self.file.write_all_at(
patch,
HEADER_SIZE + offset + klen as u64 + patch_offset as u64,
)?;

return Ok(());
}
start = idx + 1;
}
Err(Error::KeyNotFound)
}

pub(crate) fn remove(&self, ph: PartedHash, key: &[u8]) -> Result<Option<Vec<u8>>> {
let (_guard, row) = self.get_row_mut(ph);
let mut start = 0;
Expand Down
25 changes: 25 additions & 0 deletions src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,31 @@ impl VickyStore {
self._insert(ph, entry)
}

// Modify an existing entry in-place, instead of creating a version. Note that the key must exist
// and `patch.len() + patch_offset` must be less than or equal to the current value's length.
// This method is guaranteed to never trigger a split or a compaction
//
// This is not crash-safe as it overwrite existing data, and thus may produce inconsistent results
// on crashes (part old data, part new data)
pub fn modify_inplace<B1: AsRef<[u8]> + ?Sized, B2: AsRef<[u8]> + ?Sized>(
&self,
key: &B1,
patch: &B2,
patch_offset: usize,
) -> Result<()> {
let key = key.as_ref();
let patch = patch.as_ref();
let ph = PartedHash::from_buffer(USER_NAMESPACE, &self.config.secret_key, key);
self.shards
.read()
.unwrap()
.lower_bound(Bound::Excluded(&(ph.shard_selector as u32)))
.peek_next()
.unwrap()
.1
.modify_inplace(ph, key, patch, patch_offset)
}

pub fn remove<B: AsRef<[u8]> + ?Sized>(&self, key: &B) -> Result<Option<Vec<u8>>> {
let key = key.as_ref();
let ph = PartedHash::from_buffer(USER_NAMESPACE, &self.config.secret_key, key);
Expand Down
34 changes: 33 additions & 1 deletion tests/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::{
};

use rand::random;
use vicky_store::{Config, Result, SecretKey, Stats, VickyStore};
use vicky_store::{Config, Error, Result, SecretKey, Stats, VickyStore};

fn run_in_tempdir(f: impl FnOnce(&str) -> Result<()>) -> Result<()> {
let rand: u64 = random();
Expand Down Expand Up @@ -258,3 +258,35 @@ fn test_multithreaded() -> Result<()> {
Ok(())
})
}

#[test]
fn test_modify_inplace() -> Result<()> {
run_in_tempdir(|dir| {
let db = Arc::new(VickyStore::open(Config {
dir_path: dir.into(),
max_shard_size: 20 * 1024, // use small files to force lots of splits and compactions
min_compaction_threashold: 10 * 1024,
secret_key: SecretKey::new("very very secret")?,
})?);

db.insert("aaa", "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa")?;

assert!(matches!(
db.modify_inplace("zzz", "bbb", 7),
Err(Error::KeyNotFound)
));

assert!(matches!(
db.modify_inplace("aaa", "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", 7),
Err(Error::ValueTooLong)
));

db.modify_inplace("aaa", "bbb", 7)?;
assert_eq!(
db.get("aaa")?,
Some("aaaaaaabbbaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa".into())
);

Ok(())
})
}

0 comments on commit 6dd3540

Please sign in to comment.