From b9f5adbcfc1073e672e5db1bc78f4d94d1a46328 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Thu, 23 Nov 2023 11:51:19 -0800 Subject: [PATCH] fix(tricky-pipe): mpsc getting stuck when lapping --- Cargo.lock | 1 + source/tricky-pipe/src/mpsc/channel_core.rs | 58 +++++++++++---------- source/tricky-pipe/src/mpsc/tests.rs | 46 ++++++++++++++++ 3 files changed, 77 insertions(+), 28 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 8aa14c2..b948723 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -103,6 +103,7 @@ dependencies = [ "futures", "heapless", "maitake", + "maitake-sync", "postcard", "proptest", "proptest-derive", diff --git a/source/tricky-pipe/src/mpsc/channel_core.rs b/source/tricky-pipe/src/mpsc/channel_core.rs index d53b6a0..8a0de65 100644 --- a/source/tricky-pipe/src/mpsc/channel_core.rs +++ b/source/tricky-pipe/src/mpsc/channel_core.rs @@ -27,7 +27,7 @@ pub(super) struct Core { /// closed by the sender, an index into the queue array, and the sequence /// number (the current lap around the queue array). The closed flag is /// represented by the [`CLOSED`] constant. The index is represented by the - /// next [`SEQ_SHIFT`] bits (5 bits on 32-bit machines or 6 bits on 64-bit + /// next [`POS_SHIFT`] bits (5 bits on 32-bit machines or 6 bits on 64-bit /// machines). Finally, the remaining 9 or 10 bits are the sequence number. /// /// Since we always have a maximum capacity of 32 or 64 elements, a 16-bit @@ -52,7 +52,7 @@ pub(super) struct Core { /// closed by the receiver, an index into the queue array, and the sequence /// number (the current lap around the queue array). The closed flag is /// represented by the [`CLOSED`] constant. The index is represented by the - /// next [`SEQ_SHIFT`] bits (5 bits on 32-bit machines or 6 bits on 64-bit + /// next [`POS_SHIFT`] bits (5 bits on 32-bit machines or 6 bits on 64-bit /// machines). Finally, the remaining 9 or 10 bits are the sequence number. /// /// Since we always have a maximum capacity of 32 or 64 elements, a 16-bit @@ -162,14 +162,14 @@ pub(super) const MAX_CAPACITY: usize = IndexAllocWord::MAX_CAPACITY as usize; /// This is the first bit of the pos word, so that it is not clobbered if /// incrementing the actual position in the queue wraps around (which is fine). const CLOSED: u16 = 1 << 0; -const POS_SHIFT: u16 = CLOSED.trailing_ones() as u16; +const MASK: u16 = MAX_CAPACITY as u16 - 1; +const POS_SHIFT: u16 = MASK.trailing_ones() as u16; /// The value by which `enqueue_pos` and `dequeue_pos` are incremented. This is /// shifted left by two to account for the lowest bits being used for `CLOSED` /// and `HAS_ERROR` const POS_ONE: u16 = 1 << POS_SHIFT; -const MASK: u16 = MAX_CAPACITY as u16 - 1; -const SEQ_SHIFT: u16 = MASK.trailing_ones() as u16; -const SEQ_ONE: u16 = 1 << SEQ_SHIFT; +/// One lap around the queue. +const LAP_ONE: u16 = MASK << POS_SHIFT; // === impl Core === @@ -184,7 +184,7 @@ impl Core { let mut i = 0; while i != MAX_CAPACITY { - queue[i] = AtomicU16::new((i as u16) << SEQ_SHIFT); + queue[i] = AtomicU16::new((i as u16) << POS_SHIFT); i += 1; } @@ -215,7 +215,7 @@ impl Core { // loom atomics, since they don't have a `const fn` constructor. :( // oh well, this is test-only code... let vec = (0..MAX_CAPACITY) - .map(|i| AtomicU16::new((i as u16) << SEQ_SHIFT)) + .map(|i| AtomicU16::new((i as u16) << POS_SHIFT)) .collect::>(); <[_; MAX_CAPACITY]>::try_from(vec).expect("vec should be the correct length") }; @@ -355,7 +355,7 @@ impl Core { // If the dequeue index has lagged behind the enqueue index by an entire // "lap" around the ring buffer, then the queue is full. - dequeue_pos.wrapping_add(SEQ_ONE) == enqueue_pos + dequeue_pos.wrapping_add(POS_ONE) == enqueue_pos } #[must_use] @@ -369,8 +369,8 @@ impl Core { // dequeue index, then we have a consistent snapshot of both // indices. if self.enqueue_pos.load(SeqCst) == enqueue_pos { - let head = dequeue_pos & MASK; - let tail = enqueue_pos & MASK; + let head = (dequeue_pos >> POS_SHIFT) & MASK; + let tail = (dequeue_pos >> POS_SHIFT) & MASK; return match head.cmp(&tail) { cmp::Ordering::Less => (tail - head) as usize, @@ -457,8 +457,8 @@ impl Core { let slot = &self.queue[(pos & MASK) as usize]; // Load the slot's current value, and extract its sequence number. let val = slot.load(Acquire); - let seq = val >> SEQ_SHIFT; - let dif = test_dbg!(seq as i8).wrapping_sub(test_dbg!(pos).wrapping_add(1) as i8); + let seq = val >> POS_SHIFT; + let dif = test_dbg!(seq as i16).wrapping_sub(test_dbg!(pos.wrapping_add(1)) as i16); match test_dbg!(dif).cmp(&0) { cmp::Ordering::Less if test_dbg!(head & CLOSED) != 0 => { @@ -475,7 +475,7 @@ impl Core { Acquire, )) { Ok(_) => { - slot.store(val.wrapping_add(SEQ_ONE), Release); + slot.store(val.wrapping_add(LAP_ONE), Release); return Ok(Reservation { core: self, idx: (val & MASK) as u8, @@ -504,19 +504,21 @@ impl Core { // discard the `CLOSED` bit. let pos = tail >> POS_SHIFT; let slot = &self.queue[test_dbg!(pos & MASK) as usize]; - let seq = slot.load(Acquire) >> SEQ_SHIFT; - let dif = test_dbg!(seq as i8).wrapping_sub(test_dbg!(pos as i8)); + let seq = slot.load(Acquire) >> POS_SHIFT; + let dif = test_dbg!(seq as i16).wrapping_sub(test_dbg!(pos as i16)); match test_dbg!(dif).cmp(&0) { - cmp::Ordering::Less => unreachable!(), + cmp::Ordering::Less => unreachable!( + "if a slot was successfully reserved, there should be send capacity!" + ), cmp::Ordering::Equal => match test_dbg!(self.enqueue_pos.compare_exchange_weak( - tail, - tail.wrapping_add(POS_ONE), + test_dbg!(tail), + test_dbg!(tail.wrapping_add(POS_ONE)), AcqRel, Acquire, )) { Ok(_) => { - let new = test_dbg!(test_dbg!((pos) << SEQ_SHIFT).wrapping_add(SEQ_ONE)); + let new = test_dbg!(test_dbg!(pos.wrapping_add(1)) << POS_SHIFT); slot.store(test_dbg!(idx as u16 | new), Release); test_dbg!(self.cons_wait.wake()); return Ok(()); @@ -557,7 +559,7 @@ impl Reservation<'_, E> { impl Drop for Reservation<'_, E> { fn drop(&mut self) { - unsafe { self.core.uncommit(self.idx) } + unsafe { test_dbg!(self.core.uncommit(self.idx)) } } } @@ -792,13 +794,13 @@ mod tests { #[test] fn pos_bit_layout() { - eprintln!(" CLOSED = {CLOSED:#016b}"); - eprintln!(" POS_ONE = {POS_ONE:#016b}"); - eprintln!(" SEQ_ONE = {SEQ_ONE:#016b}"); - eprintln!(" MASK = {MASK:#016b}"); - eprintln!("SEQ_SHIFT = {SEQ_SHIFT}"); - let packed_seq_bits = u16::BITS - (SEQ_SHIFT as u32 + 1); - eprintln!(" seq bits = u16::BITS - (SEQ_SHIFT + 1) = {packed_seq_bits}"); + eprintln!(" CLOSED = {CLOSED:#018b}"); + eprintln!(" POS_ONE = {POS_ONE:#018b}"); + eprintln!(" LAP_ONE = {LAP_ONE:#018b}"); + eprintln!(" MASK = {MASK:#018b}"); + eprintln!(" POS_SHIFT = {POS_SHIFT}"); + let packed_seq_bits = u16::BITS - (POS_SHIFT as u32); + eprintln!(" seq bits = u16::BITS - (POS_SHIFT) = {packed_seq_bits}"); assert!( packed_seq_bits >= 2, "at least two bits (4 laps) should be used for sequence numbers" diff --git a/source/tricky-pipe/src/mpsc/tests.rs b/source/tricky-pipe/src/mpsc/tests.rs index 6aa9896..5762589 100644 --- a/source/tricky-pipe/src/mpsc/tests.rs +++ b/source/tricky-pipe/src/mpsc/tests.rs @@ -592,6 +592,52 @@ fn mpsc_send() { }) } +#[test] +#[cfg_attr(loom, ignore)] // this would probably run for 1000 years under loom... +fn mpsc_send_full() { + const TX1_SENDS: usize = 64; + const TX2_SENDS: usize = 64; + const SENDS: usize = TX1_SENDS + TX2_SENDS; + const CAPACITY: u8 = 8; + + loom::model(|| { + let chan = TrickyPipe::>::new(CAPACITY); + + let rx = test_dbg!(chan.receiver()).expect("can't get rx"); + let tx1 = chan.sender(); + let tx2 = chan.sender(); + // drop the channel now so that the channel can be tx-closed. + drop(chan); + + let t1 = thread::spawn(do_tx(TX1_SENDS, 0, tx1)); + let t2 = thread::spawn(do_tx(TX2_SENDS, TX1_SENDS, tx2)); + + let recvs = future::block_on(async move { + let mut recvs = std::collections::BTreeSet::new(); + while let Ok(msg) = rx.recv().await { + let msg = msg.into_inner(); + tracing::info!(received = msg); + assert!( + recvs.insert(msg), + "each message should only have been received once\nmessage: {msg}\nreceived: {recvs:?}" + ); + } + recvs + }); + + t1.join().unwrap(); + t2.join().unwrap(); + + for msg in 0..SENDS { + assert!( + recvs.contains(&msg), + "didn't receive {}\nreceived: {recvs:?}", + msg + ); + } + }) +} + #[test] fn rx_closes_error() { const CAPACITY: u8 = 2;