Skip to content

Commit

Permalink
Shard: cleaner separation of pub(crate) and private APIs
Browse files Browse the repository at this point in the history
  • Loading branch information
tomerfiliba committed Aug 20, 2024
1 parent 5c58182 commit 91e7835
Show file tree
Hide file tree
Showing 6 changed files with 128 additions and 117 deletions.
3 changes: 0 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,5 @@ fslock = "0.2.1"
[features]
whitebox_testing = []

[dev-dependencies]
rand = "0.8.5"

[workspace]
members = ["simulator", "candy-crasher", "candy-longliving", "candy-perf", "mini-candy", "test-list-collisions"]
34 changes: 4 additions & 30 deletions src/insertion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,7 @@ impl CandyStore {
if guard
.get(&shard_end)
.with_context(|| format!("missing shard {shard_end}"))?
.header
.write_offset
.load(Ordering::Relaxed)
.get_write_offset()
< write_offset
{
return Ok(false);
Expand All @@ -88,7 +86,7 @@ impl CandyStore {
"compact_{:04x}-{:04x}",
removed_shard.span.start, removed_shard.span.end
));
let compacted_shard = Shard::open(
let mut compacted_shard = Shard::open(
tmpfile.clone(),
removed_shard.span.clone(),
true,
Expand All @@ -97,17 +95,7 @@ impl CandyStore {

self.num_compactions.fetch_add(1, Ordering::SeqCst);

for res in removed_shard.unlocked_iter() {
let (k, v) = res?;
let ph = PartedHash::new(&self.config.hash_seed, &k);
let status = compacted_shard.insert(ph, &k, &v, InsertMode::Set)?;
if !matches!(status, InsertStatus::Added) {
return Err(anyhow!(CandyError::CompactionFailed(format!(
"{ph:?} [{}..{}] shard {status:?} k={k:?} v={v:?}",
removed_shard.span.start, removed_shard.span.end
))));
}
}
removed_shard.compact_into(&mut compacted_shard)?;

std::fs::rename(tmpfile, &orig_filename)?;
guard.insert(shard_end, compacted_shard);
Expand Down Expand Up @@ -146,21 +134,7 @@ impl CandyStore {
self.config.clone(),
)?;

for res in removed_shard.unlocked_iter() {
let (k, v) = res?;

let ph = PartedHash::new(&self.config.hash_seed, &k);
let status = if (ph.shard_selector() as u32) < midpoint {
bottom_shard.insert(ph, &k, &v, InsertMode::Set)?
} else {
top_shard.insert(ph, &k, &v, InsertMode::Set)?
};
if !matches!(status, InsertStatus::Added) {
return Err(anyhow!(CandyError::SplitFailed(format!(
"{ph:?} {status:?} [{shard_start} {midpoint} {shard_end}] k={k:?} v={v:?}",
))));
}
}
removed_shard.split_into(&bottom_shard, &top_shard)?;

self.num_splits.fetch_add(1, Ordering::SeqCst);

Expand Down
5 changes: 1 addition & 4 deletions src/lists.rs
Original file line number Diff line number Diff line change
Expand Up @@ -534,10 +534,7 @@ impl CandyStore {
let item_ph = *from_bytes::<PartedHash>(&item_ph_bytes);

// handle unlikely (but possible) collisions on item_ph
for kv in self.get_by_hash(item_ph)? {
let Ok((mut k, mut v)) = kv else {
continue;
};
for (mut k, mut v) in self.get_by_hash(item_ph)? {
if v.ends_with(&idx.to_le_bytes()) {
if truncate {
v.truncate(v.len() - size_of::<u64>());
Expand Down
162 changes: 101 additions & 61 deletions src/shard.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use anyhow::ensure;
use bytemuck::{bytes_of, Pod, Zeroable};
use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
use simd_itertools::PositionSimd;
use std::{
Expand All @@ -13,7 +15,10 @@ use std::{

use memmap::{MmapMut, MmapOptions};

use crate::hashing::{PartedHash, INVALID_SIG};
use crate::{
hashing::{PartedHash, INVALID_SIG},
SizeHistogram,
};
use crate::{Config, Result};

//
Expand All @@ -28,9 +33,9 @@ pub(crate) const NUM_ROWS: usize = 64;
pub(crate) const ROW_WIDTH: usize = 512;

#[repr(C)]
pub(crate) struct ShardRow {
pub signatures: [u32; ROW_WIDTH],
pub offsets_and_sizes: [u64; ROW_WIDTH], // | key_size: 16 | val_size: 16 | file_offset: 32 |
struct ShardRow {
signatures: [u32; ROW_WIDTH],
offsets_and_sizes: [u64; ROW_WIDTH], // | key_size: 16 | val_size: 16 | file_offset: 32 |
}

impl ShardRow {
Expand Down Expand Up @@ -86,13 +91,13 @@ fn test_row_lookup() -> Result<()> {
}

#[repr(C, align(4096))]
pub(crate) struct PageAligned<T>(pub T);
struct PageAligned<T>(T);

#[repr(C)]
pub(crate) struct ShardSizeHistogram {
pub counts_64b: [AtomicU32; 16],
pub counts_1kb: [AtomicU32; 15],
pub counts_16kb: [AtomicU32; 4],
struct ShardSizeHistogram {
counts_64b: [AtomicU32; 16],
counts_1kb: [AtomicU32; 15],
counts_16kb: [AtomicU32; 4],
}

impl ShardSizeHistogram {
Expand Down Expand Up @@ -141,13 +146,13 @@ fn test_shard_size_histogram() {
}

#[repr(C)]
pub(crate) struct ShardHeader {
pub num_inserted: AtomicU64,
pub num_removed: AtomicU64,
pub wasted_bytes: AtomicU64,
pub write_offset: AtomicU32,
pub size_histogram: ShardSizeHistogram,
pub rows: PageAligned<[ShardRow; NUM_ROWS]>,
struct ShardHeader {
num_inserted: AtomicU64,
num_removed: AtomicU64,
wasted_bytes: AtomicU64,
write_offset: AtomicU32,
size_histogram: ShardSizeHistogram,
rows: PageAligned<[ShardRow; NUM_ROWS]>,
}

pub(crate) const HEADER_SIZE: u64 = size_of::<ShardHeader>() as u64;
Expand All @@ -170,28 +175,10 @@ pub(crate) enum InsertMode<'a> {
GetOrCreate,
}

pub(crate) struct ByHashIterator<'a> {
shard: &'a Shard,
_guard: RwLockReadGuard<'a, ()>,
row: &'a ShardRow,
signature: u32,
start_idx: usize,
}

pub(crate) type KVPair = (Vec<u8>, Vec<u8>);

impl<'a> Iterator for ByHashIterator<'a> {
type Item = Result<KVPair>;
fn next(&mut self) -> Option<Self::Item> {
if let Some(idx) = self.row.lookup(self.signature, &mut self.start_idx) {
Some(self.shard.read_kv(self.row.offsets_and_sizes[idx]))
} else {
None
}
}
}

#[derive(Default, Debug, Clone, Copy)]
#[derive(Default, Debug, Clone, Copy, Pod, Zeroable)]
#[repr(C)]
struct Backpointer(u32);

impl Backpointer {
Expand All @@ -201,6 +188,8 @@ impl Backpointer {
// | idx | idx | size |
// | (6) | (9) | (17) |
// +-----+-----+----------+
const SZ: u64 = size_of::<Self>() as u64;

fn new(row_idx: u16, sig_idx: u16, entry_size: usize) -> Self {
debug_assert!((row_idx as usize % NUM_ROWS) < (1 << 6), "{row_idx}");
debug_assert!(sig_idx < (1 << 9), "{sig_idx}");
Expand Down Expand Up @@ -238,8 +227,8 @@ pub(crate) struct Shard {
config: Arc<Config>,
#[allow(dead_code)]
mmap: MmapMut, // needed to prevent it from dropping
pub(crate) header: &'static mut ShardHeader,
pub(crate) row_locks: Vec<RwLock<()>>,
header: &'static mut ShardHeader,
row_locks: Vec<RwLock<()>>,
}

enum TryReplaceStatus {
Expand Down Expand Up @@ -296,13 +285,8 @@ impl Shard {
Ok(())
}

// #[inline]
// fn is_special_offset(offset_and_size: u64) -> bool {
// (offset_and_size >> 62) != 0
// }

#[inline]
pub(crate) fn extract_offset_and_size(offset_and_size: u64) -> (usize, usize, u64) {
fn extract_offset_and_size(offset_and_size: u64) -> (usize, usize, u64) {
let klen = (offset_and_size >> 48) as usize;
debug_assert_eq!(klen >> 14, 0, "attempting to read a special key");
let vlen = ((offset_and_size >> 32) & 0xffff) as usize;
Expand All @@ -312,12 +296,10 @@ impl Shard {

// reading doesn't require holding any locks - we only ever extend the file, never overwrite data
fn read_kv(&self, offset_and_size: u64) -> Result<KVPair> {
const BP: u64 = size_of::<Backpointer>() as u64;

let (klen, vlen, offset) = Self::extract_offset_and_size(offset_and_size);
let mut buf = vec![0u8; klen + vlen];
self.file
.read_exact_at(&mut buf, HEADER_SIZE + BP + offset)?;
.read_exact_at(&mut buf, HEADER_SIZE + Backpointer::SZ + offset)?;

let val = buf[klen..klen + vlen].to_owned();
buf.truncate(klen);
Expand All @@ -327,12 +309,11 @@ impl Shard {

// writing doesn't require holding any locks since we write with an offset
fn write_kv(&self, row_idx: u16, sig_idx: u16, key: &[u8], val: &[u8]) -> Result<u64> {
const BP: usize = size_of::<Backpointer>();

const BP: usize = Backpointer::SZ as usize;
let entry_size = key.len() + val.len();
let mut buf = vec![0u8; BP + entry_size];
let bp = Backpointer::new(row_idx, sig_idx, entry_size);
buf[..BP].copy_from_slice(&bp.0.to_le_bytes());
buf[..BP].copy_from_slice(bytes_of(&bp));
buf[BP..BP + key.len()].copy_from_slice(key);
buf[BP + key.len()..].copy_from_slice(val);

Expand Down Expand Up @@ -360,7 +341,35 @@ impl Shard {
}
}

pub(crate) fn unlocked_iter<'b>(&'b self) -> impl Iterator<Item = Result<KVPair>> + 'b {
pub(crate) fn compact_into(&self, new_shard: &mut Shard) -> Result<()> {
for res in self.unlocked_iter() {
let (k, v) = res?;
let ph = PartedHash::new(&self.config.hash_seed, &k);
let status = new_shard.insert(ph, &k, &v, InsertMode::Set)?;
ensure!(matches!(status, InsertStatus::Added));
}

Ok(())
}
pub(crate) fn split_into(&self, bottom_shard: &Shard, top_shard: &Shard) -> Result<()> {
for res in self.unlocked_iter() {
let (k, v) = res?;

let ph = PartedHash::new(&self.config.hash_seed, &k);
let status = if (ph.shard_selector() as u32) < bottom_shard.span.end {
bottom_shard.insert(ph, &k, &v, InsertMode::Set)?
} else {
top_shard.insert(ph, &k, &v, InsertMode::Set)?
};
ensure!(
matches!(status, InsertStatus::Added),
"{ph} key={k:?} already exists in new_shard"
);
}
Ok(())
}

fn unlocked_iter<'b>(&'b self) -> impl Iterator<Item = Result<KVPair>> + 'b {
self.header.rows.0.iter().flat_map(|row| {
row.signatures.iter().enumerate().filter_map(|(idx, &sig)| {
if sig == INVALID_SIG {
Expand All @@ -372,22 +381,28 @@ impl Shard {
})
}

pub(crate) fn iter_by_hash<'a>(&'a self, ph: PartedHash) -> ByHashIterator<'a> {
fn get_row(&self, ph: PartedHash) -> (RwLockReadGuard<()>, &ShardRow) {
let row_idx = (ph.row_selector() as usize) % NUM_ROWS;
let guard = self.row_locks[row_idx].read();
let row = &self.header.rows.0[row_idx];
ByHashIterator {
shard: &self,
_guard: guard,
row,
signature: ph.signature(),
start_idx: 0,
(guard, row)
}

pub(crate) fn get_by_hash(&self, ph: PartedHash) -> Result<Vec<KVPair>> {
let (_guard, row) = self.get_row(ph);
let mut kvs = Vec::with_capacity(1);
let mut start = 0;
while let Some(idx) = row.lookup(ph.signature(), &mut start) {
kvs.push(self.read_kv(row.offsets_and_sizes[idx])?);
}
Ok(kvs)
}

pub(crate) fn get(&self, ph: PartedHash, key: &[u8]) -> Result<Option<Vec<u8>>> {
for res in self.iter_by_hash(ph) {
let (k, v) = res?;
let (_guard, row) = self.get_row(ph);
let mut start = 0;
while let Some(idx) = row.lookup(ph.signature(), &mut start) {
let (k, v) = self.read_kv(row.offsets_and_sizes[idx])?;
if key == k {
return Ok(Some(v));
}
Expand Down Expand Up @@ -429,7 +444,7 @@ impl Shard {
row.offsets_and_sizes[idx] =
self.write_kv(ph.row_selector(), idx as u16, key, val)?;
self.header.wasted_bytes.fetch_add(
(size_of::<Backpointer>() + k.len() + existing_val.len()) as u64,
Backpointer::SZ + (k.len() + existing_val.len()) as u64,
Ordering::SeqCst,
);
}
Expand Down Expand Up @@ -523,4 +538,29 @@ impl Shard {

Ok(None)
}

pub(crate) fn get_write_offset(&self) -> u32 {
self.header.write_offset.load(Ordering::Relaxed)
}
pub(crate) fn get_stats(&self) -> (usize, usize, usize, usize) {
(
self.header.num_inserted.load(Ordering::Relaxed) as usize,
self.header.num_removed.load(Ordering::Relaxed) as usize,
self.header.write_offset.load(Ordering::Relaxed) as usize,
self.header.wasted_bytes.load(Ordering::Relaxed) as usize,
)
}
pub(crate) fn get_size_histogram(&self) -> SizeHistogram {
let mut hist = SizeHistogram::default();
for (i, h) in self.header.size_histogram.counts_64b.iter().enumerate() {
hist.counts_64b[i] = h.load(Ordering::Relaxed) as usize;
}
for (i, h) in self.header.size_histogram.counts_1kb.iter().enumerate() {
hist.counts_1kb[i] = h.load(Ordering::Relaxed) as usize;
}
for (i, h) in self.header.size_histogram.counts_16kb.iter().enumerate() {
hist.counts_16kb[i] = h.load(Ordering::Relaxed) as usize;
}
hist
}
}
Loading

0 comments on commit 91e7835

Please sign in to comment.