Skip to content

Commit

Permalink
Tests: add test_list_iter_early_stop
Browse files Browse the repository at this point in the history
  • Loading branch information
tomerfiliba committed Aug 17, 2024
1 parent b54d503 commit 6732a90
Showing 1 changed file with 106 additions and 1 deletion.
107 changes: 106 additions & 1 deletion tests/test_lists.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
mod common;

use std::sync::{atomic::AtomicUsize, Arc};
use std::{
collections::HashSet,
sync::{
atomic::{AtomicBool, AtomicUsize},
Arc,
},
};

use candystore::{
CandyStore, CandyTypedDeque, CandyTypedList, Config, GetOrCreateStatus, ReplaceStatus, Result,
Expand Down Expand Up @@ -439,3 +445,102 @@ fn test_typed_promote() -> Result<()> {
Ok(())
})
}

#[test]
fn test_list_iter_early_stop() -> Result<()> {
run_in_tempdir(|dir| {
let db = Arc::new(CandyStore::open(dir, Config::default())?);

for i in 0u32..1000 {
db.push_to_list_tail("xxx", &i.to_le_bytes())?;
}

let mut got = vec![];
for res in db.iter_list("xxx") {
let Some((_, v)) = res? else {
break;
};
let i = u32::from_le_bytes(v.try_into().unwrap());
if i == 10 {
db.discard_list("xxx")?;
// create a some new elements, but the original iterator is broken at this point
db.push_to_list_tail("xxx", &(i + 1).to_le_bytes())?;
db.push_to_list_tail("xxx", &(i + 2).to_le_bytes())?;
db.push_to_list_tail("xxx", &(i + 3).to_le_bytes())?;
}
got.push(i);
}
assert_eq!(got, vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);

assert_eq!(
db.iter_list("xxx")
.map(|res| u32::from_le_bytes(res.unwrap().unwrap().1.try_into().unwrap()))
.collect::<Vec<_>>(),
vec![11, 12, 13]
);

db.discard_list("xxx")?;

let done = Arc::new(AtomicBool::new(false));

let h1 = {
let db = db.clone();
let done = done.clone();
std::thread::spawn(move || {
for i in 0u32..1000 {
db.set_in_list("xxx", &i.to_le_bytes(), &i.to_le_bytes())?;
std::thread::yield_now();
}
done.store(true, std::sync::atomic::Ordering::SeqCst);

Result::<()>::Ok(())
})
};

let h2 = {
let db = db.clone();
std::thread::spawn(move || {
for i in 0u32..1000 {
if db.remove_from_list("xxx", &i.to_le_bytes())?.is_none() {
std::thread::sleep(std::time::Duration::from_micros(100));
}
}

Result::<()>::Ok(())
})
};

let h3 = {
let db = db.clone();
let done = done.clone();
std::thread::spawn(move || {
let mut retry = true;
let mut got = HashSet::new();
while retry || !done.load(std::sync::atomic::Ordering::SeqCst) {
retry = false;
for res in db.iter_list("xxx") {
let Some((k, v)) = res? else {
std::thread::sleep(std::time::Duration::from_micros(100));
retry = true;
break;
};
let k = u32::from_le_bytes(k.try_into().unwrap());
let v = u32::from_le_bytes(v.try_into().unwrap());
assert_eq!(k, v);
got.insert(k);
std::thread::yield_now();
}
}

Result::<HashSet<u32>>::Ok(got)
})
};

h1.join().unwrap()?;
h2.join().unwrap()?;
let got = h3.join().unwrap()?;
println!("{}", got.len());

Ok(())
})
}

0 comments on commit 6732a90

Please sign in to comment.