Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

from/into stream #125

Merged
merged 5 commits into from
Sep 17, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ pub mod prelude;
pub mod stream;
pub mod sync;
pub mod task;
mod vec;

#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
#[cfg(feature = "unstable")]
Expand Down
29 changes: 29 additions & 0 deletions src/stream/from_stream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
use super::IntoStream;

use std::pin::Pin;

/// Conversion from a `Stream`.
///
/// By implementing `FromStream` for a type, you define how it will be created from a stream.
/// This is common for types which describe a collection of some kind.
///
/// See also: [`IntoStream`].
///
/// [`IntoStream`]: trait.IntoStream.html
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
pub trait FromStream<T: Send> {
/// Creates a value from a stream.
///
/// # Examples
///
/// Basic usage:
///
/// ```
/// // use async_std::stream::FromStream;
///
/// // let _five_fives = async_std::stream::repeat(5).take(5);
/// ```
fn from_stream<'a, S: IntoStream<Item = T> + Send + 'a>(
stream: S,
) -> Pin<Box<dyn core::future::Future<Output = Self> + Send + 'a>>;
}
36 changes: 36 additions & 0 deletions src/stream/into_stream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
use futures_core::stream::Stream;

/// Conversion into a `Stream`.
///
/// By implementing `IntoIterator` for a type, you define how it will be
/// converted to an iterator. This is common for types which describe a
/// collection of some kind.
///
/// [`from_stream`]: #tymethod.from_stream
/// [`Stream`]: trait.Stream.html
/// [`collect`]: trait.Stream.html#method.collect
///
/// See also: [`FromStream`].
///
/// [`FromStream`]: trait.FromStream.html
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
pub trait IntoStream {
/// The type of the elements being iterated over.
type Item;

/// Which kind of stream are we turning this into?
type IntoStream: Stream<Item = Self::Item> + Send;

/// Creates a stream from a value.
fn into_stream(self) -> Self::IntoStream;
}

impl<I: Stream + Send> IntoStream for I {
type Item = I::Item;
type IntoStream = I;

#[inline]
fn into_stream(self) -> I {
self
}
}
4 changes: 4 additions & 0 deletions src/stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,16 @@

pub use double_ended_stream::DoubleEndedStream;
pub use empty::{empty, Empty};
pub use from_stream::FromStream;
pub use into_stream::IntoStream;
pub use once::{once, Once};
pub use repeat::{repeat, Repeat};
pub use stream::{Scan, Stream, Take, Zip};

mod double_ended_stream;
mod empty;
mod from_stream;
mod into_stream;
mod once;
mod repeat;
mod stream;
63 changes: 60 additions & 3 deletions src/stream/stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,14 @@ use min_by::MinByFuture;
use next::NextFuture;
use nth::NthFuture;

use super::from_stream::FromStream;
use std::cmp::Ordering;
use std::marker::PhantomData;
use std::pin::Pin;
use std::task::{Context, Poll};

use cfg_if::cfg_if;

use crate::task::{Context, Poll};

cfg_if! {
if #[cfg(feature = "docs")] {
#[doc(hidden)]
Expand All @@ -80,6 +80,21 @@ cfg_if! {
}
}

cfg_if! {
if #[cfg(feature = "docs")] {
#[doc(hidden)]
pub struct DynFuture<'a, T>(std::marker::PhantomData<&'a T>);

macro_rules! dyn_ret {
($a:lifetime, $o:ty) => (DynFuture<$a, $o>);
}
} else {
macro_rules! dyn_ret {
($a:lifetime, $o:ty) => (Pin<Box<dyn core::future::Future<Output = $o> + Send + 'a>>)
}
}
}

/// An asynchronous stream of values.
///
/// This trait is an async version of [`std::iter::Iterator`].
Expand Down Expand Up @@ -536,7 +551,6 @@ pub trait Stream {
///
/// let mut s = stream::repeat::<u32>(42).take(3);
/// assert!(s.any(|x| x == 42).await);
///
/// #
/// # }) }
/// ```
Expand Down Expand Up @@ -652,6 +666,49 @@ pub trait Stream {
{
Zip::new(self, other)
}

/// Transforms a stream into a collection.
///
/// `collect()` can take anything streamable, and turn it into a relevant
/// collection. This is one of the more powerful methods in the async
/// standard library, used in a variety of contexts.
///
/// The most basic pattern in which `collect()` is used is to turn one
/// collection into another. You take a collection, call [`stream`] on it,
/// do a bunch of transformations, and then `collect()` at the end.
///
/// Because `collect()` is so general, it can cause problems with type
/// inference. As such, `collect()` is one of the few times you'll see
/// the syntax affectionately known as the 'turbofish': `::<>`. This
/// helps the inference algorithm understand specifically which collection
/// you're trying to collect into.
///
/// # Examples
///
/// ```
/// # fn main() { async_std::task::block_on(async {
/// #
/// use async_std::prelude::*;
/// use async_std::stream;
///
/// let s = stream::repeat(9u8).take(3);
/// let buf: Vec<u8> = s.collect().await;
///
/// assert_eq!(buf, vec![9; 3]);
/// #
/// # }) }
/// ```
///
/// [`stream`]: trait.Stream.html#tymethod.next
#[must_use = "if you really need to exhaust the iterator, consider `.for_each(drop)` instead (TODO)"]
fn collect<'a, B>(self) -> dyn_ret!('a, B)
where
Self: futures_core::stream::Stream + Sized + Send + 'a,
<Self as futures_core::stream::Stream>::Item: Send,
B: FromStream<<Self as futures_core::stream::Stream>::Item>,
{
FromStream::from_stream(self)
}
}

impl<T: futures_core::stream::Stream + ?Sized> Stream for T {
Expand Down
25 changes: 25 additions & 0 deletions src/vec/from_stream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
use crate::stream::{FromStream, IntoStream, Stream};

use std::pin::Pin;

impl<T: Send> FromStream<T> for Vec<T> {
#[inline]
fn from_stream<'a, S: IntoStream<Item = T>>(
stream: S,
) -> Pin<Box<dyn core::future::Future<Output = Self> + Send + 'a>>
where
<S as IntoStream>::IntoStream: Send + 'a,
{
let stream = stream.into_stream();

Pin::from(Box::new(async move {
pin_utils::pin_mut!(stream);

let mut out = vec![];
while let Some(item) = stream.next().await {
out.push(item);
}
out
}))
}
}
9 changes: 9 additions & 0 deletions src/vec/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
//! The Rust core allocation and collections library
//!
//! This library provides smart pointers and collections for managing
//! heap-allocated values.

mod from_stream;

#[doc(inline)]
pub use std::vec::Vec;