Skip to content

Commit

Permalink
Log running count by client_id in Funrun (#25360)
Browse files Browse the repository at this point in the history
GitOrigin-RevId: 992bf2af7b08014cc2c7c4211152d80b5b54a6d2
  • Loading branch information
Preslav Le authored and Convex, Inc. committed May 3, 2024
1 parent 9c84ce9 commit cd5777d
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 17 deletions.
41 changes: 27 additions & 14 deletions crates/isolate/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1013,7 +1013,8 @@ impl<RT: Runtime, W: IsolateWorker<RT>> IsolateScheduler<RT, W> {
completed_worker = in_progress_workers.select_next_some() => {
log_pool_running_count(
self.worker.config().name,
in_progress_workers.len()
in_progress_workers.len(),
"" // This is a single tenant scheduler used in the backend.
);
let Ok(completed_worker) = completed_worker else {
// Worker has shut down, so we should shut down too.
Expand Down Expand Up @@ -1049,7 +1050,13 @@ impl<RT: Runtime, W: IsolateWorker<RT>> IsolateScheduler<RT, W> {
return;
}
in_progress_workers.push(done_receiver);
log_pool_running_count(self.worker.config().name, in_progress_workers.len());
// This is a single tenant scheduler used in the backend.
let client_id = "";
log_pool_running_count(
self.worker.config().name,
in_progress_workers.len(),
client_id,
);
}
}
}
Expand Down Expand Up @@ -1115,22 +1122,30 @@ impl<RT: Runtime, W: IsolateWorker<RT>> SharedIsolateScheduler<RT, W> {
}

fn handle_completed_worker(&mut self, completed_worker: ActiveWorkerState) {
match self
let new_count = match self
.in_progress_count
.remove_entry(&completed_worker.client_id)
{
Some((client_id, count)) if count > 1 => {
self.in_progress_count.insert(client_id, count - 1);
count - 1
},
Some((_, 1)) => {
// Nothing to do; we've already removed the entry above.
0
},
_ => panic!(
"Inconsistent state in `in_progress_count` map; the count of active workers for \
client {} must be >= 1",
completed_worker.client_id
),
}
};
log_pool_running_count(
self.worker.config().name,
new_count,
&completed_worker.client_id,
);

self.available_workers
.entry(completed_worker.client_id)
.or_default()
Expand All @@ -1146,10 +1161,6 @@ impl<RT: Runtime, W: IsolateWorker<RT>> SharedIsolateScheduler<RT, W> {
loop {
select_biased! {
completed_worker = self.in_progress_workers.select_next_some() => {
log_pool_running_count(
self.worker.config().name,
self.in_progress_workers.len()
);
let Ok(completed_worker): Result<ActiveWorkerState, _> = completed_worker else {
tracing::warn!("Worker has shut down uncleanly. Shutting down {} scheduler.", self.worker.config().name);
return;
Expand All @@ -1171,10 +1182,16 @@ impl<RT: Runtime, W: IsolateWorker<RT>> SharedIsolateScheduler<RT, W> {
};
let (done_sender, done_receiver) = oneshot::channel();
self.in_progress_workers.push(done_receiver);
*self
let entry = self
.in_progress_count
.entry(request.client_id.clone())
.or_default() += 1;
.or_default();
*entry += 1;
log_pool_running_count(
self.worker.config().name,
*entry,
&request.client_id,
);
let client_id = request.client_id.clone();
if self.worker_senders[worker_id]
.try_send((
Expand All @@ -1195,10 +1212,6 @@ impl<RT: Runtime, W: IsolateWorker<RT>> SharedIsolateScheduler<RT, W> {
);
return;
}
log_pool_running_count(
self.worker.config().name,
self.in_progress_workers.len()
);
},
_ = report_stats => {
let heap_stats = self.aggregate_heap_stats();
Expand Down
10 changes: 7 additions & 3 deletions crates/isolate/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use metrics::{
register_convex_histogram,
CancelableTimer,
IntoLabel,
MetricLabel,
StaticMetricLabel,
StatusTimer,
Timer,
Expand Down Expand Up @@ -54,13 +55,16 @@ pub fn execute_timer(udf_type: &UdfType, npm_version: &Option<Version>) -> Statu
register_convex_gauge!(
ISOLATE_POOL_RUNNING_COUNT_INFO,
"How many isolate workers are currently running work",
&["pool_name"]
&["pool_name", "client_id"]
);
pub fn log_pool_running_count(name: &'static str, count: usize) {
pub fn log_pool_running_count(name: &'static str, count: usize, client_id: &str) {
log_gauge_with_labels(
&ISOLATE_POOL_RUNNING_COUNT_INFO,
count as f64,
vec![StaticMetricLabel::new("pool_name", name)],
vec![
StaticMetricLabel::new("pool_name", name),
MetricLabel::new("client_id", client_id),
],
);
}

Expand Down

0 comments on commit cd5777d

Please sign in to comment.