Skip to content

Commit

Permalink
Drop the waker of a task eagerly when the task completes and there is no
Browse files Browse the repository at this point in the history
join interest
  • Loading branch information
tglane committed Jun 13, 2024
1 parent 53ea44b commit be84fde
Show file tree
Hide file tree
Showing 3 changed files with 226 additions and 2 deletions.
16 changes: 16 additions & 0 deletions tokio/src/runtime/task/harness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -321,11 +321,27 @@ where
// this task. It is our responsibility to drop the
// output.
self.core().drop_future_or_output();

// Eagerly drop the associated waker when the `JoinHandle` is not interested in the
// output of this task anymore. Without this the subsequent call to `dealloc` will
// not happen because the waker would still hold a reference to the tasks
// internals.
// Safety: Only the `JoinHandle` may set the `waker` field. When
// `JOIN_INTEREST` is **not** set, nothing else will touch the field.
unsafe { self.trailer().set_waker(None) };
} else if snapshot.is_join_waker_set() {
// Notify the waker. Reading the waker field is safe per rule 4
// in task/mod.rs, since the JOIN_WAKER bit is set and the call
// to transition_to_complete() above set the COMPLETE bit.
self.trailer().wake_join();

// Eagerly drop the associated waker when the `JoinHandle` is not interested in the
// output of this task anymore. Without this the subsequent call to `dealloc` will
// not happen because the waker would still hold a reference to the tasks
// internals.
// Safety: Only the `JoinHandle` may set the `waker` field. When
// `JOIN_INTEREST` is **not** set, nothing else will touch the field.
unsafe { self.trailer().set_waker(None) };
}
}));

Expand Down
18 changes: 18 additions & 0 deletions tokio/src/runtime/tests/loom_current_thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,24 @@ fn assert_no_unnecessary_polls() {
});
}

#[test]
fn new_test() {
loom::model(|| {
let rt = Builder::new_current_thread().build().unwrap();

let jh = rt.spawn(async {});

let bg = std::thread::spawn(move || {
jh.poll();
});

rt.block_on(async {});

rt.shutdown();
bg.join();
});
}

struct BlockedFuture {
rx: Receiver<()>,
num_polls: Arc<AtomicUsize>,
Expand Down
194 changes: 192 additions & 2 deletions tokio/src/sync/tests/notify.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::sync::Notify;
use std::future::Future;
use crate::sync::{self, Notify};
use std::future::{Future, IntoFuture};
use std::sync::Arc;
use std::task::{Context, RawWaker, RawWakerVTable, Waker};

Expand Down Expand Up @@ -117,3 +117,193 @@ fn watch_test() {
let _ = rx.changed().await;
});
}

struct AssertDropHandle {
is_dropped: std::sync::Arc<std::sync::atomic::AtomicBool>,
}
impl AssertDropHandle {
#[track_caller]
fn assert_dropped(&self) {
assert!(self.is_dropped.load(std::sync::atomic::Ordering::SeqCst));
}

#[track_caller]
fn assert_not_dropped(&self) {
assert!(!self.is_dropped.load(std::sync::atomic::Ordering::SeqCst));
}
}

struct AssertDrop {
is_dropped: std::sync::Arc<std::sync::atomic::AtomicBool>,
}
impl AssertDrop {
fn new() -> (Self, AssertDropHandle) {
let shared = Arc::new(std::sync::atomic::AtomicBool::new(false));
(
AssertDrop {
is_dropped: shared.clone(),
},
AssertDropHandle {
is_dropped: shared.clone(),
},
)
}
}
impl Drop for AssertDrop {
fn drop(&mut self) {
self.is_dropped
.store(true, std::sync::atomic::Ordering::SeqCst);
}
}

#[test]
fn keke() {
use crate as tokio;
use std::time::Duration;
use tokio::runtime::Handle;
use tokio::sync::mpsc::channel;
use tokio::task::JoinHandle;
use tokio::time::{sleep, timeout};

let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();

rt.block_on(async {
let (tx, mut rx) = channel::<JoinHandle<AssertDrop>>(1);

let (a_tx, mut a_rx) = channel::<AssertDropHandle>(1);
let a = tokio::spawn(async move {
let (a_ad, a_handle) = AssertDrop::new();
a_tx.send(a_handle).await.unwrap();

let b = rx.recv().await.unwrap();
// let _ = b.await;

if timeout(Duration::from_secs(3), b).await.is_err() {
// Dropping handle b
println!("First task timeout elapsed");
} else {
// b completed
println!("First task handle awaited complete");
}
println!("a finished, dropping b handle");

a_ad
});

let (b_tx, mut b_rx) = channel::<AssertDropHandle>(1);
let b = tokio::spawn(async move {
let (b_ad, b_handle) = AssertDrop::new();
b_tx.send(b_handle).await.unwrap();

// tokio::time::sleep(Duration::from_secs(5)).await;
let _ = a.await;

println!("b finished, dropping a handle");
b_ad
});

tx.send(b).await.unwrap();
let b_handle = b_rx.recv().await.unwrap();
let a_handle = a_rx.recv().await.unwrap();

sleep(Duration::from_secs(5)).await;

a_handle.assert_dropped();
b_handle.assert_dropped();
});

// assert!(false);
}

use futures::FutureExt;
use std::pin::Pin;
use std::task::Poll;

struct MyFuture<T: Future + 'static> {
fut: std::pin::Pin<Box<T>>,
}

impl<T> Future for MyFuture<T>
where
T: Future + 'static,
T::Output: 'static,
{
type Output = T::Output;
fn poll(mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.fut.poll_unpin(cx)
}
}

impl<T: Future + 'static> Drop for MyFuture<T> {
fn drop(&mut self) {
println!("Dropping MyFuture");
}
}

#[test]
fn kaka() {
use crate as tokio;
use std::time::Duration;
use tokio::sync::mpsc::channel;
use tokio::task::JoinHandle;
use tokio::time::{sleep, timeout};

let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();

rt.block_on(async {
let (tx, mut rx) = channel::<JoinHandle<AssertDrop>>(1);

let (a_tx, mut a_rx) = channel::<AssertDropHandle>(1);
let a = tokio::spawn(MyFuture {
fut: Box::pin(async move {
let (a_ad, a_handle) = AssertDrop::new();
a_tx.send(a_handle).await.unwrap();

let b = rx.recv().await.unwrap();
// let _ = b.await;

if timeout(Duration::from_secs(3), b).await.is_err() {
// Dropping handle b
println!("First task timeout elapsed");
} else {
// b completed
println!("First task handle awaited complete");
}
println!("a finished, dropping b handle");

a_ad
}),
});

let (b_tx, mut b_rx) = channel::<AssertDropHandle>(1);
let b = tokio::spawn(MyFuture {
fut: Box::pin(async move {
let (b_ad, b_handle) = AssertDrop::new();
b_tx.send(b_handle).await.unwrap();

// tokio::time::sleep(Duration::from_secs(5)).await;
let _ = a.await;

println!("b finished, dropping a handle");
b_ad
}),
});

tx.send(b).await.unwrap();
let b_handle = b_rx.recv().await.unwrap();
let a_handle = a_rx.recv().await.unwrap();

sleep(Duration::from_secs(5)).await;

a_handle.assert_dropped();
b_handle.assert_dropped();
});

// assert!(false);
}

0 comments on commit be84fde

Please sign in to comment.