Skip to content

Commit 7341004

Browse files
sync: add {Receiver,UnboundedReceiver}::poll_recv_many (#6236)
1 parent 02b779e commit 7341004

File tree

2 files changed

+158
-4
lines changed

2 files changed

+158
-4
lines changed

tokio/src/sync/mpsc/bounded.rs

Lines changed: 79 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -464,8 +464,8 @@ impl<T> Receiver<T> {
464464
/// When the method returns `Poll::Pending`, the `Waker` in the provided
465465
/// `Context` is scheduled to receive a wakeup when a message is sent on any
466466
/// receiver, or when the channel is closed. Note that on multiple calls to
467-
/// `poll_recv`, only the `Waker` from the `Context` passed to the most
468-
/// recent call is scheduled to receive a wakeup.
467+
/// `poll_recv` or `poll_recv_many`, only the `Waker` from the `Context`
468+
/// passed to the most recent call is scheduled to receive a wakeup.
469469
///
470470
/// If this method returns `Poll::Pending` due to a spurious failure, then
471471
/// the `Waker` will be notified when the situation causing the spurious
@@ -475,6 +475,83 @@ impl<T> Receiver<T> {
475475
pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
476476
self.chan.recv(cx)
477477
}
478+
479+
/// Polls to receive multiple messages on this channel, extending the provided buffer.
480+
///
481+
/// This method returns:
482+
/// * `Poll::Pending` if no messages are available but the channel is not closed, or if a
483+
/// spurious failure happens.
484+
/// * `Poll::Ready(count)` where `count` is the number of messages successfully received and
485+
/// stored in `buffer`. This can be less than, or equal to, `limit`.
486+
/// * `Poll::Ready(0)` if `limit` is set to zero or when the channel is closed.
487+
///
488+
/// When the method returns `Poll::Pending`, the `Waker` in the provided
489+
/// `Context` is scheduled to receive a wakeup when a message is sent on any
490+
/// receiver, or when the channel is closed. Note that on multiple calls to
491+
/// `poll_recv` or `poll_recv_many`, only the `Waker` from the `Context`
492+
/// passed to the most recent call is scheduled to receive a wakeup.
493+
///
494+
/// Note that this method does not guarantee that exactly `limit` messages
495+
/// are received. Rather, if at least one message is available, it returns
496+
/// as many messages as it can up to the given limit. This method returns
497+
/// zero only if the channel is closed (or if `limit` is zero).
498+
///
499+
/// # Examples
500+
///
501+
/// ```
502+
/// use std::task::{Context, Poll};
503+
/// use std::pin::Pin;
504+
/// use tokio::sync::mpsc;
505+
/// use futures::Future;
506+
///
507+
/// struct MyReceiverFuture<'a> {
508+
/// receiver: mpsc::Receiver<i32>,
509+
/// buffer: &'a mut Vec<i32>,
510+
/// limit: usize,
511+
/// }
512+
///
513+
/// impl<'a> Future for MyReceiverFuture<'a> {
514+
/// type Output = usize; // Number of messages received
515+
///
516+
/// fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
517+
/// let MyReceiverFuture { receiver, buffer, limit } = &mut *self;
518+
///
519+
/// // Now `receiver` and `buffer` are mutable references, and `limit` is copied
520+
/// match receiver.poll_recv_many(cx, *buffer, *limit) {
521+
/// Poll::Pending => Poll::Pending,
522+
/// Poll::Ready(count) => Poll::Ready(count),
523+
/// }
524+
/// }
525+
/// }
526+
///
527+
/// #[tokio::main]
528+
/// async fn main() {
529+
/// let (tx, rx) = mpsc::channel(32);
530+
/// let mut buffer = Vec::new();
531+
///
532+
/// let my_receiver_future = MyReceiverFuture {
533+
/// receiver: rx,
534+
/// buffer: &mut buffer,
535+
/// limit: 3,
536+
/// };
537+
///
538+
/// for i in 0..10 {
539+
/// tx.send(i).await.unwrap();
540+
/// }
541+
///
542+
/// let count = my_receiver_future.await;
543+
/// assert_eq!(count, 3);
544+
/// assert_eq!(buffer, vec![0,1,2])
545+
/// }
546+
/// ```
547+
pub fn poll_recv_many(
548+
&mut self,
549+
cx: &mut Context<'_>,
550+
buffer: &mut Vec<T>,
551+
limit: usize,
552+
) -> Poll<usize> {
553+
self.chan.recv_many(cx, buffer, limit)
554+
}
478555
}
479556

480557
impl<T> fmt::Debug for Receiver<T> {

tokio/src/sync/mpsc/unbounded.rs

Lines changed: 79 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -343,8 +343,8 @@ impl<T> UnboundedReceiver<T> {
343343
/// When the method returns `Poll::Pending`, the `Waker` in the provided
344344
/// `Context` is scheduled to receive a wakeup when a message is sent on any
345345
/// receiver, or when the channel is closed. Note that on multiple calls to
346-
/// `poll_recv`, only the `Waker` from the `Context` passed to the most
347-
/// recent call is scheduled to receive a wakeup.
346+
/// `poll_recv` or `poll_recv_many`, only the `Waker` from the `Context`
347+
/// passed to the most recent call is scheduled to receive a wakeup.
348348
///
349349
/// If this method returns `Poll::Pending` due to a spurious failure, then
350350
/// the `Waker` will be notified when the situation causing the spurious
@@ -354,6 +354,83 @@ impl<T> UnboundedReceiver<T> {
354354
pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
355355
self.chan.recv(cx)
356356
}
357+
358+
/// Polls to receive multiple messages on this channel, extending the provided buffer.
359+
///
360+
/// This method returns:
361+
/// * `Poll::Pending` if no messages are available but the channel is not closed, or if a
362+
/// spurious failure happens.
363+
/// * `Poll::Ready(count)` where `count` is the number of messages successfully received and
364+
/// stored in `buffer`. This can be less than, or equal to, `limit`.
365+
/// * `Poll::Ready(0)` if `limit` is set to zero or when the channel is closed.
366+
///
367+
/// When the method returns `Poll::Pending`, the `Waker` in the provided
368+
/// `Context` is scheduled to receive a wakeup when a message is sent on any
369+
/// receiver, or when the channel is closed. Note that on multiple calls to
370+
/// `poll_recv` or `poll_recv_many`, only the `Waker` from the `Context`
371+
/// passed to the most recent call is scheduled to receive a wakeup.
372+
///
373+
/// Note that this method does not guarantee that exactly `limit` messages
374+
/// are received. Rather, if at least one message is available, it returns
375+
/// as many messages as it can up to the given limit. This method returns
376+
/// zero only if the channel is closed (or if `limit` is zero).
377+
///
378+
/// # Examples
379+
///
380+
/// ```
381+
/// use std::task::{Context, Poll};
382+
/// use std::pin::Pin;
383+
/// use tokio::sync::mpsc;
384+
/// use futures::Future;
385+
///
386+
/// struct MyReceiverFuture<'a> {
387+
/// receiver: mpsc::UnboundedReceiver<i32>,
388+
/// buffer: &'a mut Vec<i32>,
389+
/// limit: usize,
390+
/// }
391+
///
392+
/// impl<'a> Future for MyReceiverFuture<'a> {
393+
/// type Output = usize; // Number of messages received
394+
///
395+
/// fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
396+
/// let MyReceiverFuture { receiver, buffer, limit } = &mut *self;
397+
///
398+
/// // Now `receiver` and `buffer` are mutable references, and `limit` is copied
399+
/// match receiver.poll_recv_many(cx, *buffer, *limit) {
400+
/// Poll::Pending => Poll::Pending,
401+
/// Poll::Ready(count) => Poll::Ready(count),
402+
/// }
403+
/// }
404+
/// }
405+
///
406+
/// #[tokio::main]
407+
/// async fn main() {
408+
/// let (tx, rx) = mpsc::unbounded_channel::<i32>();
409+
/// let mut buffer = Vec::new();
410+
///
411+
/// let my_receiver_future = MyReceiverFuture {
412+
/// receiver: rx,
413+
/// buffer: &mut buffer,
414+
/// limit: 3,
415+
/// };
416+
///
417+
/// for i in 0..10 {
418+
/// tx.send(i).expect("Unable to send integer");
419+
/// }
420+
///
421+
/// let count = my_receiver_future.await;
422+
/// assert_eq!(count, 3);
423+
/// assert_eq!(buffer, vec![0,1,2])
424+
/// }
425+
/// ```
426+
pub fn poll_recv_many(
427+
&mut self,
428+
cx: &mut Context<'_>,
429+
buffer: &mut Vec<T>,
430+
limit: usize,
431+
) -> Poll<usize> {
432+
self.chan.recv_many(cx, buffer, limit)
433+
}
357434
}
358435

359436
impl<T> UnboundedSender<T> {

0 commit comments

Comments
 (0)