Skip to content

Commit

Permalink
modify_inplace: add an expected value
Browse files Browse the repository at this point in the history
  • Loading branch information
tomerfiliba committed Aug 6, 2024
1 parent 92163f3 commit 928bc5e
Show file tree
Hide file tree
Showing 6 changed files with 137 additions and 96 deletions.
138 changes: 72 additions & 66 deletions src/hashing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,18 @@ use siphasher::sip128::{Hash128, Hasher128, SipHasher24};
use crate::{Result, VickyError};

#[derive(Debug, Clone, Copy)]
pub struct SecretKey([u8; 16]);
pub struct HashSeed([u8; 16]);

/// A struct that represents a "nonce" for seeding the hash function (keyed hash).
/// Keeping it secret is only meaningful if you're concerned with DoS attacks
impl SecretKey {
impl HashSeed {
pub const LEN: usize = size_of::<Self>();

/// Construct a SecretKey from the given byte buffer (must be 16 bytes in length)
///
/// Construct a HashSeed from the given byte buffer (must be 16 bytes in length)
pub fn new<B: AsRef<[u8]> + ?Sized>(key: &B) -> Result<Self> {
let key = key.as_ref();
if key.len() != Self::LEN {
return Err(Box::new(VickyError::WrongSecretKeyLength));
return Err(Box::new(VickyError::WrongHashSeedLength));
}
let mut bytes = [0u8; Self::LEN];
bytes.copy_from_slice(&key);
Expand All @@ -33,8 +32,10 @@ pub(crate) struct PartedHash {
}

pub(crate) const INVALID_SIG: u32 = 0;

pub(crate) const USER_NAMESPACE: u8 = 1;
//pub(crate) const TYPED_NAMESPACE: u8 = 2;
//pub(crate) const XUSER_NAMESPACE: &[u8] = &[1];

impl PartedHash {
#[allow(dead_code)]
Expand All @@ -57,91 +58,96 @@ impl PartedHash {
signature,
}
}
pub fn from_buffer(namespace: u8, key: &SecretKey, buf: &[u8]) -> Self {
pub fn from_buffer(namespace: u8, seed: &HashSeed, buf: &[u8]) -> Self {
// maybe use blake3?
let mut hasher = SipHasher24::new_with_key(&key.0);
let mut hasher = SipHasher24::new_with_key(&seed.0);
hasher.write_u8(namespace);
hasher.write(buf);
Self::from_hash(hasher.finish128())
}

#[allow(dead_code)]
pub fn builder(key: &SecretKey) -> PartedHashBuilder {
PartedHashBuilder(SipHasher24::new_with_key(&key.0))
pub fn from_buffers(seed: &HashSeed, bufs: &[&[u8]]) -> Self {
// maybe use blake3?
let mut hasher = SipHasher24::new_with_key(&seed.0);
for buf in bufs {
hasher.write(buf);
}
Self::from_hash(hasher.finish128())
}

#[allow(dead_code)]
// pub fn builder(seed: &HashSeed) -> PartedHashBuilder {
// PartedHashBuilder(SipHasher24::new_with_key(&seed.0))
// }
#[cfg(test)]
pub fn to_u64(&self) -> u64 {
((self.shard_selector as u64) << 48)
| ((self.row_selector as u64) << 32)
| (self.signature as u64)
}
#[allow(dead_code)]
pub fn from_u64(val: u64) -> Self {
Self {
shard_selector: (val >> 48) as u16,
row_selector: (val >> 32) as u16,
signature: val as u32,
}
}

#[allow(dead_code)]
pub fn as_bytes(&self) -> [u8; Self::LEN] {
self.to_u64().to_le_bytes()
}
#[allow(dead_code)]
pub fn from_bytes(b: &[u8]) -> Self {
assert_eq!(b.len(), Self::LEN);
let buf: [u8; 8] = [b[0], b[1], b[2], b[3], b[4], b[5], b[6], b[7]];
Self::from_u64(u64::from_le_bytes(buf))
}
// pub fn from_u64(val: u64) -> Self {
// Self {
// shard_selector: (val >> 48) as u16,
// row_selector: (val >> 32) as u16,
// signature: val as u32,
// }
// }
// pub fn as_bytes(&self) -> [u8; Self::LEN] {
// self.to_u64().to_le_bytes()
// }
// pub fn from_bytes(b: &[u8]) -> Self {
// assert_eq!(b.len(), Self::LEN);
// let buf: [u8; 8] = [b[0], b[1], b[2], b[3], b[4], b[5], b[6], b[7]];
// Self::from_u64(u64::from_le_bytes(buf))
// }
}

#[allow(dead_code)]
pub(crate) struct PartedHashBuilder(SipHasher24);

impl PartedHashBuilder {
#[allow(dead_code)]
pub fn write(mut self, bytes: &[u8]) -> Self {
self.0.write(bytes);
self
}
#[allow(dead_code)]
pub fn write_u32(mut self, v: u32) -> Self {
self.0.write_u32(v);
self
}
#[allow(dead_code)]
pub fn write_u8(mut self, v: u8) -> Self {
self.0.write_u8(v);
self
}
#[allow(dead_code)]
pub fn finish(self) -> PartedHash {
PartedHash::from_hash(self.0.finish128())
}
}
// pub(crate) struct PartedHashBuilder(SipHasher24);

// impl PartedHashBuilder {
// pub fn write(mut self, bytes: &[u8]) -> Self {
// self.0.write(bytes);
// self
// }
// pub fn write_parted_hash(mut self, v: PartedHash) -> Self {
// self.0.write_u64(v.to_u64());
// self
// }
// pub fn write_u64(mut self, v: u64) -> Self {
// self.0.write_u64(v);
// self
// }
// pub fn write_u32(mut self, v: u32) -> Self {
// self.0.write_u32(v);
// self
// }
// pub fn write_u8(mut self, v: u8) -> Self {
// self.0.write_u8(v);
// self
// }
// pub fn finish(self) -> PartedHash {
// PartedHash::from_hash(self.0.finish128())
// }
// }

#[test]
fn test_parted_hash() -> Result<()> {
SecretKey::new("1234").expect_err("shouldn't work");
SecretKey::new("12341234123412341").expect_err("shouldn't work");
HashSeed::new("1234").expect_err("shouldn't work");
HashSeed::new("12341234123412341").expect_err("shouldn't work");

let key = SecretKey::new("aaaabbbbccccdddd")?;
let key = HashSeed::new("aaaabbbbccccdddd")?;

assert_eq!(
PartedHash::from_buffer(USER_NAMESPACE, &key, b"hello world").to_u64(),
12143172433256666175,
);

assert_eq!(
PartedHash::builder(&key)
.write_u8(USER_NAMESPACE)
.write(b"hello world")
.finish()
.to_u64(),
12143172433256666175,
);
// assert_eq!(
// PartedHash::builder(&key)
// .write_u8(USER_NAMESPACE)
// .write(b"hello world")
// .finish()
// .to_u64(),
// 12143172433256666175,
// );

Ok(())
}
29 changes: 15 additions & 14 deletions src/insertion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::sync::atomic::Ordering;
use crate::hashing::{PartedHash, USER_NAMESPACE};
use crate::shard::{InsertMode, InsertStatus, Shard};
use crate::store::VickyStore;
use crate::{Result, VickyError};
use crate::{Result, VickyError, MAX_TOTAL_KEY_SIZE, MAX_VALUE_SIZE};

#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ReplaceStatus {
Expand Down Expand Up @@ -43,10 +43,10 @@ impl GetOrCreateStatus {
pub fn already_exists(&self) -> bool {
matches!(*self, Self::ExistingValue(_))
}
pub fn value(&self) -> &[u8] {
pub fn value(self) -> Vec<u8> {
match self {
Self::CreatedNew(val) => &val,
Self::ExistingValue(val) => &val,
Self::CreatedNew(val) => val,
Self::ExistingValue(val) => val,
}
}
}
Expand Down Expand Up @@ -87,7 +87,7 @@ impl VickyStore {
for res in removed_shard.unlocked_iter() {
let (k, v) = res?;
// XXX: this will not work with namespaces
let ph = PartedHash::from_buffer(USER_NAMESPACE, &self.config.secret_key, &k);
let ph = PartedHash::from_buffer(USER_NAMESPACE, &self.config.hash_seed, &k);

let status = compacted_shard.insert(ph, &k, &v, InsertMode::Set)?;
assert!(matches!(status, InsertStatus::Added), "{status:?}");
Expand Down Expand Up @@ -131,7 +131,7 @@ impl VickyStore {
for res in removed_shard.unlocked_iter() {
let (k, v) = res?;

let ph = PartedHash::from_buffer(USER_NAMESPACE, &self.config.secret_key, &k);
let ph = PartedHash::from_buffer(USER_NAMESPACE, &self.config.hash_seed, &k);
let status = if (ph.shard_selector as u32) < midpoint {
bottom_shard.insert(ph, &k, &v, InsertMode::Set)?
} else {
Expand Down Expand Up @@ -192,10 +192,10 @@ impl VickyStore {
val: &[u8],
mode: InsertMode,
) -> Result<Option<Vec<u8>>> {
if key.len() > u16::MAX as usize {
if key.len() > MAX_TOTAL_KEY_SIZE as usize {
return Err(Box::new(VickyError::KeyTooLong));
}
if val.len() > u16::MAX as usize {
if val.len() > MAX_VALUE_SIZE as usize {
return Err(Box::new(VickyError::ValueTooLong));
}

Expand Down Expand Up @@ -241,7 +241,7 @@ impl VickyStore {
key: &B1,
val: &B2,
) -> Result<SetStatus> {
let ph = PartedHash::from_buffer(USER_NAMESPACE, &self.config.secret_key, key.as_ref());
let ph = PartedHash::from_buffer(USER_NAMESPACE, &self.config.hash_seed, key.as_ref());
if let Some(prev) = self.insert_internal(ph, key.as_ref(), val.as_ref(), InsertMode::Set)? {
Ok(SetStatus::PrevValue(prev))
} else {
Expand All @@ -259,7 +259,7 @@ impl VickyStore {
key: &B1,
val: &B2,
) -> Result<ReplaceStatus> {
let ph = PartedHash::from_buffer(USER_NAMESPACE, &self.config.secret_key, key.as_ref());
let ph = PartedHash::from_buffer(USER_NAMESPACE, &self.config.hash_seed, key.as_ref());
if let Some(prev) =
self.insert_internal(ph, key.as_ref(), val.as_ref(), InsertMode::Replace)?
{
Expand All @@ -282,7 +282,7 @@ impl VickyStore {
) -> Result<GetOrCreateStatus> {
let key = key.as_ref();
let default_val = default_val.as_ref();
let ph = PartedHash::from_buffer(USER_NAMESPACE, &self.config.secret_key, key);
let ph = PartedHash::from_buffer(USER_NAMESPACE, &self.config.hash_seed, key);
let res = self.insert_internal(ph, key, default_val, InsertMode::GetOrCreate)?;
if let Some(prev) = res {
Ok(GetOrCreateStatus::ExistingValue(prev))
Expand All @@ -303,17 +303,18 @@ impl VickyStore {
key: &B1,
patch: &B2,
patch_offset: usize,
) -> Result<()> {
expected: Option<&B2>,
) -> Result<bool> {
let key = key.as_ref();
let patch = patch.as_ref();
let ph = PartedHash::from_buffer(USER_NAMESPACE, &self.config.secret_key, key);
let ph = PartedHash::from_buffer(USER_NAMESPACE, &self.config.hash_seed, key);
self.shards
.read()
.unwrap()
.lower_bound(Bound::Excluded(&(ph.shard_selector as u32)))
.peek_next()
.unwrap()
.1
.modify_inplace(ph, key, patch, patch_offset)
.modify_inplace(ph, key, patch, patch_offset, expected.map(|b| b.as_ref()))
}
}
18 changes: 13 additions & 5 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,15 @@ mod shard;
mod store;
mod typed;

pub use hashing::SecretKey;
pub use hashing::HashSeed;
pub use insertion::{GetOrCreateStatus, ReplaceStatus, SetStatus};
use std::fmt::{Display, Formatter};
pub use store::{Stats, VickyStore};
pub use typed::{VickyTypedKey, VickyTypedStore};

#[derive(Debug)]
pub enum VickyError {
WrongSecretKeyLength,
WrongHashSeedLength,
KeyTooLong,
ValueTooLong,
KeyNotFound,
Expand All @@ -23,7 +23,7 @@ pub enum VickyError {
impl Display for VickyError {
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
match self {
Self::WrongSecretKeyLength => write!(f, "wrong secret length"),
Self::WrongHashSeedLength => write!(f, "wrong shash seed length"),
Self::KeyTooLong => write!(f, "key too long"),
Self::KeyNotFound => write!(f, "key not found"),
Self::ValueTooLong => write!(f, "value too long"),
Expand All @@ -42,7 +42,7 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
pub struct Config {
pub max_shard_size: u32, // we don't want huge shards, because splitting would be expensive
pub min_compaction_threashold: u32, // should be ~10% of max_shard_size
pub secret_key: SecretKey, // just some entropy, not so important unless you fear DoS
pub hash_seed: HashSeed, // just some entropy, not so important unless you fear DoS
pub expected_number_of_keys: usize, // hint for creating number of shards accordingly)
}

Expand All @@ -51,8 +51,16 @@ impl Default for Config {
Self {
max_shard_size: 64 * 1024 * 1024,
min_compaction_threashold: 8 * 1024 * 1024,
secret_key: SecretKey::new(b"kOYLu0xvq2WtzcKJ").unwrap(),
hash_seed: HashSeed::new(b"kOYLu0xvq2WtzcKJ").unwrap(),
expected_number_of_keys: 0,
}
}
}

pub(crate) const MAX_TOTAL_KEY_SIZE: usize = 0x3fff; // 14 bits
pub(crate) const NAMESPACING_RESERVED_SIZE: usize = 0xff;
pub const MAX_KEY_SIZE: usize = MAX_TOTAL_KEY_SIZE - NAMESPACING_RESERVED_SIZE;
pub const MAX_VALUE_SIZE: usize = 0xffff;

const _: () = assert!(MAX_KEY_SIZE <= u16::MAX as usize);
const _: () = assert!(MAX_VALUE_SIZE <= u16::MAX as usize);
13 changes: 10 additions & 3 deletions src/shard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -340,25 +340,32 @@ impl Shard {
key: &[u8],
patch: &[u8],
patch_offset: usize,
) -> Result<()> {
expected: Option<&[u8]>,
) -> Result<bool> {
let (_guard, row) = self.get_row_mut(ph);

let mut start = 0;
while let Some(idx) = row.signatures[start..].iter().position_simd(ph.signature) {
let (k, _) = self.read_kv(row.offsets_and_sizes[idx])?;
let (k, v) = self.read_kv(row.offsets_and_sizes[idx])?;
if key == k {
let (klen, vlen, offset) =
Self::extract_offset_and_size(row.offsets_and_sizes[idx]);
if patch_offset + patch.len() > vlen as usize {
return Err(Box::new(VickyError::ValueTooLong));
}

if let Some(expected) = expected {
if &v[patch_offset..patch_offset + patch.len()] != expected {
return Ok(false);
}
}

self.file.write_all_at(
patch,
HEADER_SIZE + offset + klen as u64 + patch_offset as u64,
)?;

return Ok(());
return Ok(true);
}
start = idx + 1;
}
Expand Down
Loading

0 comments on commit 928bc5e

Please sign in to comment.