Skip to content

Commit

Permalink
Add namespaces; typed items now belong to TYPED_NAMESPACE
Browse files Browse the repository at this point in the history
  • Loading branch information
tomerfiliba committed Aug 6, 2024
1 parent 928bc5e commit e78138f
Show file tree
Hide file tree
Showing 6 changed files with 128 additions and 129 deletions.
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,10 @@ server, followed by the normal sharding mechanism described above.
* Requires nightly (for `simd_itertools` and BTree cursors), uses very little `unsafe` (required due to `mmap`)

## Roadmap
* Add TTL to keys (easy, but will screw up accounting)
* Add key-prefixes which can be used to implement grouping of keys into "families", i.e. lightweight indexing
by storing their parted-hash with an "anti collision" in a modifiable entry
* Distributed protocol based on file locks (meant to run on a shared network folder)
* Add some schema-like features, maybe using rkyv
* Add generations as an adapter on top, so that older generations are compacted into exponentially larger
time spans. It's an alternative to TTL, and amortizes the number of times an entry will move around as the
dataset grows.
75 changes: 11 additions & 64 deletions src/hashing.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
use std::hash::Hasher;

use siphasher::sip128::{Hash128, Hasher128, SipHasher24};
use siphasher::sip128::{Hash128, SipHasher24};

use crate::{Result, VickyError};

Expand Down Expand Up @@ -33,14 +31,17 @@ pub(crate) struct PartedHash {

pub(crate) const INVALID_SIG: u32 = 0;

pub(crate) const USER_NAMESPACE: u8 = 1;
//pub(crate) const TYPED_NAMESPACE: u8 = 2;
//pub(crate) const XUSER_NAMESPACE: &[u8] = &[1];
pub(crate) const USER_NAMESPACE: &[u8] = &[1];
pub(crate) const TYPED_NAMESPACE: &[u8] = &[2];

impl PartedHash {
#[allow(dead_code)]
pub const LEN: usize = size_of::<u64>();

pub fn new(seed: &HashSeed, buf: &[u8]) -> Self {
Self::from_hash(SipHasher24::new_with_key(&seed.0).hash(buf))
}

fn from_hash(h: Hash128) -> Self {
let mut signature = h.h1 as u32;
if signature == INVALID_SIG {
Expand All @@ -58,31 +59,14 @@ impl PartedHash {
signature,
}
}
pub fn from_buffer(namespace: u8, seed: &HashSeed, buf: &[u8]) -> Self {
// maybe use blake3?
let mut hasher = SipHasher24::new_with_key(&seed.0);
hasher.write_u8(namespace);
hasher.write(buf);
Self::from_hash(hasher.finish128())
}
pub fn from_buffers(seed: &HashSeed, bufs: &[&[u8]]) -> Self {
// maybe use blake3?
let mut hasher = SipHasher24::new_with_key(&seed.0);
for buf in bufs {
hasher.write(buf);
}
Self::from_hash(hasher.finish128())
}

// pub fn builder(seed: &HashSeed) -> PartedHashBuilder {
// PartedHashBuilder(SipHasher24::new_with_key(&seed.0))
// }
#[cfg(test)]
pub fn to_u64(&self) -> u64 {
((self.shard_selector as u64) << 48)
| ((self.row_selector as u64) << 32)
| (self.signature as u64)
}

// pub fn from_u64(val: u64) -> Self {
// Self {
// shard_selector: (val >> 48) as u16,
Expand All @@ -100,54 +84,17 @@ impl PartedHash {
// }
}

// pub(crate) struct PartedHashBuilder(SipHasher24);

// impl PartedHashBuilder {
// pub fn write(mut self, bytes: &[u8]) -> Self {
// self.0.write(bytes);
// self
// }
// pub fn write_parted_hash(mut self, v: PartedHash) -> Self {
// self.0.write_u64(v.to_u64());
// self
// }
// pub fn write_u64(mut self, v: u64) -> Self {
// self.0.write_u64(v);
// self
// }
// pub fn write_u32(mut self, v: u32) -> Self {
// self.0.write_u32(v);
// self
// }
// pub fn write_u8(mut self, v: u8) -> Self {
// self.0.write_u8(v);
// self
// }
// pub fn finish(self) -> PartedHash {
// PartedHash::from_hash(self.0.finish128())
// }
// }

#[test]
fn test_parted_hash() -> Result<()> {
HashSeed::new("1234").expect_err("shouldn't work");
HashSeed::new("12341234123412341").expect_err("shouldn't work");

let key = HashSeed::new("aaaabbbbccccdddd")?;
let seed = HashSeed::new("aaaabbbbccccdddd")?;

assert_eq!(
PartedHash::from_buffer(USER_NAMESPACE, &key, b"hello world").to_u64(),
12143172433256666175,
PartedHash::new(&seed, b"hello world").to_u64(),
13445180190757400308,
);

// assert_eq!(
// PartedHash::builder(&key)
// .write_u8(USER_NAMESPACE)
// .write(b"hello world")
// .finish()
// .to_u64(),
// 12143172433256666175,
// );

Ok(())
}
99 changes: 60 additions & 39 deletions src/insertion.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::ops::Bound;
use std::sync::atomic::Ordering;

use crate::hashing::{PartedHash, USER_NAMESPACE};
use crate::hashing::PartedHash;
use crate::shard::{InsertMode, InsertStatus, Shard};
use crate::store::VickyStore;
use crate::{Result, VickyError, MAX_TOTAL_KEY_SIZE, MAX_VALUE_SIZE};
Expand Down Expand Up @@ -86,9 +86,7 @@ impl VickyStore {

for res in removed_shard.unlocked_iter() {
let (k, v) = res?;
// XXX: this will not work with namespaces
let ph = PartedHash::from_buffer(USER_NAMESPACE, &self.config.hash_seed, &k);

let ph = PartedHash::new(&self.config.hash_seed, &k);
let status = compacted_shard.insert(ph, &k, &v, InsertMode::Set)?;
assert!(matches!(status, InsertStatus::Added), "{status:?}");
}
Expand Down Expand Up @@ -131,7 +129,7 @@ impl VickyStore {
for res in removed_shard.unlocked_iter() {
let (k, v) = res?;

let ph = PartedHash::from_buffer(USER_NAMESPACE, &self.config.hash_seed, &k);
let ph = PartedHash::new(&self.config.hash_seed, &k);
let status = if (ph.shard_selector as u32) < midpoint {
bottom_shard.insert(ph, &k, &v, InsertMode::Set)?
} else {
Expand Down Expand Up @@ -187,20 +185,21 @@ impl VickyStore {

pub(crate) fn insert_internal(
&self,
ph: PartedHash,
key: &[u8],
full_key: &[u8],
val: &[u8],
mode: InsertMode,
) -> Result<Option<Vec<u8>>> {
if key.len() > MAX_TOTAL_KEY_SIZE as usize {
let ph = PartedHash::new(&self.config.hash_seed, full_key);

if full_key.len() > MAX_TOTAL_KEY_SIZE as usize {
return Err(Box::new(VickyError::KeyTooLong));
}
if val.len() > MAX_VALUE_SIZE as usize {
return Err(Box::new(VickyError::ValueTooLong));
}

loop {
let (status, shard_start, shard_end) = self.try_insert(ph, key, val, mode)?;
let (status, shard_start, shard_end) = self.try_insert(ph, &full_key, val, mode)?;

match status {
InsertStatus::Added => {
Expand Down Expand Up @@ -228,6 +227,14 @@ impl VickyStore {
}
}

pub fn set_raw(&self, full_key: &[u8], val: &[u8]) -> Result<SetStatus> {
if let Some(prev) = self.insert_internal(full_key, val, InsertMode::Set)? {
Ok(SetStatus::PrevValue(prev))
} else {
Ok(SetStatus::CreatedNew)
}
}

/// Inserts a key-value pair, creating it or replacing an existing pair. Note that if the program crashed
/// while or "right after" this operation, or if the operating system is unable to flush the page cache,
/// you may lose some data. However, you will still be in a consistent state, where you will get a previous
Expand All @@ -241,11 +248,14 @@ impl VickyStore {
key: &B1,
val: &B2,
) -> Result<SetStatus> {
let ph = PartedHash::from_buffer(USER_NAMESPACE, &self.config.hash_seed, key.as_ref());
if let Some(prev) = self.insert_internal(ph, key.as_ref(), val.as_ref(), InsertMode::Set)? {
Ok(SetStatus::PrevValue(prev))
self.set_raw(&self.make_user_key(key.as_ref()), val.as_ref())
}

pub fn replace_raw(&self, full_key: &[u8], val: &[u8]) -> Result<ReplaceStatus> {
if let Some(prev) = self.insert_internal(full_key, val, InsertMode::Replace)? {
Ok(ReplaceStatus::PrevValue(prev))
} else {
Ok(SetStatus::CreatedNew)
Ok(ReplaceStatus::DoesNotExist)
}
}

Expand All @@ -259,13 +269,19 @@ impl VickyStore {
key: &B1,
val: &B2,
) -> Result<ReplaceStatus> {
let ph = PartedHash::from_buffer(USER_NAMESPACE, &self.config.hash_seed, key.as_ref());
if let Some(prev) =
self.insert_internal(ph, key.as_ref(), val.as_ref(), InsertMode::Replace)?
{
Ok(ReplaceStatus::PrevValue(prev))
self.replace_raw(&self.make_user_key(key.as_ref()), val.as_ref())
}

pub fn get_or_create_raw(
&self,
full_key: &[u8],
default_val: &[u8],
) -> Result<GetOrCreateStatus> {
let res = self.insert_internal(&full_key, default_val, InsertMode::GetOrCreate)?;
if let Some(prev) = res {
Ok(GetOrCreateStatus::ExistingValue(prev))
} else {
Ok(ReplaceStatus::DoesNotExist)
Ok(GetOrCreateStatus::CreatedNew(default_val.to_owned()))
}
}

Expand All @@ -280,15 +296,25 @@ impl VickyStore {
key: &B1,
default_val: &B2,
) -> Result<GetOrCreateStatus> {
let key = key.as_ref();
let default_val = default_val.as_ref();
let ph = PartedHash::from_buffer(USER_NAMESPACE, &self.config.hash_seed, key);
let res = self.insert_internal(ph, key, default_val, InsertMode::GetOrCreate)?;
if let Some(prev) = res {
Ok(GetOrCreateStatus::ExistingValue(prev))
} else {
Ok(GetOrCreateStatus::CreatedNew(default_val.to_owned()))
}
self.get_or_create_raw(&self.make_user_key(key.as_ref()), default_val.as_ref())
}

pub(crate) fn modify_inplace_raw(
&self,
full_key: &[u8],
patch: &[u8],
patch_offset: usize,
expected: Option<&[u8]>,
) -> Result<bool> {
let ph = PartedHash::new(&self.config.hash_seed, &full_key);
self.shards
.read()
.unwrap()
.lower_bound(Bound::Excluded(&(ph.shard_selector as u32)))
.peek_next()
.unwrap()
.1
.modify_inplace(ph, full_key, patch, patch_offset, expected)
}

/// Modifies an existing entry in-place, instead of creating a new version. Note that the key must exist
Expand All @@ -305,16 +331,11 @@ impl VickyStore {
patch_offset: usize,
expected: Option<&B2>,
) -> Result<bool> {
let key = key.as_ref();
let patch = patch.as_ref();
let ph = PartedHash::from_buffer(USER_NAMESPACE, &self.config.hash_seed, key);
self.shards
.read()
.unwrap()
.lower_bound(Bound::Excluded(&(ph.shard_selector as u32)))
.peek_next()
.unwrap()
.1
.modify_inplace(ph, key, patch, patch_offset, expected.map(|b| b.as_ref()))
self.modify_inplace_raw(
&self.make_user_key(key.as_ref()),
patch.as_ref(),
patch_offset,
expected.map(|b| b.as_ref()),
)
}
}
18 changes: 14 additions & 4 deletions src/shard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,14 +279,24 @@ impl Shard {
(guard, row)
}

/*pub(crate) fn insert_multikey(
&self,
keys: &[&[u8]],
val: &[u8],
mode: InsertMode,
) -> Result<InsertStatus> {
self.insert_fullkey(ph, &full_key, val, mode)
}*/

pub(crate) fn insert(
&self,
ph: PartedHash,
key: &[u8],
full_key: &[u8],
val: &[u8],
mode: InsertMode,
) -> Result<InsertStatus> {
if self.header.write_offset.load(Ordering::Relaxed) as u64 + (key.len() + val.len()) as u64
if self.header.write_offset.load(Ordering::Relaxed) as u64
+ (full_key.len() + val.len()) as u64
> self.config.max_shard_size as u64
{
if self.header.wasted_bytes.load(Ordering::Relaxed)
Expand All @@ -305,15 +315,15 @@ impl Shard {
// see if we replace an existing key
let (_guard, row) = self.get_row_mut(ph);

match self.try_replace(row, ph, key, val, mode)? {
match self.try_replace(row, ph, &full_key, val, mode)? {
TryReplaceStatus::KeyDoesNotExist => {
if matches!(mode, InsertMode::Replace) {
return Ok(InsertStatus::KeyDoesNotExist);
}

// find an empty slot
if let Some(idx) = row.signatures.iter().position_simd(INVALID_SIG) {
let new_off = self.write_kv(key, val)?;
let new_off = self.write_kv(&full_key, val)?;

// we don't want a reorder to happen here - first write the offset, then the signature
row.offsets_and_sizes[idx] = new_off;
Expand Down
Loading

0 comments on commit e78138f

Please sign in to comment.