Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tokio: distinguish LocalSet::enter() with being polled #6016

Merged
merged 18 commits into from
Oct 15, 2023
Merged
53 changes: 36 additions & 17 deletions tokio/src/task/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -280,10 +280,12 @@ pin_project! {

tokio_thread_local!(static CURRENT: LocalData = const { LocalData {
ctx: RcCell::new(),
entered: Cell::new(false),
} });

struct LocalData {
ctx: RcCell<Context>,
entered: Cell<bool>,
}

cfg_rt! {
Expand Down Expand Up @@ -360,12 +362,16 @@ const MAX_TASKS_PER_TICK: usize = 61;
const REMOTE_FIRST_INTERVAL: u8 = 31;

/// Context guard for LocalSet
pub struct LocalEnterGuard(Option<Rc<Context>>);
pub struct LocalEnterGuard {
ctx: Option<Rc<Context>>,
entered: bool,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add a comment here describing how entered is used and what it represents? thanks!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added! Thanks!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry if this is overly nitpicky, but I think this comment could maybe be improved a bit --- it would be nice if it explained why we need to differentiate between enter and polling the LocalSet, and what behavior is controlled based on that. I don't think that's currelty all that clear from reading the code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, sorry! I've corrected it!

}

impl Drop for LocalEnterGuard {
fn drop(&mut self) {
CURRENT.with(|LocalData { ctx, .. }| {
ctx.set(self.0.take());
CURRENT.with(|LocalData { ctx, entered }| {
ctx.set(self.ctx.take());
entered.set(self.entered);
})
}
}
Expand Down Expand Up @@ -408,9 +414,10 @@ impl LocalSet {
///
/// [`spawn_local`]: fn@crate::task::spawn_local
pub fn enter(&self) -> LocalEnterGuard {
CURRENT.with(|LocalData { ctx, .. }| {
let old = ctx.replace(Some(self.context.clone()));
LocalEnterGuard(old)
CURRENT.with(|LocalData { ctx, entered }| {
let ctx = ctx.replace(Some(self.context.clone()));
let entered = entered.replace(true);
LocalEnterGuard { ctx, entered }
})
}

Expand Down Expand Up @@ -667,21 +674,27 @@ impl LocalSet {
}

fn with<T>(&self, f: impl FnOnce() -> T) -> T {
CURRENT.with(|LocalData { ctx, .. }| {
CURRENT.with(|LocalData { ctx, entered, .. }| {
struct Reset<'a> {
ctx_ref: &'a RcCell<Context>,
val: Option<Rc<Context>>,
entered_ref: &'a Cell<bool>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit, not a big deal: both ctx_ref and entered_ref reference fields of the LocalData. we could make this struct a word smaller by changing these to a single &'a LocalData reference, and just using that to access both ctx and entered.

Copy link
Contributor Author

@inq inq Sep 19, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a good idea. I've changed it!

ctx_val: Option<Rc<Context>>,
entered_val: bool,
}
impl<'a> Drop for Reset<'a> {
fn drop(&mut self) {
self.ctx_ref.set(self.val.take());
self.ctx_ref.set(self.ctx_val.take());
self.entered_ref.set(self.entered_val)
}
}
let old = ctx.replace(Some(self.context.clone()));
let ctx_old = ctx.replace(Some(self.context.clone()));
let entered_old = entered.replace(false);

let _reset = Reset {
ctx_ref: ctx,
val: old,
entered_ref: entered,
ctx_val: ctx_old,
entered_val: entered_old,
};

f()
Expand All @@ -693,21 +706,27 @@ impl LocalSet {
fn with_if_possible<T>(&self, f: impl FnOnce() -> T) -> T {
let mut f = Some(f);

let res = CURRENT.try_with(|LocalData { ctx, .. }| {
let res = CURRENT.try_with(|LocalData { ctx, entered, .. }| {
struct Reset<'a> {
ctx_ref: &'a RcCell<Context>,
val: Option<Rc<Context>>,
entered_ref: &'a Cell<bool>,
ctx_val: Option<Rc<Context>>,
entered_val: bool,
}
impl<'a> Drop for Reset<'a> {
fn drop(&mut self) {
self.ctx_ref.replace(self.val.take());
self.ctx_ref.set(self.ctx_val.take());
self.entered_ref.set(self.entered_val)
}
}
let old = ctx.replace(Some(self.context.clone()));
let ctx_old = ctx.replace(Some(self.context.clone()));
let entered_old = entered.replace(false);

let _reset = Reset {
ctx_ref: ctx,
val: old,
entered_ref: entered,
ctx_val: ctx_old,
entered_val: entered_old,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

take it or leave it: the Reset code got substantially more complex in this change, and it's used in two different places. now that it's no longer fairly trivial, what do you think about moving this out of the with and with_if_possible functions, so that we can use one implementation in both places. Something like:

impl LocalData {
    fn enter(&self, context: Rc<Context>) -> Reset<'_> {
        let ctx_val = self.ctx.replace(Some(context));
        let entered_val = self.entered.replace(false);

        Reset {
            ctx_ref: self.ctx,
            entered_ref: self.entered,
            ctx_val: ctx_old,
            entered_val: entered_old,
        }
    }
}

struct Reset<'a> {
    ctx_ref: &'a RcCell<Context>,
    entered_ref: &'a Cell<bool>,
    ctx_val: Option<Rc<Context>>,
    entered_val: bool,
}

impl<'a> Drop for Reset<'a> {
    fn drop(&mut self) {
        self.ctx_ref.set(self.ctx_val.take());
        self.entered_ref.set(self.entered_val)
    }
}

and then changing with and with_if_possible to just call

let _reset = local_data.enter(self.context.clone());

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you! I've moved out, and renamed it to more verbose name.

};

(f.take().unwrap())()
Expand Down Expand Up @@ -967,7 +986,7 @@ impl Shared {
fn schedule(&self, task: task::Notified<Arc<Self>>) {
CURRENT.with(|localdata| {
match localdata.ctx.get() {
Some(cx) if cx.shared.ptr_eq(self) => unsafe {
Some(cx) if cx.shared.ptr_eq(self) && !localdata.entered.get() => unsafe {
Darksonn marked this conversation as resolved.
Show resolved Hide resolved
// Safety: if the current `LocalSet` context points to this
// `LocalSet`, then we are on the thread that owns it.
cx.shared.local_state.task_push_back(task);
Expand Down
19 changes: 19 additions & 0 deletions tokio/tests/task_local_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -573,6 +573,25 @@ async fn spawn_wakes_localset() {
}
}

#[tokio::test]
async fn sleep_with_local_enter_guard() {
hawkw marked this conversation as resolved.
Show resolved Hide resolved
let local = LocalSet::new();
let _guard = local.enter();

let (tx, rx) = oneshot::channel();

local
.run_until(async move {
tokio::task::spawn_local(async move {
time::sleep(Duration::ZERO).await;

tx.send(()).expect("failed to send");
});
assert_eq!(rx.await, Ok(()));
})
.await;
}

#[test]
fn store_local_set_in_thread_local_with_runtime() {
use tokio::runtime::Runtime;
Expand Down