Skip to content

Commit

Permalink
task: add AbortOnDropHandle type
Browse files Browse the repository at this point in the history
wraps a `JoinHandle`, aborting the task on drop
Refs: #6224
Fixes: #6160
  • Loading branch information
barafael authored and Rafael Bachmann committed Aug 19, 2024
1 parent 365269a commit 7e47811
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 0 deletions.
63 changes: 63 additions & 0 deletions tokio-util/src/task/abort_on_drop.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
//! An [`AbortOnDropHandle`] is like a [`JoinHandle`], except that it
//! will abort the task as soon as it is dropped.
use tokio::task::{AbortHandle, JoinError, JoinHandle};

use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};

/// A wrapper around a [`tokio::task::JoinHandle`],
/// which [aborts] the task when it is dropped.
///
/// [aborts]: tokio::task::JoinHandle::abort
#[must_use = "Dropping the handle aborts the task immediately"]
#[derive(Debug)]
pub struct AbortOnDropHandle<T>(JoinHandle<T>);

impl<T> Drop for AbortOnDropHandle<T> {
fn drop(&mut self) {
self.0.abort()
}
}

impl<T> AbortOnDropHandle<T> {
/// Create an [`AbortOnDropHandle`] from a [`JoinHandle`].
pub fn new(handle: JoinHandle<T>) -> Self {
Self(handle)
}

/// Abort the task associated with this handle,
/// equivalent of [`tokio::task::JoinHandle::abort`].
pub fn abort(&self) {
self.0.abort()
}

/// Checks if the task associated with this handle is finished,
/// equivalent of [`tokio::task::JoinHandle::is_finished`].
pub fn is_finished(&self) -> bool {
self.0.is_finished()
}

/// Returns a new [`AbortHandle`] that can be used to remotely abort this task,
/// equivalent of [`tokio::task::JoinHandle::abort_handle`].
pub fn abort_handle(&self) -> AbortHandle {
self.0.abort_handle()
}
}

impl<T> Future for AbortOnDropHandle<T> {
type Output = Result<T, JoinError>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Pin::new(&mut self.0).poll(cx)
}
}

impl<T> AsRef<JoinHandle<T>> for AbortOnDropHandle<T> {
fn as_ref(&self) -> &JoinHandle<T> {
&self.0
}
}
2 changes: 2 additions & 0 deletions tokio-util/src/task/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,5 @@ pub use join_map::{JoinMap, JoinMapKeys};

pub mod task_tracker;
pub use task_tracker::TaskTracker;

pub mod abort_on_drop;
27 changes: 27 additions & 0 deletions tokio-util/tests/abort_on_drop.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
use tokio::sync::oneshot;
use tokio_util::task::abort_on_drop::AbortOnDropHandle;

#[tokio::test]
async fn aborts_task_on_drop() {
let (mut tx, rx) = oneshot::channel::<bool>();
let handle = tokio::spawn(async move {
let _ = rx.await;
});
let handle = AbortOnDropHandle::new(handle);
drop(handle);
tx.closed().await;
assert!(tx.is_closed());
}

#[tokio::test]
async fn aborts_task_directly() {
let (mut tx, rx) = oneshot::channel::<bool>();
let handle = tokio::spawn(async move {
let _ = rx.await;
});
let handle = AbortOnDropHandle::new(handle);
handle.abort();
tx.closed().await;
assert!(tx.is_closed());
assert!(handle.is_finished());
}

0 comments on commit 7e47811

Please sign in to comment.