Skip to content

Commit

Permalink
Define SchedulerInner for fine-grained cleaning (#4133)
Browse files Browse the repository at this point in the history
  • Loading branch information
ryoqun authored Dec 17, 2024
1 parent eec244f commit e27e2be
Showing 1 changed file with 42 additions and 26 deletions.
68 changes: 42 additions & 26 deletions unified-scheduler-pool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,10 @@ where

// This fn needs to return immediately due to being part of the blocking
// `::wait_for_termination()` call.
fn return_scheduler(&self, scheduler: S::Inner, should_trash: bool) {
fn return_scheduler(&self, scheduler: S::Inner) {
// Refer to the comment in is_aborted() as to the exact definition of the concept of
// _trashed_ and the interaction among different parts of unified scheduler.
let should_trash = scheduler.is_trashed();
if should_trash {
// Delay drop()-ing this trashed returned scheduler inner by stashing it in
// self.trashed_scheduler_inners, which is periodically drained by the `solScCleaner`
Expand Down Expand Up @@ -713,14 +716,6 @@ where
S: SpawnableScheduler<TH>,
TH: TaskHandler,
{
fn id(&self) -> SchedulerId {
self.thread_manager.scheduler_id
}

fn is_trashed(&self) -> bool {
self.is_aborted() || self.is_overgrown()
}

fn is_aborted(&self) -> bool {
// Schedulers can be regarded as being _trashed_ (thereby will be cleaned up later), if
// threads are joined. Remember that unified scheduler _doesn't normally join threads_ even
Expand All @@ -733,11 +728,12 @@ where
// Note that this detection is done internally every time scheduler operations are run
// (send_task() and end_session(); or schedule_execution() and wait_for_termination() in
// terms of InstalledScheduler). So, it's ensured that the detection is done at least once
// for any scheudler which is taken out of the pool.
// for any scheduler which is taken out of the pool.
//
// Thus, any transaction errors are always handled without loss of information and
// the aborted scheduler itself will always be handled as _trashed_ before returning the
// scheduler to the pool, considering is_trashed() is checked immediately before that.
// scheduler to the pool, considering is_aborted() is checked via is_trashed() immediately
// before that.
self.thread_manager.are_threads_joined()
}

Expand Down Expand Up @@ -1344,8 +1340,13 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {
}
}

pub trait SchedulerInner {
fn id(&self) -> SchedulerId;
fn is_trashed(&self) -> bool;
}

pub trait SpawnableScheduler<TH: TaskHandler>: InstalledScheduler {
type Inner: Debug + Send + Sync;
type Inner: SchedulerInner + Debug + Send + Sync;

fn into_inner(self) -> (ResultWithTimings, Self::Inner);

Expand Down Expand Up @@ -1442,22 +1443,27 @@ impl<TH: TaskHandler> InstalledScheduler for PooledScheduler<TH> {
}
}

impl<S, TH> SchedulerInner for PooledSchedulerInner<S, TH>
where
S: SpawnableScheduler<TH>,
TH: TaskHandler,
{
fn id(&self) -> SchedulerId {
self.thread_manager.scheduler_id
}

fn is_trashed(&self) -> bool {
self.is_aborted() || self.is_overgrown()
}
}

impl<S, TH> UninstalledScheduler for PooledSchedulerInner<S, TH>
where
S: SpawnableScheduler<TH, Inner = Self>,
TH: TaskHandler,
{
fn return_to_pool(self: Box<Self>) {
// Refer to the comment in is_trashed() as to the exact definition of the concept of
// _trashed_ and the interaction among different parts of unified scheduler.
let should_trash = self.is_trashed();
if should_trash {
info!("trashing scheduler (id: {})...", self.id());
}
self.thread_manager
.pool
.clone()
.return_scheduler(*self, should_trash);
self.thread_manager.pool.clone().return_scheduler(*self);
}
}

Expand Down Expand Up @@ -2117,10 +2123,10 @@ mod tests {

let (result_with_timings, scheduler1) = scheduler1.into_inner();
assert_matches!(result_with_timings, (Ok(()), _));
pool.return_scheduler(scheduler1, false);
pool.return_scheduler(scheduler1);
let (result_with_timings, scheduler2) = scheduler2.into_inner();
assert_matches!(result_with_timings, (Ok(()), _));
pool.return_scheduler(scheduler2, false);
pool.return_scheduler(scheduler2);

let scheduler3 = pool.do_take_scheduler(context.clone());
assert_eq!(scheduler_id2, scheduler3.id());
Expand Down Expand Up @@ -2163,7 +2169,7 @@ mod tests {

let scheduler = pool.do_take_scheduler(old_context.clone());
let scheduler_id = scheduler.id();
pool.return_scheduler(scheduler.into_inner().1, false);
pool.return_scheduler(scheduler.into_inner().1);

let scheduler = pool.take_scheduler(new_context.clone());
assert_eq!(scheduler_id, scheduler.id());
Expand Down Expand Up @@ -2762,11 +2768,21 @@ mod tests {
}
}

impl<const TRIGGER_RACE_CONDITION: bool> SchedulerInner for AsyncScheduler<TRIGGER_RACE_CONDITION> {
fn id(&self) -> SchedulerId {
42
}

fn is_trashed(&self) -> bool {
false
}
}

impl<const TRIGGER_RACE_CONDITION: bool> UninstalledScheduler
for AsyncScheduler<TRIGGER_RACE_CONDITION>
{
fn return_to_pool(self: Box<Self>) {
self.3.clone().return_scheduler(*self, false)
self.3.clone().return_scheduler(*self)
}
}

Expand Down

0 comments on commit e27e2be

Please sign in to comment.