Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Stream::flatten and Stream::flat_map #367

Merged
merged 28 commits into from
Oct 29, 2019
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
ec98b41
feat: Add FlattenCompat struct
k-nasa Oct 17, 2019
bb14164
feat: Add Stream trait for FlattenCompat
k-nasa Oct 17, 2019
2dee289
Add FlatMap struct
k-nasa Oct 18, 2019
2187a2a
feat: Add Stream::flat_map
k-nasa Oct 18, 2019
cd86208
Add Flatten struct
k-nasa Oct 18, 2019
8138afb
feat: Add Stream trait for Flatten
k-nasa Oct 18, 2019
176359a
Add Stream::flatten
k-nasa Oct 18, 2019
1c1e223
Merge branch 'master' into add_stream_flatten
k-nasa Oct 18, 2019
410d16e
Add docs + To unstable feature
k-nasa Oct 18, 2019
3297a0f
Merge branch 'master' into add_stream_flatten
k-nasa Oct 25, 2019
271b6f4
fix: Using pin_project!
k-nasa Oct 25, 2019
00e7e58
fix type def
k-nasa Oct 25, 2019
001368d
$cargo fmt
k-nasa Oct 25, 2019
0c5abee
to unstable stream::flat_map, stream::flatten
k-nasa Oct 25, 2019
b66ffa6
update recursion_limit
k-nasa Oct 25, 2019
7ce721f
Update src/lib.rs
k-nasa Oct 25, 2019
b7b5df1
Update src/stream/stream/flatten.rs
k-nasa Oct 25, 2019
6168952
Update src/stream/stream/flatten.rs
k-nasa Oct 25, 2019
bf3508f
Update src/stream/stream/flatten.rs
k-nasa Oct 25, 2019
8932cec
Update src/stream/stream/flatten.rs
k-nasa Oct 25, 2019
61b7a09
Fix type declaration
k-nasa Oct 25, 2019
13a08b0
Narrow the disclosure range of FlatMap::new
k-nasa Oct 27, 2019
37f14b0
Narrow the disclosure range of Flatten::new
k-nasa Oct 27, 2019
a42ae2f
Narrow the disclosure range of FlattenCompat::new
k-nasa Oct 27, 2019
6889762
fix: Split FlattenCompat logic to Flatten and FlatMap
k-nasa Oct 29, 2019
040227f
Merge branch 'master' into add_stream_flatten
k-nasa Oct 29, 2019
ae7adf2
fix: Remove unused import
k-nasa Oct 29, 2019
1554b04
$cargo fmt
k-nasa Oct 29, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
#![doc(test(attr(deny(rust_2018_idioms, warnings))))]
#![doc(test(attr(allow(unused_extern_crates, unused_variables))))]
#![doc(html_logo_url = "https://async.rs/images/logo--hero.svg")]
#![recursion_limit = "1024"]
#![recursion_limit = "2048"]

#[macro_use]
mod utils;
Expand Down
158 changes: 158 additions & 0 deletions src/stream/stream/flatten.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
use pin_project_lite::pin_project;
use std::pin::Pin;

use crate::prelude::*;
use crate::stream::stream::map::Map;
use crate::stream::{IntoStream, Stream};
use crate::task::{Context, Poll};

pin_project! {
/// This `struct` is created by the [`flat_map`] method on [`Stream`]. See its
/// documentation for more.
///
/// [`flat_map`]: trait.Stream.html#method.flat_map
/// [`Stream`]: trait.Stream.html
#[allow(missing_debug_implementations)]
pub struct FlatMap<S, U, T, F> {
#[pin]
inner: FlattenCompat<Map<S, F, T, U>, U>,
}
}

impl<S, U, F> FlatMap<S, U, S::Item, F>
where
S: Stream,
U: IntoStream,
F: FnMut(S::Item) -> U,
{
pub(super) fn new(stream: S, f: F) -> FlatMap<S, U, S::Item, F> {
FlatMap {
inner: FlattenCompat::new(stream.map(f)),
}
}
}

impl<S, U, F> Stream for FlatMap<S, U, S::Item, F>
where
S: Stream,
S::Item: IntoStream<IntoStream = U, Item = U::Item>,
U: Stream,
F: FnMut(S::Item) -> U,
{
type Item = U::Item;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.project().inner.poll_next(cx)
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of exporting flatten::{Flatten, FlatMap} I think it'd be preferable to export them from separate files.

The FlattenCompat is a nice idea, but I'm generally wary of introducing abstractions here, preferring to duplicate some of the logic instead. While this is more lines of code, it's also somewhat simpler. Would you be okay with splitting the two impls into two files, and removing the shared FlattenCompat struct?

Thanks!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yh!


pin_project! {
/// This `struct` is created by the [`flatten`] method on [`Stream`]. See its
/// documentation for more.
///
/// [`flatten`]: trait.Stream.html#method.flatten
/// [`Stream`]: trait.Stream.html
#[allow(missing_debug_implementations)]
pub struct Flatten<S, U> {
#[pin]
inner: FlattenCompat<S, U>
}
}

impl<S> Flatten<S, S::Item>
where
S: Stream,
S::Item: IntoStream,
{
pub(super) fn new(stream: S) -> Flatten<S, S::Item> {
Flatten {
inner: FlattenCompat::new(stream),
}
}
}

impl<S, U> Stream for Flatten<S, <S::Item as IntoStream>::IntoStream>
where
S: Stream,
S::Item: IntoStream<IntoStream = U, Item = U::Item>,
U: Stream,
{
type Item = U::Item;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.project().inner.poll_next(cx)
}
}

pin_project! {
/// Real logic of both `Flatten` and `FlatMap` which simply delegate to
/// this type.
#[derive(Clone, Debug)]
struct FlattenCompat<S, U> {
#[pin]
stream: S,
#[pin]
frontiter: Option<U>,
}
}

impl<S, U> FlattenCompat<S, U> {
/// Adapts an iterator by flattening it, for use in `flatten()` and `flat_map()`.
fn new(stream: S) -> FlattenCompat<S, U> {
FlattenCompat {
stream,
frontiter: None,
}
}
}

impl<S, U> Stream for FlattenCompat<S, U>
where
S: Stream,
S::Item: IntoStream<IntoStream = U, Item = U::Item>,
U: Stream,
{
type Item = U::Item;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();
loop {
if let Some(inner) = this.frontiter.as_mut().as_pin_mut() {
if let item @ Some(_) = futures_core::ready!(inner.poll_next(cx)) {
return Poll::Ready(item);
}
}

match futures_core::ready!(this.stream.as_mut().poll_next(cx)) {
None => return Poll::Ready(None),
Some(inner) => this.frontiter.set(Some(inner.into_stream())),
}
}
}
}

#[cfg(test)]
mod tests {
use super::FlattenCompat;

use crate::prelude::*;
use crate::task;

use std::collections::VecDeque;

#[test]
fn test_poll_next() -> std::io::Result<()> {
let inner1: VecDeque<u8> = vec![1, 2, 3].into_iter().collect();
let inner2: VecDeque<u8> = vec![4, 5, 6].into_iter().collect();

let s: VecDeque<_> = vec![inner1, inner2].into_iter().collect();

task::block_on(async move {
let flat = FlattenCompat::new(s);
let v: Vec<u8> = flat.collect().await;

assert_eq!(v, vec![1, 2, 3, 4, 5, 6]);
Ok(())
})
}
}
73 changes: 73 additions & 0 deletions src/stream/stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,13 @@ cfg_unstable! {

use crate::future::Future;
use crate::stream::FromStream;
use crate::stream::into_stream::IntoStream;

pub use merge::Merge;
pub use flatten::{FlatMap, Flatten};

mod merge;
mod flatten;
}

extension_trait! {
Expand Down Expand Up @@ -559,6 +562,76 @@ extension_trait! {
Filter::new(self, predicate)
}

#[doc= r#"
Creates an stream that works like map, but flattens nested structure.

# Examples

Basic usage:

```
# async_std::task::block_on(async {

use std::collections::VecDeque;
use async_std::prelude::*;
use async_std::stream::IntoStream;

let inner1: VecDeque<u8> = vec![1,2,3].into_iter().collect();
let inner2: VecDeque<u8> = vec![4,5,6].into_iter().collect();

let s: VecDeque<_> = vec![inner1, inner2].into_iter().collect();

let v :Vec<_> = s.flat_map(|s| s.into_stream()).collect().await;

assert_eq!(v, vec![1,2,3,4,5,6]);

# });
```
"#]
#[cfg(feature = "unstable")]
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
fn flat_map<U, F>(self, f: F) -> FlatMap<Self, U, Self::Item, F>
where
Self: Sized,
U: IntoStream,
F: FnMut(Self::Item) -> U,
{
FlatMap::new(self, f)
}

#[doc = r#"
Creates an stream that flattens nested structure.

# Examples

Basic usage:

```
# async_std::task::block_on(async {

use std::collections::VecDeque;
use async_std::prelude::*;

let inner1: VecDeque<u8> = vec![1,2,3].into_iter().collect();
let inner2: VecDeque<u8> = vec![4,5,6].into_iter().collect();
let s: VecDeque<_> = vec![inner1, inner2].into_iter().collect();

let v: Vec<_> = s.flatten().collect().await;

assert_eq!(v, vec![1,2,3,4,5,6]);

# });
"#]
#[cfg(feature = "unstable")]
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
fn flatten(self) -> Flatten<Self, Self::Item>
where
Self: Sized,
Self::Item: IntoStream,
{
Flatten::new(self)
}

#[doc = r#"
Both filters and maps a stream.

Expand Down