diff --git a/src/stream/stream/filter_map.rs b/src/stream/stream/filter_map.rs new file mode 100644 index 000000000..626a8eceb --- /dev/null +++ b/src/stream/stream/filter_map.rs @@ -0,0 +1,48 @@ +use std::marker::PhantomData; +use std::pin::Pin; +use std::task::{Context, Poll}; + +/// A stream that both filters and maps. +#[derive(Clone, Debug)] +pub struct FilterMap { + stream: S, + f: F, + __from: PhantomData, + __to: PhantomData, +} + +impl FilterMap { + pin_utils::unsafe_pinned!(stream: S); + pin_utils::unsafe_unpinned!(f: F); + + pub(crate) fn new(stream: S, f: F) -> Self { + FilterMap { + stream, + f, + __from: PhantomData, + __to: PhantomData, + } + } +} + +impl futures_core::stream::Stream for FilterMap +where + S: futures_core::stream::Stream, + F: FnMut(S::Item) -> Option, +{ + type Item = B; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let next = futures_core::ready!(self.as_mut().stream().poll_next(cx)); + match next { + Some(v) => match (self.as_mut().f())(v) { + Some(b) => Poll::Ready(Some(b)), + None => { + cx.waker().wake_by_ref(); + Poll::Pending + } + }, + None => Poll::Ready(None), + } + } +} diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 0aff4ca9e..dc566a025 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -23,6 +23,7 @@ mod all; mod any; +mod filter_map; mod min_by; mod next; mod nth; @@ -32,6 +33,7 @@ pub use take::Take; use all::AllFuture; use any::AnyFuture; +use filter_map::FilterMap; use min_by::MinByFuture; use next::NextFuture; use nth::NthFuture; @@ -130,6 +132,43 @@ pub trait Stream { } } + /// Both filters and maps a stream. + /// + /// # Examples + /// + /// Basic usage: + /// + /// ``` + /// # fn main() { async_std::task::block_on(async { + /// # + /// use std::collections::VecDeque; + /// use async_std::stream::Stream; + /// + /// let s: VecDeque<&str> = vec!["1", "lol", "3", "NaN", "5"].into_iter().collect(); + /// + /// let mut parsed = s.filter_map(|a| a.parse::().ok()); + /// + /// let one = parsed.next().await; + /// assert_eq!(one, Some(1)); + /// + /// let three = parsed.next().await; + /// assert_eq!(three, Some(3)); + /// + /// let five = parsed.next().await; + /// assert_eq!(five, Some(5)); + /// + /// let end = parsed.next().await; + /// assert_eq!(end, None); + /// # + /// # }) } + fn filter_map(self, f: F) -> FilterMap + where + Self: Sized, + F: FnMut(Self::Item) -> Option, + { + FilterMap::new(self, f) + } + /// Returns the element that gives the minimum value with respect to the /// specified comparison function. If several elements are equally minimum, /// the first element is returned. If the stream is empty, `None` is returned.