Skip to content

Commit

Permalink
core: Simplify use_current_thread clean-up.
Browse files Browse the repository at this point in the history
Do it from Registry::terminate() instead of exposing a new API.
  • Loading branch information
emilio committed Aug 23, 2023
1 parent 9561b27 commit 37f6dc5
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 116 deletions.
11 changes: 4 additions & 7 deletions rayon-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,8 @@ pub use self::scope::{in_place_scope, scope, Scope};
pub use self::scope::{in_place_scope_fifo, scope_fifo, ScopeFifo};
pub use self::spawn::{spawn, spawn_fifo};
pub use self::thread_pool::{
clean_up_use_current_thread, current_thread_has_pending_tasks, current_thread_index,
yield_local, yield_now, ThreadPool, Yield,
current_thread_has_pending_tasks, current_thread_index, yield_local, yield_now, ThreadPool,
Yield,
};

use self::registry::{CustomSpawn, DefaultSpawn, ThreadSpawn};
Expand Down Expand Up @@ -548,11 +548,8 @@ impl<S> ThreadPoolBuilder<S> {
/// # Cleaning up a local thread-pool
///
/// In order to properly clean-up the worker thread state, for local thread-pools you should
/// call [`clean_up_use_current_thread()`] from the same thread that built the thread-pool.
/// See that function's documentation for more details.
///
/// This call is not required, but without it the registry will leak even if the pool is
/// otherwise terminated.
/// drop the thread pool from the same thread it was created on. Otherwise the registry will
/// leak (even if the pool is otherwise terminated).
pub fn use_current_thread(mut self) -> Self {
self.use_current_thread = true;
self
Expand Down
22 changes: 15 additions & 7 deletions rayon-core/src/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -610,9 +610,21 @@ impl Registry {
/// dropped. The worker threads will gradually terminate, once any
/// extant work is completed.
pub(super) fn terminate(&self) {
if self.terminate_count.fetch_sub(1, Ordering::AcqRel) == 1 {
for (i, thread_info) in self.thread_infos.iter().enumerate() {
unsafe { OnceLatch::set_and_tickle_one(&thread_info.terminate, self, i) };
if self.terminate_count.fetch_sub(1, Ordering::AcqRel) != 1 {
return;
}
for (i, thread_info) in self.thread_infos.iter().enumerate() {
unsafe { OnceLatch::set_and_tickle_one(&thread_info.terminate, self, i) };
}
if self.used_creator_thread {
if let Some(t) = self.current_thread() {
if t.index == 0 {
unsafe {
wait_until_out_of_work(t);
let _ = Box::from_raw(WorkerThread::current() as *mut WorkerThread);
}
assert!(WorkerThread::current().is_null());
}
}
}
}
Expand All @@ -621,10 +633,6 @@ impl Registry {
pub(super) fn notify_worker_latch_is_set(&self, target_worker_index: usize) {
self.sleep.notify_worker_latch_is_set(target_worker_index);
}

pub(super) fn used_creator_thread(&self) -> bool {
self.used_creator_thread
}
}

#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
Expand Down
33 changes: 0 additions & 33 deletions rayon-core/src/thread_pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -461,39 +461,6 @@ pub fn yield_local() -> Option<Yield> {
}
}

/// Waits for termination of the thread-pool (if pending), and cleans up resources allocated by
/// [`ThreadPoolBuilder::use_current_thread()`]. Should only be called from the thread that built
/// the thread-pool, and only when [`ThreadPoolBuilder::use_current_thread()`] is used.
///
/// Calling this function from a thread pool job will block indefinitely.
///
/// Calling this function before before the thread-pool has been dropped will cause the thread to
/// not return control flow to the caller until that happens (stealing work as necessary).
///
/// # Panics
///
/// If the calling thread is no the creator thread of a thread-pool, or not part of that
/// thread-pool, via [`ThreadPoolBuilder::use_current_thread()`].
pub fn clean_up_use_current_thread() {
unsafe {
let thread = WorkerThread::current()
.as_ref()
.expect("Should be called from a worker thread");
assert!(
thread.registry().used_creator_thread(),
"Should only be used to clean up the pool creator constructor thread"
);
assert_eq!(
thread.index(),
0,
"Should be called from the thread that created the pool"
);
crate::registry::wait_until_out_of_work(thread);
let _ = Box::from_raw(WorkerThread::current() as *mut WorkerThread);
}
assert!(WorkerThread::current().is_null());
}

/// Result of [`yield_now()`] or [`yield_local()`].
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum Yield {
Expand Down
69 changes: 0 additions & 69 deletions rayon-core/tests/use_current_thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ fn use_current_thread_basic() {
.wait_while(pair.0.lock().unwrap(), |ran| !*ran)
.unwrap();
std::mem::drop(pool); // Drop the pool.
rayon_core::clean_up_use_current_thread(); // Clean-up
assert_eq!(rayon_core::current_thread_index(), None);

// Wait until all threads have actually exited. This is not really needed, other than to
Expand All @@ -47,71 +46,3 @@ fn use_current_thread_basic() {
let _ = handle.join();
}
}

#[test]
fn bogus_use_current_thread_cleanup_wrong_thread() {
assert_eq!(rayon_core::current_thread_index(), None);
let pair = Arc::new((Mutex::new(None), Condvar::new()));
let pair2 = Arc::clone(&pair);
let pool = ThreadPoolBuilder::new()
.num_threads(2)
.use_current_thread()
.panic_handler(move |e| {
let &(ref panic_message, ref condvar) = &*pair2;
*panic_message.lock().unwrap() = Some(e.downcast::<String>().unwrap());
condvar.notify_one();
})
.build()
.unwrap();
assert_eq!(rayon_core::current_thread_index(), Some(0));

pool.spawn(move || {
rayon_core::clean_up_use_current_thread();
// Should never get here.
});

std::mem::drop(pool); // Drop the pool.
let panic_message = pair
.1
.wait_while(pair.0.lock().unwrap(), |panic_message| {
panic_message.is_none()
})
.unwrap();
let panic_message = panic_message.as_ref().unwrap();
assert!(
panic_message.contains("Should be called from the thread that created the pool"),
"{panic_message}"
);
}

#[test]
#[should_panic]
fn bogus_use_current_thread_cleanup_no_thread() {
assert_eq!(rayon_core::current_thread_index(), None);
rayon_core::clean_up_use_current_thread();
}

#[test]
#[ignore = "Hard to test things that should just infinitely block"]
fn bogus_use_current_thread_cleanup_reentrant() {
let pool = Arc::new(
ThreadPoolBuilder::new()
.num_threads(1)
.use_current_thread()
.build()
.unwrap(),
);
assert_eq!(rayon_core::current_thread_index(), Some(0));
let pool_clone = Arc::clone(&pool);
let handle = std::thread::spawn(move || {
pool_clone.spawn(move || {
// This should not execute until we clean_up_use_current_thread, so it'll get called
// reentrantly. This shouldn't result in UB or a use-after-free (should just infinitely
// block).
rayon_core::clean_up_use_current_thread();
});
});
handle.join().unwrap();
std::mem::drop(pool);
rayon_core::clean_up_use_current_thread();
}

0 comments on commit 37f6dc5

Please sign in to comment.