From 532efa0eea7e56dbe77c058005f2691b43ef6293 Mon Sep 17 00:00:00 2001 From: Gavrie Philipson Date: Wed, 5 Aug 2020 21:06:38 +0300 Subject: [PATCH 1/5] Add with_stop_token and use Result instead of Option to allow Try --- src/lib.rs | 35 +++++++++++++++++++++++++++++++---- 1 file changed, 31 insertions(+), 4 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 3a7026d..b5e938e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -76,6 +76,19 @@ use pin_project_lite::pin_project; enum Never {} +#[derive(Debug)] +pub enum Error { + Stopped, +} + +impl std::fmt::Display for Error { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{:?}", self) + } +} + +impl std::error::Error for Error {} + /// `StopSource` produces `StopToken` and cancels all of its tokens on drop. /// /// # Example: @@ -190,16 +203,30 @@ pin_project! { } impl Future for StopFuture { - type Output = Option; + type Output = Result; - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = self.project(); if let Poll::Ready(()) = this.stop_token.poll(cx) { - return Poll::Ready(None); + return Poll::Ready(Err(Error::Stopped)); } match this.future.poll(cx) { Poll::Pending => Poll::Pending, - Poll::Ready(it) => Poll::Ready(Some(it)), + Poll::Ready(it) => Poll::Ready(Ok(it)), + } + } +} + +impl WithStopTokenExt for F where F: Future {} + +pub trait WithStopTokenExt: Future { + fn with_stop_token(self, stop_token: &StopToken) -> StopFuture + where + Self: Sized, + { + StopFuture { + stop_token: stop_token.clone(), + future: self, } } } From e942da2ef9db1ffaba0b8f59b0037130bca2b003 Mon Sep 17 00:00:00 2001 From: Gavrie Philipson Date: Wed, 5 Aug 2020 21:23:02 +0300 Subject: [PATCH 2/5] Add extension trait for Stream as well --- src/lib.rs | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/src/lib.rs b/src/lib.rs index b5e938e..94ae95d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -230,3 +230,17 @@ pub trait WithStopTokenExt: Future { } } } + +impl WithStopTokenStreamExt for S where S: Stream {} + +pub trait WithStopTokenStreamExt: Stream { + fn with_stop_token(self, stop_token: &StopToken) -> StopStream + where + Self: Sized, + { + StopStream { + stop_token: stop_token.clone(), + stream: self, + } + } +} From 0f480c6803ec66a58e55f8e9d92d8c965ef27e55 Mon Sep 17 00:00:00 2001 From: Gavrie Philipson Date: Wed, 5 Aug 2020 21:24:54 +0300 Subject: [PATCH 3/5] Run rustfmt --- tests/tests.rs | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/tests/tests.rs b/tests/tests.rs index 8fe28f3..6a0f649 100644 --- a/tests/tests.rs +++ b/tests/tests.rs @@ -1,6 +1,6 @@ use std::time::Duration; -use async_std::{prelude::*, task, sync::channel}; +use async_std::{prelude::*, sync::channel, task}; use stop_token::StopSource; @@ -13,13 +13,14 @@ fn smoke() { let stop_token = stop_source.stop_token(); let receiver = receiver.clone(); async move { - let mut xs = Vec::new(); - let mut stream = stop_token.stop_stream(receiver); - while let Some(x) = stream.next().await { - xs.push(x) + let mut xs = Vec::new(); + let mut stream = stop_token.stop_stream(receiver); + while let Some(x) = stream.next().await { + xs.push(x) + } + xs } - xs - }}); + }); sender.send(1).await; sender.send(2).await; sender.send(3).await; From 100a486a91caf0bc3685a728b54a930bd6fcdb60 Mon Sep 17 00:00:00 2001 From: Gavrie Philipson Date: Wed, 5 Aug 2020 21:43:23 +0300 Subject: [PATCH 4/5] Add test --- tests/tests.rs | 27 ++++++++++++++++++++++++++- 1 file changed, 26 insertions(+), 1 deletion(-) diff --git a/tests/tests.rs b/tests/tests.rs index 6a0f649..d132543 100644 --- a/tests/tests.rs +++ b/tests/tests.rs @@ -2,7 +2,7 @@ use std::time::Duration; use async_std::{prelude::*, sync::channel, task}; -use stop_token::StopSource; +use stop_token::{Error, StopSource, StopToken, WithStopTokenExt as _}; #[test] fn smoke() { @@ -35,3 +35,28 @@ fn smoke() { assert_eq!(task.await, vec![1, 2, 3]); }) } + +#[test] +fn extension_methods() { + async fn long_running(stop_token: StopToken) -> Result<(), Error> { + loop { + task::sleep(Duration::from_secs(10)) + .with_stop_token(&stop_token) + .await?; + } + } + + task::block_on(async { + let stop_source = StopSource::new(); + let stop_token = stop_source.stop_token(); + + task::spawn(async { + task::sleep(Duration::from_millis(250)).await; + drop(stop_source); + }); + + if let Ok(_) = long_running(stop_token).await { + panic!("expected to have been stopped"); + } + }) +} From df397b9927029fa729c4a52b752b2f9479fcf95b Mon Sep 17 00:00:00 2001 From: Gavrie Philipson Date: Wed, 5 Aug 2020 21:43:48 +0300 Subject: [PATCH 5/5] Bump version and explicitly use unstable feature of async-std --- Cargo.toml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 97bd9c1..7f62852 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "stop-token" -version = "0.1.2" +version = "0.2.0" authors = ["Aleksey Kladov "] edition = "2018" license = "MIT OR Apache-2.0" @@ -10,7 +10,7 @@ description = "Experimental cooperative cancellation for async-std" [dependencies] pin-project-lite = "0.1.0" -async-std = "1.0" +async-std = { version = "1.0", features = ["unstable"] } [features] unstable = ["async-std/unstable"]