-
-
Notifications
You must be signed in to change notification settings - Fork 2.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
tokio: distinguish LocalSet::enter() with being polled #6016
Changes from 4 commits
96bc15d
b887953
cdc5ca7
8f86628
777efa0
d2f542b
c0d2634
00f5b88
3cfc9f4
ff24960
95fa5ca
5715a4c
54657b9
0616cbf
676318e
869332f
2af739b
f61d1ed
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -280,10 +280,12 @@ pin_project! { | |
|
||
tokio_thread_local!(static CURRENT: LocalData = const { LocalData { | ||
ctx: RcCell::new(), | ||
entered: Cell::new(false), | ||
} }); | ||
|
||
struct LocalData { | ||
ctx: RcCell<Context>, | ||
entered: Cell<bool>, | ||
} | ||
|
||
cfg_rt! { | ||
|
@@ -360,12 +362,16 @@ const MAX_TASKS_PER_TICK: usize = 61; | |
const REMOTE_FIRST_INTERVAL: u8 = 31; | ||
|
||
/// Context guard for LocalSet | ||
pub struct LocalEnterGuard(Option<Rc<Context>>); | ||
pub struct LocalEnterGuard { | ||
ctx: Option<Rc<Context>>, | ||
entered: bool, | ||
} | ||
|
||
impl Drop for LocalEnterGuard { | ||
fn drop(&mut self) { | ||
CURRENT.with(|LocalData { ctx, .. }| { | ||
ctx.set(self.0.take()); | ||
CURRENT.with(|LocalData { ctx, entered }| { | ||
ctx.set(self.ctx.take()); | ||
entered.set(self.entered); | ||
}) | ||
} | ||
} | ||
|
@@ -408,9 +414,10 @@ impl LocalSet { | |
/// | ||
/// [`spawn_local`]: fn@crate::task::spawn_local | ||
pub fn enter(&self) -> LocalEnterGuard { | ||
CURRENT.with(|LocalData { ctx, .. }| { | ||
let old = ctx.replace(Some(self.context.clone())); | ||
LocalEnterGuard(old) | ||
CURRENT.with(|LocalData { ctx, entered }| { | ||
let ctx = ctx.replace(Some(self.context.clone())); | ||
let entered = entered.replace(true); | ||
LocalEnterGuard { ctx, entered } | ||
}) | ||
} | ||
|
||
|
@@ -667,21 +674,27 @@ impl LocalSet { | |
} | ||
|
||
fn with<T>(&self, f: impl FnOnce() -> T) -> T { | ||
CURRENT.with(|LocalData { ctx, .. }| { | ||
CURRENT.with(|LocalData { ctx, entered, .. }| { | ||
struct Reset<'a> { | ||
ctx_ref: &'a RcCell<Context>, | ||
val: Option<Rc<Context>>, | ||
entered_ref: &'a Cell<bool>, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit, not a big deal: both There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's a good idea. I've changed it! |
||
ctx_val: Option<Rc<Context>>, | ||
entered_val: bool, | ||
} | ||
impl<'a> Drop for Reset<'a> { | ||
fn drop(&mut self) { | ||
self.ctx_ref.set(self.val.take()); | ||
self.ctx_ref.set(self.ctx_val.take()); | ||
self.entered_ref.set(self.entered_val) | ||
} | ||
} | ||
let old = ctx.replace(Some(self.context.clone())); | ||
let ctx_old = ctx.replace(Some(self.context.clone())); | ||
let entered_old = entered.replace(false); | ||
|
||
let _reset = Reset { | ||
ctx_ref: ctx, | ||
val: old, | ||
entered_ref: entered, | ||
ctx_val: ctx_old, | ||
entered_val: entered_old, | ||
}; | ||
|
||
f() | ||
|
@@ -693,21 +706,27 @@ impl LocalSet { | |
fn with_if_possible<T>(&self, f: impl FnOnce() -> T) -> T { | ||
let mut f = Some(f); | ||
|
||
let res = CURRENT.try_with(|LocalData { ctx, .. }| { | ||
let res = CURRENT.try_with(|LocalData { ctx, entered, .. }| { | ||
struct Reset<'a> { | ||
ctx_ref: &'a RcCell<Context>, | ||
val: Option<Rc<Context>>, | ||
entered_ref: &'a Cell<bool>, | ||
ctx_val: Option<Rc<Context>>, | ||
entered_val: bool, | ||
} | ||
impl<'a> Drop for Reset<'a> { | ||
fn drop(&mut self) { | ||
self.ctx_ref.replace(self.val.take()); | ||
self.ctx_ref.set(self.ctx_val.take()); | ||
self.entered_ref.set(self.entered_val) | ||
} | ||
} | ||
let old = ctx.replace(Some(self.context.clone())); | ||
let ctx_old = ctx.replace(Some(self.context.clone())); | ||
let entered_old = entered.replace(false); | ||
|
||
let _reset = Reset { | ||
ctx_ref: ctx, | ||
val: old, | ||
entered_ref: entered, | ||
ctx_val: ctx_old, | ||
entered_val: entered_old, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. take it or leave it: the impl LocalData {
fn enter(&self, context: Rc<Context>) -> Reset<'_> {
let ctx_val = self.ctx.replace(Some(context));
let entered_val = self.entered.replace(false);
Reset {
ctx_ref: self.ctx,
entered_ref: self.entered,
ctx_val: ctx_old,
entered_val: entered_old,
}
}
}
struct Reset<'a> {
ctx_ref: &'a RcCell<Context>,
entered_ref: &'a Cell<bool>,
ctx_val: Option<Rc<Context>>,
entered_val: bool,
}
impl<'a> Drop for Reset<'a> {
fn drop(&mut self) {
self.ctx_ref.set(self.ctx_val.take());
self.entered_ref.set(self.entered_val)
}
} and then changing let _reset = local_data.enter(self.context.clone()); There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thank you! I've moved out, and renamed it to more verbose name. |
||
}; | ||
|
||
(f.take().unwrap())() | ||
|
@@ -967,7 +986,7 @@ impl Shared { | |
fn schedule(&self, task: task::Notified<Arc<Self>>) { | ||
CURRENT.with(|localdata| { | ||
match localdata.ctx.get() { | ||
Some(cx) if cx.shared.ptr_eq(self) => unsafe { | ||
Some(cx) if cx.shared.ptr_eq(self) && !localdata.entered.get() => unsafe { | ||
Darksonn marked this conversation as resolved.
Show resolved
Hide resolved
|
||
// Safety: if the current `LocalSet` context points to this | ||
// `LocalSet`, then we are on the thread that owns it. | ||
cx.shared.local_state.task_push_back(task); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we add a comment here describing how
entered
is used and what it represents? thanks!There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've added! Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry if this is overly nitpicky, but I think this comment could maybe be improved a bit --- it would be nice if it explained why we need to differentiate between
enter
and polling theLocalSet
, and what behavior is controlled based on that. I don't think that's currelty all that clear from reading the code.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops, sorry! I've corrected it!