Skip to content

Commit

Permalink
fix(tricky-pipe): mpsc getting stuck when lapping (#35)
Browse files Browse the repository at this point in the history
This branch fixes an issue where the `tricky-pipe` MPSC can get "stuck"
after completing one lap around the queue, due to wrong index
management. I've simplified the index bitfields a bit, and added a test
reproducing the bug (which, sadly, can't run under loom).
  • Loading branch information
hawkw authored Nov 23, 2023
1 parent 756cb3b commit b197e82
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 28 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

58 changes: 30 additions & 28 deletions source/tricky-pipe/src/mpsc/channel_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ pub(super) struct Core<E> {
/// closed by the sender, an index into the queue array, and the sequence
/// number (the current lap around the queue array). The closed flag is
/// represented by the [`CLOSED`] constant. The index is represented by the
/// next [`SEQ_SHIFT`] bits (5 bits on 32-bit machines or 6 bits on 64-bit
/// next [`POS_SHIFT`] bits (5 bits on 32-bit machines or 6 bits on 64-bit
/// machines). Finally, the remaining 9 or 10 bits are the sequence number.
///
/// Since we always have a maximum capacity of 32 or 64 elements, a 16-bit
Expand All @@ -52,7 +52,7 @@ pub(super) struct Core<E> {
/// closed by the receiver, an index into the queue array, and the sequence
/// number (the current lap around the queue array). The closed flag is
/// represented by the [`CLOSED`] constant. The index is represented by the
/// next [`SEQ_SHIFT`] bits (5 bits on 32-bit machines or 6 bits on 64-bit
/// next [`POS_SHIFT`] bits (5 bits on 32-bit machines or 6 bits on 64-bit
/// machines). Finally, the remaining 9 or 10 bits are the sequence number.
///
/// Since we always have a maximum capacity of 32 or 64 elements, a 16-bit
Expand Down Expand Up @@ -162,14 +162,14 @@ pub(super) const MAX_CAPACITY: usize = IndexAllocWord::MAX_CAPACITY as usize;
/// This is the first bit of the pos word, so that it is not clobbered if
/// incrementing the actual position in the queue wraps around (which is fine).
const CLOSED: u16 = 1 << 0;
const POS_SHIFT: u16 = CLOSED.trailing_ones() as u16;
const MASK: u16 = MAX_CAPACITY as u16 - 1;
const POS_SHIFT: u16 = MASK.trailing_ones() as u16;
/// The value by which `enqueue_pos` and `dequeue_pos` are incremented. This is
/// shifted left by two to account for the lowest bits being used for `CLOSED`
/// and `HAS_ERROR`
const POS_ONE: u16 = 1 << POS_SHIFT;
const MASK: u16 = MAX_CAPACITY as u16 - 1;
const SEQ_SHIFT: u16 = MASK.trailing_ones() as u16;
const SEQ_ONE: u16 = 1 << SEQ_SHIFT;
/// One lap around the queue.
const LAP_ONE: u16 = MASK << POS_SHIFT;

// === impl Core ===

Expand All @@ -184,7 +184,7 @@ impl<E> Core<E> {
let mut i = 0;

while i != MAX_CAPACITY {
queue[i] = AtomicU16::new((i as u16) << SEQ_SHIFT);
queue[i] = AtomicU16::new((i as u16) << POS_SHIFT);
i += 1;
}

Expand Down Expand Up @@ -215,7 +215,7 @@ impl<E> Core<E> {
// loom atomics, since they don't have a `const fn` constructor. :(
// oh well, this is test-only code...
let vec = (0..MAX_CAPACITY)
.map(|i| AtomicU16::new((i as u16) << SEQ_SHIFT))
.map(|i| AtomicU16::new((i as u16) << POS_SHIFT))
.collect::<alloc::vec::Vec<_>>();
<[_; MAX_CAPACITY]>::try_from(vec).expect("vec should be the correct length")
};
Expand Down Expand Up @@ -355,7 +355,7 @@ impl<E> Core<E> {

// If the dequeue index has lagged behind the enqueue index by an entire
// "lap" around the ring buffer, then the queue is full.
dequeue_pos.wrapping_add(SEQ_ONE) == enqueue_pos
dequeue_pos.wrapping_add(POS_ONE) == enqueue_pos
}

#[must_use]
Expand All @@ -369,8 +369,8 @@ impl<E> Core<E> {
// dequeue index, then we have a consistent snapshot of both
// indices.
if self.enqueue_pos.load(SeqCst) == enqueue_pos {
let head = dequeue_pos & MASK;
let tail = enqueue_pos & MASK;
let head = (dequeue_pos >> POS_SHIFT) & MASK;
let tail = (dequeue_pos >> POS_SHIFT) & MASK;

return match head.cmp(&tail) {
cmp::Ordering::Less => (tail - head) as usize,
Expand Down Expand Up @@ -457,8 +457,8 @@ impl<E: Clone> Core<E> {
let slot = &self.queue[(pos & MASK) as usize];
// Load the slot's current value, and extract its sequence number.
let val = slot.load(Acquire);
let seq = val >> SEQ_SHIFT;
let dif = test_dbg!(seq as i8).wrapping_sub(test_dbg!(pos).wrapping_add(1) as i8);
let seq = val >> POS_SHIFT;
let dif = test_dbg!(seq as i16).wrapping_sub(test_dbg!(pos.wrapping_add(1)) as i16);

match test_dbg!(dif).cmp(&0) {
cmp::Ordering::Less if test_dbg!(head & CLOSED) != 0 => {
Expand All @@ -475,7 +475,7 @@ impl<E: Clone> Core<E> {
Acquire,
)) {
Ok(_) => {
slot.store(val.wrapping_add(SEQ_ONE), Release);
slot.store(val.wrapping_add(LAP_ONE), Release);
return Ok(Reservation {
core: self,
idx: (val & MASK) as u8,
Expand Down Expand Up @@ -504,19 +504,21 @@ impl<E: Clone> Core<E> {
// discard the `CLOSED` bit.
let pos = tail >> POS_SHIFT;
let slot = &self.queue[test_dbg!(pos & MASK) as usize];
let seq = slot.load(Acquire) >> SEQ_SHIFT;
let dif = test_dbg!(seq as i8).wrapping_sub(test_dbg!(pos as i8));
let seq = slot.load(Acquire) >> POS_SHIFT;
let dif = test_dbg!(seq as i16).wrapping_sub(test_dbg!(pos as i16));

match test_dbg!(dif).cmp(&0) {
cmp::Ordering::Less => unreachable!(),
cmp::Ordering::Less => unreachable!(
"if a slot was successfully reserved, there should be send capacity!"
),
cmp::Ordering::Equal => match test_dbg!(self.enqueue_pos.compare_exchange_weak(
tail,
tail.wrapping_add(POS_ONE),
test_dbg!(tail),
test_dbg!(tail.wrapping_add(POS_ONE)),
AcqRel,
Acquire,
)) {
Ok(_) => {
let new = test_dbg!(test_dbg!((pos) << SEQ_SHIFT).wrapping_add(SEQ_ONE));
let new = test_dbg!(test_dbg!(pos.wrapping_add(1)) << POS_SHIFT);
slot.store(test_dbg!(idx as u16 | new), Release);
test_dbg!(self.cons_wait.wake());
return Ok(());
Expand Down Expand Up @@ -557,7 +559,7 @@ impl<E: Clone> Reservation<'_, E> {

impl<E> Drop for Reservation<'_, E> {
fn drop(&mut self) {
unsafe { self.core.uncommit(self.idx) }
unsafe { test_dbg!(self.core.uncommit(self.idx)) }
}
}

Expand Down Expand Up @@ -792,13 +794,13 @@ mod tests {

#[test]
fn pos_bit_layout() {
eprintln!(" CLOSED = {CLOSED:#016b}");
eprintln!(" POS_ONE = {POS_ONE:#016b}");
eprintln!(" SEQ_ONE = {SEQ_ONE:#016b}");
eprintln!(" MASK = {MASK:#016b}");
eprintln!("SEQ_SHIFT = {SEQ_SHIFT}");
let packed_seq_bits = u16::BITS - (SEQ_SHIFT as u32 + 1);
eprintln!(" seq bits = u16::BITS - (SEQ_SHIFT + 1) = {packed_seq_bits}");
eprintln!(" CLOSED = {CLOSED:#018b}");
eprintln!(" POS_ONE = {POS_ONE:#018b}");
eprintln!(" LAP_ONE = {LAP_ONE:#018b}");
eprintln!(" MASK = {MASK:#018b}");
eprintln!(" POS_SHIFT = {POS_SHIFT}");
let packed_seq_bits = u16::BITS - (POS_SHIFT as u32);
eprintln!(" seq bits = u16::BITS - (POS_SHIFT) = {packed_seq_bits}");
assert!(
packed_seq_bits >= 2,
"at least two bits (4 laps) should be used for sequence numbers"
Expand Down
47 changes: 47 additions & 0 deletions source/tricky-pipe/src/mpsc/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -592,6 +592,53 @@ fn mpsc_send() {
})
}

#[test]
// this would probably run for 1000 years under loom...
#[cfg_attr(any(loom, miri), ignore)]
fn stress_mpsc_send() {
const TX1_SENDS: usize = 64;
const TX2_SENDS: usize = 64;
const SENDS: usize = TX1_SENDS + TX2_SENDS;
const CAPACITY: u8 = 8;

loom::model(|| {
let chan = TrickyPipe::<loom::alloc::Track<usize>>::new(CAPACITY);

let rx = test_dbg!(chan.receiver()).expect("can't get rx");
let tx1 = chan.sender();
let tx2 = chan.sender();
// drop the channel now so that the channel can be tx-closed.
drop(chan);

let t1 = thread::spawn(do_tx(TX1_SENDS, 0, tx1));
let t2 = thread::spawn(do_tx(TX2_SENDS, TX1_SENDS, tx2));

let recvs = future::block_on(async move {
let mut recvs = std::collections::BTreeSet::new();
while let Ok(msg) = rx.recv().await {
let msg = msg.into_inner();
tracing::info!(received = msg);
assert!(
recvs.insert(msg),
"each message should only have been received once\nmessage: {msg}\nreceived: {recvs:?}"
);
}
recvs
});

t1.join().unwrap();
t2.join().unwrap();

for msg in 0..SENDS {
assert!(
recvs.contains(&msg),
"didn't receive {}\nreceived: {recvs:?}",
msg
);
}
})
}

#[test]
fn rx_closes_error() {
const CAPACITY: u8 = 2;
Expand Down

0 comments on commit b197e82

Please sign in to comment.