Skip to content

Commit

Permalink
Background compaction in a thread pool
Browse files Browse the repository at this point in the history
  • Loading branch information
tomerfiliba committed Sep 5, 2024
1 parent f747ab0 commit e401c08
Show file tree
Hide file tree
Showing 13 changed files with 744 additions and 429 deletions.
16 changes: 16 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ uuid = { version = "1.10.0" }
rand = "0.8.5"
fslock = "0.2.1"
libc = "0.2.158"
crossbeam-channel = "0.5.13"

[features]
whitebox_testing = []
Expand Down
43 changes: 15 additions & 28 deletions candy-crasher/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,15 @@ const CONFIG: Config = Config {
truncate_up: true,
clear_on_unsupported_version: true,
mlock_headers: false,
num_compaction_threads: 4,
};

fn child_inserts() -> Result<()> {
// our job is to create 1M entries while being killed by our evil parent

let store = CandyStore::open("dbdir", CONFIG)?;
let highest_bytes = store.get("highest")?.unwrap_or(vec![0, 0, 0, 0]);
let highest = u32::from_le_bytes([
highest_bytes[0],
highest_bytes[1],
highest_bytes[2],
highest_bytes[3],
]);
let highest = u32::from_le_bytes(highest_bytes.try_into().unwrap());

if highest == TARGET - 1 {
println!("child finished (already at {highest})");
Expand All @@ -50,12 +46,7 @@ fn child_removals() -> Result<()> {

let store = CandyStore::open("dbdir", CONFIG)?;
let lowest_bytes = store.get("lowest")?.unwrap_or(vec![0, 0, 0, 0]);
let lowest = u32::from_le_bytes([
lowest_bytes[0],
lowest_bytes[1],
lowest_bytes[2],
lowest_bytes[3],
]);
let lowest = u32::from_le_bytes(lowest_bytes.try_into().unwrap());

if lowest == TARGET - 1 {
println!("child finished (already at {lowest})");
Expand All @@ -79,12 +70,7 @@ fn child_list_inserts() -> Result<()> {
let store = CandyStore::open("dbdir", CONFIG)?;

let highest_bytes = store.get("list_highest")?.unwrap_or(vec![0, 0, 0, 0]);
let highest = u32::from_le_bytes([
highest_bytes[0],
highest_bytes[1],
highest_bytes[2],
highest_bytes[3],
]);
let highest = u32::from_le_bytes(highest_bytes.try_into().unwrap());

if highest == TARGET - 1 {
println!("child finished (already at {highest})");
Expand All @@ -108,12 +94,7 @@ fn child_list_removals() -> Result<()> {
let store = CandyStore::open("dbdir", CONFIG)?;

let lowest_bytes = store.get("list_lowest")?.unwrap_or(vec![0, 0, 0, 0]);
let lowest = u32::from_le_bytes([
lowest_bytes[0],
lowest_bytes[1],
lowest_bytes[2],
lowest_bytes[3],
]);
let lowest = u32::from_le_bytes(lowest_bytes.try_into().unwrap());

if lowest == TARGET - 1 {
println!("child finished (already at {lowest})");
Expand Down Expand Up @@ -258,6 +239,7 @@ fn main() -> Result<()> {
// "dbdir",
// Config {
// expected_number_of_keys: 1_000_000,
// clear_on_unsupported_version: true,
// ..Default::default()
// },
// )?;
Expand All @@ -277,7 +259,7 @@ fn main() -> Result<()> {
for res in store.iter() {
let (k, v) = res?;
assert_eq!(v, b"i am a key");
let k = u32::from_le_bytes([k[0], k[1], k[2], k[3]]);
let k = u32::from_le_bytes(k.try_into().unwrap());
assert!(k < TARGET);
count += 1;
}
Expand All @@ -296,7 +278,12 @@ fn main() -> Result<()> {
store.remove("lowest")?,
Some((TARGET - 1).to_le_bytes().to_vec())
);
assert_eq!(store.iter().count(), 0);
assert_eq!(
store.iter().count(),
0,
"{:?}",
store.iter().collect::<Vec<_>>()
);

println!("DB validated successfully");
}
Expand All @@ -314,14 +301,14 @@ fn main() -> Result<()> {

for (i, res) in store.iter_list("xxx").enumerate() {
let (k, v) = res?;
assert_eq!(k, (i as u32).to_le_bytes());
assert_eq!(u32::from_le_bytes(k.try_into().unwrap()), i as u32);
assert_eq!(v, b"yyy");
}

println!("DB validated successfully");
}

parent_run(shared_stuff, child_list_removals, 10..30)?;
parent_run(shared_stuff, child_list_removals, 10..80)?;

{
println!("Parent starts validating the DB...");
Expand Down
2 changes: 1 addition & 1 deletion candy-longliving/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ fn main() -> Result<()> {
if i % 10000 == 0 {
let t1 = Instant::now();
println!(
"thread {thd} at {i} {:?} rate={}us",
"thread {thd} at {i} {} rate={}us",
db.stats(),
t1.duration_since(t0).as_micros() / 10_000,
);
Expand Down
1 change: 1 addition & 0 deletions src/hashing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ impl PartedHash {
self.0 as u32
}

#[allow(dead_code)]
pub fn as_u64(&self) -> u64 {
self.0
}
Expand Down
7 changes: 3 additions & 4 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,17 +70,13 @@ pub enum CandyError {
KeyTooLong(usize),
ValueTooLong(usize),
EntryCannotFitInShard(usize, usize),
KeyAlreadyExists(Vec<u8>, u64),
}

impl Display for CandyError {
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
match self {
Self::KeyTooLong(sz) => write!(f, "key too long {sz}"),
Self::ValueTooLong(sz) => write!(f, "value too long {sz}"),
Self::KeyAlreadyExists(key, ph) => {
write!(f, "key {key:?} already exists (0x{ph:016x})")
}
Self::EntryCannotFitInShard(sz, max) => {
write!(f, "entry too big ({sz}) for a single shard file ({max})")
}
Expand Down Expand Up @@ -111,6 +107,8 @@ pub struct Config {
pub clear_on_unsupported_version: bool,
/// whether or not to mlock the shard headers to RAM (POSIX only)
pub mlock_headers: bool,
/// number of background compaction threads
pub num_compaction_threads: usize,
/// optionally delay modifying operations before for the given duration before flushing data to disk,
/// to ensure reboot consistency
#[cfg(feature = "flush_aggregation")]
Expand All @@ -128,6 +126,7 @@ impl Default for Config {
truncate_up: true,
clear_on_unsupported_version: false,
mlock_headers: false,
num_compaction_threads: 4,
#[cfg(feature = "flush_aggregation")]
flush_aggregation_delay: None,
}
Expand Down
1 change: 0 additions & 1 deletion src/lists.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,6 @@ impl CandyStore {
// if the item already exists, it's already part of the list. just update it and preserve the index
if let Some(mut existing_val) = self.get_raw(&item_key)? {
match mode {
InsertMode::MustCreate => unreachable!(),
InsertMode::GetOrCreate => {
existing_val.truncate(existing_val.len() - size_of::<u64>());
return Ok(InsertToListStatus::ExistingValue(existing_val));
Expand Down
Loading

0 comments on commit e401c08

Please sign in to comment.