Skip to content

Commit

Permalink
Merge branch 'master' into fs-stream-find-map
Browse files Browse the repository at this point in the history
  • Loading branch information
montekki committed Sep 10, 2019
2 parents 45bd0ef + 6d1e71f commit efb8415
Show file tree
Hide file tree
Showing 13 changed files with 266 additions and 57 deletions.
31 changes: 20 additions & 11 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,16 @@
language: rust

env: RUSTFLAGS="-D warnings"
env:
- RUSTFLAGS="-D warnings"

# Cache the whole `~/.cargo` directory to keep `~/cargo/.crates.toml`.
cache:
directories:
- /home/travis/.cargo

# Don't cache the cargo registry because it's too big.
before_cache:
- rm -rf /home/travis/.cargo/registry

matrix:
fast_finish: true
Expand Down Expand Up @@ -35,16 +45,15 @@ matrix:
script:
- cargo doc --features docs

# TODO(yoshuawuyts): re-enable mdbook
# - name: book
# rust: nightly
# os: linux
# before_script:
# - test -x $HOME/.cargo/bin/mdbook || ./ci/install-mdbook.sh
# - cargo build # to find 'extern crate async_std' by `mdbook test`
# script:
# - mdbook build docs
# - mdbook test -L ./target/debug/deps docs
- name: book
rust: nightly
os: linux
before_script:
- test -x $HOME/.cargo/bin/mdbook || ./ci/install-mdbook.sh
- cargo build # to find 'extern crate async_std' by `mdbook test`
script:
- mdbook build docs
- mdbook test -L ./target/debug/deps docs

script:
- cargo check --features unstable --all --benches --bins --examples --tests
Expand Down
4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ femme = "1.2.0"
surf = "1.0.2"
tempdir = "0.3.7"

# These are used by the book for examples
futures-channel-preview = "0.3.0-alpha.18"
futures-util-preview = "0.3.0-alpha.18"

[dev-dependencies.futures-preview]
version = "0.3.0-alpha.18"
features = ["std", "nightly", "async-await"]
9 changes: 4 additions & 5 deletions docs/src/tutorial/all_together.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,16 @@ At this point, we only need to start the broker to get a fully-functioning (in t

```rust,edition2018
# extern crate async_std;
# extern crate futures;
# extern crate futures_channel;
# extern crate futures_util;
use async_std::{
io::{self, BufReader},
net::{TcpListener, TcpStream, ToSocketAddrs},
prelude::*,
task,
};
use futures::{
channel::mpsc,
SinkExt,
};
use futures_channel::mpsc;
use futures_util::SinkExt;
use std::{
collections::hash_map::{HashMap, Entry},
sync::Arc,
Expand Down
18 changes: 8 additions & 10 deletions docs/src/tutorial/clean_shutdown.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,16 @@ Let's add waiting to the server:

```rust,edition2018
# extern crate async_std;
# extern crate futures;
# extern crate futures_channel;
# extern crate futures_util;
# use async_std::{
# io::{self, BufReader},
# net::{TcpListener, TcpStream, ToSocketAddrs},
# prelude::*,
# task,
# };
# use futures::{
# channel::mpsc,
# SinkExt,
# };
# use futures_channel::mpsc;
# use futures_util::SinkExt;
# use std::{
# collections::hash_map::{HashMap, Entry},
# sync::Arc,
Expand Down Expand Up @@ -156,17 +155,16 @@ And to the broker:

```rust,edition2018
# extern crate async_std;
# extern crate futures;
# extern crate futures_channel;
# extern crate futures_util;
# use async_std::{
# io::{self, BufReader},
# net::{TcpListener, TcpStream, ToSocketAddrs},
# prelude::*,
# task,
# };
# use futures::{
# channel::mpsc,
# SinkExt,
# };
# use futures_channel::mpsc;
# use futures_util::SinkExt;
# use std::{
# collections::hash_map::{HashMap, Entry},
# sync::Arc,
Expand Down
7 changes: 4 additions & 3 deletions docs/src/tutorial/connecting_readers_and_writers.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,16 @@ The order of events "Bob sends message to Alice" and "Alice joins" is determined

```rust,edition2018
# extern crate async_std;
# extern crate futures;
# extern crate futures_channel;
# extern crate futures_util;
# use async_std::{
# io::{Write},
# net::TcpStream,
# prelude::{Future, Stream},
# task,
# };
# use futures::channel::mpsc;
# use futures::sink::SinkExt;
# use futures_channel::mpsc;
# use futures_util::sink::SinkExt;
# use std::sync::Arc;
#
# type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
Expand Down
19 changes: 13 additions & 6 deletions docs/src/tutorial/handling_disconnection.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@ First, let's add a shutdown channel to the `client`:

```rust,edition2018
# extern crate async_std;
# extern crate futures;
# extern crate futures_channel;
# extern crate futures_util;
# use async_std::net::TcpStream;
# use futures::{channel::mpsc, SinkExt};
# use futures_channel::mpsc;
# use futures_util::SinkExt;
# use std::sync::Arc;
#
# type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
Expand Down Expand Up @@ -68,9 +70,11 @@ We use the `select` macro for this purpose:

```rust,edition2018
# extern crate async_std;
# extern crate futures;
# extern crate futures_channel;
# extern crate futures_util;
# use async_std::{io::Write, net::TcpStream};
use futures::{channel::mpsc, select, FutureExt, StreamExt};
use futures_channel::mpsc;
use futures_util::{select, FutureExt, StreamExt};
# use std::sync::Arc;
# type Receiver<T> = mpsc::UnboundedReceiver<T>;
Expand Down Expand Up @@ -118,15 +122,18 @@ The final code looks like this:

```rust,edition2018
# extern crate async_std;
# extern crate futures;
# extern crate futures_channel;
# extern crate futures_util;
use async_std::{
io::{BufReader, BufRead, Write},
net::{TcpListener, TcpStream, ToSocketAddrs},
task,
};
use futures::{channel::mpsc, future::Future, select, FutureExt, SinkExt, StreamExt};
use futures_channel::mpsc;
use futures_util::{select, FutureExt, SinkExt, StreamExt};
use std::{
collections::hash_map::{Entry, HashMap},
future::Future,
sync::Arc,
};
Expand Down
4 changes: 2 additions & 2 deletions docs/src/tutorial/implementing_a_client.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@ With async, we can just use the `select!` macro.

```rust,edition2018
# extern crate async_std;
# extern crate futures;
# extern crate futures_util;
use async_std::{
io::{stdin, BufRead, BufReader, Write},
net::{TcpStream, ToSocketAddrs},
task,
};
use futures::{select, FutureExt, StreamExt};
use futures_util::{select, FutureExt, StreamExt};
type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
Expand Down
7 changes: 4 additions & 3 deletions docs/src/tutorial/sending_messages.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,15 @@ if Alice and Charley send two messages to Bob at the same time, Bob will see the

```rust,edition2018
# extern crate async_std;
# extern crate futures;
# extern crate futures_channel;
# extern crate futures_util;
# use async_std::{
# io::Write,
# net::TcpStream,
# prelude::Stream,
# };
use futures::channel::mpsc; // 1
use futures::sink::SinkExt;
use futures_channel::mpsc; // 1
use futures_util::sink::SinkExt;
use std::sync::Arc;
# type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
Expand Down
2 changes: 2 additions & 0 deletions src/io/write/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,8 @@ pub trait Write {
/// #
/// # Ok(()) }) }
/// ```
///
/// [`write`]: #tymethod.write
fn write_all<'a>(&'a mut self, buf: &'a [u8]) -> ret!('a, WriteAllFuture, io::Result<()>)
where
Self: Unpin,
Expand Down
48 changes: 48 additions & 0 deletions src/stream/stream/filter_map.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
use std::marker::PhantomData;
use std::pin::Pin;
use std::task::{Context, Poll};

/// A stream that both filters and maps.
#[derive(Clone, Debug)]
pub struct FilterMap<S, F, T, B> {
stream: S,
f: F,
__from: PhantomData<T>,
__to: PhantomData<B>,
}

impl<S, F, T, B> FilterMap<S, F, T, B> {
pin_utils::unsafe_pinned!(stream: S);
pin_utils::unsafe_unpinned!(f: F);

pub(crate) fn new(stream: S, f: F) -> Self {
FilterMap {
stream,
f,
__from: PhantomData,
__to: PhantomData,
}
}
}

impl<S, F, B> futures_core::stream::Stream for FilterMap<S, F, S::Item, B>
where
S: futures_core::stream::Stream,
F: FnMut(S::Item) -> Option<B>,
{
type Item = B;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let next = futures_core::ready!(self.as_mut().stream().poll_next(cx));
match next {
Some(v) => match (self.as_mut().f())(v) {
Some(b) => Poll::Ready(Some(b)),
None => {
cx.waker().wake_by_ref();
Poll::Pending
}
},
None => Poll::Ready(None),
}
}
}
30 changes: 15 additions & 15 deletions src/stream/stream/min_by.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,20 @@ use std::cmp::Ordering;
use std::pin::Pin;

use crate::future::Future;
use crate::stream::Stream;
use crate::task::{Context, Poll};

/// A future that yields the minimum item in a stream by a given comparison function.
#[derive(Clone, Debug)]
pub struct MinByFuture<S: Stream, F> {
#[allow(missing_debug_implementations)]
pub struct MinByFuture<S, F, T> {
stream: S,
compare: F,
min: Option<S::Item>,
min: Option<T>,
}

impl<S: Stream + Unpin, F> Unpin for MinByFuture<S, F> {}
impl<S, F, T> MinByFuture<S, F, T> {
pin_utils::unsafe_pinned!(stream: S);
pin_utils::unsafe_unpinned!(compare: F);
pin_utils::unsafe_unpinned!(min: Option<T>);

impl<S: Stream + Unpin, F> MinByFuture<S, F> {
pub(super) fn new(stream: S, compare: F) -> Self {
MinByFuture {
stream,
Expand All @@ -25,25 +25,25 @@ impl<S: Stream + Unpin, F> MinByFuture<S, F> {
}
}

impl<S, F> Future for MinByFuture<S, F>
impl<S, F> Future for MinByFuture<S, F, S::Item>
where
S: futures_core::stream::Stream + Unpin,
S: futures_core::stream::Stream + Unpin + Sized,
S::Item: Copy,
F: FnMut(&S::Item, &S::Item) -> Ordering,
{
type Output = Option<S::Item>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let next = futures_core::ready!(Pin::new(&mut self.stream).poll_next(cx));
let next = futures_core::ready!(self.as_mut().stream().poll_next(cx));

match next {
Some(new) => {
cx.waker().wake_by_ref();
match self.as_mut().min.take() {
None => self.as_mut().min = Some(new),
Some(old) => match (&mut self.as_mut().compare)(&new, &old) {
Ordering::Less => self.as_mut().min = Some(new),
_ => self.as_mut().min = Some(old),
match self.as_mut().min().take() {
None => *self.as_mut().min() = Some(new),
Some(old) => match (&mut self.as_mut().compare())(&new, &old) {
Ordering::Less => *self.as_mut().min() = Some(new),
_ => *self.as_mut().min() = Some(old),
},
}
Poll::Pending
Expand Down
Loading

0 comments on commit efb8415

Please sign in to comment.