Skip to content

Commit

Permalink
Port bounded_thread_pool.rs to use Tokio's mpsc (#30522)
Browse files Browse the repository at this point in the history
This PR also greatly simplifies the `select!` and `FuturesUnordered` logic to just use an `mpsc` channel.

GitOrigin-RevId: 4f9925d602024b3eb9b995898e4e651e818bf216
  • Loading branch information
sujayakar authored and Convex, Inc. committed Oct 10, 2024
1 parent 78129b8 commit 7642b5a
Showing 1 changed file with 38 additions and 45 deletions.
83 changes: 38 additions & 45 deletions crates/common/src/bounded_thread_pool.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,17 @@
use std::sync::Arc;

use futures::{
channel::mpsc,
future::{
self,
BoxFuture,
},
future::BoxFuture,
pin_mut,
select_biased,
stream::FuturesUnordered,
Future,
FutureExt,
StreamExt,
};
use parking_lot::Mutex;
use tokio::sync::oneshot;
use tokio::sync::{
mpsc,
oneshot,
};

use crate::{
codel_queue::{
Expand Down Expand Up @@ -121,7 +118,7 @@ impl<RT: Runtime> BoundedThreadPool<RT> {
struct Scheduler<RT: Runtime> {
rt: RT,
// Vec of channels for sending work to individual workers.
worker_senders: Vec<mpsc::Sender<(Request, oneshot::Sender<usize>, usize)>>,
worker_senders: Vec<mpsc::Sender<(Request, mpsc::Sender<usize>, usize)>>,

// Stack of indexes into worker_senders, including exactly the workers
// that are not running any request.
Expand All @@ -136,20 +133,6 @@ struct Scheduler<RT: Runtime> {
}

impl<RT: Runtime> Scheduler<RT> {
async fn get_available_worker(&mut self) -> usize {
match self.available_workers.pop() {
Some(value) => value,
None => {
// No available worker, create a new one if under the limit
if self.worker_senders.len() < self.config.max_exec_threads {
return self.create_worker();
}
// otherwise block indefinitely.
future::pending().await
},
}
}

fn create_worker(&mut self) -> usize {
let worker_index = self.worker_senders.len();
let (work_sender, work_receiver) = mpsc::channel(1);
Expand All @@ -163,36 +146,48 @@ impl<RT: Runtime> Scheduler<RT> {
}

async fn service_requests(
mut work_receiver: mpsc::Receiver<(Request, oneshot::Sender<usize>, usize)>,
mut work_receiver: mpsc::Receiver<(Request, mpsc::Sender<usize>, usize)>,
) {
// Wait for the next job from our sender.
while let Some((request, done_sender, worker_index)) = work_receiver.next().await {
while let Some((request, done_sender, worker_index)) = work_receiver.recv().await {
// Run one job
request.execute().await;
// Then tell our sender that we're ready for another job
let _ = done_sender.send(worker_index);
let _ = done_sender.try_send(worker_index);
}
}

async fn dispatch(mut self, mut receiver: CoDelQueueReceiver<RT, Request>) {
let mut in_progress_workers = FuturesUnordered::new();

let (in_progress_tx, mut in_progress_rx) = mpsc::channel(self.config.max_exec_threads);
loop {
let next_worker = loop {
select_biased! {
completed_worker = in_progress_workers.select_next_some() => {
let Ok(completed_worker) = completed_worker else {
tracing::warn!("Worker shut down. Shutting down {} scheduler.", self.config.name);
return;
};
self.available_workers.push(completed_worker);
},
next_worker = self.get_available_worker().fuse() => {
break next_worker;
},
}
// Drain as many requests from the in_progress channel before blocking. Since
// `self.available_workers` is LIFO, it's important we drain this channel so
// we reuse the most recent completed request.
while let Ok(w) = in_progress_rx.try_recv() {
self.available_workers.push(w);
}
// Reserve ourselves an available worker before popping from the request queue:
// This lets the request queue back up and express backpressure if
// all of the workers are busy.
let next_worker = match self.available_workers.pop() {
Some(w) => w,
// No available worker, create a new one if under the limit
None if self.worker_senders.len() < self.config.max_exec_threads => {
self.create_worker()
},
// Otherwise, wait for an in-progress request to complete.
None => {
let Some(w) = in_progress_rx.recv().await else {
tracing::warn!(
"Worker shut down. Shutting down {} scheduler.",
self.config.name
);
return;
};
w
},
};
// Otherwise wait for for more work and drive any in progress
// Wait for some work.
let req = loop {
match receiver.next().await {
Some((req, None)) => break req,
Expand All @@ -201,9 +196,8 @@ impl<RT: Runtime> Scheduler<RT> {
None => return,
}
};
let (done_sender, done_receiver) = oneshot::channel();
if self.worker_senders[next_worker]
.try_send((req, done_sender, next_worker))
.try_send((req, in_progress_tx.clone(), next_worker))
.is_err()
{
// Available worker should have an empty channel, so if we fail
Expand All @@ -214,7 +208,6 @@ impl<RT: Runtime> Scheduler<RT> {
);
return;
}
in_progress_workers.push(done_receiver);
}
}
}
Expand Down

0 comments on commit 7642b5a

Please sign in to comment.