diff --git a/tests/test_lists.rs b/tests/test_lists.rs index 3394141..7947592 100644 --- a/tests/test_lists.rs +++ b/tests/test_lists.rs @@ -1,6 +1,12 @@ mod common; -use std::sync::{atomic::AtomicUsize, Arc}; +use std::{ + collections::HashSet, + sync::{ + atomic::{AtomicBool, AtomicUsize}, + Arc, + }, +}; use candystore::{ CandyStore, CandyTypedDeque, CandyTypedList, Config, GetOrCreateStatus, ReplaceStatus, Result, @@ -439,3 +445,102 @@ fn test_typed_promote() -> Result<()> { Ok(()) }) } + +#[test] +fn test_list_iter_early_stop() -> Result<()> { + run_in_tempdir(|dir| { + let db = Arc::new(CandyStore::open(dir, Config::default())?); + + for i in 0u32..1000 { + db.push_to_list_tail("xxx", &i.to_le_bytes())?; + } + + let mut got = vec![]; + for res in db.iter_list("xxx") { + let Some((_, v)) = res? else { + break; + }; + let i = u32::from_le_bytes(v.try_into().unwrap()); + if i == 10 { + db.discard_list("xxx")?; + // create a some new elements, but the original iterator is broken at this point + db.push_to_list_tail("xxx", &(i + 1).to_le_bytes())?; + db.push_to_list_tail("xxx", &(i + 2).to_le_bytes())?; + db.push_to_list_tail("xxx", &(i + 3).to_le_bytes())?; + } + got.push(i); + } + assert_eq!(got, vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]); + + assert_eq!( + db.iter_list("xxx") + .map(|res| u32::from_le_bytes(res.unwrap().unwrap().1.try_into().unwrap())) + .collect::>(), + vec![11, 12, 13] + ); + + db.discard_list("xxx")?; + + let done = Arc::new(AtomicBool::new(false)); + + let h1 = { + let db = db.clone(); + let done = done.clone(); + std::thread::spawn(move || { + for i in 0u32..1000 { + db.set_in_list("xxx", &i.to_le_bytes(), &i.to_le_bytes())?; + std::thread::yield_now(); + } + done.store(true, std::sync::atomic::Ordering::SeqCst); + + Result::<()>::Ok(()) + }) + }; + + let h2 = { + let db = db.clone(); + std::thread::spawn(move || { + for i in 0u32..1000 { + if db.remove_from_list("xxx", &i.to_le_bytes())?.is_none() { + std::thread::sleep(std::time::Duration::from_micros(100)); + } + } + + Result::<()>::Ok(()) + }) + }; + + let h3 = { + let db = db.clone(); + let done = done.clone(); + std::thread::spawn(move || { + let mut retry = true; + let mut got = HashSet::new(); + while retry || !done.load(std::sync::atomic::Ordering::SeqCst) { + retry = false; + for res in db.iter_list("xxx") { + let Some((k, v)) = res? else { + std::thread::sleep(std::time::Duration::from_micros(100)); + retry = true; + break; + }; + let k = u32::from_le_bytes(k.try_into().unwrap()); + let v = u32::from_le_bytes(v.try_into().unwrap()); + assert_eq!(k, v); + got.insert(k); + std::thread::yield_now(); + } + } + + Result::>::Ok(got) + }) + }; + + h1.join().unwrap()?; + h2.join().unwrap()?; + let got = h3.join().unwrap()?; + println!("{}", got.len()); + + Ok(()) + }) +}