Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Stream::flatten and Stream::flat_map #367

Merged
merged 28 commits into from
Oct 29, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
ec98b41
feat: Add FlattenCompat struct
k-nasa Oct 17, 2019
bb14164
feat: Add Stream trait for FlattenCompat
k-nasa Oct 17, 2019
2dee289
Add FlatMap struct
k-nasa Oct 18, 2019
2187a2a
feat: Add Stream::flat_map
k-nasa Oct 18, 2019
cd86208
Add Flatten struct
k-nasa Oct 18, 2019
8138afb
feat: Add Stream trait for Flatten
k-nasa Oct 18, 2019
176359a
Add Stream::flatten
k-nasa Oct 18, 2019
1c1e223
Merge branch 'master' into add_stream_flatten
k-nasa Oct 18, 2019
410d16e
Add docs + To unstable feature
k-nasa Oct 18, 2019
3297a0f
Merge branch 'master' into add_stream_flatten
k-nasa Oct 25, 2019
271b6f4
fix: Using pin_project!
k-nasa Oct 25, 2019
00e7e58
fix type def
k-nasa Oct 25, 2019
001368d
$cargo fmt
k-nasa Oct 25, 2019
0c5abee
to unstable stream::flat_map, stream::flatten
k-nasa Oct 25, 2019
b66ffa6
update recursion_limit
k-nasa Oct 25, 2019
7ce721f
Update src/lib.rs
k-nasa Oct 25, 2019
b7b5df1
Update src/stream/stream/flatten.rs
k-nasa Oct 25, 2019
6168952
Update src/stream/stream/flatten.rs
k-nasa Oct 25, 2019
bf3508f
Update src/stream/stream/flatten.rs
k-nasa Oct 25, 2019
8932cec
Update src/stream/stream/flatten.rs
k-nasa Oct 25, 2019
61b7a09
Fix type declaration
k-nasa Oct 25, 2019
13a08b0
Narrow the disclosure range of FlatMap::new
k-nasa Oct 27, 2019
37f14b0
Narrow the disclosure range of Flatten::new
k-nasa Oct 27, 2019
a42ae2f
Narrow the disclosure range of FlattenCompat::new
k-nasa Oct 27, 2019
6889762
fix: Split FlattenCompat logic to Flatten and FlatMap
k-nasa Oct 29, 2019
040227f
Merge branch 'master' into add_stream_flatten
k-nasa Oct 29, 2019
ae7adf2
fix: Remove unused import
k-nasa Oct 29, 2019
1554b04
$cargo fmt
k-nasa Oct 29, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 62 additions & 0 deletions src/stream/stream/flat_map.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
use pin_project_lite::pin_project;
use std::pin::Pin;

use crate::prelude::*;
use crate::stream::stream::map::Map;
use crate::stream::{IntoStream, Stream};
use crate::task::{Context, Poll};

pin_project! {
/// This `struct` is created by the [`flat_map`] method on [`Stream`]. See its
/// documentation for more.
///
/// [`flat_map`]: trait.Stream.html#method.flat_map
/// [`Stream`]: trait.Stream.html
#[allow(missing_debug_implementations)]
pub struct FlatMap<S, U, T, F> {
#[pin]
stream: Map<S, F, T, U>,
#[pin]
inner_stream: Option<U>,
}
}

impl<S, U, F> FlatMap<S, U, S::Item, F>
where
S: Stream,
U: IntoStream,
F: FnMut(S::Item) -> U,
{
pub(super) fn new(stream: S, f: F) -> FlatMap<S, U, S::Item, F> {
FlatMap {
stream: stream.map(f),
inner_stream: None,
}
}
}

impl<S, U, F> Stream for FlatMap<S, U, S::Item, F>
where
S: Stream,
S::Item: IntoStream<IntoStream = U, Item = U::Item>,
U: Stream,
F: FnMut(S::Item) -> U,
{
type Item = U::Item;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();
loop {
if let Some(inner) = this.inner_stream.as_mut().as_pin_mut() {
if let item @ Some(_) = futures_core::ready!(inner.poll_next(cx)) {
return Poll::Ready(item);
}
}

match futures_core::ready!(this.stream.as_mut().poll_next(cx)) {
None => return Poll::Ready(None),
Some(inner) => this.inner_stream.set(Some(inner.into_stream())),
}
}
}
}
58 changes: 58 additions & 0 deletions src/stream/stream/flatten.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
use pin_project_lite::pin_project;
use std::pin::Pin;

use crate::stream::{IntoStream, Stream};
use crate::task::{Context, Poll};

pin_project! {
/// This `struct` is created by the [`flatten`] method on [`Stream`]. See its
/// documentation for more.
///
/// [`flatten`]: trait.Stream.html#method.flatten
/// [`Stream`]: trait.Stream.html
#[allow(missing_debug_implementations)]
pub struct Flatten<S, U> {
#[pin]
stream: S,
#[pin]
inner_stream: Option<U>,
}
}

impl<S> Flatten<S, S::Item>
where
S: Stream,
S::Item: IntoStream,
{
pub(super) fn new(stream: S) -> Flatten<S, S::Item> {
Flatten {
stream,
inner_stream: None,
}
}
}

impl<S, U> Stream for Flatten<S, <S::Item as IntoStream>::IntoStream>
where
S: Stream,
S::Item: IntoStream<IntoStream = U, Item = U::Item>,
U: Stream,
{
type Item = U::Item;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();
loop {
if let Some(inner) = this.inner_stream.as_mut().as_pin_mut() {
if let item @ Some(_) = futures_core::ready!(inner.poll_next(cx)) {
return Poll::Ready(item);
}
}

match futures_core::ready!(this.stream.as_mut().poll_next(cx)) {
None => return Poll::Ready(None),
Some(inner) => this.inner_stream.set(Some(inner.into_stream())),
}
}
}
}
78 changes: 76 additions & 2 deletions src/stream/stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,13 +96,17 @@ cfg_unstable! {
use std::time::Duration;

use crate::future::Future;
use crate::stream::FromStream;
use crate::stream::{Product, Sum};
use crate::stream::into_stream::IntoStream;
use crate::stream::{FromStream, Product, Sum};

pub use merge::Merge;
pub use flatten::Flatten;
pub use flat_map::FlatMap;
pub use timeout::{TimeoutError, Timeout};

mod merge;
mod flatten;
mod flat_map;
mod timeout;
}

Expand Down Expand Up @@ -563,6 +567,76 @@ extension_trait! {
Filter::new(self, predicate)
}

#[doc= r#"
Creates an stream that works like map, but flattens nested structure.

# Examples

Basic usage:

```
# async_std::task::block_on(async {

use std::collections::VecDeque;
use async_std::prelude::*;
use async_std::stream::IntoStream;

let inner1: VecDeque<u8> = vec![1,2,3].into_iter().collect();
let inner2: VecDeque<u8> = vec![4,5,6].into_iter().collect();

let s: VecDeque<_> = vec![inner1, inner2].into_iter().collect();

let v :Vec<_> = s.flat_map(|s| s.into_stream()).collect().await;

assert_eq!(v, vec![1,2,3,4,5,6]);

# });
```
"#]
#[cfg(feature = "unstable")]
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
fn flat_map<U, F>(self, f: F) -> FlatMap<Self, U, Self::Item, F>
where
Self: Sized,
U: IntoStream,
F: FnMut(Self::Item) -> U,
{
FlatMap::new(self, f)
}

#[doc = r#"
Creates an stream that flattens nested structure.

# Examples

Basic usage:

```
# async_std::task::block_on(async {

use std::collections::VecDeque;
use async_std::prelude::*;

let inner1: VecDeque<u8> = vec![1,2,3].into_iter().collect();
let inner2: VecDeque<u8> = vec![4,5,6].into_iter().collect();
let s: VecDeque<_> = vec![inner1, inner2].into_iter().collect();

let v: Vec<_> = s.flatten().collect().await;

assert_eq!(v, vec![1,2,3,4,5,6]);

# });
"#]
#[cfg(feature = "unstable")]
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
fn flatten(self) -> Flatten<Self, Self::Item>
where
Self: Sized,
Self::Item: IntoStream,
{
Flatten::new(self)
}

#[doc = r#"
Both filters and maps a stream.

Expand Down