diff --git a/CHANGELOG.md b/CHANGELOG.md
index b6631cdff..b43cbfbd1 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -9,6 +9,7 @@ Please see each crate's change log below:
- [`cachet_memory`](./crates/cachet_memory/CHANGELOG.md)
- [`cachet_service`](./crates/cachet_service/CHANGELOG.md)
- [`cachet_tier`](./crates/cachet_tier/CHANGELOG.md)
+- [`cancelable`](./crates/cancelable/CHANGELOG.md)
- [`data_privacy`](./crates/data_privacy/CHANGELOG.md)
- [`data_privacy_macros`](./crates/data_privacy_macros/CHANGELOG.md)
- [`data_privacy_macros_impl`](./crates/data_privacy_macros_impl/CHANGELOG.md)
diff --git a/Cargo.lock b/Cargo.lock
index c1258435f..695be4bd4 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -568,6 +568,17 @@ dependencies = [
"tokio",
]
+[[package]]
+name = "cancelable"
+version = "0.1.0"
+dependencies = [
+ "mutants",
+ "ohno 0.3.5",
+ "pin-project",
+ "tick",
+ "tokio",
+]
+
[[package]]
name = "cast"
version = "0.3.0"
diff --git a/Cargo.toml b/Cargo.toml
index 7300a8cd3..68862830b 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -30,6 +30,7 @@ cachet = { path = "crates/cachet", default-features = false, version = "0.6.4" }
cachet_memory = { path = "crates/cachet_memory", default-features = false, version = "0.3.3" }
cachet_service = { path = "crates/cachet_service", default-features = false, version = "0.2.3" }
cachet_tier = { path = "crates/cachet_tier", default-features = false, version = "0.2.2" }
+cancelable = { path = "crates/cancelable", default-features = false, version = "0.1.0" }
data_privacy = { path = "crates/data_privacy", default-features = false, version = "0.12.0" }
data_privacy_core = { path = "crates/data_privacy_core", default-features = false, version = "0.1.0" }
data_privacy_macros = { path = "crates/data_privacy_macros", default-features = false, version = "0.10.0" }
diff --git a/README.md b/README.md
index c6ea0d5d9..51c0d11b8 100644
--- a/README.md
+++ b/README.md
@@ -33,6 +33,7 @@ These are the primary crates built out of this repo:
- [`cachet_memory`](./crates/cachet_memory/README.md) - In-memory cache tier backed by Moka for the cachet caching library.
- [`cachet_service`](./crates/cachet_service/README.md) - Layered service integration for the cachet caching library.
- [`cachet_tier`](./crates/cachet_tier/README.md) - Core cache tier trait and abstractions for building cache backends.
+- [`cancelable`](./crates/cancelable/README.md) - Cooperative cancellation via tokens
- [`data_privacy`](./crates/data_privacy/README.md) - Mechanisms to classify, manipulate, and redact sensitive data.
- [`fetch`](./crates/fetch/README.md) - "Universal, composable and resilient HTTP client."
- [`fetch_hyper`](./crates/fetch_hyper/README.md) - Hyper-based HTTP transport utilities for fetch.
diff --git a/crates/cancelable/CHANGELOG.md b/crates/cancelable/CHANGELOG.md
new file mode 100644
index 000000000..825c32f0d
--- /dev/null
+++ b/crates/cancelable/CHANGELOG.md
@@ -0,0 +1 @@
+# Changelog
diff --git a/crates/cancelable/Cargo.toml b/crates/cancelable/Cargo.toml
new file mode 100644
index 000000000..50918238e
--- /dev/null
+++ b/crates/cancelable/Cargo.toml
@@ -0,0 +1,39 @@
+# Copyright (c) Microsoft Corporation.
+# Licensed under the MIT License.
+
+[package]
+name = "cancelable"
+description = "Cooperative cancellation via tokens"
+version = "0.1.0"
+readme = "README.md"
+keywords = ["oxidizer", "async", "futures"]
+categories = ["asynchronous", "concurrency"]
+
+edition.workspace = true
+rust-version.workspace = true
+authors.workspace = true
+license.workspace = true
+homepage.workspace = true
+repository = "https://github.com/microsoft/oxidizer/tree/main/crates/cancelable"
+
+[package.metadata.cargo_check_external_types]
+allowed_external_types = [
+ "ohno::enrichable::Enrichable",
+ "ohno::error_ext::ErrorExt",
+]
+
+[package.metadata.docs.rs]
+all-features = true
+
+[dependencies]
+ohno = { workspace = true }
+pin-project = { workspace = true }
+
+[dev-dependencies]
+mutants = { workspace = true }
+ohno = { workspace = true, features = ["app-err"] }
+tick = { workspace = true, features = ["tokio"] }
+tokio = { workspace = true, features = ["macros", "rt-multi-thread"] }
+
+[lints]
+workspace = true
diff --git a/crates/cancelable/README.md b/crates/cancelable/README.md
new file mode 100644
index 000000000..ba0402c40
--- /dev/null
+++ b/crates/cancelable/README.md
@@ -0,0 +1,81 @@
+
+

+
+# Cancelable
+
+[](https://crates.io/crates/cancelable)
+[](https://docs.rs/cancelable)
+[](https://crates.io/crates/cancelable)
+[](https://github.com/microsoft/oxidizer/actions/workflows/main.yml)
+[](https://codecov.io/gh/microsoft/oxidizer)
+[](../../LICENSE)
+

+
+
+
+Cooperative cancellation via tokens.
+
+This module provides [`CancellationTokenSource`][__link0] and [`CancellationToken`][__link1],
+modeled after the equivalent C# types. A source controls cancellation and
+hands out lightweight, cloneable tokens for observers to check.
+
+## Linked Sources
+
+A linked source cancels when *any* of its parent tokens are canceled,
+enabling composition of multiple cancellation signals:
+
+```rust
+use cancelable::CancellationTokenSource;
+
+let first = CancellationTokenSource::new();
+let second = CancellationTokenSource::new();
+
+let linked = CancellationTokenSource::linked(&[first.token(), second.token()]);
+let token = linked.token();
+
+assert!(!token.is_cancelled());
+second.cancel();
+assert!(token.is_cancelled());
+```
+
+## Subscribers
+
+Register callbacks that fire exactly once when cancellation occurs:
+
+```rust
+use cancelable::CancellationTokenSource;
+
+let source = CancellationTokenSource::new();
+source.subscribe(|| println!("Operation canceled"));
+source.cancel();
+```
+
+## Futures
+
+The [`CancelableExt`][__link2] trait adds a [`cancelable`][__link3] method
+to any [`Future`][__link4], pairing it with a [`CancellationToken`][__link5] so that each
+poll checks for cancellation before and after driving the inner future.
+
+```rust
+use cancelable::{CancelableExt, CancellationTokenSource};
+
+let source = CancellationTokenSource::new();
+let token = source.token();
+
+let result = async { 42 }.cancelable(token).await?;
+assert_eq!(result, 42);
+```
+
+
+
+
+This crate was developed as part of The Oxidizer Project. Browse this crate's source code.
+
+
+ [__cargo_doc2readme_dependencies_info]: ggGmYW0CYXZlMC43LjJhdIQbLiTyV0MU86EbZU15e0PmecoboQ9jo59bnAEbyDXw04U13GlhYvRhcoQbE3Iea_zSpkIbvcbCI0vEEEEb7KqsBtUtyHsbFhKo1iYbGRphZIGCamNhbmNlbGFibGVlMC4xLjA
+ [__link0]: https://docs.rs/cancelable/0.1.0/cancelable/?search=CancellationTokenSource
+ [__link1]: https://docs.rs/cancelable/0.1.0/cancelable/?search=CancellationToken
+ [__link2]: https://docs.rs/cancelable/0.1.0/cancelable/?search=future::CancelableExt
+ [__link3]: https://docs.rs/cancelable/0.1.0/cancelable/?search=future::CancelableExt::cancelable
+ [__link4]: https://doc.rust-lang.org/stable/std/future/trait.Future.html
+ [__link5]: https://docs.rs/cancelable/0.1.0/cancelable/?search=CancellationToken
diff --git a/crates/cancelable/favicon.ico b/crates/cancelable/favicon.ico
new file mode 100644
index 000000000..6cc6d824e
--- /dev/null
+++ b/crates/cancelable/favicon.ico
@@ -0,0 +1,3 @@
+version https://git-lfs.github.com/spec/v1
+oid sha256:b7623e3f34c7d69a0392b3701aa62e534c1b52589e6f91826bb651952d86b6dc
+size 15406
diff --git a/crates/cancelable/logo.png b/crates/cancelable/logo.png
new file mode 100644
index 000000000..2f3e3885d
--- /dev/null
+++ b/crates/cancelable/logo.png
@@ -0,0 +1,3 @@
+version https://git-lfs.github.com/spec/v1
+oid sha256:5dc59a03e875c186e6e77e0b7506e40238c15d7e86c2ad1db1282d17898fbb85
+size 53167
diff --git a/crates/cancelable/src/future.rs b/crates/cancelable/src/future.rs
new file mode 100644
index 000000000..336079870
--- /dev/null
+++ b/crates/cancelable/src/future.rs
@@ -0,0 +1,204 @@
+// Copyright (c) Microsoft Corporation.
+// Licensed under the MIT License.
+
+//! Future extension for cooperative cancellation
+//!
+//! The [`CancelableExt`] trait adds a
+//! [`cancelable`](CancelableExt::cancelable) method
+//! to any [`Future`], pairing it with a [`CancellationToken`] so that each
+//! poll checks for cancellation before and after driving the inner future.
+//!
+//! ```
+//! # async fn example() -> Result<(), ohno::AppError> {
+//! use cancelable::{CancelableExt, CancellationTokenSource};
+//!
+//! let source = CancellationTokenSource::new();
+//! let token = source.token();
+//!
+//! let result = async { 42 }.cancelable(token).await?;
+//! assert_eq!(result, 42);
+//! # Ok(())
+//! # }
+//! ```
+
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+use pin_project::pin_project;
+
+use crate::CancellationToken;
+
+/// Error returned when a future is canceled
+#[ohno::error]
+#[display("operation was canceled")]
+pub struct Canceled {}
+
+/// Extension trait that adds cancellation support to any [`Future`].
+pub trait CancelableExt: Future + Sized {
+ /// Wraps this future so that each poll checks the given [`CancellationToken`]:
+ ///
+ /// - If the token is canceled (before *or* after polling the inner
+ /// future), the combined future resolves to Err([Canceled]).
+ /// - Otherwise the inner future's output is forwarded as `Ok(output)`.
+ ///
+ /// # Note on wake semantics
+ ///
+ /// Cancellation is checked cooperatively: the extension inspects the token
+ /// each time the inner future is polled. If the inner future is pending
+ /// and nothing else wakes the task, cancellation will not be noticed until
+ /// the next poll. This mirrors the cooperative model of the `C#` method
+ /// `CancellationToken.ThrowIfCancellationRequested()`.
+ ///
+ /// # Examples
+ ///
+ /// Successful completion:
+ ///
+ /// ```
+ /// # async fn example() {
+ /// use cancelable::{CancelableExt, CancellationTokenSource};
+ ///
+ /// let source = CancellationTokenSource::new();
+ /// let result = async { "hello" }.cancelable(source.token()).await;
+ /// assert_eq!(result.unwrap(), "hello");
+ /// # }
+ /// ```
+ ///
+ /// Cancelled before first poll:
+ ///
+ /// ```
+ /// # async fn example() {
+ /// use cancelable::{CancelableExt, CancellationTokenSource};
+ ///
+ /// let source = CancellationTokenSource::new();
+ /// source.cancel();
+ ///
+ /// let result = async { unreachable!() }.cancelable(source.token()).await;
+ /// assert!(result.unwrap_err().to_string().contains("canceled"));
+ /// # }
+ /// ```
+ fn cancelable(self, token: CancellationToken) -> Cancelable;
+}
+
+impl CancelableExt for F {
+ fn cancelable(self, token: CancellationToken) -> Cancelable {
+ Cancelable { inner: self, token }
+ }
+}
+
+/// Future returned by
+/// [`cancelable`](CancelableExt::cancelable).
+///
+/// See the trait method documentation for semantics.
+#[derive(Debug)]
+#[pin_project]
+#[must_use = "futures do nothing unless you `.await` or poll them"]
+pub struct Cancelable {
+ #[pin]
+ inner: F,
+ token: CancellationToken,
+}
+
+impl Future for Cancelable {
+ type Output = Result;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll {
+ let this = self.project();
+
+ // Check cancellation before running the inner future so we can
+ // short-circuit without performing unnecessary work.
+ if this.token.is_cancelled() {
+ return Poll::Ready(Err(Canceled::new()));
+ }
+
+ match this.inner.poll(cx) {
+ Poll::Ready(output) => Poll::Ready(Ok(output)),
+ Poll::Pending => {
+ // Check for cancellation again, now that we've spent time running the inner future.
+ if this.token.is_cancelled() {
+ Poll::Ready(Err(Canceled::new()))
+ } else {
+ Poll::Pending
+ }
+ }
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use std::time::Duration;
+
+ use tick::{Clock, FutureExt};
+
+ use super::*;
+ use crate::CancellationTokenSource;
+
+ #[cfg_attr(miri, ignore)]
+ #[tokio::test]
+ async fn future_returns_ok() {
+ let clock = Clock::new_tokio();
+ let source = CancellationTokenSource::new();
+ clock
+ .delay(Duration::from_millis(100))
+ .cancelable(source.token())
+ .await
+ .expect("should succeed without being canceled");
+ }
+
+ #[cfg_attr(miri, ignore)]
+ #[tokio::test]
+ async fn completed_future_returns_ok() {
+ let source = CancellationTokenSource::new();
+ let result = async { 42 }.cancelable(source.token()).await;
+ assert_eq!(result.unwrap(), 42);
+ }
+
+ #[cfg_attr(miri, ignore)]
+ #[tokio::test]
+ async fn cancelled_future_returns_err() {
+ let source = CancellationTokenSource::new();
+ source.cancel();
+
+ let result = async { unreachable!("should not poll inner future") }
+ .cancelable(source.token())
+ .await;
+ assert!(result.unwrap_err().to_string().contains("canceled"));
+ }
+
+ #[cfg_attr(miri, ignore)]
+ #[tokio::test]
+ async fn cancellation_triggered_by_inner_future_leads_to_cancellation_error() {
+ struct CancelOnPoll(CancellationTokenSource);
+ impl Future for CancelOnPoll {
+ type Output = ();
+ fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
+ self.0.cancel();
+ Poll::Pending
+ }
+ }
+
+ let clock = Clock::new_tokio();
+ let source = CancellationTokenSource::new();
+ let token = source.token();
+ let message = CancelOnPoll(source)
+ .timeout(&clock, std::time::Duration::from_secs(5))
+ .cancelable(token)
+ .await
+ .expect_err("should fail")
+ .to_string();
+ assert!(message.contains("canceled"));
+ }
+
+ #[cfg_attr(miri, ignore)]
+ #[tokio::test]
+ async fn already_cancelled_token() {
+ let clock = Clock::new_tokio();
+ let message = async { unreachable!() }
+ .timeout(&clock, std::time::Duration::from_secs(5))
+ .cancelable(CancellationToken::cancelled())
+ .await
+ .expect_err("should fail")
+ .to_string();
+ assert!(message.contains("canceled"));
+ }
+}
diff --git a/crates/cancelable/src/lib.rs b/crates/cancelable/src/lib.rs
new file mode 100644
index 000000000..db8aaea4c
--- /dev/null
+++ b/crates/cancelable/src/lib.rs
@@ -0,0 +1,72 @@
+// Copyright (c) Microsoft Corporation.
+// Licensed under the MIT License.
+
+#![cfg_attr(docsrs, feature(doc_cfg))]
+#![doc(html_logo_url = "https://media.githubusercontent.com/media/microsoft/oxidizer/refs/heads/main/crates/cancelable/logo.png")]
+#![doc(html_favicon_url = "https://media.githubusercontent.com/media/microsoft/oxidizer/refs/heads/main/crates/cancelable/favicon.ico")]
+
+//! Cooperative cancellation via tokens.
+//!
+//! This module provides [`CancellationTokenSource`] and [`CancellationToken`],
+//! modeled after the equivalent C# types. A source controls cancellation and
+//! hands out lightweight, cloneable tokens for observers to check.
+//!
+//! # Linked Sources
+//!
+//! A linked source cancels when *any* of its parent tokens are canceled,
+//! enabling composition of multiple cancellation signals:
+//!
+//! ```
+//! # fn example() {
+//! use cancelable::CancellationTokenSource;
+//!
+//! let first = CancellationTokenSource::new();
+//! let second = CancellationTokenSource::new();
+//!
+//! let linked = CancellationTokenSource::linked(&[first.token(), second.token()]);
+//! let token = linked.token();
+//!
+//! assert!(!token.is_cancelled());
+//! second.cancel();
+//! assert!(token.is_cancelled());
+//! # }
+//! ```
+//!
+//! # Subscribers
+//!
+//! Register callbacks that fire exactly once when cancellation occurs:
+//!
+//! ```
+//! # fn example() {
+//! use cancelable::CancellationTokenSource;
+//!
+//! let source = CancellationTokenSource::new();
+//! source.subscribe(|| println!("Operation canceled"));
+//! source.cancel();
+//! # }
+//! ```
+//!
+//! # Futures
+//!
+//! The [`CancelableExt`] trait adds a [`cancelable`](CancelableExt::cancelable) method
+//! to any [`Future`], pairing it with a [`CancellationToken`] so that each
+//! poll checks for cancellation before and after driving the inner future.
+//!
+//! ```
+//! # async fn example() -> Result<(), ohno::AppError> {
+//! use cancelable::{CancelableExt, CancellationTokenSource};
+//!
+//! let source = CancellationTokenSource::new();
+//! let token = source.token();
+//!
+//! let result = async { 42 }.cancelable(token).await?;
+//! assert_eq!(result, 42);
+//! # Ok(())
+//! # }
+//! ```
+
+pub mod future;
+pub use future::CancelableExt;
+
+mod token;
+pub use token::{CancellationToken, CancellationTokenSource};
diff --git a/crates/cancelable/src/token.rs b/crates/cancelable/src/token.rs
new file mode 100644
index 000000000..8fd1961ee
--- /dev/null
+++ b/crates/cancelable/src/token.rs
@@ -0,0 +1,870 @@
+// Copyright (c) Microsoft Corporation.
+// Licensed under the MIT License.
+
+use std::any::type_name;
+use std::fmt;
+use std::fmt::Debug;
+use std::sync::atomic::{AtomicBool, Ordering};
+use std::sync::{Arc, Mutex, Weak};
+
+enum Subscriber {
+ /// An external callback for arbitrary subscriber logic.
+ External(Box),
+ /// A weak reference to a linked child's shared state, avoiding a heap
+ /// allocation for the common parent/child propagation path.
+ Linked(Weak),
+}
+
+impl Subscriber {
+ fn notify(self) {
+ match self {
+ Self::External(f) => f(),
+ Self::Linked(weak) => {
+ if let Some(inner) = weak.upgrade() {
+ inner.cancel_and_notify();
+ }
+ }
+ }
+ }
+
+ fn matches_linked(&self, target: &Weak) -> bool {
+ match self {
+ Self::External(_) => false,
+ Self::Linked(inner) => inner.ptr_eq(target),
+ }
+ }
+}
+
+/// Shared state backing one or more [`CancellationToken`] handles.
+struct Inner {
+ /// Cancellation signal
+ canceled: AtomicBool,
+
+ /// Subscribers to notify on cancellation
+ ///
+ /// `Some(vec)` → not yet canceled; subscribers accumulate here.
+ /// `None` → already canceled; new subscribers fire immediately.
+ subscribers: Mutex