From b7d4fba70755e2ec682665f22f630b8a725a3705 Mon Sep 17 00:00:00 2001 From: Matthijs Brobbel Date: Fri, 3 May 2024 13:44:01 +0200 Subject: [PATCH] sync: add `mpsc::Receiver::{capacity,max_capacity}` (#6511) --- tokio/src/sync/mpsc/bounded.rs | 88 ++++++++++++++++++++++++++++++++-- tokio/src/sync/mpsc/chan.rs | 4 ++ 2 files changed, 88 insertions(+), 4 deletions(-) diff --git a/tokio/src/sync/mpsc/bounded.rs b/tokio/src/sync/mpsc/bounded.rs index 6ac97591fea..a4f98060b19 100644 --- a/tokio/src/sync/mpsc/bounded.rs +++ b/tokio/src/sync/mpsc/bounded.rs @@ -481,7 +481,7 @@ impl Receiver { /// assert!(!rx.is_closed()); /// /// rx.close(); - /// + /// /// assert!(rx.is_closed()); /// } /// ``` @@ -530,6 +530,86 @@ impl Receiver { self.chan.len() } + /// Returns the current capacity of the channel. + /// + /// The capacity goes down when the sender sends a value by calling [`Sender::send`] or by reserving + /// capacity with [`Sender::reserve`]. The capacity goes up when values are received. + /// This is distinct from [`max_capacity`], which always returns buffer capacity initially + /// specified when calling [`channel`]. + /// + /// # Examples + /// + /// ``` + /// use tokio::sync::mpsc; + /// + /// #[tokio::main] + /// async fn main() { + /// let (tx, mut rx) = mpsc::channel::<()>(5); + /// + /// assert_eq!(rx.capacity(), 5); + /// + /// // Making a reservation drops the capacity by one. + /// let permit = tx.reserve().await.unwrap(); + /// assert_eq!(rx.capacity(), 4); + /// assert_eq!(rx.len(), 0); + /// + /// // Sending and receiving a value increases the capacity by one. + /// permit.send(()); + /// assert_eq!(rx.len(), 1); + /// rx.recv().await.unwrap(); + /// assert_eq!(rx.capacity(), 5); + /// + /// // Directly sending a message drops the capacity by one. + /// tx.send(()).await.unwrap(); + /// assert_eq!(rx.capacity(), 4); + /// assert_eq!(rx.len(), 1); + /// + /// // Receiving the message increases the capacity by one. + /// rx.recv().await.unwrap(); + /// assert_eq!(rx.capacity(), 5); + /// assert_eq!(rx.len(), 0); + /// } + /// ``` + /// [`capacity`]: Receiver::capacity + /// [`max_capacity`]: Receiver::max_capacity + pub fn capacity(&self) -> usize { + self.chan.semaphore().semaphore.available_permits() + } + + /// Returns the maximum buffer capacity of the channel. + /// + /// The maximum capacity is the buffer capacity initially specified when calling + /// [`channel`]. This is distinct from [`capacity`], which returns the *current* + /// available buffer capacity: as messages are sent and received, the value + /// returned by [`capacity`] will go up or down, whereas the value + /// returned by [`max_capacity`] will remain constant. + /// + /// # Examples + /// + /// ``` + /// use tokio::sync::mpsc; + /// + /// #[tokio::main] + /// async fn main() { + /// let (tx, rx) = mpsc::channel::<()>(5); + /// + /// // both max capacity and capacity are the same at first + /// assert_eq!(rx.max_capacity(), 5); + /// assert_eq!(rx.capacity(), 5); + /// + /// // Making a reservation doesn't change the max capacity. + /// let permit = tx.reserve().await.unwrap(); + /// assert_eq!(rx.max_capacity(), 5); + /// // but drops the capacity by one + /// assert_eq!(rx.capacity(), 4); + /// } + /// ``` + /// [`capacity`]: Receiver::capacity + /// [`max_capacity`]: Receiver::max_capacity + pub fn max_capacity(&self) -> usize { + self.chan.semaphore().bound + } + /// Polls to receive the next message on this channel. /// /// This method returns: @@ -1059,7 +1139,7 @@ impl Sender { /// /// // The iterator should now be exhausted /// assert!(permit.next().is_none()); - /// + /// /// // The value sent on the permit is received /// assert_eq!(rx.recv().await.unwrap(), 456); /// assert_eq!(rx.recv().await.unwrap(), 457); @@ -1274,7 +1354,7 @@ impl Sender { /// // The value sent on the permit is received /// assert_eq!(rx.recv().await.unwrap(), 456); /// assert_eq!(rx.recv().await.unwrap(), 457); - /// + /// /// // Trying to call try_reserve_many with 0 will return an empty iterator /// let mut permit = tx.try_reserve_many(0).unwrap(); /// assert!(permit.next().is_none()); @@ -1447,7 +1527,7 @@ impl Sender { /// [`channel`]. This is distinct from [`capacity`], which returns the *current* /// available buffer capacity: as messages are sent and received, the /// value returned by [`capacity`] will go up or down, whereas the value - /// returned by `max_capacity` will remain constant. + /// returned by [`max_capacity`] will remain constant. /// /// # Examples /// diff --git a/tokio/src/sync/mpsc/chan.rs b/tokio/src/sync/mpsc/chan.rs index ae378d7ecb2..d8838242a39 100644 --- a/tokio/src/sync/mpsc/chan.rs +++ b/tokio/src/sync/mpsc/chan.rs @@ -465,6 +465,10 @@ impl Rx { } }) } + + pub(super) fn semaphore(&self) -> &S { + &self.inner.semaphore + } } impl Drop for Rx {