Skip to content

Commit

Permalink
Merge pull request #541 from yjhmelody/stream-partition
Browse files Browse the repository at this point in the history
add stream-partition
  • Loading branch information
yoshuawuyts authored Nov 15, 2019
2 parents 837604b + d76b32e commit 3c6d41c
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 0 deletions.
42 changes: 42 additions & 0 deletions src/stream/stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,11 @@ cfg_unstable! {

use crate::stream::into_stream::IntoStream;
use crate::stream::{FromStream, Product, Sum};
use crate::stream::Extend;

use count::CountFuture;
use partition::PartitionFuture;

pub use merge::Merge;
pub use flatten::Flatten;
pub use flat_map::FlatMap;
Expand All @@ -132,6 +135,7 @@ cfg_unstable! {
mod merge;
mod flatten;
mod flat_map;
mod partition;
mod timeout;
mod throttle;
}
Expand Down Expand Up @@ -1308,6 +1312,44 @@ extension_trait! {
FoldFuture::new(self, init, f)
}

#[doc = r#"
A combinator that applies a function to every element in a stream
creating two collections from it.
# Examples
Basic usage:
```
# fn main() { async_std::task::block_on(async {
#
use async_std::prelude::*;
use async_std::stream;
let (even, odd): (Vec<i32>, Vec<i32>) = stream::from_iter(vec![1, 2, 3])
.partition(|&n| n % 2 == 0).await;
assert_eq!(even, vec![2]);
assert_eq!(odd, vec![1, 3]);
#
# }) }
```
"#]
#[cfg(feature = "unstable")]
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
fn partition<B, F>(
self,
f: F,
) -> impl Future<Output = (B, B)> [PartitionFuture<Self, F, B>]
where
Self: Sized,
F: FnMut(&Self::Item) -> bool,
B: Default + Extend<Self::Item>,
{
PartitionFuture::new(self, f)
}

#[doc = r#"
Call a closure on each element of the stream.
Expand Down
60 changes: 60 additions & 0 deletions src/stream/stream/partition.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
use std::future::Future;
use std::pin::Pin;
use std::default::Default;
use pin_project_lite::pin_project;

use crate::stream::Stream;
use crate::task::{Context, Poll};

pin_project! {
#[derive(Debug)]
#[allow(missing_debug_implementations)]
#[cfg(all(feature = "default", feature = "unstable"))]
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
pub struct PartitionFuture<S, F, B> {
#[pin]
stream: S,
f: F,
res: Option<(B, B)>,
}
}

impl<S, F, B: Default> PartitionFuture<S, F, B> {
pub(super) fn new(stream: S, f: F) -> Self {
Self {
stream,
f,
res: Some((B::default(), B::default())),
}
}
}

impl<S, F, B> Future for PartitionFuture<S, F, B>
where
S: Stream + Sized,
F: FnMut(&S::Item) -> bool,
B: Default + Extend<S::Item>,
{
type Output = (B, B);

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();

loop {
let next = futures_core::ready!(this.stream.as_mut().poll_next(cx));

match next {
Some(v) => {
let mut res = this.res.take().unwrap();
match (this.f)(&v) {
true => res.0.extend(Some(v)),
false => res.1.extend(Some(v)),
};

*this.res = Some(res);
}
None => return Poll::Ready(this.res.take().unwrap()),
}
}
}
}

0 comments on commit 3c6d41c

Please sign in to comment.