Skip to content

Commit

Permalink
Add version info to shard files; remove backpointers until we figure …
Browse files Browse the repository at this point in the history
…out a way to do background compaction
  • Loading branch information
tomerfiliba committed Aug 21, 2024
1 parent a34750a commit 63a9ec6
Show file tree
Hide file tree
Showing 5 changed files with 144 additions and 78 deletions.
80 changes: 70 additions & 10 deletions candy-crasher/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,13 @@ const TARGET: u32 = 1_000_000;
fn child_inserts() -> Result<()> {
// our job is to create 1M entries while being killed by our evil parent

let store = CandyStore::open("dbdir", Config::default())?;
let store = CandyStore::open(
"dbdir",
Config {
clear_on_unsupported_version: true,
..Default::default()
},
)?;
let highest_bytes = store.get("highest")?.unwrap_or(vec![0, 0, 0, 0]);
let highest = u32::from_le_bytes([
highest_bytes[0],
Expand Down Expand Up @@ -38,7 +44,13 @@ fn child_inserts() -> Result<()> {
fn child_removals() -> Result<()> {
// our job is to remove 1M entries while being killed by our evil parent

let store = CandyStore::open("dbdir", Config::default())?;
let store = CandyStore::open(
"dbdir",
Config {
clear_on_unsupported_version: true,
..Default::default()
},
)?;
let lowest_bytes = store.get("lowest")?.unwrap_or(vec![0, 0, 0, 0]);
let lowest = u32::from_le_bytes([
lowest_bytes[0],
Expand Down Expand Up @@ -66,7 +78,13 @@ fn child_removals() -> Result<()> {
fn child_list_inserts() -> Result<()> {
// our job is to insert 1M entries to a list while being killed by our evil parent

let store = CandyStore::open("dbdir", Config::default())?;
let store = CandyStore::open(
"dbdir",
Config {
clear_on_unsupported_version: true,
..Default::default()
},
)?;

let highest_bytes = store.get("list_highest")?.unwrap_or(vec![0, 0, 0, 0]);
let highest = u32::from_le_bytes([
Expand Down Expand Up @@ -95,7 +113,13 @@ fn child_list_inserts() -> Result<()> {
fn child_list_removals() -> Result<()> {
// our job is to remove 1M entries to a list while being killed by our evil parent

let store = CandyStore::open("dbdir", Config::default())?;
let store = CandyStore::open(
"dbdir",
Config {
clear_on_unsupported_version: true,
..Default::default()
},
)?;

let lowest_bytes = store.get("list_lowest")?.unwrap_or(vec![0, 0, 0, 0]);
let lowest = u32::from_le_bytes([
Expand Down Expand Up @@ -142,7 +166,13 @@ fn child_list_removals() -> Result<()> {
}

fn child_list_iterator_removals() -> Result<()> {
let store = CandyStore::open("dbdir", Config::default())?;
let store = CandyStore::open(
"dbdir",
Config {
clear_on_unsupported_version: true,
..Default::default()
},
)?;

if rand::random() {
//println!("FWD");
Expand Down Expand Up @@ -258,7 +288,13 @@ fn main() -> Result<()> {
{
println!("Parent starts validating the DB...");

let store = CandyStore::open("dbdir", Config::default())?;
let store = CandyStore::open(
"dbdir",
Config {
clear_on_unsupported_version: true,
..Default::default()
},
)?;
assert_eq!(
store.remove("highest")?,
Some((TARGET - 1).to_le_bytes().to_vec())
Expand All @@ -281,7 +317,13 @@ fn main() -> Result<()> {
{
println!("Parent starts validating the DB...");

let store = CandyStore::open("dbdir", Config::default())?;
let store = CandyStore::open(
"dbdir",
Config {
clear_on_unsupported_version: true,
..Default::default()
},
)?;
assert_eq!(
store.remove("lowest")?,
Some((TARGET - 1).to_le_bytes().to_vec())
Expand All @@ -296,7 +338,13 @@ fn main() -> Result<()> {
{
println!("Parent starts validating the DB...");

let store = CandyStore::open("dbdir", Config::default())?;
let store = CandyStore::open(
"dbdir",
Config {
clear_on_unsupported_version: true,
..Default::default()
},
)?;
assert_eq!(
store.remove("list_highest")?,
Some((TARGET - 1).to_le_bytes().to_vec())
Expand All @@ -316,7 +364,13 @@ fn main() -> Result<()> {
{
println!("Parent starts validating the DB...");

let store = CandyStore::open("dbdir", Config::default())?;
let store = CandyStore::open(
"dbdir",
Config {
clear_on_unsupported_version: true,
..Default::default()
},
)?;
assert_eq!(
store.remove("list_lowest")?,
Some((TARGET - 1).to_le_bytes().to_vec())
Expand Down Expand Up @@ -358,7 +412,13 @@ fn main() -> Result<()> {
{
println!("Parent starts validating the DB...");

let store = CandyStore::open("dbdir", Config::default())?;
let store = CandyStore::open(
"dbdir",
Config {
clear_on_unsupported_version: true,
..Default::default()
},
)?;

assert_eq!(store.iter_list("xxx").count(), 0);

Expand Down
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ pub struct Config {
pub merge_small_shards: bool, // whether or not to merge small shards when items are removed
pub max_concurrent_list_ops: u32, // number of keyed locks for concurrent list ops
pub truncate_up: bool, // whether or not to truncate up shard files to their max size (spare files)
pub clear_on_unsupported_version: bool, // whether or not to clear the DB if the version is unsupported
}

impl Default for Config {
Expand All @@ -119,6 +120,7 @@ impl Default for Config {
merge_small_shards: false,
max_concurrent_list_ops: 64,
truncate_up: true,
clear_on_unsupported_version: false,
}
}
}
Expand Down
128 changes: 65 additions & 63 deletions src/shard.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use anyhow::ensure;
use bytemuck::{bytes_of, Pod, Zeroable};
use anyhow::{bail, ensure};
use bytemuck::{bytes_of_mut, Pod, Zeroable};
use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
use simd_itertools::PositionSimd;
use std::{
fs::{File, OpenOptions},
io::Read,
ops::Range,
os::unix::fs::FileExt,
path::PathBuf,
Expand Down Expand Up @@ -146,8 +147,19 @@ fn test_shard_size_histogram() {
assert_eq!(hist.counts_16kb[3].load(Ordering::Relaxed), 2);
}

pub(crate) const SHARD_FILE_MAGIC: u64 = 0x72745379646e6143; // "CandyStr"
pub(crate) const SHARD_FILE_VERSION: u64 = 0x06;

#[derive(Clone, Copy, Default, Debug, Pod, Zeroable)]
#[repr(C)]
struct MetaHeader {
magic: u64,
version: u64,
}

#[repr(C)]
struct ShardHeader {
metadata: MetaHeader,
num_inserted: AtomicU64,
num_removed: AtomicU64,
wasted_bytes: AtomicU64,
Expand Down Expand Up @@ -178,43 +190,6 @@ pub(crate) enum InsertMode<'a> {

pub(crate) type KVPair = (Vec<u8>, Vec<u8>);

#[derive(Default, Debug, Clone, Copy, Pod, Zeroable)]
#[repr(C)]
struct Backpointer(u32);

impl Backpointer {
// MSB LSB
// +-----+-----+----------+
// + row | sig | entry |
// | idx | idx | size |
// | (6) | (9) | (17) |
// +-----+-----+----------+
const SZ: u64 = size_of::<Self>() as u64;

fn new(row_idx: u16, sig_idx: u16, entry_size: usize) -> Self {
debug_assert!((row_idx as usize % NUM_ROWS) < (1 << 6), "{row_idx}");
debug_assert!(sig_idx < (1 << 9), "{sig_idx}");
Self(
(((row_idx % (NUM_ROWS as u16)) as u32) << 26)
| ((sig_idx as u32) << 17)
| (entry_size as u32 & 0x1ffff),
)
}

#[allow(dead_code)]
fn entry_size(&self) -> u32 {
self.0 & 0x1ffff
}
#[allow(dead_code)]
fn row(&self) -> usize {
(self.0 >> 26) as usize
}
#[allow(dead_code)]
fn sig_idx(&self) -> usize {
((self.0 >> 17) & 0x1ff) as usize
}
}

// Note: it's possible to reduce the number row_locks, it we make them per-store rather than per-shard.
// the trivial way that would be to use NUM_ROWS (without risking deadlocks), which means you can have 64
// concurrent operations. if you'd want more concurrency, it's possible to take the number of shards,
Expand Down Expand Up @@ -247,24 +222,58 @@ impl Shard {
truncate: bool,
config: Arc<InternalConfig>,
) -> Result<Self> {
let file = OpenOptions::new()
let mut file = OpenOptions::new()
.create(true)
.read(true)
.write(true)
.truncate(truncate)
.open(&filename)?;
let md = file.metadata()?;
if md.len() < HEADER_SIZE {
file.set_len(0)?;

let mut file_size = file.metadata()?.len();
if file_size != 0 {
let mut meta_header = MetaHeader::default();
let sz = file.read(bytes_of_mut(&mut meta_header))?;
if sz != size_of::<MetaHeader>()
|| meta_header.magic != SHARD_FILE_MAGIC
|| meta_header.version != SHARD_FILE_VERSION
{
if config.clear_on_unsupported_version {
file.set_len(0)?;
file_size = 0;
} else {
bail!(
"unsupported magic=0x{:016x} version=0x{:016x} size={}",
meta_header.magic,
meta_header.version,
file_size,
);
}
}

if file_size != 0 && file_size < HEADER_SIZE {
if config.clear_on_unsupported_version {
file.set_len(0)?;
file_size = 0;
} else {
bail!("corrupt shard file (size={})", file_size);
}
}
}

if file_size == 0 {
file.set_len(HEADER_SIZE)?;
if config.truncate_up {
// when creating, set the file's length so that we won't need to extend it every time we write
// (saves on file metadata updates)
file.set_len(HEADER_SIZE + config.max_shard_size as u64)?;
}
}

let mut mmap = unsafe { MmapOptions::new().len(HEADER_SIZE as usize).map_mut(&file) }?;

let header = unsafe { &mut *(mmap.as_mut_ptr() as *mut ShardHeader) };
header.metadata.magic = SHARD_FILE_MAGIC;
header.metadata.version = SHARD_FILE_VERSION;
let mut row_locks = Vec::with_capacity(NUM_ROWS);
for _ in 0..NUM_ROWS {
row_locks.push(RwLock::new(()));
Expand Down Expand Up @@ -299,8 +308,7 @@ impl Shard {
fn read_kv(&self, offset_and_size: u64) -> Result<KVPair> {
let (klen, vlen, offset) = Self::extract_offset_and_size(offset_and_size);
let mut buf = vec![0u8; klen + vlen];
self.file
.read_exact_at(&mut buf, HEADER_SIZE + Backpointer::SZ + offset)?;
self.file.read_exact_at(&mut buf, HEADER_SIZE + offset)?;

let val = buf[klen..klen + vlen].to_owned();
buf.truncate(klen);
Expand All @@ -309,14 +317,11 @@ impl Shard {
}

// writing doesn't require holding any locks since we write with an offset
fn write_kv(&self, row_idx: u16, sig_idx: u16, key: &[u8], val: &[u8]) -> Result<u64> {
const BP: usize = Backpointer::SZ as usize;
fn write_kv(&self, key: &[u8], val: &[u8]) -> Result<u64> {
let entry_size = key.len() + val.len();
let mut buf = vec![0u8; BP + entry_size];
let bp = Backpointer::new(row_idx, sig_idx, entry_size);
buf[..BP].copy_from_slice(bytes_of(&bp));
buf[BP..BP + key.len()].copy_from_slice(key);
buf[BP + key.len()..].copy_from_slice(val);
let mut buf = vec![0u8; entry_size];
buf[..key.len()].copy_from_slice(key);
buf[key.len()..].copy_from_slice(val);

// atomically allocate some area. it may leak if the IO below fails or if we crash before updating the
// offsets_and_size array, but we're okay with leaks
Expand Down Expand Up @@ -442,12 +447,10 @@ impl Shard {

// optimization
if val != existing_val {
row.offsets_and_sizes[idx] =
self.write_kv(ph.row_selector(), idx as u16, key, val)?;
self.header.wasted_bytes.fetch_add(
Backpointer::SZ + (k.len() + existing_val.len()) as u64,
Ordering::SeqCst,
);
row.offsets_and_sizes[idx] = self.write_kv(key, val)?;
self.header
.wasted_bytes
.fetch_add((k.len() + existing_val.len()) as u64, Ordering::SeqCst);
}
return Ok(TryReplaceStatus::KeyExistsReplaced(existing_val));
}
Expand Down Expand Up @@ -499,7 +502,7 @@ impl Shard {
// find an empty slot
let mut start = 0;
if let Some(idx) = row.lookup(INVALID_SIG, &mut start) {
let new_off = self.write_kv(ph.row_selector(), idx as u16, &full_key, val)?;
let new_off = self.write_kv(&full_key, val)?;

// we don't want a reorder to happen here - first write the offset, then the signature
row.offsets_and_sizes[idx] = new_off;
Expand Down Expand Up @@ -529,10 +532,9 @@ impl Shard {
row.signatures[idx] = INVALID_SIG;
// we managed to remove this key
self.header.num_removed.fetch_add(1, Ordering::Relaxed);
self.header.wasted_bytes.fetch_add(
(size_of::<Backpointer>() + k.len() + v.len()) as u64,
Ordering::Relaxed,
);
self.header
.wasted_bytes
.fetch_add((k.len() + v.len()) as u64, Ordering::Relaxed);
return Ok(Some(v));
}
}
Expand Down
2 changes: 2 additions & 0 deletions src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ pub(crate) struct InternalConfig {
pub expected_number_of_keys: usize,
pub max_concurrent_list_ops: u32,
pub truncate_up: bool,
pub clear_on_unsupported_version: bool,
}

#[derive(Debug, Clone, PartialEq, Eq)]
Expand Down Expand Up @@ -346,6 +347,7 @@ impl CandyStore {
max_shard_size: config.max_shard_size,
min_compaction_threashold: config.min_compaction_threashold,
truncate_up: config.truncate_up,
clear_on_unsupported_version: config.clear_on_unsupported_version,
});

std::fs::create_dir_all(dir_path)?;
Expand Down
Loading

0 comments on commit 63a9ec6

Please sign in to comment.