Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Request idling containers exit when the container limit is reached #1120

Merged
merged 1 commit into from
Nov 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions compiler/base/orchestrator/src/coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -852,6 +852,9 @@ type ResourceResult<T, E = ResourceError> = std::result::Result<T, E>;
pub trait ResourceLimits: Send + Sync + fmt::Debug + 'static {
/// Block until resources for a container are available.
fn next_container(&self) -> BoxFuture<'static, ResourceResult<Box<dyn ContainerPermit>>>;

/// Block until someone reqeusts that you return an in-use container.
fn container_requested(&self) -> BoxFuture<'static, ()>;
}

/// Represents one allowed Docker container (or equivalent).
Expand Down Expand Up @@ -884,6 +887,10 @@ impl CoordinatorFactory {

Coordinator::new(limits, backend)
}

pub async fn container_requested(&self) {
self.limits.container_requested().await
}
}

#[derive(Debug)]
Expand Down
53 changes: 52 additions & 1 deletion compiler/base/orchestrator/src/coordinator/limits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ pub struct Global<L = NoOpLifecycle> {
lifecycle: L,
container_semaphore: Arc<Semaphore>,
process_semaphore: Arc<Semaphore>,
container_request_semaphore: Arc<Semaphore>,
start: u64,
id: AtomicU64,
}
Expand Down Expand Up @@ -136,6 +137,7 @@ where
pub fn with_lifecycle(container_limit: usize, process_limit: usize, lifecycle: L) -> Self {
let container_semaphore = Arc::new(Semaphore::new(container_limit));
let process_semaphore = Arc::new(Semaphore::new(process_limit));
let container_request_semaphore = Arc::new(Semaphore::new(0));

let now = std::time::SystemTime::now();
let start = now
Expand All @@ -149,6 +151,7 @@ where
lifecycle,
container_semaphore,
process_semaphore,
container_request_semaphore,
start,
id,
}
Expand All @@ -163,13 +166,44 @@ where
let lifecycle = self.lifecycle.clone();
let container_semaphore = self.container_semaphore.clone();
let process_semaphore = self.process_semaphore.clone();
let container_request_semaphore = self.container_request_semaphore.clone();
let start = self.start;
let id = self.id.fetch_add(1, Ordering::SeqCst);

async move {
let guard = ContainerAcquireGuard::start(&lifecycle);

let container_permit = container_semaphore.acquire_owned().await;
// Attempt to acquire the container semaphore. If we don't
// immediately get it, notify the container request
// semaphore. Any idle-but-not-yet-exited connections
// should watch that semaphore to see if they should give
// up thier container to allow someone else in.
//
// There *is* a race here: a container might naturally
// exit after we attempt to acquire the first time. In
// that case, we'd spuriously notify the request semaphore
// and a container might exit earlier than it needed
// to. However, this should be a transient issue and only
// occur when we are already at the upper bounds of our
// limits. In those cases, freeing an extra container or
// two shouldn't be the worst thing.
let container_permit = {
let fallback = {
let container_semaphore = container_semaphore.clone();
async {
container_request_semaphore.add_permits(1);
container_semaphore.acquire_owned().await
}
};

tokio::select! {
biased;

permit = container_semaphore.acquire_owned() => permit,
permit = fallback => permit,
}
};

let container_permit = guard.complete(container_permit)?;

let token = TrackContainer {
Expand All @@ -183,6 +217,23 @@ where
}
.boxed()
}

fn container_requested(&self) -> BoxFuture<'static, ()> {
let container_request_semaphore = self.container_request_semaphore.clone();

async move {
let permit = container_request_semaphore
.acquire()
.await
.expect("The semaphore is never closed");

// We're now dealing with the request to return a
// container so we discard the permit to prevent anyone
// else from trying to handle it.
permit.forget();
}
.boxed()
}
}

impl<L> fmt::Display for TrackContainer<L>
Expand Down
29 changes: 24 additions & 5 deletions ui/src/server_axum/websocket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::{
collections::BTreeMap,
convert::TryFrom,
mem,
ops::ControlFlow,
pin::pin,
sync::{
atomic::{AtomicU64, Ordering},
Expand Down Expand Up @@ -444,13 +445,16 @@ async fn handle_core(
},

_ = &mut idle_timeout, if manager.is_empty() => {
let idled = manager.idle().await.context(StreamingCoordinatorIdleSnafu);
if handle_idle(&mut manager, &tx).await.is_break() {
break
}
},

let Err(error) = idled else { continue };
_ = factory.container_requested(), if manager.is_empty() => {
info!("Container requested to idle");

if tx.send(Err((error, None))).await.is_err() {
// We can't send a response
break;
if handle_idle(&mut manager, &tx).await.is_break() {
break
}
},

Expand Down Expand Up @@ -506,6 +510,21 @@ fn response_to_message(response: MessageResponse) -> Message {
Message::Text(resp)
}

async fn handle_idle(manager: &mut CoordinatorManager, tx: &ResponseTx) -> ControlFlow<()> {
let idled = manager.idle().await.context(StreamingCoordinatorIdleSnafu);

let Err(error) = idled else {
return ControlFlow::Continue(());
};

if tx.send(Err((error, None))).await.is_err() {
// We can't send a response
return ControlFlow::Break(());
}

ControlFlow::Continue(())
}

type ActiveExecutionInfo = (CancellationToken, Option<mpsc::Sender<String>>);

async fn handle_msg(
Expand Down