Skip to content

Commit

Permalink
Add size histogram
Browse files Browse the repository at this point in the history
  • Loading branch information
tomerfiliba committed Aug 14, 2024
1 parent d84c0a3 commit 63a9493
Show file tree
Hide file tree
Showing 3 changed files with 182 additions and 2 deletions.
58 changes: 57 additions & 1 deletion src/shard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,66 @@ fn test_row_lookup() -> Result<()> {
#[repr(C, align(4096))]
pub(crate) struct PageAligned<T>(pub T);

#[repr(C)]
pub(crate) struct ShardSizeHistogram {
pub counts_64b: [AtomicU32; 16],
pub counts_1kb: [AtomicU32; 15],
pub counts_16kb: [AtomicU32; 4],
}

impl ShardSizeHistogram {
fn insert(&self, sz: usize) {
if sz < 1024 {
self.counts_64b[sz / 64].fetch_add(1, Ordering::Relaxed);
} else if sz < 16 * 1024 {
// index 15 will always be empty, but oh well
self.counts_1kb[(sz - 1024) / 1024].fetch_add(1, Ordering::Relaxed);
} else {
self.counts_16kb[(sz - 16 * 1024) / (16 * 1024)].fetch_add(1, Ordering::Relaxed);
}
}
}

#[test]
fn test_shard_size_histogram() {
let hist = ShardSizeHistogram {
counts_64b: Default::default(),
counts_1kb: Default::default(),
counts_16kb: Default::default(),
};
hist.insert(0);
hist.insert(63);
hist.insert(1022);
hist.insert(1023);
assert_eq!(hist.counts_64b[0].load(Ordering::Relaxed), 2);
assert_eq!(hist.counts_64b[15].load(Ordering::Relaxed), 2);

hist.insert(1024);
hist.insert(1025);
hist.insert(16382);
hist.insert(16383);
assert_eq!(hist.counts_1kb[0].load(Ordering::Relaxed), 2);
assert_eq!(hist.counts_1kb[14].load(Ordering::Relaxed), 2);

hist.insert(16384);
hist.insert(16385);
hist.insert(65534);
hist.insert(65535);
assert_eq!(hist.counts_16kb[0].load(Ordering::Relaxed), 2);
assert_eq!(hist.counts_16kb[2].load(Ordering::Relaxed), 2);

hist.insert(65536);
hist.insert(65537);
assert_eq!(hist.counts_16kb[3].load(Ordering::Relaxed), 2);
}

#[repr(C)]
pub(crate) struct ShardHeader {
pub num_inserted: AtomicU64,
pub num_removed: AtomicU64,
pub wasted_bytes: AtomicU64,
pub write_offset: AtomicU32,
pub size_histogram: ShardSizeHistogram,
pub rows: PageAligned<[ShardRow; NUM_ROWS]>,
}

Expand Down Expand Up @@ -237,7 +291,8 @@ impl Shard {

// writing doesn't require holding any locks since we write with an offset
fn write_kv(&self, key: &[u8], val: &[u8]) -> Result<u64> {
let mut buf = vec![0u8; key.len() + val.len()];
let entry_size = key.len() + val.len();
let mut buf = vec![0u8; entry_size];
buf[..key.len()].copy_from_slice(key);
buf[key.len()..].copy_from_slice(val);

Expand All @@ -250,6 +305,7 @@ impl Shard {

// now writing can be non-atomic (pwrite)
self.write_raw(&buf, write_offset)?;
self.header.size_histogram.insert(entry_size);

Ok(((key.len() as u64) << 48) | ((val.len() as u64) << 32) | write_offset)
}
Expand Down
88 changes: 87 additions & 1 deletion src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use anyhow::{anyhow, Context};
use parking_lot::{Mutex, RwLock};
use std::{
collections::BTreeMap,
ops::Bound,
ops::{Bound, Range},
path::{Path, PathBuf},
sync::{
atomic::{AtomicUsize, Ordering},
Expand Down Expand Up @@ -65,6 +65,75 @@ impl Stats {
}
}

/// A histogram of inserted entry sizes, in three bucket sizes:
/// * up to 1KB we keep 64-byte resolution
/// * from 1KB-16K, we keep in 1KB resolution
/// * over 16K, we keep in 16K resolution
///
/// Notes:
/// * Entry sizes are rounded down to the nearest bucket, e.g., 100 goes to the bucket of [64..128)
/// * Counts are updated on insert, and are unchanged by removals. They represent the entry sizes "seen" by this
/// store, not the currently existing ones. When a shard is split or compacted, only the existing entries remain
/// in the histogram.
/// * Use [Self::iter] to get a user-friendly representation of the histogram
#[derive(Clone, Debug, Default)]
pub struct SizeHistogram {
pub counts_64b: [usize; 16],
pub counts_1kb: [usize; 15],
pub counts_16kb: [usize; 4],
}

impl SizeHistogram {
/// return the count of the bucket for the given `sz`
pub fn get(&self, sz: usize) -> usize {
if sz < 1024 {
self.counts_64b[sz / 64]
} else if sz < 16 * 1024 {
self.counts_1kb[(sz - 1024) / 1024]
} else {
self.counts_16kb[(sz - 16 * 1024) / (16 * 1024)]
}
}

/// iterate over all non-empty buckets, and return their spans and counts
pub fn iter<'a>(&'a self) -> impl Iterator<Item = (Range<usize>, usize)> + 'a {
self.counts_64b
.iter()
.enumerate()
.filter_map(|(i, &c)| {
if c == 0 {
return None;
}
Some((i * 64..(i + 1) * 64, c))
})
.chain(self.counts_1kb.iter().enumerate().filter_map(|(i, &c)| {
if c == 0 {
return None;
}
Some(((i + 1) * 1024..(i + 2) * 1024, c))
}))
.chain(self.counts_16kb.iter().enumerate().filter_map(|(i, &c)| {
if c == 0 {
return None;
}
Some(((i + 1) * 16 * 1024..(i + 2) * 16 * 1024, c))
}))
}
}

impl std::fmt::Display for SizeHistogram {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
for (r, c) in self.iter() {
if r.end == usize::MAX {
write!(f, "[{}..): {c}\n", r.start)?;
} else {
write!(f, "[{}..{}): {c}\n", r.start, r.end)?;
}
}
Ok(())
}
}

/// The CandyStore object. Note that it's fully sync'ed, so can be shared between threads using `Arc`
pub struct CandyStore {
pub(crate) shards: RwLock<BTreeMap<u32, Shard>>,
Expand Down Expand Up @@ -494,6 +563,23 @@ impl CandyStore {
stats
}

pub fn size_histogram(&self) -> SizeHistogram {
let guard = self.shards.read();
let mut hist = SizeHistogram::default();
for (_, shard) in guard.iter() {
for (i, h) in shard.header.size_histogram.counts_64b.iter().enumerate() {
hist.counts_64b[i] += h.load(Ordering::Relaxed) as usize;
}
for (i, h) in shard.header.size_histogram.counts_1kb.iter().enumerate() {
hist.counts_1kb[i] += h.load(Ordering::Relaxed) as usize;
}
for (i, h) in shard.header.size_histogram.counts_16kb.iter().enumerate() {
hist.counts_16kb[i] += h.load(Ordering::Relaxed) as usize;
}
}
hist
}

/// Returns an iterator over the whole store (skipping linked lists or typed items)
pub fn iter(&self) -> CandyStoreIterator {
CandyStoreIterator::new(self)
Expand Down
38 changes: 38 additions & 0 deletions tests/test_logic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,3 +106,41 @@ fn test_logic() -> Result<()> {
Ok(())
})
}

#[test]
fn test_histogram() -> Result<()> {
run_in_tempdir(|dir| {
let db = CandyStore::open(
dir,
Config {
expected_number_of_keys: 100_000, // pre-split
..Default::default()
},
)?;

db.set("k1", "bbb")?;
db.set("k2", &vec![b'b'; 100])?;
db.set("k3", &vec![b'b'; 500])?;
db.set("k4", &vec![b'b'; 5000])?;
db.set("k4", &vec![b'b'; 4500])?;
db.set("k5", &vec![b'b'; 50000])?;
db.set("kkkkkkkkkkkkkkk", &vec![b'b'; 0xffff])?;

let hist = db.size_histogram();
assert_eq!(
hist.iter().collect::<Vec<_>>(),
vec![
(0..64, 1),
(64..128, 1),
(448..512, 1),
(4096..5120, 2),
(49152..65536, 1),
(65536..81920, 1)
]
);

assert!(hist.to_string().contains("[64..128): 1"));

Ok(())
})
}

0 comments on commit 63a9493

Please sign in to comment.