Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 89 additions & 0 deletions crossbeam-queue/src/array_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,49 @@ impl<T> ArrayQueue<T> {
})
}

/// Attempts to push an element using an exclusive reference of the queue.
///
/// Atomic operaitons and checks are omitted
///
/// # Examples
///
/// ```
/// use crossbeam_queue::ArrayQueue;
///
/// let mut q = ArrayQueue::new(1);
///
/// assert_eq!(q.push_mut(10), Ok(()));
/// assert_eq!(q.push_mut(20), Err(20));
/// ```
pub fn push_mut(&mut self, value: T) -> Result<(), T> {
let tail = self.tail.load(Ordering::Relaxed);
let head = self.head.load(Ordering::Relaxed);

if head.wrapping_add(self.one_lap) == tail {
return Err(value);
}

// Now, trying to update the tail
let index = tail & (self.one_lap - 1);
let lap = tail & !(self.one_lap - 1);
let new_tail = if index + 1 < self.capacity() {
tail + 1
} else {
// One lap forward, index wraps around to zero.
lap.wrapping_add(self.one_lap)
};

self.tail.store(new_tail, Ordering::Relaxed);

let slot = unsafe { self.buffer.get_unchecked(index) };
unsafe {
slot.value.get().write(MaybeUninit::new(value));
}
slot.stamp.store(tail + 1, Ordering::Relaxed);

Ok(())
}

/// Pushes an element into the queue, replacing the oldest element if necessary.
///
/// If the queue is full, the oldest element is replaced and returned,
Expand Down Expand Up @@ -336,6 +379,52 @@ impl<T> ArrayQueue<T> {
}
}

/// Attempts to pop an element using a exclusive reference of the queue.
///
/// Due to having an exclusive reference, atomic operaitons and checks are omitted
///
/// # Examples
///
/// ```
/// use crossbeam_queue::ArrayQueue;
///
/// let mut q = ArrayQueue::new(1);
/// assert_eq!(q.push(10), Ok(()));
///
/// assert_eq!(q.pop_mut(), Some(10));
/// assert!(q.pop_mut().is_none());
/// ```
pub fn pop_mut(&mut self) -> Option<T> {
let head = self.head.load(Ordering::Relaxed);
let tail = self.tail.load(Ordering::Relaxed);

// If the tail equals the head, that means the channel is empty.
if tail == head {
return None;
}
let index = head & (self.one_lap - 1);
let lap = head & !(self.one_lap - 1);

// Inspect the corresponding slot.
debug_assert!(index < self.buffer.len());
let slot = unsafe { self.buffer.get_unchecked(index) };

let new = if index + 1 < self.capacity() {
// Same lap, incremented index.
// Set to `{ lap: lap, index: index + 1 }`.
head + 1
} else {
// One lap forward, index wraps around to zero.
// Set to `{ lap: lap.wrapping_add(1), index: 0 }`.
lap.wrapping_add(self.one_lap)
};
let msg = unsafe { slot.value.get().read().assume_init() };
slot.stamp
.store(head.wrapping_add(self.one_lap), Ordering::Relaxed);
self.head.store(new, Ordering::Relaxed);
Some(msg)
}

/// Returns the capacity of the queue.
///
/// # Examples
Expand Down
127 changes: 126 additions & 1 deletion crossbeam-queue/src/seg_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,6 @@ impl<T> Block<T> {
return;
}
}

// No thread is using the block, now it is safe to destroy it.
drop(unsafe { Box::from_raw(this) });
}
Expand Down Expand Up @@ -283,6 +282,59 @@ impl<T> SegQueue<T> {
}
}

/// Pushes an element to the queue with exclusive mutable access.
///
/// Avoids atomic operations and synchronization, assuming
/// no other threads access the queue concurrently.
///
/// # Examples
///
/// ```
/// use crossbeam_queue::SegQueue;
///
/// let mut q = SegQueue::new();
///
/// q.push_mut(10);
/// q.push_mut(20);
/// ```
pub fn push_mut(&mut self, value: T) {
let tail = self.tail.index.load(Ordering::Relaxed);
let mut block = self.tail.block.load(Ordering::Relaxed);

// Calculate the offset of the index into the block.
let offset = (tail >> SHIFT) % LAP;

// If this is the first push operation, we need to allocate the first block.
if block.is_null() {
let new = Box::into_raw(Block::<T>::new());
self.head.block.store(new, Ordering::Relaxed);
self.tail.block.store(new, Ordering::Relaxed);

block = new;
}

let new_tail = tail + (1 << SHIFT);

self.tail.index.store(new_tail, Ordering::Relaxed);

unsafe {
// If we've reached the end of the block, install the next one.
if offset + 1 == BLOCK_CAP {
let next_block = Box::into_raw(Block::<T>::new());
let next_index = new_tail.wrapping_add(1 << SHIFT);

self.tail.block.store(next_block, Ordering::Relaxed);
self.tail.index.store(next_index, Ordering::Relaxed);
(*block).next.store(next_block, Ordering::Relaxed);
}

// Write the value into the slot.
let slot = (*block).slots.get_unchecked(offset);
slot.value.get().write(MaybeUninit::new(value));
slot.state.fetch_or(WRITE, Ordering::Relaxed);
}
}

/// Pops the head element from the queue.
///
/// If the queue is empty, `None` is returned.
Expand Down Expand Up @@ -387,6 +439,79 @@ impl<T> SegQueue<T> {
}
}

/// Pops the head element from the queue using an exclusive reference.
///
/// Avoids atomic operations and synchronization, assuming
/// no other threads access the queue concurrently.
///
/// If the queue is empty, `None` is returned.
///
/// # Examples
///
/// ```
/// use crossbeam_queue::SegQueue;
///
/// let mut q = SegQueue::new();
///
/// q.push(10);
/// q.push(20);
/// assert_eq!(q.pop_mut(), Some(10));
/// assert_eq!(q.pop_mut(), Some(20));
/// assert!(q.pop_mut().is_none());
/// ```
pub fn pop_mut(&mut self) -> Option<T> {
let head = self.head.index.load(Ordering::Relaxed);
let block = self.head.block.load(Ordering::Relaxed);

// Calculate the offset of the index into the block.
let offset = (head >> SHIFT) % LAP;

let mut new_head = head + (1 << SHIFT);

if new_head & HAS_NEXT == 0 {
let tail = self.tail.index.load(Ordering::Relaxed);

// If the tail equals the head, that means the queue is empty.
if head >> SHIFT == tail >> SHIFT {
return None;
}

// If head and tail are not in the same block, set `HAS_NEXT` in head.
if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP {
new_head |= HAS_NEXT;
}
}

self.head.index.store(new_head, Ordering::Relaxed);

unsafe {
// If we've reached the end of the block, move to the next one.
if offset + 1 == BLOCK_CAP {
let next = (*block).wait_next();
let mut next_index = (new_head & !HAS_NEXT).wrapping_add(1 << SHIFT);
if !(*next).next.load(Ordering::Relaxed).is_null() {
next_index |= HAS_NEXT;
}

self.head.block.store(next, Ordering::Relaxed);
self.head.index.store(next_index, Ordering::Relaxed);
}

// Read the value.
let slot = (*block).slots.get_unchecked(offset);
let value = slot.value.get().read().assume_init();

// Destroy the block if we've reached the end
if offset + 1 == BLOCK_CAP {
Block::destroy(block, 0);
} else if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 {
Block::destroy(block, offset + 1);
}

Some(value)
}
}

/// Returns `true` if the queue is empty.
///
/// # Examples
Expand Down
30 changes: 30 additions & 0 deletions crossbeam-queue/tests/array_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,36 @@ fn len_empty_full() {
assert!(!q.is_full());
}

#[test]
fn exclusive_reference() {
let mut q = ArrayQueue::new(2);

assert_eq!(q.len(), 0);
assert!(q.is_empty());

q.push_mut(()).unwrap();

assert_eq!(q.len(), 1);
assert!(!q.is_empty());
assert!(!q.is_full());

q.push_mut(()).unwrap();

assert_eq!(q.len(), 2);
assert!(!q.is_empty());
assert!(q.is_full());

q.pop_mut().unwrap();

assert_eq!(q.len(), 1);
assert!(!q.is_empty());
assert!(!q.is_full());

q.pop().unwrap();

assert_eq!(q.len(), 0);
}

#[test]
fn len() {
#[cfg(miri)]
Expand Down
47 changes: 47 additions & 0 deletions crossbeam-queue/tests/seg_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,53 @@ fn len() {
assert_eq!(q.len(), 0);
}

#[test]
fn exclusive_reference() {
let mut q = SegQueue::new();

assert_eq!(q.len(), 0);

for i in 0..50 {
q.push_mut(i);
assert_eq!(q.len(), i + 1);
}

for i in 0..50 {
q.pop_mut().unwrap();
assert_eq!(q.len(), 50 - i - 1);
}

assert_eq!(q.len(), 0);
assert!(q.is_empty());

for i in 0..35 {
q.push(i);
assert_eq!(q.len(), i + 1);
}

for i in 0..5 {
q.push_mut(i);
assert_eq!(q.len(), 35 + i + 1);
}

for i in 0..5 {
q.pop_mut().unwrap();
assert_eq!(q.len(), 40 - i - 1);
}

for i in 0..35 {
q.pop().unwrap();
assert_eq!(q.len(), 35 - i - 1);
}

assert_eq!(q.len(), 0);
assert!(q.is_empty());

q.push_mut(1);

assert!(!q.is_empty());
}

#[test]
fn spsc() {
#[cfg(miri)]
Expand Down