diff --git a/spellcheck.dic b/spellcheck.dic index f368d2d7214..4b6dbd910f8 100644 --- a/spellcheck.dic +++ b/spellcheck.dic @@ -274,6 +274,7 @@ unparks Unparks unreceived unsafety +unsets Unsets unsynchronized untrusted diff --git a/tokio/src/runtime/task/harness.rs b/tokio/src/runtime/task/harness.rs index 996f0f2d9b4..9bf73b74fbf 100644 --- a/tokio/src/runtime/task/harness.rs +++ b/tokio/src/runtime/task/harness.rs @@ -284,9 +284,11 @@ where } pub(super) fn drop_join_handle_slow(self) { - // Try to unset `JOIN_INTEREST`. This must be done as a first step in + // Try to unset `JOIN_INTEREST` and `JOIN_WAKER`. This must be done as a first step in // case the task concurrently completed. - if self.state().unset_join_interested().is_err() { + let transition = self.state().transition_to_join_handle_dropped(); + + if transition.drop_output { // It is our responsibility to drop the output. This is critical as // the task output may not be `Send` and as such must remain with // the scheduler or `JoinHandle`. i.e. if the output remains in the @@ -301,6 +303,23 @@ where })); } + if transition.drop_waker { + // If the JOIN_WAKER flag is unset at this point, the task is either + // already terminal or not complete so the `JoinHandle` is responsible + // for dropping the waker. + // Safety: + // If the JOIN_WAKER bit is not set the join handle has exclusive + // access to the waker as per rule 2 in task/mod.rs. + // This can only be the case at this point in two scenarios: + // 1. The task completed and the runtime unset `JOIN_WAKER` flag + // after accessing the waker during task completion. So the + // `JoinHandle` is the only one to access the join waker here. + // 2. The task is not completed so the `JoinHandle` was able to unset + // `JOIN_WAKER` bit itself to get mutable access to the waker. + // The runtime will not access the waker when this flag is unset. + unsafe { self.trailer().set_waker(None) }; + } + // Drop the `JoinHandle` reference, possibly deallocating the task self.drop_reference(); } @@ -311,7 +330,6 @@ where fn complete(self) { // The future has completed and its output has been written to the task // stage. We transition from running to complete. - let snapshot = self.state().transition_to_complete(); // We catch panics here in case dropping the future or waking the @@ -320,13 +338,28 @@ where if !snapshot.is_join_interested() { // The `JoinHandle` is not interested in the output of // this task. It is our responsibility to drop the - // output. + // output. The join waker was already dropped by the + // `JoinHandle` before. self.core().drop_future_or_output(); } else if snapshot.is_join_waker_set() { // Notify the waker. Reading the waker field is safe per rule 4 // in task/mod.rs, since the JOIN_WAKER bit is set and the call // to transition_to_complete() above set the COMPLETE bit. self.trailer().wake_join(); + + // Inform the `JoinHandle` that we are done waking the waker by + // unsetting the `JOIN_WAKER` bit. If the `JoinHandle` has + // already been dropped and `JOIN_INTEREST` is unset, then we must + // drop the waker ourselves. + if !self + .state() + .unset_waker_after_complete() + .is_join_interested() + { + // SAFETY: We have COMPLETE=1 and JOIN_INTEREST=0, so + // we have exclusive access to the waker. + unsafe { self.trailer().set_waker(None) }; + } } })); diff --git a/tokio/src/runtime/task/mod.rs b/tokio/src/runtime/task/mod.rs index 33f54003d38..15c5a8f4afe 100644 --- a/tokio/src/runtime/task/mod.rs +++ b/tokio/src/runtime/task/mod.rs @@ -94,16 +94,30 @@ //! `JoinHandle` needs to (i) successfully set `JOIN_WAKER` to zero if it is //! not already zero to gain exclusive access to the waker field per rule //! 2, (ii) write a waker, and (iii) successfully set `JOIN_WAKER` to one. +//! If the `JoinHandle` unsets `JOIN_WAKER` in the process of being dropped +//! to clear the waker field, only steps (i) and (ii) are relevant. //! //! 6. The `JoinHandle` can change `JOIN_WAKER` only if COMPLETE is zero (i.e. -//! the task hasn't yet completed). +//! the task hasn't yet completed). The runtime can change `JOIN_WAKER` only +//! if COMPLETE is one. +//! +//! 7. If `JOIN_INTEREST` is zero and COMPLETE is one, then the runtime has +//! exclusive (mutable) access to the waker field. This might happen if the +//! `JoinHandle` gets dropped right after the task completes and the runtime +//! sets the `COMPLETE` bit. In this case the runtime needs the mutable access +//! to the waker field to drop it. //! //! Rule 6 implies that the steps (i) or (iii) of rule 5 may fail due to a //! race. If step (i) fails, then the attempt to write a waker is aborted. If //! step (iii) fails because COMPLETE is set to one by another thread after //! step (i), then the waker field is cleared. Once COMPLETE is one (i.e. //! task has completed), the `JoinHandle` will not modify `JOIN_WAKER`. After the -//! runtime sets COMPLETE to one, it invokes the waker if there is one. +//! runtime sets COMPLETE to one, it invokes the waker if there is one so in this +//! case when a task completes the `JOIN_WAKER` bit implicates to the runtime +//! whether it should invoke the waker or not. After the runtime is done with +//! using the waker during task completion, it unsets the `JOIN_WAKER` bit to give +//! the `JoinHandle` exclusive access again so that it is able to drop the waker +//! at a later point. //! //! All other fields are immutable and can be accessed immutably without //! synchronization by anyone. diff --git a/tokio/src/runtime/task/state.rs b/tokio/src/runtime/task/state.rs index 0fc7bb0329b..037c1c90c61 100644 --- a/tokio/src/runtime/task/state.rs +++ b/tokio/src/runtime/task/state.rs @@ -89,6 +89,12 @@ pub(crate) enum TransitionToNotifiedByRef { Submit, } +#[must_use] +pub(super) struct TransitionToJoinHandleDrop { + pub(super) drop_waker: bool, + pub(super) drop_output: bool, +} + /// All transitions are performed via RMW operations. This establishes an /// unambiguous modification order. impl State { @@ -371,22 +377,45 @@ impl State { .map_err(|_| ()) } - /// Tries to unset the `JOIN_INTEREST` flag. - /// - /// Returns `Ok` if the operation happens before the task transitions to a - /// completed state, `Err` otherwise. - pub(super) fn unset_join_interested(&self) -> UpdateResult { - self.fetch_update(|curr| { - assert!(curr.is_join_interested()); + /// Unsets the `JOIN_INTEREST` flag. If `COMPLETE` is not set, the `JOIN_WAKER` + /// flag is also unset. + /// The returned `TransitionToJoinHandleDrop` indicates whether the `JoinHandle` should drop + /// the output of the future or the join waker after the transition. + pub(super) fn transition_to_join_handle_dropped(&self) -> TransitionToJoinHandleDrop { + self.fetch_update_action(|mut snapshot| { + assert!(snapshot.is_join_interested()); - if curr.is_complete() { - return None; + let mut transition = TransitionToJoinHandleDrop { + drop_waker: false, + drop_output: false, + }; + + snapshot.unset_join_interested(); + + if !snapshot.is_complete() { + // If `COMPLETE` is unset we also unset `JOIN_WAKER` to give the + // `JoinHandle` exclusive access to the waker following rule 6 in task/mod.rs. + // The `JoinHandle` will drop the waker if it has exclusive access + // to drop it. + snapshot.unset_join_waker(); + } else { + // If `COMPLETE` is set the task is completed so the `JoinHandle` is responsible + // for dropping the output. + transition.drop_output = true; } - let mut next = curr; - next.unset_join_interested(); + if !snapshot.is_join_waker_set() { + // If the `JOIN_WAKER` bit is unset and the `JOIN_HANDLE` has exclusive access to + // the join waker and should drop it following this transition. + // This might happen in two situations: + // 1. The task is not completed and we just unset the `JOIN_WAKer` above in this + // function. + // 2. The task is completed. In that case the `JOIN_WAKER` bit was already unset + // by the runtime during completion. + transition.drop_waker = true; + } - Some(next) + (transition, Some(snapshot)) }) } @@ -430,6 +459,16 @@ impl State { }) } + /// Unsets the `JOIN_WAKER` bit unconditionally after task completion. + /// + /// This operation requires the task to be completed. + pub(super) fn unset_waker_after_complete(&self) -> Snapshot { + let prev = Snapshot(self.val.fetch_and(!JOIN_WAKER, AcqRel)); + assert!(prev.is_complete()); + assert!(prev.is_join_waker_set()); + Snapshot(prev.0 & !JOIN_WAKER) + } + pub(super) fn ref_inc(&self) { use std::process; use std::sync::atomic::Ordering::Relaxed; diff --git a/tokio/src/runtime/tests/loom_multi_thread.rs b/tokio/src/runtime/tests/loom_multi_thread.rs index ddd14b7fb3f..e2706e65c65 100644 --- a/tokio/src/runtime/tests/loom_multi_thread.rs +++ b/tokio/src/runtime/tests/loom_multi_thread.rs @@ -10,6 +10,7 @@ mod yield_now; /// In order to speed up the C use crate::runtime::tests::loom_oneshot as oneshot; use crate::runtime::{self, Runtime}; +use crate::sync::mpsc::channel; use crate::{spawn, task}; use tokio_test::assert_ok; @@ -459,3 +460,32 @@ impl Future for Track { }) } } + +#[test] +fn drop_tasks_with_reference_cycle() { + loom::model(|| { + let pool = mk_pool(2); + + pool.block_on(async move { + let (tx, mut rx) = channel(1); + + let (a_closer, mut wait_for_close_a) = channel::<()>(1); + let (b_closer, mut wait_for_close_b) = channel::<()>(1); + + let a = spawn(async move { + let b = rx.recv().await.unwrap(); + + futures::future::select(std::pin::pin!(b), std::pin::pin!(a_closer.send(()))).await; + }); + + let b = spawn(async move { + let _ = a.await; + let _ = b_closer.send(()).await; + }); + + tx.send(b).await.unwrap(); + + futures::future::join(wait_for_close_a.recv(), wait_for_close_b.recv()).await; + }); + }); +}