From eac390f5138b555da68f67c03cd758e0b86e4f96 Mon Sep 17 00:00:00 2001 From: Adam Foxman <5109471+afoxman@users.noreply.github.com> Date: Thu, 14 May 2026 11:06:21 -0700 Subject: [PATCH 01/10] beginnings of a new crate. needs to be polished and prepped for a v1 release. --- crates/cancelable/src/future.rs | 172 +++++++++ crates/cancelable/src/token.rs | 648 ++++++++++++++++++++++++++++++++ 2 files changed, 820 insertions(+) create mode 100644 crates/cancelable/src/future.rs create mode 100644 crates/cancelable/src/token.rs diff --git a/crates/cancelable/src/future.rs b/crates/cancelable/src/future.rs new file mode 100644 index 000000000..b328f761d --- /dev/null +++ b/crates/cancelable/src/future.rs @@ -0,0 +1,172 @@ +// Copyright (c) Microsoft Corporation. + +//! Future combinators for cooperative cancellation. +//! +//! The [`CancellationFutureExt`] trait adds a +//! [`with_cancellation`](CancellationFutureExt::with_cancellation) combinator +//! to any [`Future`], pairing it with a [`CancellationToken`] so that each +//! poll checks for cancellation before and after driving the inner future. +//! +//! ``` +//! # async fn example() { +//! use oxidizer_sync::{CancellationTokenSource, CancellationFutureExt}; +//! +//! let source = CancellationTokenSource::new(); +//! let token = source.token(); +//! +//! let result = async { 42 }.with_cancellation(token).await; +//! assert_eq!(result.unwrap(), 42); +//! # } +//! ``` + +use crate::CancellationToken; +use pin_project::pin_project; +use std::pin::Pin; +use std::task::{Context, Poll}; + +/// Error returned when a [`WithCancellation`] future detects that its +/// associated [`CancellationToken`] has been canceled. +#[ohno::error] +#[display("operation was canceled")] +pub struct Canceled {} + +/// Extension trait that adds cancellation support to any [`Future`]. +pub trait CancellationFutureExt: Future + Sized { + /// Wraps this future so that each poll checks the given [`CancellationToken`]: + /// + /// - If the token is canceled (before *or* after polling the inner + /// future), the combined future resolves to Err([Canceled]). + /// - Otherwise the inner future's output is forwarded as `Ok(output)`. + /// + /// # Note on wake semantics + /// + /// Cancellation is checked cooperatively: the combinator inspects the token + /// each time the inner future is polled. If the inner future is pending + /// and nothing else wakes the task, cancellation will not be noticed until + /// the next poll. This mirrors the cooperative model of C#'s + /// `CancellationToken.ThrowIfCancellationRequested()`. + /// + /// # Examples + /// + /// Successful completion: + /// + /// ``` + /// # async fn example() { + /// use oxidizer_sync::{CancellationTokenSource, CancellationFutureExt}; + /// + /// let source = CancellationTokenSource::new(); + /// let result = async { "hello" } + /// .with_cancellation(source.token()) + /// .await; + /// assert_eq!(result.unwrap(), "hello"); + /// # } + /// ``` + /// + /// Cancelled before first poll: + /// + /// ``` + /// # async fn example() { + /// use oxidizer_sync::{CancellationTokenSource, CancellationFutureExt}; + /// + /// let source = CancellationTokenSource::new(); + /// source.cancel(); + /// + /// let result = async { unreachable!() } + /// .with_cancellation(source.token()) + /// .await; + /// assert!(result.unwrap_err().to_string().contains("canceled")); + /// # } + /// ``` + fn with_cancellation(self, token: CancellationToken) -> WithCancellation; +} + +impl CancellationFutureExt for F { + fn with_cancellation(self, token: CancellationToken) -> WithCancellation { + WithCancellation { + inner: self, + token, + } + } +} + +/// Future returned by +/// [`with_cancellation`](CancellationFutureExt::with_cancellation). +/// +/// See the trait method documentation for semantics. +#[derive(Debug)] +#[pin_project] +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub struct WithCancellation { + #[pin] + inner: F, + token: CancellationToken, +} + +impl Future for WithCancellation { + type Output = Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + + // Check cancellation before running the inner future so we can + // short-circuit without performing unnecessary work. + if this.token.is_cancelled() { + return Poll::Ready(Err(Canceled::new())); + } + + match this.inner.poll(cx) { + Poll::Ready(output) => Poll::Ready(Ok(output)), + Poll::Pending => { + // Check for cancellation again, now that we've spent time running the inner future. + if this.token.is_cancelled() { + Poll::Ready(Err(Canceled::new())) + } else { + Poll::Pending + } + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::CancellationTokenSource; + + #[tokio::test] + async fn completed_future_returns_ok() { + let source = CancellationTokenSource::new(); + let result = async { 42 }.with_cancellation(source.token()).await; + assert_eq!(result.unwrap(), 42); + } + + #[tokio::test] + async fn cancelled_future_returns_err() { + let source = CancellationTokenSource::new(); + source.cancel(); + + let result = async { unreachable!("should not poll inner future") }.with_cancellation(source.token()).await; + assert!(result.unwrap_err().to_string().contains("canceled")); + } + + #[tokio::test] + async fn cancellation_triggered_by_inner_future_leads_to_cancellation_error() { + struct CancelOnPoll(CancellationTokenSource); + impl Future for CancelOnPoll { + type Output = (); + fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> { + self.0.cancel(); + Poll::Pending + } + } + + let source = CancellationTokenSource::new(); + let token = source.token(); + CancelOnPoll(source).with_cancellation(token).await.expect_err("should fail").to_string().contains("canceled"); + } + + #[tokio::test] + async fn already_cancelled_token() { + async { unreachable!() }.with_cancellation(CancellationToken::cancelled()).await.expect_err("should fail").to_string().contains("canceled"); + } +} diff --git a/crates/cancelable/src/token.rs b/crates/cancelable/src/token.rs new file mode 100644 index 000000000..0a1a85cd5 --- /dev/null +++ b/crates/cancelable/src/token.rs @@ -0,0 +1,648 @@ +// Copyright (c) Microsoft Corporation. + +//! Cooperative cancellation via tokens. +//! +//! This module provides [`CancellationTokenSource`] and [`CancellationToken`], +//! modeled after the equivalent C# types. A source controls cancellation and +//! hands out lightweight, cloneable tokens for observers to check. +//! +//! # Linked sources +//! +//! A linked source cancels when *any* of its parent tokens are canceled, +//! enabling composition of multiple cancellation signals: +//! +//! ``` +//! # fn example() { +//! use oxidizer_sync::CancellationTokenSource; +//! +//! let parent_a = CancellationTokenSource::new(); +//! let parent_b = CancellationTokenSource::new(); +//! +//! let linked = CancellationTokenSource::linked(&[ +//! parent_a.token(), +//! parent_b.token(), +//! ]); +//! let token = linked.token(); +//! +//! assert!(!token.is_cancelled()); +//! parent_a.cancel(); +//! assert!(token.is_cancelled()); +//! # } +//! ``` +//! +//! # Subscribers +//! +//! Register callbacks that fire exactly once when cancellation occurs: +//! +//! ``` +//! # fn example() { +//! use oxidizer_sync::CancellationTokenSource; +//! +//! let source = CancellationTokenSource::new(); +//! source.subscribe(|| println!("Operation canceled")); +//! source.cancel(); +//! # } +//! ``` + +use std::any::type_name; +use std::fmt; +use std::fmt::Debug; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::{Arc, Weak, Mutex}; + +type Subscriber = Box; + +/// Shared state backing one or more [`CancellationToken`] handles. +struct Inner { + /// Cancellation signal + canceled: AtomicBool, + + /// Subscribers to notify on cancellation + /// + /// `Some(vec)` → not yet canceled; subscribers accumulate here. + /// `None` → already canceled; new subscribers fire immediately. + subscribers: Mutex>>, +} + +impl Inner { + fn new(canceled: bool) -> Self { + Self { + canceled: AtomicBool::new(canceled), + subscribers: Mutex::new(if canceled { None } else { Some(Vec::new()) }), + } + } + + /// Returns `true` if cancellation has been requested. + #[must_use] + fn is_cancelled(&self) -> bool { + self.canceled.load(Ordering::Acquire) + } + + /// Attempt to change the `canceled` signal from `false` to `true`. + /// + /// Returns `true` on success, signaling that the caller is responsible for notifying subscribers. + fn try_set_cancelled(&self) -> bool { + self.canceled + .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire) + .is_ok() + } + + /// Signal cancellation and notify subscribers + /// + /// # Panics + /// + /// Panics when the mutex protecting the subscriber list has been poisoned. + fn cancel_and_notify(&self) { + if self.try_set_cancelled() { + // Lock, take contents, and unlock + let subscribers = self.subscribers.lock().unwrap().take().unwrap_or_default(); + + // Notify from outside the lock + for f in subscribers { + f(); + } + } + } + + /// Subscribe to the cancellation notification + /// + /// If cancellation has already occurred, the callback fires immediately. + /// + /// # Panics + /// + /// Panics when the mutex protecting the subscriber list has been poisoned. + fn subscribe(&self, callback: Subscriber) { + { + let mut guard = self.subscribers.lock().unwrap(); + + // Subscribers is `Some(...)` which means they haven't notified them yet + // (not canceled, or mid-cancellation). Add to the list. + if let Some(list) = guard.as_mut() { + list.push(callback); + return; + } + + // Subscribers are `None`, meaning cancellation has already occurred + // and all subscribers have already been notified. + // + // Fall through to release the lock, then notify immediately. + } + + callback(); + } +} + +impl Debug for Inner { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let subscribers = match self.subscribers.try_lock() { + Ok(lock) => { + match &*lock { + Some(s) => format!("Mutex(Some({} subscriber closure(s)))", s.len()), + None => String::from("Mutex(None)"), + } + }, + Err(_) => String::from("locked subscriber list"), + }; + + f.debug_struct(type_name::()) + .field("canceled", &self.canceled.load(Ordering::Acquire)) + .field("subscribers", &subscribers) + .finish() + } +} + +/// A lightweight, cloneable handle for observing whether cancellation has been +/// requested. +/// +/// Tokens are obtained from a [`CancellationTokenSource`] via +/// [`token()`](CancellationTokenSource::token) and can be passed throughout a +/// call graph so that any layer can cooperatively check for cancellation. +/// +/// # Examples +/// +/// ``` +/// # fn example() { +/// use oxidizer_sync::CancellationTokenSource; +/// +/// let source = CancellationTokenSource::new(); +/// let token = source.token(); +/// +/// assert!(!token.is_cancelled()); +/// source.cancel(); +/// assert!(token.is_cancelled()); +/// # } +/// ``` +#[derive(Debug, Clone)] +pub struct CancellationToken { + inner: Arc, +} + +impl CancellationToken { + #[must_use] + pub fn new() -> Self { + Self { + inner: Arc::new(Inner::new(false)), + } + } + + /// Returns a token that is already canceled. + /// + /// Useful for testing or for immediately signaling cancellation. + #[must_use] + pub fn cancelled() -> Self { + Self { + inner: Arc::new(Inner::new(true)), + } + } + + /// Returns `true` if cancellation has been requested. + #[must_use] + pub fn is_cancelled(&self) -> bool { + self.inner.is_cancelled() + } + + /// Get a weak reference to the shared state + fn weak_ref(&self) -> Weak { + Arc::downgrade(&self.inner) + } +} + +impl Default for CancellationToken { + fn default() -> Self { + Self::new() + } +} + +/// Controls cancellation for one or more [`CancellationToken`]s. +/// +/// Create a source, distribute tokens via [`token()`](Self::token), and call +/// [`cancel()`](Self::cancel) when the operation should stop. +/// +/// Dropping a `CancellationTokenSource` does **not** cancel its tokens. +/// Outstanding tokens simply remain in their current state. +/// +/// # Linked sources +/// +/// Use [`linked()`](Self::linked) to create a source that automatically +/// cancels when *any* of the supplied parent tokens are canceled: +/// +/// ``` +/// # fn example() { +/// use oxidizer_sync::CancellationTokenSource; +/// +/// let parent_a = CancellationTokenSource::new(); +/// let parent_b = CancellationTokenSource::new(); +/// +/// let linked = CancellationTokenSource::linked(&[ +/// parent_a.token(), +/// parent_b.token(), +/// ]); +/// let token = linked.token(); +/// +/// assert!(!token.is_cancelled()); +/// parent_a.cancel(); +/// assert!(token.is_cancelled()); +/// # } +/// ``` +/// +/// # Subscribers +/// +/// Register callbacks via [`subscribe()`](Self::subscribe) that fire once when +/// cancellation occurs. If the source is already cancelled, the callback fires +/// immediately. +/// +/// ``` +/// # fn example() { +/// # use std::sync::Arc; +/// # use std::sync::atomic::{AtomicBool, Ordering}; +/// # use oxidizer_sync::CancellationTokenSource; +/// let source = CancellationTokenSource::new(); +/// let notified = Arc::new(AtomicBool::new(false)); +/// +/// let flag = Arc::clone(¬ified); +/// source.subscribe(move || { flag.store(true, Ordering::Relaxed); }); +/// +/// source.cancel(); +/// assert!(notified.load(Ordering::Relaxed)); +/// # } +/// ``` +#[derive(Debug, Default)] +pub struct CancellationTokenSource { + token: CancellationToken, +} + +impl CancellationTokenSource { + /// Creates a new, independent cancellation source. + #[must_use] + pub fn new() -> Self { + Self { + token: CancellationToken::new(), + } + } + + /// Creates a source linked to the parent tokens. + /// + /// The returned source's token reports [`is_cancelled()`] when: + /// - [`cancel()`](Self::cancel) is called directly on this source, **or** + /// - *any* parent token is canceled + /// + /// Linked sources work by registering a subscriber on each parent token. + /// When any parent is canceled, we get a notification (callback), and use it to self-cancel. + /// + /// [`is_cancelled()`]: CancellationToken::is_cancelled + pub fn linked(parents: &[CancellationToken]) -> Self { + let source = Self::new(); + + if !parents.is_empty() { + let weak = source.token.weak_ref(); + + for parent in parents { + let weak = Weak::clone(&weak); + parent + .inner + .subscribe(Box::new(move || { + if let Some(inner) = weak.upgrade() { + inner.cancel_and_notify(); + } + })); + } + } + + source + } + + /// Returns a [`CancellationToken`] associated with this source. + /// + /// All tokens from the same source share the same cancellation state. + #[must_use] + pub fn token(&self) -> CancellationToken { + self.token.clone() + } + + /// Requests cancellation. + /// + /// All tokens obtained from this source will report + /// [`is_cancelled() == true`](CancellationToken::is_cancelled) after this + /// call, and all registered subscribers will be notified. Calling `cancel` + /// more than once has no additional effect. + /// + /// Subscriber callbacks run synchronously on the calling thread. If any + /// callback panics, the panic propagates immediately, and remaining + /// callbacks will not run. + pub fn cancel(&self) { + self.token.inner.cancel_and_notify(); + } + + /// Returns `true` if cancellation has been requested on this source + /// or on any parent token (for linked sources). + #[must_use] + pub fn is_cancelled(&self) -> bool { + self.token.is_cancelled() + } + + /// Registers a callback to invoke when cancellation occurs. + /// + /// If this source is already canceled, the callback fires immediately + /// on the calling thread. + /// + /// # Examples + /// + /// ``` + /// # fn example() { + /// use std::sync::Arc; + /// use std::sync::atomic::{AtomicBool, Ordering}; + /// use oxidizer_sync::CancellationTokenSource; + /// + /// let source = CancellationTokenSource::new(); + /// let notified = Arc::new(AtomicBool::new(false)); + /// + /// let flag = Arc::clone(¬ified); + /// source.subscribe(move || { flag.store(true, Ordering::Relaxed); }); + /// + /// source.cancel(); + /// assert!(notified.load(Ordering::Relaxed)); + /// # } + /// ``` + pub fn subscribe(&self, callback: impl FnOnce() + Send + 'static) { + self.token.inner.subscribe(Box::new(callback)); + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::sync::atomic::AtomicUsize; + + #[test] + fn new_source_is_not_cancelled() { + let source = CancellationTokenSource::new(); + assert!(!source.is_cancelled()); + assert!(!source.token().is_cancelled()); + } + + #[test] + fn cancel_propagates_to_token() { + let source = CancellationTokenSource::new(); + let token = source.token(); + + source.cancel(); + + assert!(token.is_cancelled()); + assert!(source.is_cancelled()); + } + + #[test] + fn cancel_is_idempotent() { + let source = CancellationTokenSource::new(); + source.cancel(); + source.cancel(); + assert!(source.is_cancelled()); + } + + #[test] + fn multiple_tokens_share_state() { + let source = CancellationTokenSource::new(); + let t1 = source.token(); + let t2 = source.token(); + + source.cancel(); + + assert!(t1.is_cancelled()); + assert!(t2.is_cancelled()); + } + + #[test] + fn cloned_token_shares_state() { + let source = CancellationTokenSource::new(); + let t1 = source.token(); + let t2 = t1.clone(); + + source.cancel(); + + assert!(t1.is_cancelled()); + assert!(t2.is_cancelled()); + } + + #[test] + fn cancelled_token_is_cancelled() { + let token = CancellationToken::cancelled(); + assert!(token.is_cancelled()); + } + + #[test] + fn default_token_is_not_cancelled() { + let token = CancellationToken::default(); + assert!(!token.is_cancelled()); + } + + #[test] + fn default_source_is_not_cancelled() { + let source = CancellationTokenSource::default(); + assert!(!source.is_cancelled()); + } + + #[test] + fn linked_cancels_when_parent_cancels() { + let parent = CancellationTokenSource::new(); + + let linked = CancellationTokenSource::linked(&[parent.token()]); + let linked_token = linked.token(); + + assert!(!linked_token.is_cancelled()); + parent.cancel(); + assert!(linked_token.is_cancelled()); + } + + #[test] + fn linked_cancels_when_any_parent_cancels() { + let p1 = CancellationTokenSource::new(); + let p2 = CancellationTokenSource::new(); + let linked = CancellationTokenSource::linked(&[p1.token(), p2.token()]); + + assert!(!linked.is_cancelled()); + p2.cancel(); + assert!(linked.is_cancelled()); + } + + #[test] + fn linked_cancels_directly() { + let parent = CancellationTokenSource::new(); + let linked = CancellationTokenSource::linked(&[parent.token()]); + + linked.cancel(); + + assert!(linked.is_cancelled()); + assert!(!parent.is_cancelled(), "cancelling child must not propagate to parent"); + } + + #[test] + fn linked_chain_propagates() { + let root = CancellationTokenSource::new(); + let mid = CancellationTokenSource::linked(&[root.token()]); + let leaf = CancellationTokenSource::linked(&[mid.token()]); + + assert!(!leaf.is_cancelled()); + root.cancel(); + assert!(leaf.is_cancelled()); + } + + #[test] + fn linked_from_already_cancelled_parent() { + let parent = CancellationTokenSource::new(); + parent.cancel(); + + let linked = CancellationTokenSource::linked(&[parent.token()]); + assert!(linked.is_cancelled()); + } + + #[test] + fn dropping_source_does_not_cancel_token() { + let token = { + let source = CancellationTokenSource::new(); + source.token() + }; + assert!(!token.is_cancelled()); + } + + #[test] + fn dropped_linked_source_does_not_notify_on_parent_cancel() { + let parent = CancellationTokenSource::new(); + let counter = Arc::new(AtomicUsize::new(0)); + + { + let linked = CancellationTokenSource::linked(&[parent.token()]); + let c = Arc::clone(&counter); + linked.subscribe(move || { + c.fetch_add(1, Ordering::Relaxed); + }); + + // linked dropped here + } + + parent.cancel(); + assert_eq!(counter.load(Ordering::Relaxed), 0); + } + + #[test] + fn cancel_visible_across_threads() { + let source = CancellationTokenSource::new(); + let token = source.token(); + + let handle = std::thread::spawn(move || { + while !token.is_cancelled() { + std::hint::spin_loop(); + } + true + }); + + source.cancel(); + assert!(handle.join().unwrap()); + } + + #[test] + fn linked_cancellation_is_visible_across_threads() { + let parent = CancellationTokenSource::new(); + let linked = CancellationTokenSource::linked(&[parent.token()]); + let linked_token = linked.token(); + + let handle = std::thread::spawn(move || { + while !linked_token.is_cancelled() { + std::hint::spin_loop(); + } + true + }); + + parent.cancel(); + assert!(handle.join().unwrap()); + } + + #[test] + fn subscribers_are_notified_on_cancel() { + let source = CancellationTokenSource::new(); + let counter = Arc::new(AtomicUsize::new(0)); + + let c = Arc::clone(&counter); + source.subscribe(move || { + c.fetch_add(1, Ordering::Relaxed); + }); + + source.cancel(); + assert_eq!(counter.load(Ordering::Relaxed), 1); + } + + #[test] + fn subscribers_are_notified_immediately_if_already_cancelled() { + let source = CancellationTokenSource::new(); + source.cancel(); + + let counter = Arc::new(AtomicUsize::new(0)); + let c = Arc::clone(&counter); + source.subscribe(move || { + c.fetch_add(1, Ordering::Relaxed); + }); + + assert_eq!(counter.load(Ordering::Relaxed), 1); + } + + #[test] + fn all_subscribers_are_notified() { + let source = CancellationTokenSource::new(); + let counter = Arc::new(AtomicUsize::new(0)); + + for _ in 0..5 { + let c = Arc::clone(&counter); + source.subscribe(move || { + c.fetch_add(1, Ordering::Relaxed); + }); + } + + source.cancel(); + assert_eq!(counter.load(Ordering::Relaxed), 5); + } + + #[test] + fn subscribers_are_only_notified_once_on_double_cancel() { + let source = CancellationTokenSource::new(); + let counter = Arc::new(AtomicUsize::new(0)); + + let c = Arc::clone(&counter); + source.subscribe(move || { + c.fetch_add(1, Ordering::Relaxed); + }); + + source.cancel(); + source.cancel(); + assert_eq!(counter.load(Ordering::Relaxed), 1); + } + + #[test] + fn linked_subscriber_is_notified_on_parent_cancel() { + let parent = CancellationTokenSource::new(); + let linked = CancellationTokenSource::linked(&[parent.token()]); + let counter = Arc::new(AtomicUsize::new(0)); + + let c = Arc::clone(&counter); + linked.subscribe(move || { + c.fetch_add(1, Ordering::Relaxed); + }); + + parent.cancel(); + assert_eq!(counter.load(Ordering::Relaxed), 1); + } + + #[test] + fn chained_linked_subscribers_are_notified() { + let root = CancellationTokenSource::new(); + let mid = CancellationTokenSource::linked(&[root.token()]); + let leaf = CancellationTokenSource::linked(&[mid.token()]); + + let counter = Arc::new(AtomicUsize::new(0)); + let c = Arc::clone(&counter); + leaf.subscribe(move || { + c.fetch_add(1, Ordering::Relaxed); + }); + + root.cancel(); + assert_eq!(counter.load(Ordering::Relaxed), 1); + } +} From 48aee1d5e8e9176e2f269f1df07ea32d783ca142 Mon Sep 17 00:00:00 2001 From: Adam Foxman <5109471+afoxman@users.noreply.github.com> Date: Mon, 1 Jun 2026 18:08:34 -0700 Subject: [PATCH 02/10] v1 draft --- CHANGELOG.md | 1 + Cargo.lock | 9 ++ Cargo.toml | 1 + README.md | 1 + crates/cancelable/CHANGELOG.md | 1 + crates/cancelable/Cargo.toml | 31 ++++++ crates/cancelable/README.md | 82 +++++++++++++++ crates/cancelable/favicon.ico | 3 + crates/cancelable/logo.png | 3 + crates/cancelable/src/future.rs | 58 ++++++----- crates/cancelable/src/lib.rs | 73 ++++++++++++++ crates/cancelable/src/token.rs | 171 ++++++-------------------------- 12 files changed, 269 insertions(+), 165 deletions(-) create mode 100644 crates/cancelable/CHANGELOG.md create mode 100644 crates/cancelable/Cargo.toml create mode 100644 crates/cancelable/README.md create mode 100644 crates/cancelable/favicon.ico create mode 100644 crates/cancelable/logo.png create mode 100644 crates/cancelable/src/lib.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index 57c3b358f..62aec0706 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ Please see each crate's change log below: - [`cachet_memory`](./crates/cachet_memory/CHANGELOG.md) - [`cachet_service`](./crates/cachet_service/CHANGELOG.md) - [`cachet_tier`](./crates/cachet_tier/CHANGELOG.md) +- [`cancelable`](./crates/cancelable/CHANGELOG.md) - [`data_privacy`](./crates/data_privacy/CHANGELOG.md) - [`data_privacy_macros`](./crates/data_privacy_macros/CHANGELOG.md) - [`data_privacy_macros_impl`](./crates/data_privacy_macros_impl/CHANGELOG.md) diff --git a/Cargo.lock b/Cargo.lock index 1af6f95c5..f5e87b6ca 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -540,6 +540,15 @@ dependencies = [ "tokio", ] +[[package]] +name = "cancelable" +version = "0.1.0" +dependencies = [ + "ohno 0.3.3", + "pin-project", + "tokio", +] + [[package]] name = "cast" version = "0.3.0" diff --git a/Cargo.toml b/Cargo.toml index a4ac27e1c..1ed385ba6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,6 +30,7 @@ cachet = { path = "crates/cachet", default-features = false, version = "0.6.0" } cachet_memory = { path = "crates/cachet_memory", default-features = false, version = "0.3.0" } cachet_service = { path = "crates/cachet_service", default-features = false, version = "0.2.0" } cachet_tier = { path = "crates/cachet_tier", default-features = false, version = "0.2.0" } +cancelable = { path = "crates/cancelable", default-features = false, version = "0.1.0" } data_privacy = { path = "crates/data_privacy", default-features = false, version = "0.11.1" } data_privacy_macros = { path = "crates/data_privacy_macros", default-features = false, version = "0.9.1" } data_privacy_macros_impl = { path = "crates/data_privacy_macros_impl", default-features = false, version = "0.9.1" } diff --git a/README.md b/README.md index fa2b0765a..11bed2c8b 100644 --- a/README.md +++ b/README.md @@ -33,6 +33,7 @@ These are the primary crates built out of this repo: - [`cachet_memory`](./crates/cachet_memory/README.md) - In-memory cache tier backed by Moka for the cachet caching library. - [`cachet_service`](./crates/cachet_service/README.md) - Layered service integration for the cachet caching library. - [`cachet_tier`](./crates/cachet_tier/README.md) - Core cache tier trait and abstractions for building cache backends. +- [`cancelable`](./crates/cancelable/README.md) - Cooperative cancellation via tokens - [`data_privacy`](./crates/data_privacy/README.md) - Mechanisms to classify, manipulate, and redact sensitive data. - [`fetch_hyper`](./crates/fetch_hyper/README.md) - Hyper-based HTTP transport utilities for fetch. - [`fundle`](./crates/fundle/README.md) - Compile-time safe dependency injection for Rust. diff --git a/crates/cancelable/CHANGELOG.md b/crates/cancelable/CHANGELOG.md new file mode 100644 index 000000000..825c32f0d --- /dev/null +++ b/crates/cancelable/CHANGELOG.md @@ -0,0 +1 @@ +# Changelog diff --git a/crates/cancelable/Cargo.toml b/crates/cancelable/Cargo.toml new file mode 100644 index 000000000..e4cd3ac59 --- /dev/null +++ b/crates/cancelable/Cargo.toml @@ -0,0 +1,31 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +[package] +name = "cancelable" +description = "Cooperative cancellation via tokens" +version = "0.1.0" +readme = "README.md" +keywords = ["oxidizer", "async", "futures"] +categories = ["asynchronous", "concurrency"] + +edition.workspace = true +rust-version.workspace = true +authors.workspace = true +license.workspace = true +homepage.workspace = true +repository = "https://github.com/microsoft/oxidizer/tree/main/crates/cancelable" + +[package.metadata.docs.rs] +all-features = true + +[dependencies] +ohno = { workspace = true } +pin-project = { workspace = true } + +[dev-dependencies] +ohno = { workspace = true, features = ["app-err"] } +tokio = { workspace = true, features = ["macros", "rt-multi-thread"] } + +[lints] +workspace = true diff --git a/crates/cancelable/README.md b/crates/cancelable/README.md new file mode 100644 index 000000000..6a51e9ade --- /dev/null +++ b/crates/cancelable/README.md @@ -0,0 +1,82 @@ +
+ Cancelable Logo + +# Cancelable + +[![crate.io](https://img.shields.io/crates/v/cancelable.svg)](https://crates.io/crates/cancelable) +[![docs.rs](https://docs.rs/cancelable/badge.svg)](https://docs.rs/cancelable) +[![MSRV](https://img.shields.io/crates/msrv/cancelable)](https://crates.io/crates/cancelable) +[![CI](https://github.com/microsoft/oxidizer/actions/workflows/main.yml/badge.svg?event=push)](https://github.com/microsoft/oxidizer/actions/workflows/main.yml) +[![Coverage](https://codecov.io/gh/microsoft/oxidizer/graph/badge.svg?token=FCUG0EL5TI)](https://codecov.io/gh/microsoft/oxidizer) +[![License](https://img.shields.io/badge/license-MIT-blue.svg)](../../LICENSE) +This crate was developed as part of the Oxidizer project + +
+ +Cooperative cancellation via tokens. + +This module provides [`CancellationTokenSource`][__link0] and [`CancellationToken`][__link1], +modeled after the equivalent C# types. A source controls cancellation and +hands out lightweight, cloneable tokens for observers to check. + +## Linked sources + +A linked source cancels when *any* of its parent tokens are canceled, +enabling composition of multiple cancellation signals: + +```rust +use cancelable::CancellationTokenSource; + +let first = CancellationTokenSource::new(); +let second = CancellationTokenSource::new(); + +let linked = CancellationTokenSource::linked(&[first.token(), second.token()]); +let token = linked.token(); + +assert!(!token.is_cancelled()); +second.cancel(); +assert!(token.is_cancelled()); +``` + +## Subscribers + +Register callbacks that fire exactly once when cancellation occurs: + +```rust +use cancelable::CancellationTokenSource; + +let source = CancellationTokenSource::new(); +source.subscribe(|| println!("Operation canceled")); +source.cancel(); +``` + +## Futures + +The [`WithCancellationExt`][__link2] trait adds a +[`with_cancellation`][__link3] combinator +to any [`Future`][__link4], pairing it with a [`CancellationToken`][__link5] so that each +poll checks for cancellation before and after driving the inner future. + +```rust +use cancelable::{CancellationTokenSource, WithCancellationExt}; + +let source = CancellationTokenSource::new(); +let token = source.token(); + +let result = async { 42 }.with_cancellation(token).await?; +assert_eq!(result, 42); +``` + + +
+ +This crate was developed as part of The Oxidizer Project. Browse this crate's source code. + + + [__cargo_doc2readme_dependencies_info]: ggGmYW0CYXZlMC43LjJhdIQbLiTyV0MU86EbZU15e0PmecoboQ9jo59bnAEbyDXw04U13GlhYvRhcoQbwIR-DtpCTPsbyw0-tCMPjdAbd-QY8wrEiWQbEkCmgY5QMxFhZIGCamNhbmNlbGFibGVlMC4xLjA + [__link0]: https://docs.rs/cancelable/0.1.0/cancelable/?search=CancellationTokenSource + [__link1]: https://docs.rs/cancelable/0.1.0/cancelable/?search=CancellationToken + [__link2]: https://docs.rs/cancelable/0.1.0/cancelable/?search=future::WithCancellationExt + [__link3]: https://docs.rs/cancelable/0.1.0/cancelable/?search=future::WithCancellationExt::with_cancellation + [__link4]: https://doc.rust-lang.org/stable/std/future/trait.Future.html + [__link5]: https://docs.rs/cancelable/0.1.0/cancelable/?search=CancellationToken diff --git a/crates/cancelable/favicon.ico b/crates/cancelable/favicon.ico new file mode 100644 index 000000000..6cc6d824e --- /dev/null +++ b/crates/cancelable/favicon.ico @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:b7623e3f34c7d69a0392b3701aa62e534c1b52589e6f91826bb651952d86b6dc +size 15406 diff --git a/crates/cancelable/logo.png b/crates/cancelable/logo.png new file mode 100644 index 000000000..2f3e3885d --- /dev/null +++ b/crates/cancelable/logo.png @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:5dc59a03e875c186e6e77e0b7506e40238c15d7e86c2ad1db1282d17898fbb85 +size 53167 diff --git a/crates/cancelable/src/future.rs b/crates/cancelable/src/future.rs index b328f761d..41f482690 100644 --- a/crates/cancelable/src/future.rs +++ b/crates/cancelable/src/future.rs @@ -1,29 +1,32 @@ // Copyright (c) Microsoft Corporation. -//! Future combinators for cooperative cancellation. +//! Future combinators for cooperative cancellation //! -//! The [`CancellationFutureExt`] trait adds a -//! [`with_cancellation`](CancellationFutureExt::with_cancellation) combinator +//! The [`WithCancellationExt`] trait adds a +//! [`with_cancellation`](WithCancellationExt::with_cancellation) combinator //! to any [`Future`], pairing it with a [`CancellationToken`] so that each //! poll checks for cancellation before and after driving the inner future. //! //! ``` -//! # async fn example() { -//! use oxidizer_sync::{CancellationTokenSource, CancellationFutureExt}; +//! # async fn example() -> Result<(), ohno::AppError> { +//! use cancelable::{CancellationTokenSource, WithCancellationExt}; //! //! let source = CancellationTokenSource::new(); //! let token = source.token(); //! -//! let result = async { 42 }.with_cancellation(token).await; -//! assert_eq!(result.unwrap(), 42); +//! let result = async { 42 }.with_cancellation(token).await?; +//! assert_eq!(result, 42); +//! # Ok(()) //! # } //! ``` -use crate::CancellationToken; -use pin_project::pin_project; use std::pin::Pin; use std::task::{Context, Poll}; +use pin_project::pin_project; + +use crate::CancellationToken; + /// Error returned when a [`WithCancellation`] future detects that its /// associated [`CancellationToken`] has been canceled. #[ohno::error] @@ -31,7 +34,7 @@ use std::task::{Context, Poll}; pub struct Canceled {} /// Extension trait that adds cancellation support to any [`Future`]. -pub trait CancellationFutureExt: Future + Sized { +pub trait WithCancellationExt: Future + Sized { /// Wraps this future so that each poll checks the given [`CancellationToken`]: /// /// - If the token is canceled (before *or* after polling the inner @@ -52,12 +55,10 @@ pub trait CancellationFutureExt: Future + Sized { /// /// ``` /// # async fn example() { - /// use oxidizer_sync::{CancellationTokenSource, CancellationFutureExt}; + /// use cancelable::{CancellationTokenSource, WithCancellationExt}; /// /// let source = CancellationTokenSource::new(); - /// let result = async { "hello" } - /// .with_cancellation(source.token()) - /// .await; + /// let result = async { "hello" }.with_cancellation(source.token()).await; /// assert_eq!(result.unwrap(), "hello"); /// # } /// ``` @@ -66,7 +67,7 @@ pub trait CancellationFutureExt: Future + Sized { /// /// ``` /// # async fn example() { - /// use oxidizer_sync::{CancellationTokenSource, CancellationFutureExt}; + /// use cancelable::{CancellationTokenSource, WithCancellationExt}; /// /// let source = CancellationTokenSource::new(); /// source.cancel(); @@ -80,17 +81,14 @@ pub trait CancellationFutureExt: Future + Sized { fn with_cancellation(self, token: CancellationToken) -> WithCancellation; } -impl CancellationFutureExt for F { +impl WithCancellationExt for F { fn with_cancellation(self, token: CancellationToken) -> WithCancellation { - WithCancellation { - inner: self, - token, - } + WithCancellation { inner: self, token } } } /// Future returned by -/// [`with_cancellation`](CancellationFutureExt::with_cancellation). +/// [`with_cancellation`](WithCancellationExt::with_cancellation). /// /// See the trait method documentation for semantics. #[derive(Debug)] @@ -145,7 +143,9 @@ mod tests { let source = CancellationTokenSource::new(); source.cancel(); - let result = async { unreachable!("should not poll inner future") }.with_cancellation(source.token()).await; + let result = async { unreachable!("should not poll inner future") } + .with_cancellation(source.token()) + .await; assert!(result.unwrap_err().to_string().contains("canceled")); } @@ -162,11 +162,21 @@ mod tests { let source = CancellationTokenSource::new(); let token = source.token(); - CancelOnPoll(source).with_cancellation(token).await.expect_err("should fail").to_string().contains("canceled"); + CancelOnPoll(source) + .with_cancellation(token) + .await + .expect_err("should fail") + .to_string() + .contains("canceled"); } #[tokio::test] async fn already_cancelled_token() { - async { unreachable!() }.with_cancellation(CancellationToken::cancelled()).await.expect_err("should fail").to_string().contains("canceled"); + async { unreachable!() } + .with_cancellation(CancellationToken::cancelled()) + .await + .expect_err("should fail") + .to_string() + .contains("canceled"); } } diff --git a/crates/cancelable/src/lib.rs b/crates/cancelable/src/lib.rs new file mode 100644 index 000000000..786f7187a --- /dev/null +++ b/crates/cancelable/src/lib.rs @@ -0,0 +1,73 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +#![cfg_attr(docsrs, feature(doc_cfg))] +#![doc(html_logo_url = "https://media.githubusercontent.com/media/microsoft/oxidizer/refs/heads/main/crates/cancelable/logo.png")] +#![doc(html_favicon_url = "https://media.githubusercontent.com/media/microsoft/oxidizer/refs/heads/main/crates/cancelable/favicon.ico")] + +//! Cooperative cancellation via tokens. +//! +//! This module provides [`CancellationTokenSource`] and [`CancellationToken`], +//! modeled after the equivalent C# types. A source controls cancellation and +//! hands out lightweight, cloneable tokens for observers to check. +//! +//! # Linked sources +//! +//! A linked source cancels when *any* of its parent tokens are canceled, +//! enabling composition of multiple cancellation signals: +//! +//! ``` +//! # fn example() { +//! use cancelable::CancellationTokenSource; +//! +//! let first = CancellationTokenSource::new(); +//! let second = CancellationTokenSource::new(); +//! +//! let linked = CancellationTokenSource::linked(&[first.token(), second.token()]); +//! let token = linked.token(); +//! +//! assert!(!token.is_cancelled()); +//! second.cancel(); +//! assert!(token.is_cancelled()); +//! # } +//! ``` +//! +//! # Subscribers +//! +//! Register callbacks that fire exactly once when cancellation occurs: +//! +//! ``` +//! # fn example() { +//! use cancelable::CancellationTokenSource; +//! +//! let source = CancellationTokenSource::new(); +//! source.subscribe(|| println!("Operation canceled")); +//! source.cancel(); +//! # } +//! ``` +//! +//! # Futures +//! +//! The [`WithCancellationExt`] trait adds a +//! [`with_cancellation`](WithCancellationExt::with_cancellation) combinator +//! to any [`Future`], pairing it with a [`CancellationToken`] so that each +//! poll checks for cancellation before and after driving the inner future. +//! +//! ``` +//! # async fn example() -> Result<(), ohno::AppError> { +//! use cancelable::{CancellationTokenSource, WithCancellationExt}; +//! +//! let source = CancellationTokenSource::new(); +//! let token = source.token(); +//! +//! let result = async { 42 }.with_cancellation(token).await?; +//! assert_eq!(result, 42); +//! # Ok(()) +//! # } +//! ``` + +pub mod future; +pub use future::WithCancellationExt; + +mod token; +pub use token::{CancellationToken, CancellationTokenSource}; diff --git a/crates/cancelable/src/token.rs b/crates/cancelable/src/token.rs index 0a1a85cd5..00c5eeb14 100644 --- a/crates/cancelable/src/token.rs +++ b/crates/cancelable/src/token.rs @@ -1,54 +1,10 @@ // Copyright (c) Microsoft Corporation. -//! Cooperative cancellation via tokens. -//! -//! This module provides [`CancellationTokenSource`] and [`CancellationToken`], -//! modeled after the equivalent C# types. A source controls cancellation and -//! hands out lightweight, cloneable tokens for observers to check. -//! -//! # Linked sources -//! -//! A linked source cancels when *any* of its parent tokens are canceled, -//! enabling composition of multiple cancellation signals: -//! -//! ``` -//! # fn example() { -//! use oxidizer_sync::CancellationTokenSource; -//! -//! let parent_a = CancellationTokenSource::new(); -//! let parent_b = CancellationTokenSource::new(); -//! -//! let linked = CancellationTokenSource::linked(&[ -//! parent_a.token(), -//! parent_b.token(), -//! ]); -//! let token = linked.token(); -//! -//! assert!(!token.is_cancelled()); -//! parent_a.cancel(); -//! assert!(token.is_cancelled()); -//! # } -//! ``` -//! -//! # Subscribers -//! -//! Register callbacks that fire exactly once when cancellation occurs: -//! -//! ``` -//! # fn example() { -//! use oxidizer_sync::CancellationTokenSource; -//! -//! let source = CancellationTokenSource::new(); -//! source.subscribe(|| println!("Operation canceled")); -//! source.cancel(); -//! # } -//! ``` - use std::any::type_name; use std::fmt; use std::fmt::Debug; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::{Arc, Weak, Mutex}; +use std::sync::{Arc, Mutex, Weak}; type Subscriber = Box; @@ -88,14 +44,15 @@ impl Inner { } /// Signal cancellation and notify subscribers - /// - /// # Panics - /// - /// Panics when the mutex protecting the subscriber list has been poisoned. fn cancel_and_notify(&self) { if self.try_set_cancelled() { - // Lock, take contents, and unlock - let subscribers = self.subscribers.lock().unwrap().take().unwrap_or_default(); + let subscribers = match self.subscribers.lock() { + // Lock, take contents, and unlock + Ok(mut guard) => guard.take().unwrap_or_default(), + + // Lock has been poisoned, which means we can't read the subscriber list. + Err(_) => Vec::default(), + }; // Notify from outside the lock for f in subscribers { @@ -107,25 +64,23 @@ impl Inner { /// Subscribe to the cancellation notification /// /// If cancellation has already occurred, the callback fires immediately. - /// - /// # Panics - /// - /// Panics when the mutex protecting the subscriber list has been poisoned. fn subscribe(&self, callback: Subscriber) { - { - let mut guard = self.subscribers.lock().unwrap(); - - // Subscribers is `Some(...)` which means they haven't notified them yet - // (not canceled, or mid-cancellation). Add to the list. + if let Ok(mut guard) = self.subscribers.lock() { + // Subscribers is `Some(...)` which means we haven't notified them yet + // (e.g. not canceled, and not mid-cancellation). Add to the list. if let Some(list) = guard.as_mut() { list.push(callback); return; } - // Subscribers are `None`, meaning cancellation has already occurred + // Subscribers list was `None`, meaning cancellation has already occurred // and all subscribers have already been notified. // // Fall through to release the lock, then notify immediately. + } else { + // Lock has been poisoned, which means we can't add to the subscriber list. + // + // The token source is unusable. Send the notification immediately. } callback(); @@ -135,11 +90,9 @@ impl Inner { impl Debug for Inner { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { let subscribers = match self.subscribers.try_lock() { - Ok(lock) => { - match &*lock { - Some(s) => format!("Mutex(Some({} subscriber closure(s)))", s.len()), - None => String::from("Mutex(None)"), - } + Ok(lock) => match &*lock { + Some(s) => format!("Mutex(Some({} subscriber closure(s)))", s.len()), + None => String::from("Mutex(None)"), }, Err(_) => String::from("locked subscriber list"), }; @@ -162,7 +115,7 @@ impl Debug for Inner { /// /// ``` /// # fn example() { -/// use oxidizer_sync::CancellationTokenSource; +/// use cancelable::CancellationTokenSource; /// /// let source = CancellationTokenSource::new(); /// let token = source.token(); @@ -178,6 +131,7 @@ pub struct CancellationToken { } impl CancellationToken { + /// Create a new cancellation token #[must_use] pub fn new() -> Self { Self { @@ -213,59 +167,13 @@ impl Default for CancellationToken { } } -/// Controls cancellation for one or more [`CancellationToken`]s. +/// Controls cancellation for one or more [`CancellationToken`]s /// /// Create a source, distribute tokens via [`token()`](Self::token), and call /// [`cancel()`](Self::cancel) when the operation should stop. /// /// Dropping a `CancellationTokenSource` does **not** cancel its tokens. /// Outstanding tokens simply remain in their current state. -/// -/// # Linked sources -/// -/// Use [`linked()`](Self::linked) to create a source that automatically -/// cancels when *any* of the supplied parent tokens are canceled: -/// -/// ``` -/// # fn example() { -/// use oxidizer_sync::CancellationTokenSource; -/// -/// let parent_a = CancellationTokenSource::new(); -/// let parent_b = CancellationTokenSource::new(); -/// -/// let linked = CancellationTokenSource::linked(&[ -/// parent_a.token(), -/// parent_b.token(), -/// ]); -/// let token = linked.token(); -/// -/// assert!(!token.is_cancelled()); -/// parent_a.cancel(); -/// assert!(token.is_cancelled()); -/// # } -/// ``` -/// -/// # Subscribers -/// -/// Register callbacks via [`subscribe()`](Self::subscribe) that fire once when -/// cancellation occurs. If the source is already cancelled, the callback fires -/// immediately. -/// -/// ``` -/// # fn example() { -/// # use std::sync::Arc; -/// # use std::sync::atomic::{AtomicBool, Ordering}; -/// # use oxidizer_sync::CancellationTokenSource; -/// let source = CancellationTokenSource::new(); -/// let notified = Arc::new(AtomicBool::new(false)); -/// -/// let flag = Arc::clone(¬ified); -/// source.subscribe(move || { flag.store(true, Ordering::Relaxed); }); -/// -/// source.cancel(); -/// assert!(notified.load(Ordering::Relaxed)); -/// # } -/// ``` #[derive(Debug, Default)] pub struct CancellationTokenSource { token: CancellationToken, @@ -290,6 +198,7 @@ impl CancellationTokenSource { /// When any parent is canceled, we get a notification (callback), and use it to self-cancel. /// /// [`is_cancelled()`]: CancellationToken::is_cancelled + #[must_use] pub fn linked(parents: &[CancellationToken]) -> Self { let source = Self::new(); @@ -298,13 +207,11 @@ impl CancellationTokenSource { for parent in parents { let weak = Weak::clone(&weak); - parent - .inner - .subscribe(Box::new(move || { - if let Some(inner) = weak.upgrade() { - inner.cancel_and_notify(); - } - })); + parent.inner.subscribe(Box::new(move || { + if let Some(inner) = weak.upgrade() { + inner.cancel_and_notify(); + } + })); } } @@ -344,25 +251,6 @@ impl CancellationTokenSource { /// /// If this source is already canceled, the callback fires immediately /// on the calling thread. - /// - /// # Examples - /// - /// ``` - /// # fn example() { - /// use std::sync::Arc; - /// use std::sync::atomic::{AtomicBool, Ordering}; - /// use oxidizer_sync::CancellationTokenSource; - /// - /// let source = CancellationTokenSource::new(); - /// let notified = Arc::new(AtomicBool::new(false)); - /// - /// let flag = Arc::clone(¬ified); - /// source.subscribe(move || { flag.store(true, Ordering::Relaxed); }); - /// - /// source.cancel(); - /// assert!(notified.load(Ordering::Relaxed)); - /// # } - /// ``` pub fn subscribe(&self, callback: impl FnOnce() + Send + 'static) { self.token.inner.subscribe(Box::new(callback)); } @@ -370,9 +258,10 @@ impl CancellationTokenSource { #[cfg(test)] mod tests { - use super::*; use std::sync::atomic::AtomicUsize; + use super::*; + #[test] fn new_source_is_not_cancelled() { let source = CancellationTokenSource::new(); From d443a1bdc3a219ed471bf2d30389d6648136c388 Mon Sep 17 00:00:00 2001 From: Adam Foxman <5109471+afoxman@users.noreply.github.com> Date: Mon, 1 Jun 2026 18:28:06 -0700 Subject: [PATCH 03/10] fixes --- crates/cancelable/Cargo.toml | 6 ++++++ crates/cancelable/README.md | 4 ++-- crates/cancelable/src/future.rs | 9 +++++---- crates/cancelable/src/lib.rs | 2 +- crates/cancelable/src/token.rs | 1 + 5 files changed, 15 insertions(+), 7 deletions(-) diff --git a/crates/cancelable/Cargo.toml b/crates/cancelable/Cargo.toml index e4cd3ac59..28037c1a9 100644 --- a/crates/cancelable/Cargo.toml +++ b/crates/cancelable/Cargo.toml @@ -16,6 +16,12 @@ license.workspace = true homepage.workspace = true repository = "https://github.com/microsoft/oxidizer/tree/main/crates/cancelable" +[package.metadata.cargo_check_external_types] +allowed_external_types = [ + "ohno::enrichable::Enrichable", + "ohno::error_ext::ErrorExt", +] + [package.metadata.docs.rs] all-features = true diff --git a/crates/cancelable/README.md b/crates/cancelable/README.md index 6a51e9ade..27cff4452 100644 --- a/crates/cancelable/README.md +++ b/crates/cancelable/README.md @@ -53,7 +53,7 @@ source.cancel(); ## Futures The [`WithCancellationExt`][__link2] trait adds a -[`with_cancellation`][__link3] combinator +[`with_cancellation`][__link3] method to any [`Future`][__link4], pairing it with a [`CancellationToken`][__link5] so that each poll checks for cancellation before and after driving the inner future. @@ -73,7 +73,7 @@ assert_eq!(result, 42); This crate was developed as part of The Oxidizer Project. Browse this crate's source code. - [__cargo_doc2readme_dependencies_info]: ggGmYW0CYXZlMC43LjJhdIQbLiTyV0MU86EbZU15e0PmecoboQ9jo59bnAEbyDXw04U13GlhYvRhcoQbwIR-DtpCTPsbyw0-tCMPjdAbd-QY8wrEiWQbEkCmgY5QMxFhZIGCamNhbmNlbGFibGVlMC4xLjA + [__cargo_doc2readme_dependencies_info]: ggGmYW0CYXZlMC43LjJhdIQbLiTyV0MU86EbZU15e0PmecoboQ9jo59bnAEbyDXw04U13GlhYvRhcoQbciObxqSLWWsbfKiwAXNlniAb4klZvw7J_M0bBHKwCELmcj1hZIGCamNhbmNlbGFibGVlMC4xLjA [__link0]: https://docs.rs/cancelable/0.1.0/cancelable/?search=CancellationTokenSource [__link1]: https://docs.rs/cancelable/0.1.0/cancelable/?search=CancellationToken [__link2]: https://docs.rs/cancelable/0.1.0/cancelable/?search=future::WithCancellationExt diff --git a/crates/cancelable/src/future.rs b/crates/cancelable/src/future.rs index 41f482690..f34458c10 100644 --- a/crates/cancelable/src/future.rs +++ b/crates/cancelable/src/future.rs @@ -1,9 +1,10 @@ // Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. -//! Future combinators for cooperative cancellation +//! Future extension for cooperative cancellation //! //! The [`WithCancellationExt`] trait adds a -//! [`with_cancellation`](WithCancellationExt::with_cancellation) combinator +//! [`with_cancellation`](WithCancellationExt::with_cancellation) method //! to any [`Future`], pairing it with a [`CancellationToken`] so that each //! poll checks for cancellation before and after driving the inner future. //! @@ -43,10 +44,10 @@ pub trait WithCancellationExt: Future + Sized { /// /// # Note on wake semantics /// - /// Cancellation is checked cooperatively: the combinator inspects the token + /// Cancellation is checked cooperatively: the extension inspects the token /// each time the inner future is polled. If the inner future is pending /// and nothing else wakes the task, cancellation will not be noticed until - /// the next poll. This mirrors the cooperative model of C#'s + /// the next poll. This mirrors the cooperative model of the `C#` method /// `CancellationToken.ThrowIfCancellationRequested()`. /// /// # Examples diff --git a/crates/cancelable/src/lib.rs b/crates/cancelable/src/lib.rs index 786f7187a..abaa4acda 100644 --- a/crates/cancelable/src/lib.rs +++ b/crates/cancelable/src/lib.rs @@ -49,7 +49,7 @@ //! # Futures //! //! The [`WithCancellationExt`] trait adds a -//! [`with_cancellation`](WithCancellationExt::with_cancellation) combinator +//! [`with_cancellation`](WithCancellationExt::with_cancellation) method //! to any [`Future`], pairing it with a [`CancellationToken`] so that each //! poll checks for cancellation before and after driving the inner future. //! diff --git a/crates/cancelable/src/token.rs b/crates/cancelable/src/token.rs index 00c5eeb14..47bd901fb 100644 --- a/crates/cancelable/src/token.rs +++ b/crates/cancelable/src/token.rs @@ -1,4 +1,5 @@ // Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. use std::any::type_name; use std::fmt; From 5167a0a3788baea1c8100df6c97b98d3a283b472 Mon Sep 17 00:00:00 2001 From: Adam Foxman <5109471+afoxman@users.noreply.github.com> Date: Tue, 2 Jun 2026 13:22:34 -0700 Subject: [PATCH 04/10] PR feedback and fixes 1. Disable miri for tokio tests (not supported) 2. Mutation testing fixes. Tests were hanging, but now time out and fail after 5 secs, giving them plenty of time to stabilize (e.g. not flaky). 3. Rename future extension to CancelableExt with method cancelable() and state struct Cancelable. 4. Document atomic ordering decisions 5. Propagate poisoned lock errors as panics 6. Avoid boxing when using the subscriber list to capture linked token relationships. 7. Unregister linked child token sources from their parents on Drop. 8. Update doc comments --- Cargo.lock | 2 + crates/cancelable/Cargo.toml | 2 + crates/cancelable/README.md | 15 +- crates/cancelable/src/future.rs | 55 +++-- crates/cancelable/src/lib.rs | 11 +- crates/cancelable/src/token.rs | 378 ++++++++++++++++++++++++++++---- 6 files changed, 382 insertions(+), 81 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f5e87b6ca..343c54a2a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -544,8 +544,10 @@ dependencies = [ name = "cancelable" version = "0.1.0" dependencies = [ + "mutants", "ohno 0.3.3", "pin-project", + "tick", "tokio", ] diff --git a/crates/cancelable/Cargo.toml b/crates/cancelable/Cargo.toml index 28037c1a9..50918238e 100644 --- a/crates/cancelable/Cargo.toml +++ b/crates/cancelable/Cargo.toml @@ -30,7 +30,9 @@ ohno = { workspace = true } pin-project = { workspace = true } [dev-dependencies] +mutants = { workspace = true } ohno = { workspace = true, features = ["app-err"] } +tick = { workspace = true, features = ["tokio"] } tokio = { workspace = true, features = ["macros", "rt-multi-thread"] } [lints] diff --git a/crates/cancelable/README.md b/crates/cancelable/README.md index 27cff4452..ba0402c40 100644 --- a/crates/cancelable/README.md +++ b/crates/cancelable/README.md @@ -19,7 +19,7 @@ This module provides [`CancellationTokenSource`][__link0] and [`CancellationToke modeled after the equivalent C# types. A source controls cancellation and hands out lightweight, cloneable tokens for observers to check. -## Linked sources +## Linked Sources A linked source cancels when *any* of its parent tokens are canceled, enabling composition of multiple cancellation signals: @@ -52,18 +52,17 @@ source.cancel(); ## Futures -The [`WithCancellationExt`][__link2] trait adds a -[`with_cancellation`][__link3] method +The [`CancelableExt`][__link2] trait adds a [`cancelable`][__link3] method to any [`Future`][__link4], pairing it with a [`CancellationToken`][__link5] so that each poll checks for cancellation before and after driving the inner future. ```rust -use cancelable::{CancellationTokenSource, WithCancellationExt}; +use cancelable::{CancelableExt, CancellationTokenSource}; let source = CancellationTokenSource::new(); let token = source.token(); -let result = async { 42 }.with_cancellation(token).await?; +let result = async { 42 }.cancelable(token).await?; assert_eq!(result, 42); ``` @@ -73,10 +72,10 @@ assert_eq!(result, 42); This crate was developed as part of The Oxidizer Project. Browse this crate's source code. - [__cargo_doc2readme_dependencies_info]: ggGmYW0CYXZlMC43LjJhdIQbLiTyV0MU86EbZU15e0PmecoboQ9jo59bnAEbyDXw04U13GlhYvRhcoQbciObxqSLWWsbfKiwAXNlniAb4klZvw7J_M0bBHKwCELmcj1hZIGCamNhbmNlbGFibGVlMC4xLjA + [__cargo_doc2readme_dependencies_info]: ggGmYW0CYXZlMC43LjJhdIQbLiTyV0MU86EbZU15e0PmecoboQ9jo59bnAEbyDXw04U13GlhYvRhcoQbE3Iea_zSpkIbvcbCI0vEEEEb7KqsBtUtyHsbFhKo1iYbGRphZIGCamNhbmNlbGFibGVlMC4xLjA [__link0]: https://docs.rs/cancelable/0.1.0/cancelable/?search=CancellationTokenSource [__link1]: https://docs.rs/cancelable/0.1.0/cancelable/?search=CancellationToken - [__link2]: https://docs.rs/cancelable/0.1.0/cancelable/?search=future::WithCancellationExt - [__link3]: https://docs.rs/cancelable/0.1.0/cancelable/?search=future::WithCancellationExt::with_cancellation + [__link2]: https://docs.rs/cancelable/0.1.0/cancelable/?search=future::CancelableExt + [__link3]: https://docs.rs/cancelable/0.1.0/cancelable/?search=future::CancelableExt::cancelable [__link4]: https://doc.rust-lang.org/stable/std/future/trait.Future.html [__link5]: https://docs.rs/cancelable/0.1.0/cancelable/?search=CancellationToken diff --git a/crates/cancelable/src/future.rs b/crates/cancelable/src/future.rs index f34458c10..0a5966750 100644 --- a/crates/cancelable/src/future.rs +++ b/crates/cancelable/src/future.rs @@ -3,19 +3,19 @@ //! Future extension for cooperative cancellation //! -//! The [`WithCancellationExt`] trait adds a -//! [`with_cancellation`](WithCancellationExt::with_cancellation) method +//! The [`CancelableExt`] trait adds a +//! [`cancelable`](CancelableExt::cancelable) method //! to any [`Future`], pairing it with a [`CancellationToken`] so that each //! poll checks for cancellation before and after driving the inner future. //! //! ``` //! # async fn example() -> Result<(), ohno::AppError> { -//! use cancelable::{CancellationTokenSource, WithCancellationExt}; +//! use cancelable::{CancelableExt, CancellationTokenSource}; //! //! let source = CancellationTokenSource::new(); //! let token = source.token(); //! -//! let result = async { 42 }.with_cancellation(token).await?; +//! let result = async { 42 }.cancelable(token).await?; //! assert_eq!(result, 42); //! # Ok(()) //! # } @@ -28,14 +28,13 @@ use pin_project::pin_project; use crate::CancellationToken; -/// Error returned when a [`WithCancellation`] future detects that its -/// associated [`CancellationToken`] has been canceled. +/// Error returned when a future is canceled #[ohno::error] #[display("operation was canceled")] pub struct Canceled {} /// Extension trait that adds cancellation support to any [`Future`]. -pub trait WithCancellationExt: Future + Sized { +pub trait CancelableExt: Future + Sized { /// Wraps this future so that each poll checks the given [`CancellationToken`]: /// /// - If the token is canceled (before *or* after polling the inner @@ -56,10 +55,10 @@ pub trait WithCancellationExt: Future + Sized { /// /// ``` /// # async fn example() { - /// use cancelable::{CancellationTokenSource, WithCancellationExt}; + /// use cancelable::{CancelableExt, CancellationTokenSource}; /// /// let source = CancellationTokenSource::new(); - /// let result = async { "hello" }.with_cancellation(source.token()).await; + /// let result = async { "hello" }.cancelable(source.token()).await; /// assert_eq!(result.unwrap(), "hello"); /// # } /// ``` @@ -68,40 +67,38 @@ pub trait WithCancellationExt: Future + Sized { /// /// ``` /// # async fn example() { - /// use cancelable::{CancellationTokenSource, WithCancellationExt}; + /// use cancelable::{CancelableExt, CancellationTokenSource}; /// /// let source = CancellationTokenSource::new(); /// source.cancel(); /// - /// let result = async { unreachable!() } - /// .with_cancellation(source.token()) - /// .await; + /// let result = async { unreachable!() }.cancelable(source.token()).await; /// assert!(result.unwrap_err().to_string().contains("canceled")); /// # } /// ``` - fn with_cancellation(self, token: CancellationToken) -> WithCancellation; + fn cancelable(self, token: CancellationToken) -> Cancelable; } -impl WithCancellationExt for F { - fn with_cancellation(self, token: CancellationToken) -> WithCancellation { - WithCancellation { inner: self, token } +impl CancelableExt for F { + fn cancelable(self, token: CancellationToken) -> Cancelable { + Cancelable { inner: self, token } } } /// Future returned by -/// [`with_cancellation`](WithCancellationExt::with_cancellation). +/// [`cancelable`](CancelableExt::cancelable). /// /// See the trait method documentation for semantics. #[derive(Debug)] #[pin_project] #[must_use = "futures do nothing unless you `.await` or poll them"] -pub struct WithCancellation { +pub struct Cancelable { #[pin] inner: F, token: CancellationToken, } -impl Future for WithCancellation { +impl Future for Cancelable { type Output = Result; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { @@ -129,27 +126,32 @@ impl Future for WithCancellation { #[cfg(test)] mod tests { + use tick::{Clock, FutureExt}; + use super::*; use crate::CancellationTokenSource; + #[cfg_attr(miri, ignore)] #[tokio::test] async fn completed_future_returns_ok() { let source = CancellationTokenSource::new(); - let result = async { 42 }.with_cancellation(source.token()).await; + let result = async { 42 }.cancelable(source.token()).await; assert_eq!(result.unwrap(), 42); } + #[cfg_attr(miri, ignore)] #[tokio::test] async fn cancelled_future_returns_err() { let source = CancellationTokenSource::new(); source.cancel(); let result = async { unreachable!("should not poll inner future") } - .with_cancellation(source.token()) + .cancelable(source.token()) .await; assert!(result.unwrap_err().to_string().contains("canceled")); } + #[cfg_attr(miri, ignore)] #[tokio::test] async fn cancellation_triggered_by_inner_future_leads_to_cancellation_error() { struct CancelOnPoll(CancellationTokenSource); @@ -161,20 +163,25 @@ mod tests { } } + let clock = Clock::new_tokio(); let source = CancellationTokenSource::new(); let token = source.token(); CancelOnPoll(source) - .with_cancellation(token) + .timeout(&clock, std::time::Duration::from_secs(5)) + .cancelable(token) .await .expect_err("should fail") .to_string() .contains("canceled"); } + #[cfg_attr(miri, ignore)] #[tokio::test] async fn already_cancelled_token() { + let clock = Clock::new_tokio(); async { unreachable!() } - .with_cancellation(CancellationToken::cancelled()) + .timeout(&clock, std::time::Duration::from_secs(5)) + .cancelable(CancellationToken::cancelled()) .await .expect_err("should fail") .to_string() diff --git a/crates/cancelable/src/lib.rs b/crates/cancelable/src/lib.rs index abaa4acda..db8aaea4c 100644 --- a/crates/cancelable/src/lib.rs +++ b/crates/cancelable/src/lib.rs @@ -11,7 +11,7 @@ //! modeled after the equivalent C# types. A source controls cancellation and //! hands out lightweight, cloneable tokens for observers to check. //! -//! # Linked sources +//! # Linked Sources //! //! A linked source cancels when *any* of its parent tokens are canceled, //! enabling composition of multiple cancellation signals: @@ -48,26 +48,25 @@ //! //! # Futures //! -//! The [`WithCancellationExt`] trait adds a -//! [`with_cancellation`](WithCancellationExt::with_cancellation) method +//! The [`CancelableExt`] trait adds a [`cancelable`](CancelableExt::cancelable) method //! to any [`Future`], pairing it with a [`CancellationToken`] so that each //! poll checks for cancellation before and after driving the inner future. //! //! ``` //! # async fn example() -> Result<(), ohno::AppError> { -//! use cancelable::{CancellationTokenSource, WithCancellationExt}; +//! use cancelable::{CancelableExt, CancellationTokenSource}; //! //! let source = CancellationTokenSource::new(); //! let token = source.token(); //! -//! let result = async { 42 }.with_cancellation(token).await?; +//! let result = async { 42 }.cancelable(token).await?; //! assert_eq!(result, 42); //! # Ok(()) //! # } //! ``` pub mod future; -pub use future::WithCancellationExt; +pub use future::CancelableExt; mod token; pub use token::{CancellationToken, CancellationTokenSource}; diff --git a/crates/cancelable/src/token.rs b/crates/cancelable/src/token.rs index 47bd901fb..0a4d0bec7 100644 --- a/crates/cancelable/src/token.rs +++ b/crates/cancelable/src/token.rs @@ -7,7 +7,33 @@ use std::fmt::Debug; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Mutex, Weak}; -type Subscriber = Box; +enum Subscriber { + /// An external callback for arbitrary subscriber logic. + External(Box), + /// A weak reference to a linked child's shared state, avoiding a heap + /// allocation for the common parent/child propagation path. + Linked(Weak), +} + +impl Subscriber { + fn notify(self) { + match self { + Self::External(f) => f(), + Self::Linked(weak) => { + if let Some(inner) = weak.upgrade() { + inner.cancel_and_notify(); + } + } + } + } + + fn matches_linked(&self, target: &Weak) -> bool { + match self { + Self::External(_) => false, + Self::Linked(inner) => inner.ptr_eq(target), + } + } +} /// Shared state backing one or more [`CancellationToken`] handles. struct Inner { @@ -32,6 +58,9 @@ impl Inner { /// Returns `true` if cancellation has been requested. #[must_use] fn is_cancelled(&self) -> bool { + // Use acquire/release ordering to ensure cancelable reads occur before + // draining the subscriber list. May not be strictly necessary since + // the subscriber list is protected by a lock instead of atomic. self.canceled.load(Ordering::Acquire) } @@ -39,52 +68,88 @@ impl Inner { /// /// Returns `true` on success, signaling that the caller is responsible for notifying subscribers. fn try_set_cancelled(&self) -> bool { + // Use acquire/release ordering to ensure cancelable reads/updates occur + // before draining the subscriber list. May not be strictly necessary since + // the subscriber list is protected by a lock instead of atomic. self.canceled .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire) .is_ok() } /// Signal cancellation and notify subscribers + /// + /// # Panics + /// + /// Panics when the lock protecting the subscriber list is poisoned. This + /// happens when another thread, which had been holding the lock, panicked. fn cancel_and_notify(&self) { - if self.try_set_cancelled() { - let subscribers = match self.subscribers.lock() { - // Lock, take contents, and unlock - Ok(mut guard) => guard.take().unwrap_or_default(), - - // Lock has been poisoned, which means we can't read the subscriber list. - Err(_) => Vec::default(), - }; - - // Notify from outside the lock - for f in subscribers { - f(); - } + if !self.try_set_cancelled() { + // Already canceled by someone else. They will notify. + return; + } + + // Lock, take subscribers, and unlock + let subscribers = self + .subscribers + .lock() + .expect("subscriber lock is poisoned") + .take() + .unwrap_or_default(); + + // Notify from outside the lock + for subscriber in subscribers { + subscriber.notify(); } } /// Subscribe to the cancellation notification /// /// If cancellation has already occurred, the callback fires immediately. + /// + /// # Panics + /// + /// Panics when the lock protecting the subscriber list is poisoned. This + /// happens when another thread, which had been holding the lock, panicked. fn subscribe(&self, callback: Subscriber) { - if let Ok(mut guard) = self.subscribers.lock() { - // Subscribers is `Some(...)` which means we haven't notified them yet - // (e.g. not canceled, and not mid-cancellation). Add to the list. - if let Some(list) = guard.as_mut() { - list.push(callback); - return; - } + let mut guard = self.subscribers.lock().expect("subscriber lock is poisoned"); - // Subscribers list was `None`, meaning cancellation has already occurred - // and all subscribers have already been notified. - // - // Fall through to release the lock, then notify immediately. - } else { - // Lock has been poisoned, which means we can't add to the subscriber list. - // - // The token source is unusable. Send the notification immediately. + // Subscribers is `Some(...)` which means we haven't notified them yet + // (e.g. not canceled, and not mid-cancellation). Add to the list. + if let Some(list) = guard.as_mut() { + list.push(callback); + return; } - callback(); + // Subscribers list was `None`, meaning cancellation has already occurred + // and all subscribers have already been notified. + // + // Fall through to release the lock, then notify immediately. + + callback.notify(); + } + + /// Remove the linked child token from the list of subscribers. + /// + /// This is a no-op if cancellation has already occurred (the list is `None`). + /// + /// # Panics + /// + /// Panics when the lock protecting the subscriber list is poisoned. This + /// happens when another thread, which had been holding the lock, panicked. + #[cfg_attr(test, mutants::skip)] // Mutation breaks list iteration, causing tests to run forever. + fn unsubscribe_linked_child(&self, child: &Weak) { + let mut guard = self.subscribers.lock().expect("subscriber lock is poisoned"); + + if let Some(list) = guard.as_mut() { + let mut i = 0; + while i < list.len() { + if list[i].matches_linked(child) { + list.swap_remove(i); + } else { + i += 1; + } + } + } } } @@ -105,8 +170,7 @@ impl Debug for Inner { } } -/// A lightweight, cloneable handle for observing whether cancellation has been -/// requested. +/// A lightweight, cloneable handle for observing a cancellation signal. /// /// Tokens are obtained from a [`CancellationTokenSource`] via /// [`token()`](CancellationTokenSource::token) and can be passed throughout a @@ -168,16 +232,27 @@ impl Default for CancellationToken { } } -/// Controls cancellation for one or more [`CancellationToken`]s +/// Controls cancellation, providing a shared [`CancellationToken`]. /// /// Create a source, distribute tokens via [`token()`](Self::token), and call -/// [`cancel()`](Self::cancel) when the operation should stop. +/// [`cancel()`](Self::cancel) to signal when the operation should stop. /// -/// Dropping a `CancellationTokenSource` does **not** cancel its tokens. +/// # Drop Behavior +/// +/// Dropping a [`CancellationTokenSource`] does **not** cancel its tokens. /// Outstanding tokens simply remain in their current state. +/// +/// # Linked Parents +/// +/// When a source is [`linked()`](CancellationTokenSource::linked) to a set of +/// parents, it registers to receive notifications from each parent. When the +/// source is later dropped, it unregistered from each of the parents. This +/// ensures that long-lived parents only track and notify active children. #[derive(Debug, Default)] pub struct CancellationTokenSource { token: CancellationToken, + /// Parents to which this source is linked + parent_refs: Vec>, } impl CancellationTokenSource { @@ -186,6 +261,7 @@ impl CancellationTokenSource { pub fn new() -> Self { Self { token: CancellationToken::new(), + parent_refs: Vec::new(), } } @@ -198,21 +274,21 @@ impl CancellationTokenSource { /// Linked sources work by registering a subscriber on each parent token. /// When any parent is canceled, we get a notification (callback), and use it to self-cancel. /// + /// On drop, this source unregisters itself from each parent. + /// /// [`is_cancelled()`]: CancellationToken::is_cancelled #[must_use] pub fn linked(parents: &[CancellationToken]) -> Self { - let source = Self::new(); + let source = Self { + token: CancellationToken::new(), + parent_refs: parents.iter().map(CancellationToken::weak_ref).collect(), + }; if !parents.is_empty() { let weak = source.token.weak_ref(); for parent in parents { - let weak = Weak::clone(&weak); - parent.inner.subscribe(Box::new(move || { - if let Some(inner) = weak.upgrade() { - inner.cancel_and_notify(); - } - })); + parent.inner.subscribe(Subscriber::Linked(Weak::clone(&weak))); } } @@ -252,8 +328,25 @@ impl CancellationTokenSource { /// /// If this source is already canceled, the callback fires immediately /// on the calling thread. + /// + /// This callback cannot be unregistered. pub fn subscribe(&self, callback: impl FnOnce() + Send + 'static) { - self.token.inner.subscribe(Box::new(callback)); + self.token.inner.subscribe(Subscriber::External(Box::new(callback))); + } +} + +impl Drop for CancellationTokenSource { + fn drop(&mut self) { + if self.parent_refs.is_empty() { + return; + } + + let weak = self.token.weak_ref(); + for parent_ref in &self.parent_refs { + if let Some(inner) = parent_ref.upgrade() { + inner.unsubscribe_linked_child(&weak); + } + } } } @@ -419,7 +512,9 @@ mod tests { let token = source.token(); let handle = std::thread::spawn(move || { + let deadline = std::time::Instant::now() + std::time::Duration::from_secs(5); while !token.is_cancelled() { + assert!(std::time::Instant::now() < deadline, "spin loop timed out after 5 seconds"); std::hint::spin_loop(); } true @@ -436,7 +531,9 @@ mod tests { let linked_token = linked.token(); let handle = std::thread::spawn(move || { + let deadline = std::time::Instant::now() + std::time::Duration::from_secs(5); while !linked_token.is_cancelled() { + assert!(std::time::Instant::now() < deadline, "spin loop timed out after 5 seconds"); std::hint::spin_loop(); } true @@ -535,4 +632,199 @@ mod tests { root.cancel(); assert_eq!(counter.load(Ordering::Relaxed), 1); } + + /// Returns the number of subscribers currently registered, or `None` if + /// cancellation has already drained the list. + fn subscriber_count(inner: &Inner) -> Option { + inner + .subscribers + .lock() + .expect("subscriber lock is poisoned") + .as_ref() + .map(Vec::len) + } + + #[test] + fn unsubscribe_linked_child_removes_matching_entry() { + let parent = Inner::new(false); + let child = Arc::new(Inner::new(false)); + let weak = Arc::downgrade(&child); + + parent.subscribe(Subscriber::Linked(Weak::clone(&weak))); + assert_eq!(subscriber_count(&parent), Some(1)); + + parent.unsubscribe_linked_child(&weak); + assert_eq!(subscriber_count(&parent), Some(0)); + } + + #[test] + fn unsubscribe_linked_child_removes_all_matching_entries() { + let parent = Inner::new(false); + let child = Arc::new(Inner::new(false)); + let weak = Arc::downgrade(&child); + + parent.subscribe(Subscriber::Linked(Weak::clone(&weak))); + parent.subscribe(Subscriber::Linked(Weak::clone(&weak))); + assert_eq!(subscriber_count(&parent), Some(2)); + + parent.unsubscribe_linked_child(&weak); + assert_eq!(subscriber_count(&parent), Some(0)); + } + + #[test] + fn unsubscribe_linked_child_leaves_other_linked_subscribers() { + let parent = Inner::new(false); + let child = Arc::new(Inner::new(false)); + let child_other = Arc::new(Inner::new(false)); + let weak = Arc::downgrade(&child); + let weak_other = Arc::downgrade(&child_other); + + parent.subscribe(Subscriber::Linked(Weak::clone(&weak))); + parent.subscribe(Subscriber::Linked(Weak::clone(&weak_other))); + assert_eq!(subscriber_count(&parent), Some(2)); + + parent.unsubscribe_linked_child(&weak); + assert_eq!(subscriber_count(&parent), Some(1)); + + // Cancelling the parent should still propagate to child_other + parent.cancel_and_notify(); + assert!(child_other.is_cancelled()); + assert!(!child.is_cancelled()); + } + + #[test] + fn unsubscribe_linked_child_leaves_external_subscribers() { + let parent = Inner::new(false); + let child = Arc::new(Inner::new(false)); + let weak = Arc::downgrade(&child); + let counter = Arc::new(AtomicUsize::new(0)); + + let c = Arc::clone(&counter); + parent.subscribe(Subscriber::External(Box::new(move || { + c.fetch_add(1, Ordering::Relaxed); + }))); + parent.subscribe(Subscriber::Linked(Weak::clone(&weak))); + assert_eq!(subscriber_count(&parent), Some(2)); + + parent.unsubscribe_linked_child(&weak); + assert_eq!(subscriber_count(&parent), Some(1)); + + parent.cancel_and_notify(); + assert_eq!(counter.load(Ordering::Relaxed), 1, "external subscriber should still fire"); + assert!(!child.is_cancelled(), "unsubscribed child should not be cancelled"); + } + + #[test] + fn unsubscribe_linked_child_is_noop_when_already_cancelled() { + let parent = Inner::new(false); + let child = Arc::new(Inner::new(false)); + let weak = Arc::downgrade(&child); + + parent.subscribe(Subscriber::Linked(Weak::clone(&weak))); + parent.cancel_and_notify(); + assert_eq!(subscriber_count(&parent), None); + + // Should not panic or have any effect + parent.unsubscribe_linked_child(&weak); + assert_eq!(subscriber_count(&parent), None); + } + + #[test] + fn unsubscribe_linked_child_is_noop_when_no_match() { + let parent = Inner::new(false); + let child_a = Arc::new(Inner::new(false)); + let child_b = Arc::new(Inner::new(false)); + let weak_a = Arc::downgrade(&child_a); + let weak_b = Arc::downgrade(&child_b); + + parent.subscribe(Subscriber::Linked(Weak::clone(&weak_a))); + assert_eq!(subscriber_count(&parent), Some(1)); + + parent.unsubscribe_linked_child(&weak_b); + assert_eq!(subscriber_count(&parent), Some(1)); + } + + #[test] + fn drop_linked_source_unregisters_from_parent() { + let parent = CancellationTokenSource::new(); + + { + let _linked = CancellationTokenSource::linked(&[parent.token()]); + assert_eq!(subscriber_count(&parent.token.inner), Some(1)); + // _linked dropped here + } + + assert_eq!(subscriber_count(&parent.token.inner), Some(0)); + } + + #[test] + fn drop_linked_source_unregisters_from_all_parents() { + let p1 = CancellationTokenSource::new(); + let p2 = CancellationTokenSource::new(); + + { + let _linked = CancellationTokenSource::linked(&[p1.token(), p2.token()]); + assert_eq!(subscriber_count(&p1.token.inner), Some(1)); + assert_eq!(subscriber_count(&p2.token.inner), Some(1)); + } + + assert_eq!(subscriber_count(&p1.token.inner), Some(0)); + assert_eq!(subscriber_count(&p2.token.inner), Some(0)); + } + + #[test] + fn drop_linked_source_leaves_sibling_subscriptions() { + let parent = CancellationTokenSource::new(); + let sibling = CancellationTokenSource::linked(&[parent.token()]); + + { + let _linked = CancellationTokenSource::linked(&[parent.token()]); + assert_eq!(subscriber_count(&parent.token.inner), Some(2)); + } + + assert_eq!(subscriber_count(&parent.token.inner), Some(1)); + + // Sibling should still receive cancellation + parent.cancel(); + assert!(sibling.is_cancelled()); + } + + #[test] + fn drop_linked_source_leaves_external_subscriptions() { + let parent = CancellationTokenSource::new(); + let counter = Arc::new(AtomicUsize::new(0)); + + let c = Arc::clone(&counter); + parent.subscribe(move || { + c.fetch_add(1, Ordering::Relaxed); + }); + + { + let _linked = CancellationTokenSource::linked(&[parent.token()]); + assert_eq!(subscriber_count(&parent.token.inner), Some(2)); + } + + assert_eq!(subscriber_count(&parent.token.inner), Some(1)); + + parent.cancel(); + assert_eq!(counter.load(Ordering::Relaxed), 1); + } + + #[test] + fn drop_independent_source_does_not_panic() { + let _source = CancellationTokenSource::new(); + // No parents — drop should be a no-op without panicking + } + + #[test] + fn drop_linked_source_after_parent_cancelled_does_not_panic() { + let parent = CancellationTokenSource::new(); + let linked = CancellationTokenSource::linked(&[parent.token()]); + + parent.cancel(); + assert!(linked.is_cancelled()); + + // Subscriber list is already None; drop should not panic + drop(linked); + } } From 5f5f5248b28601677fe95b65074be491a54c769d Mon Sep 17 00:00:00 2001 From: Adam Foxman <5109471+afoxman@users.noreply.github.com> Date: Tue, 2 Jun 2026 18:36:59 -0700 Subject: [PATCH 05/10] Copilot feedback, fix lockfile --- Cargo.lock | 2 +- crates/cancelable/src/future.rs | 12 ++++++------ crates/cancelable/src/token.rs | 4 ++-- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 54f3872de..7a2ba4ba2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -545,7 +545,7 @@ name = "cancelable" version = "0.1.0" dependencies = [ "mutants", - "ohno 0.3.3", + "ohno 0.3.4", "pin-project", "tick", "tokio", diff --git a/crates/cancelable/src/future.rs b/crates/cancelable/src/future.rs index 0a5966750..a99e9ecd1 100644 --- a/crates/cancelable/src/future.rs +++ b/crates/cancelable/src/future.rs @@ -166,25 +166,25 @@ mod tests { let clock = Clock::new_tokio(); let source = CancellationTokenSource::new(); let token = source.token(); - CancelOnPoll(source) + let message = CancelOnPoll(source) .timeout(&clock, std::time::Duration::from_secs(5)) .cancelable(token) .await .expect_err("should fail") - .to_string() - .contains("canceled"); + .to_string(); + assert!(message.contains("canceled")); } #[cfg_attr(miri, ignore)] #[tokio::test] async fn already_cancelled_token() { let clock = Clock::new_tokio(); - async { unreachable!() } + let message = async { unreachable!() } .timeout(&clock, std::time::Duration::from_secs(5)) .cancelable(CancellationToken::cancelled()) .await .expect_err("should fail") - .to_string() - .contains("canceled"); + .to_string(); + assert!(message.contains("canceled")); } } diff --git a/crates/cancelable/src/token.rs b/crates/cancelable/src/token.rs index 0a4d0bec7..1342abafb 100644 --- a/crates/cancelable/src/token.rs +++ b/crates/cancelable/src/token.rs @@ -123,8 +123,8 @@ impl Inner { // Subscribers list was `None`, meaning cancellation has already occurred // and all subscribers have already been notified. // - // Fall through to release the lock, then notify immediately. - + // Release the lock, then notify immediately. + drop(guard); callback.notify(); } From ee8b26aca23978302e3fb076692c0c85d124bcb4 Mon Sep 17 00:00:00 2001 From: Adam Foxman <5109471+afoxman@users.noreply.github.com> Date: Wed, 3 Jun 2026 12:09:31 -0700 Subject: [PATCH 06/10] pr feedback, test coverage --- crates/cancelable/src/future.rs | 14 ++++ crates/cancelable/src/token.rs | 117 +++++++++++++++++++++----------- 2 files changed, 92 insertions(+), 39 deletions(-) diff --git a/crates/cancelable/src/future.rs b/crates/cancelable/src/future.rs index a99e9ecd1..336079870 100644 --- a/crates/cancelable/src/future.rs +++ b/crates/cancelable/src/future.rs @@ -126,11 +126,25 @@ impl Future for Cancelable { #[cfg(test)] mod tests { + use std::time::Duration; + use tick::{Clock, FutureExt}; use super::*; use crate::CancellationTokenSource; + #[cfg_attr(miri, ignore)] + #[tokio::test] + async fn future_returns_ok() { + let clock = Clock::new_tokio(); + let source = CancellationTokenSource::new(); + clock + .delay(Duration::from_millis(100)) + .cancelable(source.token()) + .await + .expect("should succeed without being canceled"); + } + #[cfg_attr(miri, ignore)] #[tokio::test] async fn completed_future_returns_ok() { diff --git a/crates/cancelable/src/token.rs b/crates/cancelable/src/token.rs index 1342abafb..88e07638e 100644 --- a/crates/cancelable/src/token.rs +++ b/crates/cancelable/src/token.rs @@ -77,11 +77,6 @@ impl Inner { } /// Signal cancellation and notify subscribers - /// - /// # Panics - /// - /// Panics when the lock protecting the subscriber list is poisoned. This - /// happens when another thread, which had been holding the lock, panicked. fn cancel_and_notify(&self) { if !self.try_set_cancelled() { // Already canceled by someone else. They will notify. @@ -92,7 +87,7 @@ impl Inner { let subscribers = self .subscribers .lock() - .expect("subscriber lock is poisoned") + .expect("subscriber lock should not be poisoned because lock is never held over fallible or unsafe calls") .take() .unwrap_or_default(); @@ -105,13 +100,11 @@ impl Inner { /// Subscribe to the cancellation notification /// /// If cancellation has already occurred, the callback fires immediately. - /// - /// # Panics - /// - /// Panics when the lock protecting the subscriber list is poisoned. This - /// happens when another thread, which had been holding the lock, panicked. fn subscribe(&self, callback: Subscriber) { - let mut guard = self.subscribers.lock().expect("subscriber lock is poisoned"); + let mut guard = self + .subscribers + .lock() + .expect("subscriber lock should not be poisoned because lock is never held over fallible or unsafe calls"); // Subscribers is `Some(...)` which means we haven't notified them yet // (e.g. not canceled, and not mid-cancellation). Add to the list. @@ -353,6 +346,7 @@ impl Drop for CancellationTokenSource { #[cfg(test)] mod tests { use std::sync::atomic::AtomicUsize; + use std::thread::JoinHandle; use super::*; @@ -490,57 +484,53 @@ mod tests { #[test] fn dropped_linked_source_does_not_notify_on_parent_cancel() { let parent = CancellationTokenSource::new(); - let counter = Arc::new(AtomicUsize::new(0)); { let linked = CancellationTokenSource::linked(&[parent.token()]); - let c = Arc::clone(&counter); - linked.subscribe(move || { - c.fetch_add(1, Ordering::Relaxed); - }); + linked.subscribe(|| panic!("should not be called")); // linked dropped here } parent.cancel(); - assert_eq!(counter.load(Ordering::Relaxed), 0); } - #[test] - fn cancel_visible_across_threads() { - let source = CancellationTokenSource::new(); - let token = source.token(); + fn start_polling_thead_for_is_cancelled(token: CancellationToken) -> JoinHandle<()> { + let counter = Arc::new(AtomicUsize::new(0)); - let handle = std::thread::spawn(move || { - let deadline = std::time::Instant::now() + std::time::Duration::from_secs(5); + let thread_counter = Arc::clone(&counter); + let thread_handle = std::thread::spawn(move || { while !token.is_cancelled() { - assert!(std::time::Instant::now() < deadline, "spin loop timed out after 5 seconds"); + thread_counter.fetch_add(1, Ordering::Relaxed); std::hint::spin_loop(); } - true }); + // wait for the thread to start running + let deadline = std::time::Instant::now() + std::time::Duration::from_secs(5); + while counter.load(Ordering::Relaxed) == 0 { + assert!(std::time::Instant::now() < deadline, "thread did not start running"); + std::hint::spin_loop(); + } + + thread_handle + } + + #[test] + fn cancel_visible_across_threads() { + let source = CancellationTokenSource::new(); + let handle = start_polling_thead_for_is_cancelled(source.token()); source.cancel(); - assert!(handle.join().unwrap()); + handle.join().expect("thread should complete successfully"); } #[test] fn linked_cancellation_is_visible_across_threads() { let parent = CancellationTokenSource::new(); let linked = CancellationTokenSource::linked(&[parent.token()]); - let linked_token = linked.token(); - - let handle = std::thread::spawn(move || { - let deadline = std::time::Instant::now() + std::time::Duration::from_secs(5); - while !linked_token.is_cancelled() { - assert!(std::time::Instant::now() < deadline, "spin loop timed out after 5 seconds"); - std::hint::spin_loop(); - } - true - }); - + let handle = start_polling_thead_for_is_cancelled(linked.token()); parent.cancel(); - assert!(handle.join().unwrap()); + handle.join().expect("thread should complete successfully"); } #[test] @@ -827,4 +817,53 @@ mod tests { // Subscriber list is already None; drop should not panic drop(linked); } + + #[test] + fn linked_with_no_parents_behaves_like_independent_source() { + let source = CancellationTokenSource::linked(&[]); + assert!(!source.is_cancelled()); + + source.cancel(); + assert!(source.is_cancelled()); + } + + #[test] + fn debug_inner_not_cancelled_no_subscribers() { + let source = CancellationTokenSource::new(); + let debug = format!("{source:?}"); + assert!(debug.contains("canceled: false"), "expected canceled: false, got: {debug}"); + assert!(debug.contains("0 subscriber closure(s)"), "expected 0 subscribers, got: {debug}"); + } + + #[test] + fn debug_inner_not_cancelled_with_subscribers() { + let source = CancellationTokenSource::new(); + source.subscribe(|| {}); + source.subscribe(|| {}); + let debug = format!("{source:?}"); + assert!(debug.contains("canceled: false"), "expected canceled: false, got: {debug}"); + assert!(debug.contains("2 subscriber closure(s)"), "expected 2 subscribers, got: {debug}"); + } + + #[test] + fn debug_inner_cancelled() { + let source = CancellationTokenSource::new(); + source.cancel(); + let debug = format!("{source:?}"); + assert!(debug.contains("canceled: true"), "expected canceled: true, got: {debug}"); + assert!(debug.contains("Mutex(None)"), "expected Mutex(None), got: {debug}"); + } + + #[test] + fn debug_inner_while_mutex_locked() { + let source = CancellationTokenSource::new(); + // Hold the subscriber lock from the current thread so try_lock fails + // during Debug formatting. + let _guard = source.token.inner.subscribers.lock().unwrap(); + let debug = format!("{source:?}"); + assert!( + debug.contains("locked subscriber list"), + "expected locked subscriber list, got: {debug}" + ); + } } From 932d4f576324ce270a67554fb84fba5503aa45ed Mon Sep 17 00:00:00 2001 From: Adam Foxman <5109471+afoxman@users.noreply.github.com> Date: Wed, 3 Jun 2026 12:24:30 -0700 Subject: [PATCH 07/10] fix comment --- crates/cancelable/src/token.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/cancelable/src/token.rs b/crates/cancelable/src/token.rs index 88e07638e..e1f2a3520 100644 --- a/crates/cancelable/src/token.rs +++ b/crates/cancelable/src/token.rs @@ -239,7 +239,7 @@ impl Default for CancellationToken { /// /// When a source is [`linked()`](CancellationTokenSource::linked) to a set of /// parents, it registers to receive notifications from each parent. When the -/// source is later dropped, it unregistered from each of the parents. This +/// source is later dropped, it unregisters from each of the parents. This /// ensures that long-lived parents only track and notify active children. #[derive(Debug, Default)] pub struct CancellationTokenSource { From c14166f814d0beab0850af6c59332ab7aa812c18 Mon Sep 17 00:00:00 2001 From: Adam Foxman <5109471+afoxman@users.noreply.github.com> Date: Wed, 3 Jun 2026 16:30:11 -0700 Subject: [PATCH 08/10] fix test --- crates/cancelable/src/token.rs | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/crates/cancelable/src/token.rs b/crates/cancelable/src/token.rs index e1f2a3520..a7bdd739a 100644 --- a/crates/cancelable/src/token.rs +++ b/crates/cancelable/src/token.rs @@ -495,21 +495,22 @@ mod tests { parent.cancel(); } - fn start_polling_thead_for_is_cancelled(token: CancellationToken) -> JoinHandle<()> { + fn start_cancellation_polling_thead(token: CancellationToken) -> JoinHandle<()> { + let deadline = std::time::Instant::now() + std::time::Duration::from_secs(5); let counter = Arc::new(AtomicUsize::new(0)); let thread_counter = Arc::clone(&counter); let thread_handle = std::thread::spawn(move || { while !token.is_cancelled() { thread_counter.fetch_add(1, Ordering::Relaxed); + assert!(std::time::Instant::now() < deadline, "thread did not finish in time"); std::hint::spin_loop(); } }); // wait for the thread to start running - let deadline = std::time::Instant::now() + std::time::Duration::from_secs(5); while counter.load(Ordering::Relaxed) == 0 { - assert!(std::time::Instant::now() < deadline, "thread did not start running"); + assert!(std::time::Instant::now() < deadline, "thread did not start in time"); std::hint::spin_loop(); } @@ -519,7 +520,7 @@ mod tests { #[test] fn cancel_visible_across_threads() { let source = CancellationTokenSource::new(); - let handle = start_polling_thead_for_is_cancelled(source.token()); + let handle = start_cancellation_polling_thead(source.token()); source.cancel(); handle.join().expect("thread should complete successfully"); } @@ -528,7 +529,7 @@ mod tests { fn linked_cancellation_is_visible_across_threads() { let parent = CancellationTokenSource::new(); let linked = CancellationTokenSource::linked(&[parent.token()]); - let handle = start_polling_thead_for_is_cancelled(linked.token()); + let handle = start_cancellation_polling_thead(linked.token()); parent.cancel(); handle.join().expect("thread should complete successfully"); } From a5a5230e603b47b6fca34f3c02ff12f97a9992fc Mon Sep 17 00:00:00 2001 From: Adam Foxman <5109471+afoxman@users.noreply.github.com> Date: Wed, 3 Jun 2026 16:41:56 -0700 Subject: [PATCH 09/10] spelling --- crates/cancelable/src/token.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/crates/cancelable/src/token.rs b/crates/cancelable/src/token.rs index a7bdd739a..8fd1961ee 100644 --- a/crates/cancelable/src/token.rs +++ b/crates/cancelable/src/token.rs @@ -495,7 +495,7 @@ mod tests { parent.cancel(); } - fn start_cancellation_polling_thead(token: CancellationToken) -> JoinHandle<()> { + fn start_cancellation_polling_thread(token: CancellationToken) -> JoinHandle<()> { let deadline = std::time::Instant::now() + std::time::Duration::from_secs(5); let counter = Arc::new(AtomicUsize::new(0)); @@ -520,7 +520,7 @@ mod tests { #[test] fn cancel_visible_across_threads() { let source = CancellationTokenSource::new(); - let handle = start_cancellation_polling_thead(source.token()); + let handle = start_cancellation_polling_thread(source.token()); source.cancel(); handle.join().expect("thread should complete successfully"); } @@ -529,7 +529,7 @@ mod tests { fn linked_cancellation_is_visible_across_threads() { let parent = CancellationTokenSource::new(); let linked = CancellationTokenSource::linked(&[parent.token()]); - let handle = start_cancellation_polling_thead(linked.token()); + let handle = start_cancellation_polling_thread(linked.token()); parent.cancel(); handle.join().expect("thread should complete successfully"); } From ff067d44db3ee71b4a85b45bb56986c10cbf7772 Mon Sep 17 00:00:00 2001 From: Adam Foxman <5109471+afoxman@users.noreply.github.com> Date: Fri, 5 Jun 2026 10:29:02 -0700 Subject: [PATCH 10/10] lockfile --- Cargo.lock | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index 8890659e8..695be4bd4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -573,7 +573,7 @@ name = "cancelable" version = "0.1.0" dependencies = [ "mutants", - "ohno 0.3.4", + "ohno 0.3.5", "pin-project", "tick", "tokio",