diff --git a/tokio/src/sync/broadcast.rs b/tokio/src/sync/broadcast.rs index 56c4cd6b92f..b97c44769c5 100644 --- a/tokio/src/sync/broadcast.rs +++ b/tokio/src/sync/broadcast.rs @@ -389,8 +389,46 @@ struct RecvGuard<'a, T> { slot: RwLockReadGuard<'a, Slot>, } +pub(crate) mod future { + use std::{ + future::Future, + pin::Pin, + task::{Context, Poll}, + }; + + use pin_project_lite::pin_project; + + use crate::runtime::coop::Coop; + + use super::{error::RecvError, RecvInner}; + + pin_project! { + /// Future for the [`Receiver::recv`][super::Receiver::recv] method. + pub struct Recv<'a, T> + where + T: Clone, + { + #[pin] + pub(super) inner: Coop>, + } + } + + impl<'a, T> Future for Recv<'a, T> + where + T: Clone, + { + type Output = Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + self.project().inner.poll(cx) + } + } +} + +use self::future::Recv; + /// Receive a value future. -struct Recv<'a, T> { +struct RecvInner<'a, T> { /// Receiver being waited on. receiver: &'a mut Receiver, @@ -398,8 +436,8 @@ struct Recv<'a, T> { waiter: UnsafeCell, } -unsafe impl<'a, T: Send> Send for Recv<'a, T> {} -unsafe impl<'a, T: Send> Sync for Recv<'a, T> {} +unsafe impl<'a, T: Send> Send for RecvInner<'a, T> {} +unsafe impl<'a, T: Send> Sync for RecvInner<'a, T> {} /// Max number of receivers. Reserve space to lock. const MAX_RECEIVERS: usize = usize::MAX >> 2; @@ -1192,6 +1230,12 @@ impl Receiver { } /// Receives the next value for this receiver. /// + /// Equivalent to: + /// + /// ```ignore + /// async fn recv(&self) -> Result; + /// ``` + /// /// Each [`Receiver`] handle will receive a clone of all values sent /// **after** it has subscribed. /// @@ -1262,8 +1306,10 @@ impl Receiver { /// assert_eq!(30, rx.recv().await.unwrap()); /// } /// ``` - pub async fn recv(&mut self) -> Result { - cooperative(Recv::new(self)).await + pub fn recv(&mut self) -> Recv<'_, T> { + Recv { + inner: cooperative(RecvInner::new(self)), + } } /// Attempts to return a pending value on this receiver without awaiting. @@ -1363,9 +1409,9 @@ impl Drop for Receiver { } } -impl<'a, T> Recv<'a, T> { - fn new(receiver: &'a mut Receiver) -> Recv<'a, T> { - Recv { +impl<'a, T> RecvInner<'a, T> { + fn new(receiver: &'a mut Receiver) -> RecvInner<'a, T> { + RecvInner { receiver, waiter: UnsafeCell::new(Waiter { queued: AtomicBool::new(false), @@ -1389,7 +1435,7 @@ impl<'a, T> Recv<'a, T> { } } -impl<'a, T> Future for Recv<'a, T> +impl<'a, T> Future for RecvInner<'a, T> where T: Clone, { @@ -1411,7 +1457,7 @@ where } } -impl<'a, T> Drop for Recv<'a, T> { +impl<'a, T> Drop for RecvInner<'a, T> { fn drop(&mut self) { // Safety: `waiter.queued` is atomic. // Acquire ordering is required to synchronize with diff --git a/tokio/src/sync/mod.rs b/tokio/src/sync/mod.rs index ddf99644270..adfa7a7efb7 100644 --- a/tokio/src/sync/mod.rs +++ b/tokio/src/sync/mod.rs @@ -449,7 +449,7 @@ cfg_sync! { /// Named future types. pub mod futures { - pub use super::notify::Notified; + pub use super::{notify::Notified, broadcast::future::Recv}; } mod barrier;