Skip to content

Commit

Permalink
Stats: seprate to positive and negative lookups; use Relaxed ordering…
Browse files Browse the repository at this point in the history
…; add num_entries to shard header; add InsertMode::MustCreate
  • Loading branch information
tomerfiliba committed Aug 25, 2024
1 parent bf6efeb commit 0dd4235
Show file tree
Hide file tree
Showing 6 changed files with 94 additions and 52 deletions.
4 changes: 4 additions & 0 deletions src/hashing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ impl PartedHash {
self.0 as u32
}

pub fn as_u64(&self) -> u64 {
self.0
}

fn from_hash(h: Hash128) -> Self {
let mut sig = h.h1 as u32;
if sig == INVALID_SIG {
Expand Down
4 changes: 4 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ pub enum CandyError {
ValueTooLong(usize),
EntryCannotFitInShard(usize, usize),
KeyNotFound,
KeyAlreadyExists(Vec<u8>, u64),
CompactionFailed(String),
SplitFailed(String),
LoadingFailed(String),
Expand All @@ -84,6 +85,9 @@ impl Display for CandyError {
Self::KeyTooLong(sz) => write!(f, "key too long {sz}"),
Self::KeyNotFound => write!(f, "key not found"),
Self::ValueTooLong(sz) => write!(f, "value too long {sz}"),
Self::KeyAlreadyExists(key, ph) => {
write!(f, "key {key:?} already exists (0x{ph:016x})")
}
Self::EntryCannotFitInShard(sz, max) => {
write!(f, "entry too big ({sz}) for a single shard file ({max})")
}
Expand Down
1 change: 1 addition & 0 deletions src/lists.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ impl CandyStore {
// if the item already exists, it's already part of the list. just update it and preserve the index
if let Some(mut existing_val) = self.get_raw(&item_key)? {
match mode {
InsertMode::MustCreate => unreachable!(),
InsertMode::GetOrCreate => {
existing_val.truncate(existing_val.len() - size_of::<u64>());
return Ok(InsertToListStatus::ExistingValue(existing_val));
Expand Down
2 changes: 1 addition & 1 deletion src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ impl ShardRouter {
Ok(())
}

fn compact_shard(&self, write_offset: u32) -> Result<()> {
fn compact_shard(&self, write_offset: u64) -> Result<()> {
let mut guard = self.node.write();
let ShardNode::Leaf(sh) = &*guard else {
// was split
Expand Down
87 changes: 58 additions & 29 deletions src/shard.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use anyhow::{bail, ensure};
use anyhow::bail;
use bytemuck::{bytes_of_mut, Pod, Zeroable};
use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
use simd_itertools::PositionSimd;
Expand All @@ -9,19 +9,19 @@ use std::{
os::unix::fs::FileExt,
path::PathBuf,
sync::{
atomic::{AtomicU32, AtomicU64, Ordering},
atomic::{AtomicU64, Ordering},
Arc,
},
};

use memmap::{MmapMut, MmapOptions};

use crate::Result;
use crate::{
hashing::{PartedHash, INVALID_SIG},
stats::InternalStats,
store::InternalConfig,
};
use crate::{CandyError, Result};

//
// these numbers were chosen according to the simulation, as they allow for 90% utilization of the shard with
Expand Down Expand Up @@ -95,21 +95,22 @@ fn test_row_lookup() -> Result<()> {
#[repr(C, align(4096))]
struct PageAligned<T>(T);

pub(crate) const SHARD_FILE_MAGIC: u64 = 0x72745379646e6143; // "CandyStr"
pub(crate) const SHARD_FILE_VERSION: u64 = 0x07;
pub(crate) const SHARD_FILE_MAGIC: [u8; 8] = *b"CandyStr";
pub(crate) const SHARD_FILE_VERSION: u64 = 0x08;

#[derive(Clone, Copy, Default, Debug, Pod, Zeroable)]
#[repr(C)]
struct MetaHeader {
magic: u64,
magic: [u8; 8],
version: u64,
}

#[repr(C)]
struct ShardHeader {
metadata: MetaHeader,
wasted_bytes: AtomicU64,
write_offset: AtomicU32,
write_offset: AtomicU64,
num_entries: AtomicU64,
rows: PageAligned<[ShardRow; NUM_ROWS]>,
}

Expand All @@ -121,7 +122,7 @@ pub(crate) enum InsertStatus {
Added,
Replaced(Vec<u8>),
KeyDoesNotExist,
CompactionNeeded(u32),
CompactionNeeded(u64),
SplitNeeded,
AlreadyExists(Vec<u8>),
}
Expand All @@ -131,6 +132,7 @@ pub(crate) enum InsertMode<'a> {
Set,
Replace(Option<&'a [u8]>),
GetOrCreate,
MustCreate,
}

pub(crate) type KVPair = (Vec<u8>, Vec<u8>);
Expand Down Expand Up @@ -193,7 +195,7 @@ impl Shard {
file_size = 0;
} else {
bail!(
"unsupported magic=0x{:016x} version=0x{:016x} size={}",
"{filename:?} unsupported magic={:?} version=0x{:016x} size={}",
meta_header.magic,
meta_header.version,
file_size,
Expand Down Expand Up @@ -232,6 +234,15 @@ impl Shard {
let header = unsafe { &mut *(mmap.as_mut_ptr() as *mut ShardHeader) };
header.metadata.magic = SHARD_FILE_MAGIC;
header.metadata.version = SHARD_FILE_VERSION;

if file_size > 0 {
// if the shard existed before, update the stats
stats.num_inserts.fetch_add(
header.num_entries.load(Ordering::Relaxed) as usize,
Ordering::Relaxed,
);
}

let mut row_locks = Vec::with_capacity(NUM_ROWS);
for _ in 0..NUM_ROWS {
row_locks.push(RwLock::new(()));
Expand Down Expand Up @@ -275,8 +286,8 @@ impl Shard {

self.stats
.num_read_bytes
.fetch_add(buf.len(), Ordering::SeqCst);
self.stats.num_read_ops.fetch_add(1, Ordering::SeqCst);
.fetch_add(buf.len(), Ordering::Relaxed);
self.stats.num_read_ops.fetch_add(1, Ordering::Relaxed);

let val = buf[klen..klen + vlen].to_owned();
buf.truncate(klen);
Expand All @@ -296,7 +307,7 @@ impl Shard {
let write_offset = self
.header
.write_offset
.fetch_add(buf.len() as u32, Ordering::SeqCst) as u64;
.fetch_add(buf.len() as u64, Ordering::SeqCst) as u64;

// now writing can be non-atomic (pwrite)
self.file.write_all_at(&buf, HEADER_SIZE + write_offset)?;
Expand All @@ -319,8 +330,7 @@ impl Shard {
for res in self.unlocked_iter() {
let (k, v) = res?;
let ph = PartedHash::new(&self.config.hash_seed, &k);
let status = new_shard.insert(ph, &k, &v, InsertMode::Set, false)?;
ensure!(matches!(status, InsertStatus::Added));
new_shard.insert(ph, &k, &v, InsertMode::MustCreate, false)?;
}

Ok(())
Expand All @@ -330,15 +340,11 @@ impl Shard {
let (k, v) = res?;

let ph = PartedHash::new(&self.config.hash_seed, &k);
let status = if (ph.shard_selector() as u32) < bottom_shard.span.end {
bottom_shard.insert(ph, &k, &v, InsertMode::Set, false)?
if (ph.shard_selector() as u32) < bottom_shard.span.end {
bottom_shard.insert(ph, &k, &v, InsertMode::MustCreate, false)?;
} else {
top_shard.insert(ph, &k, &v, InsertMode::Set, false)?
};
ensure!(
matches!(status, InsertStatus::Added),
"{ph} key={k:?} already exists in new_shard"
);
top_shard.insert(ph, &k, &v, InsertMode::MustCreate, false)?;
}
}
Ok(())
}
Expand All @@ -363,26 +369,42 @@ impl Shard {
}

pub(crate) fn get_by_hash(&self, ph: PartedHash) -> Result<Vec<KVPair>> {
self.stats.num_lookups.fetch_add(1, Ordering::SeqCst);
let mut first_time = true;
let (_guard, row) = self.get_row(ph);
let mut kvs = Vec::with_capacity(1);
let mut start = 0;
while let Some(idx) = row.lookup(ph.signature(), &mut start) {
kvs.push(self.read_kv(row.offsets_and_sizes[idx])?);
if first_time {
self.stats
.num_positive_lookups
.fetch_add(1, Ordering::Relaxed);
first_time = false;
}
}
if kvs.is_empty() {
self.stats
.num_negative_lookups
.fetch_add(1, Ordering::Relaxed);
}
Ok(kvs)
}

pub(crate) fn get(&self, ph: PartedHash, key: &[u8]) -> Result<Option<Vec<u8>>> {
self.stats.num_lookups.fetch_add(1, Ordering::SeqCst);
let (_guard, row) = self.get_row(ph);
let mut start = 0;
while let Some(idx) = row.lookup(ph.signature(), &mut start) {
let (k, v) = self.read_kv(row.offsets_and_sizes[idx])?;
if key == k {
self.stats
.num_positive_lookups
.fetch_add(1, Ordering::Relaxed);
return Ok(Some(v));
}
}
self.stats
.num_negative_lookups
.fetch_add(1, Ordering::Relaxed);
Ok(None)
}

Expand Down Expand Up @@ -446,10 +468,15 @@ impl Shard {
continue;
}
match mode {
InsertMode::MustCreate => {
bail!(CandyError::KeyAlreadyExists(key.into(), ph.as_u64()))
}
InsertMode::GetOrCreate => {
// no-op, key already exists
if inc_stats {
self.stats.num_lookups.fetch_add(1, Ordering::SeqCst);
self.stats
.num_positive_lookups
.fetch_add(1, Ordering::Relaxed);
}
return Ok(TryReplaceStatus::KeyExistsNotReplaced(existing_val));
}
Expand All @@ -470,7 +497,7 @@ impl Shard {
.wasted_bytes
.fetch_add((k.len() + existing_val.len()) as u64, Ordering::SeqCst);
if inc_stats {
self.stats.num_updates.fetch_add(1, Ordering::SeqCst);
self.stats.num_updates.fetch_add(1, Ordering::Relaxed);
}
#[cfg(feature = "flush_aggregation")]
{
Expand Down Expand Up @@ -536,8 +563,9 @@ impl Shard {
std::sync::atomic::fence(Ordering::SeqCst);
row.signatures[idx] = ph.signature();
if inc_stats {
self.stats.num_inserts.fetch_add(1, Ordering::SeqCst);
self.stats.num_inserts.fetch_add(1, Ordering::Relaxed);
}
self.header.num_entries.fetch_add(1, Ordering::Relaxed);
#[cfg(feature = "flush_aggregation")]
{
drop(_guard);
Expand Down Expand Up @@ -565,7 +593,8 @@ impl Shard {
if key == k {
row.signatures[idx] = INVALID_SIG;
// we managed to remove this key
self.stats.num_removals.fetch_add(1, Ordering::SeqCst);
self.stats.num_removals.fetch_add(1, Ordering::Relaxed);
self.header.num_entries.fetch_sub(1, Ordering::Relaxed);
self.header
.wasted_bytes
.fetch_add((k.len() + v.len()) as u64, Ordering::Relaxed);
Expand All @@ -581,7 +610,7 @@ impl Shard {
Ok(None)
}

pub(crate) fn get_write_offset(&self) -> u32 {
pub(crate) fn get_write_offset(&self) -> u64 {
self.header.write_offset.load(Ordering::Relaxed)
}
pub(crate) fn get_wasted_bytes(&self) -> u64 {
Expand Down
Loading

0 comments on commit 0dd4235

Please sign in to comment.