From 0b70019d24b461ffc31450265f40ae75781dea53 Mon Sep 17 00:00:00 2001 From: Jake Goulding Date: Wed, 13 Nov 2024 16:54:11 -0500 Subject: [PATCH] Request idling containers exit when the container limit is reached We let WebSocket connections keep {stable,beta,nightly} containers running after the execution finishes. These idle containers still count against the enforced container limits even though they aren't doing useful work. This change allows the owner of those idle containers to know that someone else wants their slot and then they can make the containers exit earlier than they otherwise would have. This can be best shown by setting the container limit very low (e.g. 2) and then opening that many plus one more browser tabs (e.g. 3). Before this change, the last tab would need to wait for the idle timeout (defaults to 60 seconds) before they could have a chance to run their code. = Known issues There's a race that can cause containers to spuriously exit, but that should only occur when we are near the cap of containers anyway. More details are in comments. = Future thoughts / limitations One WebSocket connection could have *both* an active container and an idle one (e.g. stable and nightly). This case is not handled, but it may also be comparatively rare. The non-WebSocket endpoints can request other containers to exit, but they won't ever decrement the exit request counter themselves. This means that if you hit the server many times at these endpoints, then future WebSocket connections will exit earlier than they need to. Hopefully this is also comparatively rare. --- compiler/base/orchestrator/src/coordinator.rs | 7 +++ .../orchestrator/src/coordinator/limits.rs | 53 ++++++++++++++++++- ui/src/server_axum/websocket.rs | 29 ++++++++-- 3 files changed, 83 insertions(+), 6 deletions(-) diff --git a/compiler/base/orchestrator/src/coordinator.rs b/compiler/base/orchestrator/src/coordinator.rs index 038e0b29..02b5c6ce 100644 --- a/compiler/base/orchestrator/src/coordinator.rs +++ b/compiler/base/orchestrator/src/coordinator.rs @@ -852,6 +852,9 @@ type ResourceResult = std::result::Result; pub trait ResourceLimits: Send + Sync + fmt::Debug + 'static { /// Block until resources for a container are available. fn next_container(&self) -> BoxFuture<'static, ResourceResult>>; + + /// Block until someone reqeusts that you return an in-use container. + fn container_requested(&self) -> BoxFuture<'static, ()>; } /// Represents one allowed Docker container (or equivalent). @@ -884,6 +887,10 @@ impl CoordinatorFactory { Coordinator::new(limits, backend) } + + pub async fn container_requested(&self) { + self.limits.container_requested().await + } } #[derive(Debug)] diff --git a/compiler/base/orchestrator/src/coordinator/limits.rs b/compiler/base/orchestrator/src/coordinator/limits.rs index 0e1f741f..6a76c226 100644 --- a/compiler/base/orchestrator/src/coordinator/limits.rs +++ b/compiler/base/orchestrator/src/coordinator/limits.rs @@ -94,6 +94,7 @@ pub struct Global { lifecycle: L, container_semaphore: Arc, process_semaphore: Arc, + container_request_semaphore: Arc, start: u64, id: AtomicU64, } @@ -136,6 +137,7 @@ where pub fn with_lifecycle(container_limit: usize, process_limit: usize, lifecycle: L) -> Self { let container_semaphore = Arc::new(Semaphore::new(container_limit)); let process_semaphore = Arc::new(Semaphore::new(process_limit)); + let container_request_semaphore = Arc::new(Semaphore::new(0)); let now = std::time::SystemTime::now(); let start = now @@ -149,6 +151,7 @@ where lifecycle, container_semaphore, process_semaphore, + container_request_semaphore, start, id, } @@ -163,13 +166,44 @@ where let lifecycle = self.lifecycle.clone(); let container_semaphore = self.container_semaphore.clone(); let process_semaphore = self.process_semaphore.clone(); + let container_request_semaphore = self.container_request_semaphore.clone(); let start = self.start; let id = self.id.fetch_add(1, Ordering::SeqCst); async move { let guard = ContainerAcquireGuard::start(&lifecycle); - let container_permit = container_semaphore.acquire_owned().await; + // Attempt to acquire the container semaphore. If we don't + // immediately get it, notify the container request + // semaphore. Any idle-but-not-yet-exited connections + // should watch that semaphore to see if they should give + // up thier container to allow someone else in. + // + // There *is* a race here: a container might naturally + // exit after we attempt to acquire the first time. In + // that case, we'd spuriously notify the request semaphore + // and a container might exit earlier than it needed + // to. However, this should be a transient issue and only + // occur when we are already at the upper bounds of our + // limits. In those cases, freeing an extra container or + // two shouldn't be the worst thing. + let container_permit = { + let fallback = { + let container_semaphore = container_semaphore.clone(); + async { + container_request_semaphore.add_permits(1); + container_semaphore.acquire_owned().await + } + }; + + tokio::select! { + biased; + + permit = container_semaphore.acquire_owned() => permit, + permit = fallback => permit, + } + }; + let container_permit = guard.complete(container_permit)?; let token = TrackContainer { @@ -183,6 +217,23 @@ where } .boxed() } + + fn container_requested(&self) -> BoxFuture<'static, ()> { + let container_request_semaphore = self.container_request_semaphore.clone(); + + async move { + let permit = container_request_semaphore + .acquire() + .await + .expect("The semaphore is never closed"); + + // We're now dealing with the request to return a + // container so we discard the permit to prevent anyone + // else from trying to handle it. + permit.forget(); + } + .boxed() + } } impl fmt::Display for TrackContainer diff --git a/ui/src/server_axum/websocket.rs b/ui/src/server_axum/websocket.rs index 1895c8ac..4da6940e 100644 --- a/ui/src/server_axum/websocket.rs +++ b/ui/src/server_axum/websocket.rs @@ -16,6 +16,7 @@ use std::{ collections::BTreeMap, convert::TryFrom, mem, + ops::ControlFlow, pin::pin, sync::{ atomic::{AtomicU64, Ordering}, @@ -444,13 +445,16 @@ async fn handle_core( }, _ = &mut idle_timeout, if manager.is_empty() => { - let idled = manager.idle().await.context(StreamingCoordinatorIdleSnafu); + if handle_idle(&mut manager, &tx).await.is_break() { + break + } + }, - let Err(error) = idled else { continue }; + _ = factory.container_requested(), if manager.is_empty() => { + info!("Container requested to idle"); - if tx.send(Err((error, None))).await.is_err() { - // We can't send a response - break; + if handle_idle(&mut manager, &tx).await.is_break() { + break } }, @@ -506,6 +510,21 @@ fn response_to_message(response: MessageResponse) -> Message { Message::Text(resp) } +async fn handle_idle(manager: &mut CoordinatorManager, tx: &ResponseTx) -> ControlFlow<()> { + let idled = manager.idle().await.context(StreamingCoordinatorIdleSnafu); + + let Err(error) = idled else { + return ControlFlow::Continue(()); + }; + + if tx.send(Err((error, None))).await.is_err() { + // We can't send a response + return ControlFlow::Break(()); + } + + ControlFlow::Continue(()) +} + type ActiveExecutionInfo = (CancellationToken, Option>); async fn handle_msg(