diff --git a/crates/common/src/bounded_thread_pool.rs b/crates/common/src/bounded_thread_pool.rs index f119e23e..6bf1645c 100644 --- a/crates/common/src/bounded_thread_pool.rs +++ b/crates/common/src/bounded_thread_pool.rs @@ -1,20 +1,17 @@ use std::sync::Arc; use futures::{ - channel::mpsc, - future::{ - self, - BoxFuture, - }, + future::BoxFuture, pin_mut, - select_biased, - stream::FuturesUnordered, Future, FutureExt, StreamExt, }; use parking_lot::Mutex; -use tokio::sync::oneshot; +use tokio::sync::{ + mpsc, + oneshot, +}; use crate::{ codel_queue::{ @@ -121,7 +118,7 @@ impl BoundedThreadPool { struct Scheduler { rt: RT, // Vec of channels for sending work to individual workers. - worker_senders: Vec, usize)>>, + worker_senders: Vec, usize)>>, // Stack of indexes into worker_senders, including exactly the workers // that are not running any request. @@ -136,20 +133,6 @@ struct Scheduler { } impl Scheduler { - async fn get_available_worker(&mut self) -> usize { - match self.available_workers.pop() { - Some(value) => value, - None => { - // No available worker, create a new one if under the limit - if self.worker_senders.len() < self.config.max_exec_threads { - return self.create_worker(); - } - // otherwise block indefinitely. - future::pending().await - }, - } - } - fn create_worker(&mut self) -> usize { let worker_index = self.worker_senders.len(); let (work_sender, work_receiver) = mpsc::channel(1); @@ -163,36 +146,48 @@ impl Scheduler { } async fn service_requests( - mut work_receiver: mpsc::Receiver<(Request, oneshot::Sender, usize)>, + mut work_receiver: mpsc::Receiver<(Request, mpsc::Sender, usize)>, ) { // Wait for the next job from our sender. - while let Some((request, done_sender, worker_index)) = work_receiver.next().await { + while let Some((request, done_sender, worker_index)) = work_receiver.recv().await { // Run one job request.execute().await; // Then tell our sender that we're ready for another job - let _ = done_sender.send(worker_index); + let _ = done_sender.try_send(worker_index); } } async fn dispatch(mut self, mut receiver: CoDelQueueReceiver) { - let mut in_progress_workers = FuturesUnordered::new(); - + let (in_progress_tx, mut in_progress_rx) = mpsc::channel(self.config.max_exec_threads); loop { - let next_worker = loop { - select_biased! { - completed_worker = in_progress_workers.select_next_some() => { - let Ok(completed_worker) = completed_worker else { - tracing::warn!("Worker shut down. Shutting down {} scheduler.", self.config.name); - return; - }; - self.available_workers.push(completed_worker); - }, - next_worker = self.get_available_worker().fuse() => { - break next_worker; - }, - } + // Drain as many requests from the in_progress channel before blocking. Since + // `self.available_workers` is LIFO, it's important we drain this channel so + // we reuse the most recent completed request. + while let Ok(w) = in_progress_rx.try_recv() { + self.available_workers.push(w); + } + // Reserve ourselves an available worker before popping from the request queue: + // This lets the request queue back up and express backpressure if + // all of the workers are busy. + let next_worker = match self.available_workers.pop() { + Some(w) => w, + // No available worker, create a new one if under the limit + None if self.worker_senders.len() < self.config.max_exec_threads => { + self.create_worker() + }, + // Otherwise, wait for an in-progress request to complete. + None => { + let Some(w) = in_progress_rx.recv().await else { + tracing::warn!( + "Worker shut down. Shutting down {} scheduler.", + self.config.name + ); + return; + }; + w + }, }; - // Otherwise wait for for more work and drive any in progress + // Wait for some work. let req = loop { match receiver.next().await { Some((req, None)) => break req, @@ -201,9 +196,8 @@ impl Scheduler { None => return, } }; - let (done_sender, done_receiver) = oneshot::channel(); if self.worker_senders[next_worker] - .try_send((req, done_sender, next_worker)) + .try_send((req, in_progress_tx.clone(), next_worker)) .is_err() { // Available worker should have an empty channel, so if we fail @@ -214,7 +208,6 @@ impl Scheduler { ); return; } - in_progress_workers.push(done_receiver); } } }