diff --git a/tokio/src/runtime/task/harness.rs b/tokio/src/runtime/task/harness.rs index 996f0f2d9b4..bf74d48dada 100644 --- a/tokio/src/runtime/task/harness.rs +++ b/tokio/src/runtime/task/harness.rs @@ -284,9 +284,11 @@ where } pub(super) fn drop_join_handle_slow(self) { - // Try to unset `JOIN_INTEREST`. This must be done as a first step in + // Try to unset `JOIN_INTEREST` and `JOIN_WAKER`. This must be done as a first step in // case the task concurrently completed. - if self.state().unset_join_interested().is_err() { + let snapshot = self.state().transition_to_join_handle_dropped(); + + if snapshot.is_complete() { // It is our responsibility to drop the output. This is critical as // the task output may not be `Send` and as such must remain with // the scheduler or `JoinHandle`. i.e. if the output remains in the @@ -301,6 +303,25 @@ where })); } + if !snapshot.is_join_waker_set() { + // If the JOIN_WAKER flag is unset at this point, the task is either + // already terminal or not complete so the `JoinHandle` is responsible + // for dropping the waker. + // Safety: + // If the JOIN_WAKER bit is not set the join handle has exclusive + // access to the waker as per rule 2 in task/mod.rs. + // This can only be the case at this point in two scenarios: + // 1. The task completed and the runtime unset `JOIN_WAKER` flag + // after accessing the waker during task completion. So the + // `JoinHandle` is the only one to access the join waker here. + // 2. The task is not completed so the `JoinHandle` was able to unset + // `JOIN_WAKER` bit itself to get mutable access to the waker. + // The runtime will not access the waker when this flag is unset. + unsafe { + self.trailer().set_waker(None); + } + } + // Drop the `JoinHandle` reference, possibly deallocating the task self.drop_reference(); } @@ -311,7 +332,6 @@ where fn complete(self) { // The future has completed and its output has been written to the task // stage. We transition from running to complete. - let snapshot = self.state().transition_to_complete(); // We catch panics here in case dropping the future or waking the @@ -320,13 +340,33 @@ where if !snapshot.is_join_interested() { // The `JoinHandle` is not interested in the output of // this task. It is our responsibility to drop the - // output. + // output. The join waker was already dropped by the + // `JoinHandle` before. self.core().drop_future_or_output(); } else if snapshot.is_join_waker_set() { // Notify the waker. Reading the waker field is safe per rule 4 // in task/mod.rs, since the JOIN_WAKER bit is set and the call // to transition_to_complete() above set the COMPLETE bit. self.trailer().wake_join(); + + // If JOIN_INTEREST is still set at this point the `JoinHandle` + // was not dropped since setting COMPLETE so we unset JOIN_WAKER + // to give the responsibility of dropping the join waker back to + // the `JoinHandle`. `JoinHandle` is able to drop the waker when + // itself gets dropped. + if self.state().unset_waker_if_join_interested().is_err() { + // Unsetting JOIN_WAKER flag will fail if JOIN_INTERESTED is + // not set to indicate that the runtime has the responsibility + // to drop the join waker here as per rule 7 in task/mod.rs. + // Safety: + // If JOIN_INTEREST got unset since setting COMPLETE we are + // the only ones to have access to the join waker and need + // to drop it here because the `JoinHandle` of the task + // already got dropped. + unsafe { + self.trailer().set_waker(None); + } + } } })); diff --git a/tokio/src/runtime/task/mod.rs b/tokio/src/runtime/task/mod.rs index 33f54003d38..3de0e8d7d33 100644 --- a/tokio/src/runtime/task/mod.rs +++ b/tokio/src/runtime/task/mod.rs @@ -94,10 +94,15 @@ //! `JoinHandle` needs to (i) successfully set `JOIN_WAKER` to zero if it is //! not already zero to gain exclusive access to the waker field per rule //! 2, (ii) write a waker, and (iii) successfully set `JOIN_WAKER` to one. +//! If the `JoinHandle` unsets `JOIN_WAKER` in the process of being dropped +//! to clear the waker field, only steps (i) and (ii) are relevant. //! //! 6. The `JoinHandle` can change `JOIN_WAKER` only if COMPLETE is zero (i.e. //! the task hasn't yet completed). //! +//! 7. If JOIN_INTEREST is zero and COMPLETE is one, then the runtime has +//! exclusive (mutable) access to the waker field. +//! //! Rule 6 implies that the steps (i) or (iii) of rule 5 may fail due to a //! race. If step (i) fails, then the attempt to write a waker is aborted. If //! step (iii) fails because COMPLETE is set to one by another thread after diff --git a/tokio/src/runtime/task/state.rs b/tokio/src/runtime/task/state.rs index 0fc7bb0329b..35db3e8e453 100644 --- a/tokio/src/runtime/task/state.rs +++ b/tokio/src/runtime/task/state.rs @@ -371,22 +371,21 @@ impl State { .map_err(|_| ()) } - /// Tries to unset the `JOIN_INTEREST` flag. - /// - /// Returns `Ok` if the operation happens before the task transitions to a - /// completed state, `Err` otherwise. - pub(super) fn unset_join_interested(&self) -> UpdateResult { - self.fetch_update(|curr| { - assert!(curr.is_join_interested()); + /// Unsets the `JOIN_INTEREST` flag. If `COMPLETE` is not set, the `JOIN_WAKER` + /// flag is also unset. + pub(super) fn transition_to_join_handle_dropped(&self) -> Snapshot { + self.fetch_update_action(|mut snapshot| { + assert!(snapshot.is_join_interested()); - if curr.is_complete() { - return None; - } + snapshot.unset_join_interested(); - let mut next = curr; - next.unset_join_interested(); + if !snapshot.is_complete() { + // If `COMPLETE` is unset we also unset `JOIN_WAKER` to give the + // `JoinHandle` exclusive access to the waker to drop it. + snapshot.unset_join_waker(); + } - Some(next) + (snapshot, Some(snapshot)) }) } @@ -430,6 +429,26 @@ impl State { }) } + /// Unsets the `JOIN_WAKER` bit only if the `JOIN_INTEREST` is still set. + /// + /// Returns `Ok` has been unset, `Err` otherwise. This operation requires + /// the task to be completed. + pub(super) fn unset_waker_if_join_interested(&self) -> UpdateResult { + self.fetch_update(|curr| { + assert!(curr.is_complete()); + assert!(curr.is_join_waker_set()); + + if !curr.is_join_interested() { + return None; + } + + let mut next = curr; + next.unset_join_waker(); + + Some(next) + }) + } + pub(super) fn ref_inc(&self) { use std::process; use std::sync::atomic::Ordering::Relaxed; diff --git a/tokio/src/runtime/tests/loom_multi_thread.rs b/tokio/src/runtime/tests/loom_multi_thread.rs index ddd14b7fb3f..e2706e65c65 100644 --- a/tokio/src/runtime/tests/loom_multi_thread.rs +++ b/tokio/src/runtime/tests/loom_multi_thread.rs @@ -10,6 +10,7 @@ mod yield_now; /// In order to speed up the C use crate::runtime::tests::loom_oneshot as oneshot; use crate::runtime::{self, Runtime}; +use crate::sync::mpsc::channel; use crate::{spawn, task}; use tokio_test::assert_ok; @@ -459,3 +460,32 @@ impl Future for Track { }) } } + +#[test] +fn drop_tasks_with_reference_cycle() { + loom::model(|| { + let pool = mk_pool(2); + + pool.block_on(async move { + let (tx, mut rx) = channel(1); + + let (a_closer, mut wait_for_close_a) = channel::<()>(1); + let (b_closer, mut wait_for_close_b) = channel::<()>(1); + + let a = spawn(async move { + let b = rx.recv().await.unwrap(); + + futures::future::select(std::pin::pin!(b), std::pin::pin!(a_closer.send(()))).await; + }); + + let b = spawn(async move { + let _ = a.await; + let _ = b_closer.send(()).await; + }); + + tx.send(b).await.unwrap(); + + futures::future::join(wait_for_close_a.recv(), wait_for_close_b.recv()).await; + }); + }); +}