Skip to content

Commit f3ad6cf

Browse files
authored
task: fix missing wakeup when using LocalSet::enter (#6016)
1 parent f1e41a4 commit f3ad6cf

File tree

2 files changed

+93
-43
lines changed

2 files changed

+93
-43
lines changed

tokio/src/task/local.rs

Lines changed: 72 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -280,10 +280,43 @@ pin_project! {
280280

281281
tokio_thread_local!(static CURRENT: LocalData = const { LocalData {
282282
ctx: RcCell::new(),
283+
wake_on_schedule: Cell::new(false),
283284
} });
284285

285286
struct LocalData {
286287
ctx: RcCell<Context>,
288+
wake_on_schedule: Cell<bool>,
289+
}
290+
291+
impl LocalData {
292+
/// Should be called except when we call `LocalSet::enter`.
293+
/// Especially when we poll a LocalSet.
294+
#[must_use = "dropping this guard will reset the entered state"]
295+
fn enter(&self, ctx: Rc<Context>) -> LocalDataEnterGuard<'_> {
296+
let ctx = self.ctx.replace(Some(ctx));
297+
let wake_on_schedule = self.wake_on_schedule.replace(false);
298+
LocalDataEnterGuard {
299+
local_data_ref: self,
300+
ctx,
301+
wake_on_schedule,
302+
}
303+
}
304+
}
305+
306+
/// A guard for `LocalData::enter()`
307+
struct LocalDataEnterGuard<'a> {
308+
local_data_ref: &'a LocalData,
309+
ctx: Option<Rc<Context>>,
310+
wake_on_schedule: bool,
311+
}
312+
313+
impl<'a> Drop for LocalDataEnterGuard<'a> {
314+
fn drop(&mut self) {
315+
self.local_data_ref.ctx.set(self.ctx.take());
316+
self.local_data_ref
317+
.wake_on_schedule
318+
.set(self.wake_on_schedule)
319+
}
287320
}
288321

289322
cfg_rt! {
@@ -360,13 +393,26 @@ const MAX_TASKS_PER_TICK: usize = 61;
360393
const REMOTE_FIRST_INTERVAL: u8 = 31;
361394

362395
/// Context guard for LocalSet
363-
pub struct LocalEnterGuard(Option<Rc<Context>>);
396+
pub struct LocalEnterGuard {
397+
ctx: Option<Rc<Context>>,
398+
399+
/// Distinguishes whether the context was entered or being polled.
400+
/// When we enter it, the value `wake_on_schedule` is set. In this case
401+
/// `spawn_local` refers the context, whereas it is not being polled now.
402+
wake_on_schedule: bool,
403+
}
364404

365405
impl Drop for LocalEnterGuard {
366406
fn drop(&mut self) {
367-
CURRENT.with(|LocalData { ctx, .. }| {
368-
ctx.set(self.0.take());
369-
})
407+
CURRENT.with(
408+
|LocalData {
409+
ctx,
410+
wake_on_schedule,
411+
}| {
412+
ctx.set(self.ctx.take());
413+
wake_on_schedule.set(self.wake_on_schedule);
414+
},
415+
)
370416
}
371417
}
372418

@@ -408,10 +454,20 @@ impl LocalSet {
408454
///
409455
/// [`spawn_local`]: fn@crate::task::spawn_local
410456
pub fn enter(&self) -> LocalEnterGuard {
411-
CURRENT.with(|LocalData { ctx, .. }| {
412-
let old = ctx.replace(Some(self.context.clone()));
413-
LocalEnterGuard(old)
414-
})
457+
CURRENT.with(
458+
|LocalData {
459+
ctx,
460+
wake_on_schedule,
461+
..
462+
}| {
463+
let ctx = ctx.replace(Some(self.context.clone()));
464+
let wake_on_schedule = wake_on_schedule.replace(true);
465+
LocalEnterGuard {
466+
ctx,
467+
wake_on_schedule,
468+
}
469+
},
470+
)
415471
}
416472

417473
/// Spawns a `!Send` task onto the local task set.
@@ -667,23 +723,8 @@ impl LocalSet {
667723
}
668724

669725
fn with<T>(&self, f: impl FnOnce() -> T) -> T {
670-
CURRENT.with(|LocalData { ctx, .. }| {
671-
struct Reset<'a> {
672-
ctx_ref: &'a RcCell<Context>,
673-
val: Option<Rc<Context>>,
674-
}
675-
impl<'a> Drop for Reset<'a> {
676-
fn drop(&mut self) {
677-
self.ctx_ref.set(self.val.take());
678-
}
679-
}
680-
let old = ctx.replace(Some(self.context.clone()));
681-
682-
let _reset = Reset {
683-
ctx_ref: ctx,
684-
val: old,
685-
};
686-
726+
CURRENT.with(|local_data| {
727+
let _guard = local_data.enter(self.context.clone());
687728
f()
688729
})
689730
}
@@ -693,23 +734,8 @@ impl LocalSet {
693734
fn with_if_possible<T>(&self, f: impl FnOnce() -> T) -> T {
694735
let mut f = Some(f);
695736

696-
let res = CURRENT.try_with(|LocalData { ctx, .. }| {
697-
struct Reset<'a> {
698-
ctx_ref: &'a RcCell<Context>,
699-
val: Option<Rc<Context>>,
700-
}
701-
impl<'a> Drop for Reset<'a> {
702-
fn drop(&mut self) {
703-
self.ctx_ref.replace(self.val.take());
704-
}
705-
}
706-
let old = ctx.replace(Some(self.context.clone()));
707-
708-
let _reset = Reset {
709-
ctx_ref: ctx,
710-
val: old,
711-
};
712-
737+
let res = CURRENT.try_with(|local_data| {
738+
let _guard = local_data.enter(self.context.clone());
713739
(f.take().unwrap())()
714740
});
715741

@@ -967,7 +993,10 @@ impl Shared {
967993
fn schedule(&self, task: task::Notified<Arc<Self>>) {
968994
CURRENT.with(|localdata| {
969995
match localdata.ctx.get() {
970-
Some(cx) if cx.shared.ptr_eq(self) => unsafe {
996+
// If the current `LocalSet` is being polled, we don't need to wake it.
997+
// When we `enter` it, then the value `wake_on_schedule` is set to be true.
998+
// In this case it is not being polled, so we need to wake it.
999+
Some(cx) if cx.shared.ptr_eq(self) && !localdata.wake_on_schedule.get() => unsafe {
9711000
// Safety: if the current `LocalSet` context points to this
9721001
// `LocalSet`, then we are on the thread that owns it.
9731002
cx.shared.local_state.task_push_back(task);

tokio/tests/task_local_set.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -573,6 +573,27 @@ async fn spawn_wakes_localset() {
573573
}
574574
}
575575

576+
/// Checks that the task wakes up with `enter`.
577+
/// Reproduces <https://github.com/tokio-rs/tokio/issues/5020>.
578+
#[tokio::test]
579+
async fn sleep_with_local_enter_guard() {
580+
let local = LocalSet::new();
581+
let _guard = local.enter();
582+
583+
let (tx, rx) = oneshot::channel();
584+
585+
local
586+
.run_until(async move {
587+
tokio::task::spawn_local(async move {
588+
time::sleep(Duration::ZERO).await;
589+
590+
tx.send(()).expect("failed to send");
591+
});
592+
assert_eq!(rx.await, Ok(()));
593+
})
594+
.await;
595+
}
596+
576597
#[test]
577598
fn store_local_set_in_thread_local_with_runtime() {
578599
use tokio::runtime::Runtime;

0 commit comments

Comments
 (0)