diff --git a/tokio/src/sync/broadcast.rs b/tokio/src/sync/broadcast.rs index 094161bc706..d4a6600c5a9 100644 --- a/tokio/src/sync/broadcast.rs +++ b/tokio/src/sync/broadcast.rs @@ -116,7 +116,6 @@ //! } //! ``` -use crate::future::poll_fn; use crate::loom::cell::UnsafeCell; use crate::loom::sync::atomic::{AtomicBool, AtomicUsize}; use crate::loom::sync::{Arc, Mutex, MutexGuard, RwLock, RwLockReadGuard}; @@ -846,7 +845,19 @@ impl Sender { /// } /// ``` pub async fn closed(&self) { - self.shared.notify_rx_drop.notified().await; + loop { + let notified = self.shared.notify_rx_drop.notified(); + + { + // Ensure the lock drops if the channel isn't closed + let tail = self.shared.tail.lock(); + if tail.closed { + return; + } + } + + notified.await; + } } fn close_channel(&self) { @@ -1398,12 +1409,13 @@ impl Drop for Receiver { let until = tail.pos; let remaining_rx = tail.rx_cnt; - drop(tail); - if remaining_rx == 0 { self.shared.notify_rx_drop.notify_waiters(); + tail.closed = true; } + drop(tail); + while self.next < until { match self.recv_ref(None) { Ok(_) => {}