8
8
//! This is built on top of [`pal_uring`] and [`pal_async`].
9
9
10
10
#![ warn( missing_docs) ]
11
- // UNSAFETY: needed for saving per-thread state.
12
- #![ expect( unsafe_code) ]
11
+ #![ forbid( unsafe_code) ]
13
12
14
13
use inspect:: Inspect ;
14
+ use loan_cell:: LoanCell ;
15
15
use pal:: unix:: affinity:: CpuSet ;
16
16
use pal_async:: fd:: FdReadyDriver ;
17
17
use pal_async:: task:: Runnable ;
@@ -30,7 +30,6 @@ use pal_uring::IoUringPool;
30
30
use pal_uring:: PoolClient ;
31
31
use pal_uring:: Timer ;
32
32
use parking_lot:: Mutex ;
33
- use std:: cell:: Cell ;
34
33
use std:: future:: poll_fn;
35
34
use std:: io;
36
35
use std:: marker:: PhantomData ;
@@ -215,15 +214,12 @@ impl ThreadpoolBuilder {
215
214
send. send ( Ok ( pool. client ( ) . clone ( ) ) ) . ok ( ) ;
216
215
217
216
// Store the current thread's driver so that spawned tasks can
218
- // find it via `Thread::current()`.
219
- CURRENT_THREADPOOL_CPU . with ( |current| {
220
- current. set ( std:: ptr:: from_ref ( & driver) ) ;
217
+ // find it via `Thread::current()`. Do this via a loan instead
218
+ // of storing it directly in TLS to avoid the overhead of
219
+ // registering a destructor.
220
+ CURRENT_THREAD_DRIVER . with ( |current| {
221
+ current. lend ( & driver, || pool. run ( ) ) ;
221
222
} ) ;
222
- pool. run ( ) ;
223
- CURRENT_THREADPOOL_CPU . with ( |current| {
224
- current. set ( std:: ptr:: null ( ) ) ;
225
- } ) ;
226
- drop ( driver) ;
227
223
} ) ?;
228
224
229
225
// Wait for the pool to be initialized.
@@ -360,33 +356,27 @@ impl Initiate for AffinitizedThreadpool {
360
356
/// The state for the thread pool thread for the currently running CPU.
361
357
#[ derive( Debug , Copy , Clone ) ]
362
358
pub struct Thread {
363
- driver : & ' static ThreadpoolDriver ,
364
359
_not_send_sync : PhantomData < * const ( ) > ,
365
360
}
366
361
367
362
impl Thread {
368
- /// Returns a new driver for the current CPU.
363
+ /// Returns an instance for the current CPU.
369
364
pub fn current ( ) -> Option < Self > {
370
- let inner = CURRENT_THREADPOOL_CPU . with ( |current| {
371
- let p = current. get ( ) ;
372
- // SAFETY: the `ThreadpoolDriver` is on the current thread's stack
373
- // and so is guaranteed to be valid. And since `Thread` is not
374
- // `Send` or `Sync`, this reference cannot be accessed after the
375
- // driver has been dropped, since any task that can construct a
376
- // `Thread` will have been completed by that time. So it's OK for
377
- // this reference to live as long as `Thread`.
378
- ( !p. is_null ( ) ) . then ( || unsafe { & * p } )
379
- } ) ?;
365
+ if !CURRENT_THREAD_DRIVER . with ( |current| current. is_lent ( ) ) {
366
+ return None ;
367
+ }
380
368
Some ( Self {
381
- driver : inner,
382
369
_not_send_sync : PhantomData ,
383
370
} )
384
371
}
385
372
386
- fn once ( & self ) -> & ThreadpoolDriverOnce {
387
- // Since we are on the thread, the thread is guaranteed to have been
388
- // initialized.
389
- self . driver . inner . once . get ( ) . unwrap ( )
373
+ /// Calls `f` with the driver for the current thread.
374
+ pub fn with_driver < R > ( & self , f : impl FnOnce ( & ThreadpoolDriver ) -> R ) -> R {
375
+ CURRENT_THREAD_DRIVER . with ( |current| current. borrow ( |driver| f ( driver. unwrap ( ) ) ) )
376
+ }
377
+
378
+ fn with_once < R > ( & self , f : impl FnOnce ( & ThreadpoolDriver , & ThreadpoolDriverOnce ) -> R ) -> R {
379
+ self . with_driver ( |driver| f ( driver, driver. inner . once . get ( ) . unwrap ( ) ) )
390
380
}
391
381
392
382
/// Sets the idle task to run. The task is returned by `f`, which receives
@@ -400,56 +390,52 @@ impl Thread {
400
390
F : ' static + Send + FnOnce ( IdleControl ) -> Fut ,
401
391
Fut : std:: future:: Future < Output = ( ) > ,
402
392
{
403
- self . once ( ) . client . set_idle_task ( f)
404
- }
405
-
406
- /// Returns the driver for the current thread.
407
- pub fn driver ( & self ) -> & ThreadpoolDriver {
408
- self . driver
393
+ self . with_once ( |_, once| once. client . set_idle_task ( f) )
409
394
}
410
395
411
396
/// Tries to set the affinity to this thread's intended CPU, if it has not
412
397
/// already been set. Returns `Ok(false)` if the intended CPU is still
413
398
/// offline.
414
399
pub fn try_set_affinity ( & self ) -> Result < bool , SetAffinityError > {
415
- let mut state = self . driver . inner . state . lock ( ) ;
416
- if matches ! ( state. affinity, AffinityState :: Set ) {
417
- return Ok ( true ) ;
418
- }
419
- if !is_cpu_online ( self . driver . inner . cpu ) . map_err ( SetAffinityError :: Online ) ? {
420
- return Ok ( false ) ;
421
- }
400
+ self . with_once ( |driver, once| {
401
+ let mut state = driver. inner . state . lock ( ) ;
402
+ if matches ! ( state. affinity, AffinityState :: Set ) {
403
+ return Ok ( true ) ;
404
+ }
405
+ if !is_cpu_online ( driver. inner . cpu ) . map_err ( SetAffinityError :: Online ) ? {
406
+ return Ok ( false ) ;
407
+ }
422
408
423
- let mut affinity = CpuSet :: new ( ) ;
424
- affinity. set ( self . driver . inner . cpu ) ;
425
-
426
- pal:: unix:: affinity:: set_current_thread_affinity ( & affinity)
427
- . map_err ( SetAffinityError :: Thread ) ?;
428
- self . once ( )
429
- . client
430
- . set_iowq_affinity ( & affinity )
431
- . map_err ( SetAffinityError :: Ring ) ? ;
432
-
433
- let old_affinity_state = std :: mem :: replace ( & mut state . affinity , AffinityState :: Set ) ;
434
- self . driver . inner . affinity_set . store ( true , Relaxed ) ;
435
- drop ( state ) ;
436
-
437
- match old_affinity_state {
438
- AffinityState :: Waiting ( wakers ) => {
439
- for waker in wakers {
440
- waker . wake ( ) ;
409
+ let mut affinity = CpuSet :: new ( ) ;
410
+ affinity. set ( driver. inner . cpu ) ;
411
+
412
+ pal:: unix:: affinity:: set_current_thread_affinity ( & affinity)
413
+ . map_err ( SetAffinityError :: Thread ) ?;
414
+ once. client
415
+ . set_iowq_affinity ( & affinity )
416
+ . map_err ( SetAffinityError :: Ring ) ? ;
417
+
418
+ let old_affinity_state = std :: mem :: replace ( & mut state . affinity , AffinityState :: Set ) ;
419
+ driver . inner . affinity_set . store ( true , Relaxed ) ;
420
+ drop ( state ) ;
421
+
422
+ match old_affinity_state {
423
+ AffinityState :: Waiting ( wakers ) => {
424
+ for waker in wakers {
425
+ waker . wake ( ) ;
426
+ }
441
427
}
428
+ AffinityState :: Set => unreachable ! ( ) ,
442
429
}
443
- AffinityState :: Set => unreachable ! ( ) ,
444
- }
445
- Ok ( true )
430
+ Ok ( true )
431
+ } )
446
432
}
447
433
448
434
/// Returns the that caused this thread to spawn.
449
435
///
450
436
/// Returns `None` if the thread was spawned to issue IO.
451
- pub fn first_task ( & self ) -> Option < & TaskInfo > {
452
- self . once ( ) . first_task . as_ref ( )
437
+ pub fn first_task ( & self ) -> Option < TaskInfo > {
438
+ self . with_once ( |_ , once| once . first_task . clone ( ) )
453
439
}
454
440
}
455
441
@@ -468,12 +454,12 @@ pub enum SetAffinityError {
468
454
}
469
455
470
456
thread_local ! {
471
- static CURRENT_THREADPOOL_CPU : Cell < * const ThreadpoolDriver > = const { Cell :: new( std :: ptr :: null ( ) ) } ;
457
+ static CURRENT_THREAD_DRIVER : LoanCell < ThreadpoolDriver > = const { LoanCell :: new( ) } ;
472
458
}
473
459
474
460
impl SpawnLocal for Thread {
475
461
fn scheduler_local ( & self , metadata : & TaskMetadata ) -> Arc < dyn Schedule > {
476
- self . driver . scheduler ( metadata) . clone ( )
462
+ self . with_driver ( | driver| driver . scheduler ( metadata) . clone ( ) )
477
463
}
478
464
}
479
465
@@ -506,7 +492,7 @@ struct ThreadpoolDriverOnce {
506
492
}
507
493
508
494
/// Information about a task that caused a thread to spawn.
509
- #[ derive( Debug , Inspect ) ]
495
+ #[ derive( Debug , Clone , Inspect ) ]
510
496
pub struct TaskInfo {
511
497
/// The name of the task.
512
498
pub name : Arc < str > ,
0 commit comments