Skip to content

Commit

Permalink
Merge #166
Browse files Browse the repository at this point in the history
166: adds stream::nth combinator r=yoshuawuyts a=montekki

Implements `nth` combinator.

---
Stdlib: https://doc.rust-lang.org/std/iter/trait.Iterator.html#method.nth
Ref: #129 

Co-authored-by: Fedor Sakharov <[email protected]>
  • Loading branch information
bors[bot] and montekki authored Sep 10, 2019
2 parents c3f6a51 + 43b7523 commit a0c9442
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 0 deletions.
60 changes: 60 additions & 0 deletions src/stream/stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ mod all;
mod any;
mod min_by;
mod next;
mod nth;
mod take;

pub use take::Take;
Expand All @@ -33,6 +34,7 @@ use all::AllFuture;
use any::AnyFuture;
use min_by::MinByFuture;
use next::NextFuture;
use nth::NthFuture;

use std::cmp::Ordering;
use std::marker::PhantomData;
Expand Down Expand Up @@ -161,6 +163,64 @@ pub trait Stream {
MinByFuture::new(self, compare)
}

/// Returns the nth element of the stream.
///
/// # Examples
///
/// Basic usage:
///
/// ```
/// # fn main() { async_std::task::block_on(async {
/// #
/// use std::collections::VecDeque;
/// use async_std::stream::Stream;
///
/// let mut s: VecDeque<usize> = vec![1, 2, 3].into_iter().collect();
///
/// let second = s.nth(1).await;
/// assert_eq!(second, Some(2));
/// #
/// # }) }
/// ```
/// Calling `nth()` multiple times:
///
/// ```
/// # fn main() { async_std::task::block_on(async {
/// #
/// use std::collections::VecDeque;
/// use async_std::stream::Stream;
///
/// let mut s: VecDeque<usize> = vec![1, 2, 3].into_iter().collect();
///
/// let second = s.nth(0).await;
/// assert_eq!(second, Some(1));
///
/// let second = s.nth(0).await;
/// assert_eq!(second, Some(2));
/// #
/// # }) }
/// ```
/// Returning `None` if the stream finished before returning `n` elements:
/// ```
/// # fn main() { async_std::task::block_on(async {
/// #
/// use std::collections::VecDeque;
/// use async_std::stream::Stream;
///
/// let mut s: VecDeque<usize> = vec![1, 2, 3].into_iter().collect();
///
/// let fourth = s.nth(4).await;
/// assert_eq!(fourth, None);
/// #
/// # }) }
/// ```
fn nth(&mut self, n: usize) -> ret!('_, NthFuture, Option<Self::Item>)
where
Self: Sized,
{
NthFuture::new(self, n)
}

/// Tests if every element of the stream matches a predicate.
///
/// `all()` takes a closure that returns `true` or `false`. It applies
Expand Down
41 changes: 41 additions & 0 deletions src/stream/stream/nth.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
use std::pin::Pin;
use std::task::{Context, Poll};

#[allow(missing_debug_implementations)]
pub struct NthFuture<'a, S> {
stream: &'a mut S,
n: usize,
}

impl<'a, S> NthFuture<'a, S> {
pin_utils::unsafe_pinned!(stream: &'a mut S);
pin_utils::unsafe_unpinned!(n: usize);

pub(crate) fn new(stream: &'a mut S, n: usize) -> Self {
NthFuture { stream, n }
}
}

impl<'a, S> futures_core::future::Future for NthFuture<'a, S>
where
S: futures_core::stream::Stream + Unpin + Sized,
{
type Output = Option<S::Item>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
use futures_core::stream::Stream;

let next = futures_core::ready!(self.as_mut().stream().poll_next(cx));
match next {
Some(v) => match self.n {
0 => Poll::Ready(Some(v)),
_ => {
*self.as_mut().n() -= 1;
cx.waker().wake_by_ref();
Poll::Pending
}
},
None => Poll::Ready(None),
}
}
}

0 comments on commit a0c9442

Please sign in to comment.