diff --git a/src/lib.rs b/src/lib.rs index 844cd64..6a48e46 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -48,6 +48,7 @@ mod encodable; mod hashing; mod lists; +mod queues; mod router; mod shard; mod stats; diff --git a/src/lists.rs b/src/lists.rs index be58773..357a1ea 100644 --- a/src/lists.rs +++ b/src/lists.rs @@ -141,7 +141,7 @@ enum InsertToListPos { } impl CandyStore { - const FIRST_IDX: u64 = 0x8000_0000_0000_0000; + const FIRST_LIST_IDX: u64 = 0x8000_0000_0000_0000; fn make_list_key(&self, mut list_key: Vec) -> (PartedHash, Vec) { list_key.extend_from_slice(LIST_NAMESPACE); @@ -154,7 +154,7 @@ impl CandyStore { (PartedHash::new(&self.config.hash_seed, &item_key), item_key) } - fn lock_list(&self, list_ph: PartedHash) -> MutexGuard<()> { + pub(crate) fn lock_list(&self, list_ph: PartedHash) -> MutexGuard<()> { self.keyed_locks[(list_ph.signature() & self.keyed_locks_mask) as usize].lock() } @@ -207,8 +207,8 @@ impl CandyStore { let res = self.get_or_create_raw( &list_key, bytes_of(&List { - head_idx: Self::FIRST_IDX, - tail_idx: Self::FIRST_IDX + 1, + head_idx: Self::FIRST_LIST_IDX, + tail_idx: Self::FIRST_LIST_IDX + 1, num_items: 1, }) .to_owned(), @@ -220,14 +220,14 @@ impl CandyStore { self.set_raw( bytes_of(&ChainKey { list_ph, - idx: Self::FIRST_IDX, + idx: Self::FIRST_LIST_IDX, namespace: CHAIN_NAMESPACE, }), bytes_of(&item_ph), )?; // create item - val.extend_from_slice(bytes_of(&Self::FIRST_IDX)); + val.extend_from_slice(bytes_of(&Self::FIRST_LIST_IDX)); self.set_raw(&item_key, &val)?; } crate::GetOrCreateStatus::ExistingValue(list_bytes) => { diff --git a/src/queues.rs b/src/queues.rs new file mode 100644 index 0000000..b2d1803 --- /dev/null +++ b/src/queues.rs @@ -0,0 +1,349 @@ +use std::ops::Range; + +use crate::{ + hashing::PartedHash, + store::{QUEUE_ITEM_NAMESPACE, QUEUE_NAMESPACE}, + CandyStore, +}; +use anyhow::Result; +use bytemuck::{bytes_of, checked::from_bytes_mut, from_bytes, Pod, Zeroable}; + +#[derive(Clone, Copy, Pod, Zeroable)] +#[repr(C)] +struct Queue { + head_idx: u64, // inclusive + tail_idx: u64, // exclusive + num_items: u64, +} + +enum QueuePos { + Head, + Tail, +} + +pub struct QueueIterator<'a, 'b> { + store: &'a CandyStore, + queue_key: &'b [u8], + curr: Option, + end: Option, + fwd: bool, +} + +impl<'a, 'b> Iterator for QueueIterator<'a, 'b> { + type Item = Result<(usize, Vec)>; + fn next(&mut self) -> Option { + if self.curr.is_none() { + match self.store.fetch_queue(self.queue_key) { + Ok(queue) => match queue { + Some(queue) => { + if self.fwd { + self.curr = Some(queue.head_idx); + self.end = Some(queue.tail_idx); + } else { + self.curr = Some(queue.tail_idx - 1); + self.end = Some(queue.head_idx); + } + } + None => return None, + }, + Err(e) => return Some(Err(e)), + } + } + + loop { + let curr = self.curr.unwrap(); + if self.fwd { + self.curr = Some(curr + 1); + } else { + self.curr = Some(curr - 1); + } + match self + .store + .get_raw(&self.store.make_queue_item_key(self.queue_key, curr)) + { + Ok(v) => { + match v { + Some(v) => return Some(Ok((curr as usize, v))), + None => { + if self.fwd { + if curr >= self.end.unwrap() { + return None; + } + } else { + if curr < self.end.unwrap() { + return None; + } + } + // continue, we might have holes + } + } + } + Err(e) => return Some(Err(e)), + } + } + } +} + +impl CandyStore { + const FIRST_QUEUE_IDX: u64 = 0x8000_0000_0000_0000; + + fn make_queue_key(&self, queue_key: &[u8]) -> (PartedHash, Vec) { + let mut full_queue_key = queue_key.to_owned(); + full_queue_key.extend_from_slice(QUEUE_NAMESPACE); + ( + PartedHash::new(&self.config.hash_seed, &queue_key), + full_queue_key, + ) + } + fn make_queue_item_key(&self, queue_key: &[u8], idx: u64) -> Vec { + let mut item_key = queue_key.to_owned(); + item_key.extend_from_slice(bytes_of(&idx)); + item_key.extend_from_slice(QUEUE_ITEM_NAMESPACE); + item_key + } + + fn _push_to_queue(&self, queue_key: &[u8], val: &[u8], pos: QueuePos) -> Result { + let (queue_ph, full_queue_key) = self.make_queue_key(queue_key); + let _guard = self.lock_list(queue_ph); + + let status = self.get_or_create_raw( + &full_queue_key, + bytes_of(&Queue { + head_idx: Self::FIRST_QUEUE_IDX, + tail_idx: Self::FIRST_QUEUE_IDX + 1, + num_items: 1, + }) + .to_owned(), + )?; + + let item_idx = match status { + crate::GetOrCreateStatus::CreatedNew(_) => Self::FIRST_QUEUE_IDX, + crate::GetOrCreateStatus::ExistingValue(mut queue_bytes) => { + let queue = from_bytes_mut::(&mut queue_bytes); + let item_idx = match pos { + QueuePos::Head => { + queue.head_idx -= 1; + queue.head_idx + } + QueuePos::Tail => { + let item_idx = queue.tail_idx; + queue.tail_idx += 1; + item_idx + } + }; + queue.num_items += 1; + self.set_raw(&full_queue_key, &queue_bytes)?; + item_idx + } + }; + + self.set_raw(&self.make_queue_item_key(queue_key, item_idx), val)?; + Ok(item_idx as usize) + } + + pub fn push_to_queue_head + ?Sized, B2: AsRef<[u8]> + ?Sized>( + &self, + queue_key: &B1, + val: &B2, + ) -> Result { + self._push_to_queue(queue_key.as_ref(), val.as_ref(), QueuePos::Head) + } + pub fn push_to_queue_tail + ?Sized, B2: AsRef<[u8]> + ?Sized>( + &self, + queue_key: &B1, + val: &B2, + ) -> Result { + self._push_to_queue(queue_key.as_ref(), val.as_ref(), QueuePos::Tail) + } + + fn _pop_queue(&self, queue_key: &[u8], pos: QueuePos) -> Result>> { + let (queue_ph, full_queue_key) = self.make_queue_key(queue_key); + let _guard = self.lock_list(queue_ph); + + let Some(mut queue_bytes) = self.get_raw(&full_queue_key)? else { + return Ok(None); + }; + let queue = from_bytes_mut::(&mut queue_bytes); + let mut val = None; + + match pos { + QueuePos::Head => { + while queue.head_idx < queue.tail_idx { + let idx = queue.head_idx; + queue.head_idx += 1; + if let Some(v) = self.remove_raw(&self.make_queue_item_key(queue_key, idx))? { + val = Some(v); + queue.num_items -= 1; + break; + } + } + } + QueuePos::Tail => { + while queue.tail_idx > queue.head_idx { + queue.tail_idx -= 1; + let idx = queue.tail_idx; + if let Some(v) = self.remove_raw(&self.make_queue_item_key(queue_key, idx))? { + val = Some(v); + queue.num_items -= 1; + break; + } + } + } + } + + if queue.head_idx == queue.tail_idx { + self.remove_raw(&full_queue_key)?; + } else { + self.set_raw(&full_queue_key, &queue_bytes)?; + } + + Ok(val) + } + + pub fn pop_queue_head + ?Sized>( + &self, + queue_key: &B, + ) -> Result>> { + self._pop_queue(queue_key.as_ref(), QueuePos::Head) + } + pub fn pop_queue_tail + ?Sized>( + &self, + queue_key: &B, + ) -> Result>> { + self._pop_queue(queue_key.as_ref(), QueuePos::Tail) + } + + pub fn remove_from_queue + ?Sized>( + &self, + queue_key: &B, + idx: usize, + ) -> Result>> { + let idx = idx as u64; + let queue_key = queue_key.as_ref(); + let (queue_ph, full_queue_key) = self.make_queue_key(queue_key); + let _guard = self.lock_list(queue_ph); + + let Some(val) = self.remove_raw(&self.make_queue_item_key(queue_key, idx as u64))? else { + return Ok(None); + }; + + if let Some(mut queue_bytes) = self.get_raw(&full_queue_key)? { + let queue = from_bytes_mut::(&mut queue_bytes); + if queue.head_idx == idx { + queue.head_idx += 1; + } + if queue.tail_idx == idx + 1 { + queue.tail_idx -= 1; + } + queue.num_items -= 1; + if queue.head_idx == queue.tail_idx { + self.remove_raw(&full_queue_key)?; + } else { + self.set_raw(&full_queue_key, &queue_bytes)?; + } + } + + Ok(Some(val)) + } + + pub fn discard_queue + ?Sized>(&self, queue_key: &B) -> Result { + let queue_key = queue_key.as_ref(); + let (queue_ph, full_queue_key) = self.make_queue_key(queue_key); + let _guard = self.lock_list(queue_ph); + + let Some(queue_bytes) = self.get_raw(&full_queue_key)? else { + return Ok(false); + }; + let queue = from_bytes::(&queue_bytes); + + for i in queue.head_idx..queue.tail_idx { + self.remove_raw(&self.make_queue_item_key(queue_key, i as u64))?; + } + + self.remove_raw(&full_queue_key)?; + Ok(true) + } + + fn fetch_queue(&self, queue_key: &[u8]) -> Result> { + let queue_key = queue_key.as_ref(); + let (queue_ph, full_queue_key) = self.make_queue_key(queue_key); + let _guard = self.lock_list(queue_ph); + if let Some(queue_bytes) = self.get_raw(&full_queue_key)? { + Ok(Some(*from_bytes::(&queue_bytes))) + } else { + Ok(None) + } + } + + pub fn extend_queue<'a, B: AsRef<[u8]> + ?Sized>( + &self, + queue_key: &B, + items: impl Iterator>, + ) -> Result> { + let queue_key = queue_key.as_ref(); + let (queue_ph, full_queue_key) = self.make_queue_key(queue_key); + let _guard = self.lock_list(queue_ph); + + let mut queue_bytes = &mut self + .get_or_create_raw( + &full_queue_key, + bytes_of(&Queue { + head_idx: Self::FIRST_QUEUE_IDX, + tail_idx: Self::FIRST_QUEUE_IDX, + num_items: 0, + }) + .to_owned(), + )? + .value(); + + let queue = from_bytes_mut::(&mut queue_bytes); + + let first_idx = queue.tail_idx; + for item in items { + self.set_raw( + &self.make_queue_item_key(queue_key, queue.tail_idx), + item.as_ref(), + )?; + queue.tail_idx += 1; + queue.num_items += 1; + } + + let indices = first_idx as usize..queue.tail_idx as usize; + self.set_raw(&full_queue_key, &queue_bytes)?; + + Ok(indices) + } + + pub fn iter_queue<'a, 'b, B: AsRef<[u8]> + ?Sized>( + &'a self, + queue_key: &'b B, + ) -> QueueIterator<'a, 'b> { + QueueIterator { + store: &self, + queue_key: queue_key.as_ref(), + curr: None, + end: None, + fwd: true, + } + } + + pub fn iter_queue_backwards<'a, 'b, B: AsRef<[u8]> + ?Sized>( + &'a self, + queue_key: &'b B, + ) -> QueueIterator<'a, 'b> { + QueueIterator { + store: &self, + queue_key: queue_key.as_ref(), + curr: None, + end: None, + fwd: false, + } + } + + pub fn queue_len + ?Sized>(&self, queue_key: &B) -> Result { + let Some(queue) = self.fetch_queue(queue_key.as_ref())? else { + return Ok(0); + }; + Ok(queue.num_items as usize) + } +} diff --git a/src/shard.rs b/src/shard.rs index cb6366e..6859359 100644 --- a/src/shard.rs +++ b/src/shard.rs @@ -98,7 +98,7 @@ fn test_row_lookup() -> Result<()> { struct PageAligned(T); pub(crate) const SHARD_FILE_MAGIC: [u8; 8] = *b"CandyStr"; -pub(crate) const SHARD_FILE_VERSION: u64 = 10; +pub(crate) const SHARD_FILE_VERSION: u64 = 11; #[derive(Clone, Copy, Default, Debug, Pod, Zeroable)] #[repr(C)] diff --git a/src/store.rs b/src/store.rs index 8ec42ca..888f1be 100644 --- a/src/store.rs +++ b/src/store.rs @@ -24,6 +24,8 @@ pub(crate) const TYPED_NAMESPACE: &[u8] = &[2]; pub(crate) const LIST_NAMESPACE: &[u8] = &[3]; pub(crate) const ITEM_NAMESPACE: &[u8] = &[4]; pub(crate) const CHAIN_NAMESPACE: u8 = 5; +pub(crate) const QUEUE_NAMESPACE: &[u8] = &[6]; +pub(crate) const QUEUE_ITEM_NAMESPACE: &[u8] = &[7]; #[derive(Debug, Clone)] pub(crate) struct InternalConfig { @@ -506,11 +508,13 @@ impl CandyStore { /// sure the value is correct. /// /// Returns true if the value had existed before (thus it was replaced), false otherwise - pub fn set_big(&self, key: &[u8], val: &[u8]) -> Result { - let existed = self.discard_list(key)?; - for (i, chunk) in val.chunks(MAX_VALUE_SIZE).enumerate() { - self.set_in_list(key, &i.to_le_bytes(), chunk)?; - } + pub fn set_big + ?Sized, B2: AsRef<[u8]> + ?Sized>( + &self, + key: &B1, + val: &B2, + ) -> Result { + let existed = self.discard_queue(key)?; + self.extend_queue(key, val.as_ref().chunks(MAX_VALUE_SIZE))?; Ok(existed) } @@ -519,7 +523,7 @@ impl CandyStore { pub fn get_big(&self, key: &[u8]) -> Result>> { let mut val = vec![]; let mut exists = false; - for res in self.iter_list(key) { + for res in self.iter_queue(key) { let (_, chunk) = res?; exists = true; val.extend_from_slice(&chunk); @@ -534,7 +538,7 @@ impl CandyStore { /// Removes a big item by key. Returns true if the key had existed, false otherwise. /// See also [Self::set_big] pub fn remove_big(&self, key: &[u8]) -> Result { - self.discard_list(key) + self.discard_queue(key) } } diff --git a/tests/test_queues.rs b/tests/test_queues.rs new file mode 100644 index 0000000..de901ad --- /dev/null +++ b/tests/test_queues.rs @@ -0,0 +1,105 @@ +mod common; + +use candystore::{CandyStore, Config, Result}; + +use crate::common::run_in_tempdir; + +#[test] +fn test_queues() -> Result<()> { + run_in_tempdir(|dir| { + let db = CandyStore::open(dir, Config::default())?; + + db.push_to_queue_tail("work", "item1")?; + db.push_to_queue_tail("work", "item2")?; + db.push_to_queue_tail("work", "item3")?; + assert_eq!(db.pop_queue_head("work")?, Some("item1".into())); + assert_eq!(db.pop_queue_head("work")?, Some("item2".into())); + assert_eq!(db.pop_queue_head("work")?, Some("item3".into())); + assert_eq!(db.pop_queue_head("work")?, None); + + db.push_to_queue_head("rev", "item1")?; + db.push_to_queue_head("rev", "item2")?; + db.push_to_queue_head("rev", "item3")?; + assert_eq!(db.pop_queue_tail("rev")?, Some("item1".into())); + assert_eq!(db.pop_queue_tail("rev")?, Some("item2".into())); + assert_eq!(db.pop_queue_tail("rev")?, Some("item3".into())); + assert_eq!(db.pop_queue_tail("rev")?, None); + + assert_eq!(db.queue_len("work")?, 0); + + for i in 1000u32..2000 { + db.push_to_queue_tail("work", &i.to_le_bytes())?; + } + assert_eq!(db.queue_len("work")?, 1000); + assert_eq!(db.queue_len("joke")?, 0); + + for (i, res) in db.iter_queue("work").enumerate() { + let (idx, val) = res?; + let v = u32::from_le_bytes(val.try_into().unwrap()); + assert_eq!(v, 1000 + i as u32); + + // create some holes + if v % 5 == 0 { + assert!(db.remove_from_queue("work", idx)?.is_some()); + } + } + + let mut count = 0; + for res in db.iter_queue("work") { + let (_, val) = res?; + let v = u32::from_le_bytes(val.try_into().unwrap()); + assert_ne!(v % 5, 0); + count += 1; + } + assert!(count == 800); + + let mut count2 = 0; + while let Some(val) = db.pop_queue_head("work")? { + let v = u32::from_le_bytes(val.try_into().unwrap()); + assert_ne!(v % 5, 0); + count2 += 1; + if count2 > 400 { + break; + } + } + while let Some(val) = db.pop_queue_tail("work")? { + let v = u32::from_le_bytes(val.try_into().unwrap()); + assert_ne!(v % 5, 0); + count2 += 1; + } + + assert_eq!(count, count2); + assert_eq!(db.queue_len("work")?, 0); + + db.push_to_queue_tail("work", "item1")?; + db.push_to_queue_tail("work", "item2")?; + db.push_to_queue_tail("work", "item3")?; + assert_eq!(db.queue_len("work")?, 3); + db.extend_queue("work", ["item4", "item5"].iter())?; + assert_eq!(db.queue_len("work")?, 5); + + let items = db + .iter_queue("work") + .map(|res| std::str::from_utf8(&res.unwrap().1).unwrap().to_owned()) + .collect::>(); + assert_eq!(items, ["item1", "item2", "item3", "item4", "item5"]); + + db.discard_queue("work")?; + assert_eq!(db.queue_len("work")?, 0); + + db.extend_queue("work", (1u32..10).map(|i| i.to_le_bytes()))?; + let items = db + .iter_queue("work") + .map(|res| u32::from_le_bytes(res.unwrap().1.try_into().unwrap())) + .collect::>(); + assert_eq!(items, (1u32..10).collect::>()); + + let items = db + .iter_queue_backwards("work") + .map(|res| u32::from_le_bytes(res.unwrap().1.try_into().unwrap())) + .collect::>(); + assert_eq!(items, (1u32..10).rev().collect::>()); + + Ok(()) + }) +}