Skip to content

Commit

Permalink
Queues: add xxx_with_idx APIs
Browse files Browse the repository at this point in the history
  • Loading branch information
tomerfiliba committed Sep 15, 2024
1 parent 5390b9d commit eddd232
Show file tree
Hide file tree
Showing 3 changed files with 145 additions and 43 deletions.
62 changes: 55 additions & 7 deletions src/queues.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,23 +168,23 @@ impl CandyStore {
self._push_to_queue(queue_key.as_ref(), val.as_ref(), QueuePos::Tail)
}

fn _pop_queue(&self, queue_key: &[u8], pos: QueuePos) -> Result<Option<Vec<u8>>> {
fn _pop_queue(&self, queue_key: &[u8], pos: QueuePos) -> Result<Option<(usize, Vec<u8>)>> {
let (queue_ph, full_queue_key) = self.make_queue_key(queue_key);
let _guard = self.lock_list(queue_ph);

let Some(mut queue_bytes) = self.get_raw(&full_queue_key)? else {
return Ok(None);
};
let queue = from_bytes_mut::<Queue>(&mut queue_bytes);
let mut val = None;
let mut res = None;

match pos {
QueuePos::Head => {
while queue.head_idx < queue.tail_idx {
let idx = queue.head_idx;
queue.head_idx += 1;
if let Some(v) = self.remove_raw(&self.make_queue_item_key(queue_key, idx))? {
val = Some(v);
res = Some((idx as usize, v));
queue.num_items -= 1;
break;
}
Expand All @@ -195,7 +195,7 @@ impl CandyStore {
queue.tail_idx -= 1;
let idx = queue.tail_idx;
if let Some(v) = self.remove_raw(&self.make_queue_item_key(queue_key, idx))? {
val = Some(v);
res = Some((idx as usize, v));
queue.num_items -= 1;
break;
}
Expand All @@ -209,23 +209,41 @@ impl CandyStore {
self.set_raw(&full_queue_key, &queue_bytes)?;
}

Ok(val)
Ok(res)
}

/// Removes and returns the head element and its index of the queue, or None if the queue is empty
pub fn pop_queue_head_with_idx<B: AsRef<[u8]> + ?Sized>(
&self,
queue_key: &B,
) -> Result<Option<(usize, Vec<u8>)>> {
self._pop_queue(queue_key.as_ref(), QueuePos::Head)
}

/// Removes and returns the head element of the queue, or None if the queue is empty
pub fn pop_queue_head<B: AsRef<[u8]> + ?Sized>(
&self,
queue_key: &B,
) -> Result<Option<Vec<u8>>> {
self._pop_queue(queue_key.as_ref(), QueuePos::Head)
Ok(self
.pop_queue_head_with_idx(queue_key.as_ref())?
.map(|iv| iv.1))
}

/// Removes and returns the tail element and its index of the queue, or None if the queue is empty
pub fn pop_queue_tail_with_idx<B: AsRef<[u8]> + ?Sized>(
&self,
queue_key: &B,
) -> Result<Option<(usize, Vec<u8>)>> {
self._pop_queue(queue_key.as_ref(), QueuePos::Tail)
}

/// Removes and returns the tail element of the queue, or None if the queue is empty
pub fn pop_queue_tail<B: AsRef<[u8]> + ?Sized>(
&self,
queue_key: &B,
) -> Result<Option<Vec<u8>>> {
self._pop_queue(queue_key.as_ref(), QueuePos::Tail)
Ok(self.pop_queue_tail_with_idx(queue_key)?.map(|iv| iv.1))
}

/// Removes an element by index from the queue, returning the value it had or None if it did not exist (as well
Expand Down Expand Up @@ -344,6 +362,17 @@ impl CandyStore {
Ok(indices)
}

/// Returns (without removing) the head element of the queue and its index, or None if the queue is empty
pub fn peek_queue_head_with_idx<B: AsRef<[u8]> + ?Sized>(
&self,
queue_key: &B,
) -> Result<Option<(usize, Vec<u8>)>> {
for res in self.iter_queue(queue_key) {
return Ok(Some(res?));
}
Ok(None)
}

/// Returns (without removing) the head element of the queue, or None if the queue is empty
pub fn peek_queue_head<B: AsRef<[u8]> + ?Sized>(
&self,
Expand All @@ -355,6 +384,17 @@ impl CandyStore {
Ok(None)
}

/// Returns (without removing) the head element of the queue and its index, or None if the queue is empty
pub fn peek_queue_tail_with_idx<B: AsRef<[u8]> + ?Sized>(
&self,
queue_key: &B,
) -> Result<Option<(usize, Vec<u8>)>> {
for res in self.iter_queue_backwards(queue_key) {
return Ok(Some(res?));
}
Ok(None)
}

/// Returns (without removing) the tail element of the queue, or None if the queue is empty
pub fn peek_queue_tail<B: AsRef<[u8]> + ?Sized>(
&self,
Expand Down Expand Up @@ -404,4 +444,12 @@ impl CandyStore {
};
Ok(queue.num_items as usize)
}

/// Returns a the range (indices) of the given queue or an empty range if the queue does not exist
pub fn queue_range<B: AsRef<[u8]> + ?Sized>(&self, queue_key: &B) -> Result<Range<usize>> {
let Some(queue) = self.fetch_queue(queue_key.as_ref())? else {
return Ok(Self::FIRST_QUEUE_IDX as usize..Self::FIRST_QUEUE_IDX as usize);
};
Ok(queue.head_idx as usize..queue.tail_idx as usize)
}
}
122 changes: 88 additions & 34 deletions src/typed.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use anyhow::anyhow;
use bytemuck::bytes_of;
use std::{borrow::Borrow, marker::PhantomData, sync::Arc};
use std::{borrow::Borrow, marker::PhantomData, ops::Range, sync::Arc};

use crate::{
store::{ReplaceStatus, SetStatus, TYPED_NAMESPACE},
Expand Down Expand Up @@ -577,112 +577,166 @@ where
/// Pushes a value at the beginning (head) of the queue
pub fn push_head<Q1: ?Sized + Encode, Q2: ?Sized + Encode>(
&self,
list_key: &Q1,
queue_key: &Q1,
val: &Q2,
) -> Result<()>
where
L: Borrow<Q1>,
V: Borrow<Q2>,
{
let list_key = CandyTypedList::<L, (), ()>::make_list_key(list_key);
let queue_key = CandyTypedList::<L, (), ()>::make_list_key(queue_key);
let val = val.to_bytes::<LE>();
self.store.push_to_queue_head(&list_key, &val)?;
self.store.push_to_queue_head(&queue_key, &val)?;
Ok(())
}

/// Pushes a value at the end (tail) of the queue
pub fn push_tail<Q1: ?Sized + Encode, Q2: ?Sized + Encode>(
&self,
list_key: &Q1,
queue_key: &Q1,
val: &Q2,
) -> Result<()>
where
L: Borrow<Q1>,
V: Borrow<Q2>,
{
let list_key = CandyTypedList::<L, (), ()>::make_list_key(list_key);
let queue_key = CandyTypedList::<L, (), ()>::make_list_key(queue_key);
let val = val.to_bytes::<LE>();
self.store.push_to_queue_tail(&list_key, &val)?;
self.store.push_to_queue_tail(&queue_key, &val)?;
Ok(())
}

/// Pops a value from the beginning (head) of the queue
pub fn pop_head<Q: ?Sized + Encode>(&self, list_key: &Q) -> Result<Option<V>>
pub fn pop_head_with_idx<Q: ?Sized + Encode>(&self, queue_key: &Q) -> Result<Option<(usize, V)>>
where
L: Borrow<Q>,
{
let list_key = CandyTypedList::<L, (), ()>::make_list_key(list_key);
let Some(v) = self.store.pop_queue_head(&list_key)? else {
let queue_key = CandyTypedList::<L, (), ()>::make_list_key(queue_key);
let Some((idx, v)) = self.store.pop_queue_head_with_idx(&queue_key)? else {
return Ok(None);
};
Ok(Some(from_bytes::<V>(&v)?))
Ok(Some((idx, from_bytes::<V>(&v)?)))
}

/// Pops a value from the beginning (head) of the queue
pub fn pop_head<Q: ?Sized + Encode>(&self, queue_key: &Q) -> Result<Option<V>>
where
L: Borrow<Q>,
{
Ok(self.pop_head_with_idx(queue_key)?.map(|iv| iv.1))
}

/// Pops a value from the end (tail) of the queue
pub fn pop_tail<Q: ?Sized + Encode>(&self, list_key: &Q) -> Result<Option<V>>
pub fn pop_tail_with_idx<Q: ?Sized + Encode>(&self, queue_key: &Q) -> Result<Option<(usize, V)>>
where
L: Borrow<Q>,
{
let list_key = CandyTypedList::<L, (), ()>::make_list_key(list_key);
let Some(v) = self.store.pop_queue_tail(&list_key)? else {
let queue_key = CandyTypedList::<L, (), ()>::make_list_key(queue_key);
let Some((idx, v)) = self.store.pop_queue_tail_with_idx(&queue_key)? else {
return Ok(None);
};
Ok(Some(from_bytes::<V>(&v)?))
Ok(Some((idx, from_bytes::<V>(&v)?)))
}

/// Peek at the value from the beginning (head) of the queue
pub fn peek_head<Q: ?Sized + Encode>(&self, list_key: &Q) -> Result<Option<V>>
/// Pops a value from the end (tail) of the queue
pub fn pop_tail<Q: ?Sized + Encode>(&self, queue_key: &Q) -> Result<Option<V>>
where
L: Borrow<Q>,
{
let list_key = CandyTypedList::<L, (), ()>::make_list_key(list_key);
let Some(v) = self.store.peek_queue_head(&list_key)? else {
Ok(self.pop_tail_with_idx(queue_key)?.map(|iv| iv.1))
}

/// Peek at the value from the beginning (head) of the queue and its index
pub fn peek_head_with_idx<Q: ?Sized + Encode>(
&self,
queue_key: &Q,
) -> Result<Option<(usize, V)>>
where
L: Borrow<Q>,
{
let queue_key = CandyTypedList::<L, (), ()>::make_list_key(queue_key);
let Some((idx, v)) = self.store.peek_queue_head_with_idx(&queue_key)? else {
return Ok(None);
};
Ok(Some(from_bytes::<V>(&v)?))
Ok(Some((idx, from_bytes::<V>(&v)?)))
}

/// Peek at the value from the beginning (head) of the queue
pub fn peek_head<Q: ?Sized + Encode>(&self, queue_key: &Q) -> Result<Option<V>>
where
L: Borrow<Q>,
{
Ok(self.peek_head_with_idx(queue_key)?.map(|iv| iv.1))
}

/// Peek at the value from the end (tail) of the queue
pub fn peek_tail<Q: ?Sized + Encode>(&self, list_key: &Q) -> Result<Option<V>>
pub fn peek_tail_with_idx<Q: ?Sized + Encode>(
&self,
queue_key: &Q,
) -> Result<Option<(usize, V)>>
where
L: Borrow<Q>,
{
let list_key = CandyTypedList::<L, (), ()>::make_list_key(list_key);
let Some(v) = self.store.peek_queue_tail(&list_key)? else {
let queue_key = CandyTypedList::<L, (), ()>::make_list_key(queue_key);
let Some((idx, v)) = self.store.peek_queue_tail_with_idx(&queue_key)? else {
return Ok(None);
};
Ok(Some(from_bytes::<V>(&v)?))
Ok(Some((idx, from_bytes::<V>(&v)?)))
}

/// Peek at the value from the end (tail) of the queue
pub fn peek_tail<Q: ?Sized + Encode>(&self, queue_key: &Q) -> Result<Option<V>>
where
L: Borrow<Q>,
{
Ok(self.peek_tail_with_idx(queue_key)?.map(|iv| iv.1))
}

/// See [CandyTypedList::iter]
pub fn iter<'a, Q: ?Sized + Encode>(
&'a self,
list_key: &Q,
) -> impl Iterator<Item = Result<V>> + 'a
queue_key: &Q,
) -> impl Iterator<Item = Result<(usize, V)>> + 'a
where
L: Borrow<Q>,
{
let list_key = CandyTypedList::<L, (), ()>::make_list_key(list_key);
self.store.iter_queue(&list_key).map(|res| match res {
let queue_key = CandyTypedList::<L, (), ()>::make_list_key(queue_key);
self.store.iter_queue(&queue_key).map(|res| match res {
Err(e) => Err(e),
Ok((_, v)) => Ok(from_bytes::<V>(&v).unwrap()),
Ok((idx, v)) => Ok((idx, from_bytes::<V>(&v).unwrap())),
})
}

/// See [CandyTypedList::iter_backwards]
pub fn iter_backwards<'a, Q: ?Sized + Encode>(
&'a self,
list_key: &Q,
) -> impl Iterator<Item = Result<V>> + 'a
queue_key: &Q,
) -> impl Iterator<Item = Result<(usize, V)>> + 'a
where
L: Borrow<Q>,
{
let list_key = CandyTypedList::<L, (), ()>::make_list_key(list_key);
let queue_key = CandyTypedList::<L, (), ()>::make_list_key(queue_key);
self.store
.iter_queue_backwards(&list_key)
.iter_queue_backwards(&queue_key)
.map(|res| match res {
Err(e) => Err(e),
Ok((_, v)) => Ok(from_bytes::<V>(&v).unwrap()),
Ok((idx, v)) => Ok((idx, from_bytes::<V>(&v).unwrap())),
})
}

pub fn len<Q: ?Sized + Encode>(&self, queue_key: &Q) -> Result<usize>
where
L: Borrow<Q>,
{
let queue_key = CandyTypedList::<L, (), ()>::make_list_key(queue_key);
self.store.queue_len(&queue_key)
}

pub fn range<Q: ?Sized + Encode>(&self, queue_key: &Q) -> Result<Range<usize>>
where
L: Borrow<Q>,
{
let queue_key = CandyTypedList::<L, (), ()>::make_list_key(queue_key);
self.store.queue_range(&queue_key)
}
}
4 changes: 2 additions & 2 deletions tests/test_lists.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,14 +264,14 @@ fn test_typed_queue() -> Result<()> {

let items = queue
.iter("orders")
.map(|res| res.unwrap())
.map(|res| res.unwrap().1)
.collect::<Vec<_>>();

assert_eq!(items, vec![105, 104, 103, 100, 101, 102]);

let items = queue
.iter_backwards("orders")
.map(|res| res.unwrap())
.map(|res| res.unwrap().1)
.collect::<Vec<_>>();

assert_eq!(items, vec![102, 101, 100, 103, 104, 105]);
Expand Down

0 comments on commit eddd232

Please sign in to comment.