Skip to content

Commit

Permalink
General refactoring; add crasher test for list iteration and count le…
Browse files Browse the repository at this point in the history
…aked items
  • Loading branch information
tomerfiliba committed Aug 17, 2024
1 parent e5263db commit 351c909
Show file tree
Hide file tree
Showing 6 changed files with 121 additions and 45 deletions.
3 changes: 0 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ simd-itertools = "0.2.3"
siphasher = "1.0.1"
anyhow = "1.0.86"
parking_lot = "0.12.3"
uuid = { version = "1.10.0", features = ["v4"] }
uuid = { version = "1.10.0" }
rand = "0.8.5"

[features]
whitebox_testing = []
Expand Down
66 changes: 66 additions & 0 deletions candy-crasher/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,34 @@ fn child_list_removals() -> Result<()> {
Ok(())
}

fn child_list_iterator_removals() -> Result<()> {
let store = CandyStore::open("dbdir", Config::default())?;

if rand::thread_rng().gen() {
for (i, res) in store.iter_list("xxx").enumerate() {
let (k, v) = res?.unwrap();
let v2 = u32::from_le_bytes(v.try_into().unwrap());
if i == 0 {
println!("FWD child starts at {v2}");
}
store.remove_from_list("xxx", &k)?;
}
} else {
for (i, res) in store.iter_list_backwards("xxx").enumerate() {
let (k, v) = res?.unwrap();
let v2 = u32::from_le_bytes(v.try_into().unwrap());
if i == 0 {
println!("BACK child starts at {v2}");
}
store.remove_from_list("xxx", &k)?;
}
}

println!("child finished");

Ok(())
}

fn parent_run(
shared_stuff: &SharedStuff,
mut child_func: impl FnMut() -> Result<()>,
Expand Down Expand Up @@ -299,5 +327,43 @@ fn main() -> Result<()> {
println!("DB validated successfully");
}

{
println!("Parent creates 1M members in a list...");

let store = CandyStore::open(
"dbdir",
Config {
expected_number_of_keys: 1_000_000,
..Default::default()
},
)?;
let t0 = std::time::Instant::now();
for i in 0u32..1_000_000 {
if i % 65536 == 0 {
println!("{i}");
}
store.push_to_list_tail("xxx", &i.to_le_bytes())?;
}
println!(
"{}us",
std::time::Instant::now().duration_since(t0).as_micros()
);
}

parent_run(shared_stuff, child_list_iterator_removals, 10..30)?;

{
println!("Parent starts validating the DB...");

let store = CandyStore::open("dbdir", Config::default())?;

assert_eq!(store.iter_list("xxx").count(), 0);

// we will surely leak some entries that were unlinked from the list before they were removed
println!("leaked: {}", store.iter_raw().count());

println!("DB validated successfully");
}

Ok(())
}
60 changes: 30 additions & 30 deletions src/list_insert.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use bytemuck::{bytes_of, from_bytes};
use databuf::config::num::LE;
use databuf::Encode;
use rand::Rng;
use uuid::Uuid;

use crate::encodable::EncodableUuid;
Expand Down Expand Up @@ -72,10 +73,12 @@ impl CandyStore {
let item_fph = FullPartedHash::new(item_ph, item_collidx);

// item does not exist, and the list itself might also not exist. get or create the list
let curr_list = LinkedList::new(item_fph, item_fph);
let curr_list = *from_bytes::<LinkedList>(
&self
.get_or_create_raw(&list_key, bytes_of(&curr_list).to_vec())?
.get_or_create_raw(
&list_key,
bytes_of(&LinkedList::new(item_fph, item_fph)).to_vec(),
)?
.value(),
);

Expand All @@ -101,8 +104,6 @@ impl CandyStore {
return Ok(InsertToListStatus::CreatedNew(val));
}

//self.fixup_true_ends(list_ph, curr_list)?;

let v =
match pos {
InsertPosition::Tail => self
Expand Down Expand Up @@ -131,7 +132,7 @@ impl CandyStore {
// find_true_head will stop at this item
update_chain_prev(&mut head_v, item_fph);
if self.replace_raw(&head_k, &head_v, None)?.failed() {
corrupted_list!("list {list_ph:?} failed to point {head_k:?}->prev to {item_key:?}");
corrupted_list!("list {list_ph} failed to point {head_k:?}->prev to {item_key:?}");
}

// now add item, with prev pointing to the old head. if we crash after this, find_head_tail
Expand All @@ -141,13 +142,13 @@ impl CandyStore {
let this_chain = Chain::new(item_fph.collidx, head_fph, FullPartedHash::INVALID);
val.extend_from_slice(bytes_of(&this_chain));
if !self.set_raw(&item_key, &val)?.was_created() {
corrupted_list!("list {list_ph:?} tail {item_key:?} already exists");
corrupted_list!("list {list_ph} tail {item_key:?} already exists");
}

// now update the list to point to the new tail. if we crash before it's committed, all's good
let new_list = LinkedList::new(item_fph, curr_list.full_tail());
if self
.replace_raw(&list_key, bytes_of(&new_list), Some(bytes_of(&curr_list)))?
.replace_raw(&list_key, bytes_of(&new_list), None)?
.failed()
{
corrupted_list!("list {item_fph} failed to point head to {item_key:?}");
Expand Down Expand Up @@ -175,7 +176,7 @@ impl CandyStore {
update_chain_next(&mut tail_v, item_fph);

if self.replace_raw(&tail_k, &tail_v, None)?.failed() {
corrupted_list!("list {list_ph:?} failed to point {tail_k:?}->next to {item_key:?}");
corrupted_list!("list {list_ph} failed to point {tail_k:?}->next to {item_key:?}");
}

// now add item, with prev pointing to the old tail. if we crash after this, find_true_tail
Expand All @@ -185,17 +186,16 @@ impl CandyStore {
let this_chain = Chain::new(item_fph.collidx, FullPartedHash::INVALID, tail_fph);
val.extend_from_slice(bytes_of(&this_chain));
if self.set_raw(&item_key, &val)?.was_replaced() {
corrupted_list!("list {list_ph:?} tail {item_key:?} already exists");
corrupted_list!("list {list_ph} tail {item_key:?} already exists");
}

// now update the list to point to the new tail. if we crash before it's committed, all's good
let mut new_list = curr_list.clone();
new_list.set_full_tail(item_fph);
let new_list = LinkedList::new(curr_list.full_head(), item_fph);
if self
.replace_raw(&list_key, bytes_of(&new_list), Some(bytes_of(&curr_list)))?
.replace_raw(&list_key, bytes_of(&new_list), None)?
.failed()
{
corrupted_list!("list {list_ph:?} failed to point tail to {item_key:?}");
corrupted_list!("list {list_ph} failed to point tail to {item_key:?}");
}

val.truncate(val.len() - size_of::<Chain>());
Expand Down Expand Up @@ -234,7 +234,7 @@ impl CandyStore {
/// This allows for the implementation of LRUs, where older items stay at the beginning and more
/// recent ones are at the end.
///
/// Note: this operation is not crash-safe, as it removes and inserts the item.
/// Note: this operation is **not crash-safe**, as it removes and inserts the item.
///
/// See also [Self::set], [Self::set_in_list]
pub fn set_in_list_promoting<
Expand Down Expand Up @@ -380,26 +380,37 @@ impl CandyStore {
self.owned_push_to_list_tail(list_key.as_ref().to_owned(), val.as_ref().to_owned())
}

/// Owned version of [Self::push_to_list]
pub fn owned_push_to_list_tail(
fn _owned_push_to_list(
&self,
list_key: Vec<u8>,
val: Vec<u8>,
pos: InsertPosition,
) -> Result<EncodableUuid> {
let uuid = EncodableUuid::from(Uuid::new_v4());
// this rng does not produce "well formed UUID" like UUIDv4, but this is faster (because it doesn't
// use the OS rng) and produces 128 bits of entropy rather than 122 in UUIDv4
let uuid = EncodableUuid::from(Uuid::from_bytes(rand::thread_rng().gen()));
let status = self._insert_to_list(
list_key,
uuid.to_bytes::<LE>(),
val,
InsertMode::GetOrCreate,
InsertPosition::Tail,
pos,
)?;
if !matches!(status, InsertToListStatus::CreatedNew(_)) {
corrupted_list!("list uuid collision {uuid} {status:?}");
}
Ok(uuid)
}

/// Owned version of [Self::push_to_list]
pub fn owned_push_to_list_tail(
&self,
list_key: Vec<u8>,
val: Vec<u8>,
) -> Result<EncodableUuid> {
self._owned_push_to_list(list_key, val, InsertPosition::Tail)
}

/// In case you only want to store values in a list (the keys are immaterial), this function
/// generates a random UUID and inserts the given element to the head (head) of the list.
/// Can be used to implement queues, where elements are pushed at the back and popped from
Expand All @@ -421,17 +432,6 @@ impl CandyStore {
list_key: Vec<u8>,
val: Vec<u8>,
) -> Result<EncodableUuid> {
let uuid = EncodableUuid::from(Uuid::new_v4());
let status = self._insert_to_list(
list_key,
uuid.to_bytes::<LE>(),
val,
InsertMode::GetOrCreate,
InsertPosition::Head,
)?;
if !matches!(status, InsertToListStatus::CreatedNew(_)) {
corrupted_list!("uuid collision {uuid} {status:?}");
}
Ok(uuid)
self._owned_push_to_list(list_key, val, InsertPosition::Head)
}
}
32 changes: 21 additions & 11 deletions src/list_remove.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ impl CandyStore {
item_key: &[u8],
) -> Result<()> {
let Some((next_k, mut next_v)) = self._list_get(list_ph, chain.full_next())? else {
corrupted_list!("list {list_ph:?} failed getting next of {item_key:?}");
corrupted_list!("list {list_ph} failed getting next of {item_key:?}");
};

// update list.head from this to this.next. if we crash afterwards, the list will start
Expand All @@ -37,7 +37,7 @@ impl CandyStore {
update_chain_prev(&mut next_v, FullPartedHash::INVALID);
if self.replace_raw(&next_k, &next_v, None)?.failed() {
corrupted_list!(
"list {list_ph:?} failed updating prev=INVALID on the now-first element"
"list {list_ph} failed updating prev=INVALID on the now-first {next_k:?} element"
);
}

Expand All @@ -55,25 +55,30 @@ impl CandyStore {
item_key: &[u8],
) -> Result<()> {
let Some((prev_k, mut prev_v)) = self._list_get(list_ph, chain.full_prev())? else {
corrupted_list!("list {list_ph:?} missing prev element {item_key:?}");
corrupted_list!("list {list_ph} missing prev element {item_key:?}");
};

// point list.tail to the prev item. if we crash afterwards, the removed tail is still considered
// part of the list (find_true_tai will find it)
// part of the list (find_true_tail will find it)
list.set_full_tail(chain.full_prev());
if !self
.replace_raw(list_key, bytes_of(&list), None)?
.was_replaced()
{
corrupted_list!("failed updating list tail to point to prev");
corrupted_list!(
"failed updating list {list_ph} tail to point to prev {}",
chain.full_prev()
);
}

// XXX clear the item's chain so we can scrub it later?

// update the new tail's next to INVALID. if we crash afterwards, the removed tail is no longer
// considered part of the list
update_chain_next(&mut prev_v, FullPartedHash::INVALID);
if self.replace_raw(&prev_k, &prev_v, None)?.failed() {
corrupted_list!(
"list {list_ph:?} failed updating next=INVALID on the now-last element"
"list {list_ph} failed updating next=INVALID on the now-last {prev_k:?} element"
);
}

Expand All @@ -99,7 +104,7 @@ impl CandyStore {
if chain_of(&prev_v).full_next() == item_fph {
update_chain_next(&mut prev_v, chain.full_next());
if self.replace_raw(&prev_k, &prev_v, None)?.failed() {
corrupted_list!("list {list_ph:?} failed updating prev.next on {prev_k:?}");
corrupted_list!("list {list_ph} failed updating prev.next on {prev_k:?}");
}
}
}
Expand All @@ -110,7 +115,7 @@ impl CandyStore {
if chain_of(&next_v).full_prev() == item_fph {
update_chain_prev(&mut next_v, chain.full_prev());
if self.replace_raw(&next_k, &next_v, None)?.failed() {
corrupted_list!("list {list_ph:?} failed updating next.prev on {next_k:?}");
corrupted_list!("list {list_ph} failed updating next.prev on {next_k:?}");
}
}
}
Expand Down Expand Up @@ -139,14 +144,19 @@ impl CandyStore {
chain: Chain,
) -> Result<()> {
// because of the crash model, it's possible list.head and list.tail are not up to date.
// it's also possible that we'll leak some entries if we crash mid-operation, i.e., an item
// might have been unlinked from its prev or next, but still exists on its own.
// XXX: maybe background compaction can check for leaks and remove them?

let mut head_fph = list.full_head();
let mut tail_fph = list.full_tail();
if item_fph == head_fph && chain.full_prev().is_valid() {
let (true_head_fph, _, _) = self.find_true_head(list_ph, list.full_head())?;
let (true_head_fph, _, _) = self.find_true_head(list_ph, head_fph)?;
head_fph = true_head_fph;
}

let mut tail_fph = list.full_tail();
if item_fph == tail_fph && chain.full_next().is_valid() {
let (true_tail_fph, _, _) = self.find_true_tail(list_ph, list.full_tail())?;
let (true_tail_fph, _, _) = self.find_true_tail(list_ph, tail_fph)?;
tail_fph = true_tail_fph;
}

Expand Down
2 changes: 2 additions & 0 deletions src/lists.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ pub(crate) struct LinkedList {

impl LinkedList {
pub fn new(head: FullPartedHash, tail: FullPartedHash) -> Self {
assert!(head.is_valid(), "creating a list with an invalid head");
assert!(tail.is_valid(), "creating a list with an invalid tail");
Self {
_head: head.ph,
_head_collidx: head.collidx,
Expand Down

0 comments on commit 351c909

Please sign in to comment.