Skip to content

Remove runtime dependency on async-std by implementing a custom CondVar using the event_listener crate. #4

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 15 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 10 additions & 11 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,15 @@ jobs:
runs-on: ubuntu-latest

steps:
- uses: actions/checkout@master
- uses: actions/checkout@master

- name: Install nightly
uses: actions-rs/toolchain@v1
with:
toolchain: nightly
override: true
- name: Install nightly
uses: actions-rs/toolchain@v1
with:
toolchain: nightly
override: true

- name: tests
uses: actions-rs/cargo@v1
with:
command: test
args: --features unstable
- name: tests
uses: actions-rs/cargo@v1
with:
command: test
13 changes: 10 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "stop-token"
version = "0.1.2"
version = "0.1.3"
authors = ["Aleksey Kladov <[email protected]>"]
edition = "2018"
license = "MIT OR Apache-2.0"
Expand All @@ -10,7 +10,14 @@ description = "Experimental cooperative cancellation for async-std"

[dependencies]
pin-project-lite = "0.1.0"
async-std = "1.0"
futures = "0.3.5"
event-listener = "2.2.0"


[dev-dependencies]
async-std = { version = "1.0", features = ["unstable"] }

[features]
unstable = ["async-std/unstable"]
unstable = []
# This feature doesn't do anything anymore,
# but is needed for backwards-compatibility
181 changes: 161 additions & 20 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,18 @@
//! The cancellation system is a subset of `C#` [`CancellationToken / CancellationTokenSource`](https://docs.microsoft.com/en-us/dotnet/standard/threading/cancellation-in-managed-threads).
//! The `StopToken / StopTokenSource` terminology is borrowed from C++ paper P0660: https://wg21.link/p0660.

use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::ptr::null_mut;
use std::sync::{
atomic::{AtomicPtr, Ordering},
Arc,
};

use async_std::prelude::*;
use async_std::sync::{channel, Receiver, Sender};
use event_listener::{Event, EventListener};
use futures::stream::Stream;
use pin_project_lite::pin_project;

enum Never {}
use std::task::{Context, Poll};

/// `StopSource` produces `StopToken` and cancels all of its tokens on drop.
///
Expand All @@ -86,30 +90,163 @@ enum Never {}
/// schedule_some_work(stop_token);
/// drop(stop_source); // At this point, scheduled work notices that it is canceled.
/// ```

/// An immutable, atomic option type that store data in Boxed Arcs
struct AtomicOption<T>(AtomicPtr<Arc<T>>);

// TODO: relax orderings on atomic accesses
impl<T> AtomicOption<T> {
fn is_none(&self) -> bool {
self.0.load(Ordering::SeqCst).is_null()
}

#[allow(dead_code)]
fn is_some(&self) -> bool {
!self.is_none()
}

fn get(&self) -> Option<Arc<T>> {
let ptr = self.0.load(Ordering::SeqCst);
if ptr.is_null() {
None
} else {
// Safety: we know that `ptr` is not null and can only have been created from a `Box` by `new` or `replace`
// this means it's safe to turn back into a `Box`
let arc_box = unsafe { Box::from_raw(ptr as *mut Arc<T>) };

let arc = *arc_box.clone(); // Clone the Arc

Box::leak(arc_box); // And make sure rust doesn't drop our inner value

Some(arc)
}
}

fn new(value: Option<T>) -> Self {
let ptr = if let Some(value) = value {
Box::into_raw(Box::new(Arc::new(value)))
} else {
null_mut()
};

Self(AtomicPtr::new(ptr))
}

fn take(&self) -> Option<Arc<T>> {
self.replace(None)
}

fn replace(&self, new: Option<T>) -> Option<Arc<T>> {
let new_ptr = if let Some(new) = new {
Box::into_raw(Box::new(Arc::new(new)))
} else {
null_mut()
};

let ptr = self.0.swap(new_ptr, Ordering::SeqCst);

if ptr.is_null() {
None
} else {
// Safety: we know that `ptr` is not null and can only have been created from a `Box` by `new` or `replace`
// this means it's safe to turn back into a `Box`
Some(unsafe { *Box::from_raw(ptr) })
}
}
}

impl<T> Drop for AtomicOption<T> {
fn drop(&mut self) {
std::mem::drop(self.take());
}
}

impl<T> std::fmt::Debug for AtomicOption<T>
where
T: std::fmt::Debug,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
if self.is_none() {
write!(f, "None")
} else {
write!(f, "Some(<Opaque>)")
}
}
}

/// a custom implementation of a CondVar that short-circuits after
/// being signaled once
#[derive(Debug)]
struct ShortCircuitingCondVar(AtomicOption<Event>);

impl ShortCircuitingCondVar {
fn is_done(&self) -> bool {
self.0.is_none()
}

fn notify(&self, n: usize) -> bool {
self.0.take().map(|x| x.notify(n)).is_some()
}

fn listen(&self) -> Option<EventListener> {
self.0.get().map(|event| event.listen())
}
}

#[derive(Debug)]
pub struct StopSource {
/// Solely for `Drop`.
_chan: Sender<Never>,
stop_token: StopToken,
signal: Arc<ShortCircuitingCondVar>,
}

/// `StopToken` is a future which completes when the associated `StopSource` is dropped.
#[derive(Debug, Clone)]
#[derive(Debug)]
pub struct StopToken {
chan: Receiver<Never>,
cond_var: Arc<ShortCircuitingCondVar>,
cached_listener: Option<EventListener>,
}

impl StopToken {
fn new(cond_var: Arc<ShortCircuitingCondVar>) -> Self {
Self {
cond_var,
cached_listener: None,
}
}

fn listen(&mut self) -> Option<&mut EventListener> {
if self.cond_var.is_done() {
return None;
}

if self.cached_listener.is_none() {
self.cached_listener = self.cond_var.listen();
}
self.cached_listener.as_mut()
}
}

impl Clone for StopToken {
fn clone(&self) -> Self {
Self::new(self.cond_var.clone())
}
}

impl Default for StopSource {
fn default() -> StopSource {
let (sender, receiver) = channel::<Never>(1);

StopSource {
_chan: sender,
stop_token: StopToken { chan: receiver },
signal: Arc::new(ShortCircuitingCondVar(AtomicOption::new(
Some(Event::new()),
))),
}
}
}

impl Drop for StopSource {
fn drop(&mut self) {
self.signal.notify(usize::MAX);
}
}

impl StopSource {
/// Creates a new `StopSource`.
pub fn new() -> StopSource {
Expand All @@ -120,19 +257,23 @@ impl StopSource {
///
/// Once the source is destroyed, `StopToken` future completes.
pub fn stop_token(&self) -> StopToken {
self.stop_token.clone()
StopToken::new(self.signal.clone())
}
}

impl Future for StopToken {
type Output = ();

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
let chan = Pin::new(&mut self.chan);
match Stream::poll_next(chan, cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(Some(never)) => match never {},
Poll::Ready(None) => Poll::Ready(()),
if let Some(mut listener) = self.listen() {
let result = match Future::poll(Pin::new(&mut listener), cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(_) => Poll::Ready(()),
};

return result;
} else {
Poll::Ready(())
}
}
}
Expand Down
Loading