Skip to content

Commit 0f564e6

Browse files
authored
Merge pull request #1120 from rust-lang/sharing-is-caring
Request idling containers exit when the container limit is reached
2 parents 6330754 + 0b70019 commit 0f564e6

File tree

3 files changed

+83
-6
lines changed

3 files changed

+83
-6
lines changed

compiler/base/orchestrator/src/coordinator.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -852,6 +852,9 @@ type ResourceResult<T, E = ResourceError> = std::result::Result<T, E>;
852852
pub trait ResourceLimits: Send + Sync + fmt::Debug + 'static {
853853
/// Block until resources for a container are available.
854854
fn next_container(&self) -> BoxFuture<'static, ResourceResult<Box<dyn ContainerPermit>>>;
855+
856+
/// Block until someone reqeusts that you return an in-use container.
857+
fn container_requested(&self) -> BoxFuture<'static, ()>;
855858
}
856859

857860
/// Represents one allowed Docker container (or equivalent).
@@ -884,6 +887,10 @@ impl CoordinatorFactory {
884887

885888
Coordinator::new(limits, backend)
886889
}
890+
891+
pub async fn container_requested(&self) {
892+
self.limits.container_requested().await
893+
}
887894
}
888895

889896
#[derive(Debug)]

compiler/base/orchestrator/src/coordinator/limits.rs

Lines changed: 52 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@ pub struct Global<L = NoOpLifecycle> {
9494
lifecycle: L,
9595
container_semaphore: Arc<Semaphore>,
9696
process_semaphore: Arc<Semaphore>,
97+
container_request_semaphore: Arc<Semaphore>,
9798
start: u64,
9899
id: AtomicU64,
99100
}
@@ -136,6 +137,7 @@ where
136137
pub fn with_lifecycle(container_limit: usize, process_limit: usize, lifecycle: L) -> Self {
137138
let container_semaphore = Arc::new(Semaphore::new(container_limit));
138139
let process_semaphore = Arc::new(Semaphore::new(process_limit));
140+
let container_request_semaphore = Arc::new(Semaphore::new(0));
139141

140142
let now = std::time::SystemTime::now();
141143
let start = now
@@ -149,6 +151,7 @@ where
149151
lifecycle,
150152
container_semaphore,
151153
process_semaphore,
154+
container_request_semaphore,
152155
start,
153156
id,
154157
}
@@ -163,13 +166,44 @@ where
163166
let lifecycle = self.lifecycle.clone();
164167
let container_semaphore = self.container_semaphore.clone();
165168
let process_semaphore = self.process_semaphore.clone();
169+
let container_request_semaphore = self.container_request_semaphore.clone();
166170
let start = self.start;
167171
let id = self.id.fetch_add(1, Ordering::SeqCst);
168172

169173
async move {
170174
let guard = ContainerAcquireGuard::start(&lifecycle);
171175

172-
let container_permit = container_semaphore.acquire_owned().await;
176+
// Attempt to acquire the container semaphore. If we don't
177+
// immediately get it, notify the container request
178+
// semaphore. Any idle-but-not-yet-exited connections
179+
// should watch that semaphore to see if they should give
180+
// up thier container to allow someone else in.
181+
//
182+
// There *is* a race here: a container might naturally
183+
// exit after we attempt to acquire the first time. In
184+
// that case, we'd spuriously notify the request semaphore
185+
// and a container might exit earlier than it needed
186+
// to. However, this should be a transient issue and only
187+
// occur when we are already at the upper bounds of our
188+
// limits. In those cases, freeing an extra container or
189+
// two shouldn't be the worst thing.
190+
let container_permit = {
191+
let fallback = {
192+
let container_semaphore = container_semaphore.clone();
193+
async {
194+
container_request_semaphore.add_permits(1);
195+
container_semaphore.acquire_owned().await
196+
}
197+
};
198+
199+
tokio::select! {
200+
biased;
201+
202+
permit = container_semaphore.acquire_owned() => permit,
203+
permit = fallback => permit,
204+
}
205+
};
206+
173207
let container_permit = guard.complete(container_permit)?;
174208

175209
let token = TrackContainer {
@@ -183,6 +217,23 @@ where
183217
}
184218
.boxed()
185219
}
220+
221+
fn container_requested(&self) -> BoxFuture<'static, ()> {
222+
let container_request_semaphore = self.container_request_semaphore.clone();
223+
224+
async move {
225+
let permit = container_request_semaphore
226+
.acquire()
227+
.await
228+
.expect("The semaphore is never closed");
229+
230+
// We're now dealing with the request to return a
231+
// container so we discard the permit to prevent anyone
232+
// else from trying to handle it.
233+
permit.forget();
234+
}
235+
.boxed()
236+
}
186237
}
187238

188239
impl<L> fmt::Display for TrackContainer<L>

ui/src/server_axum/websocket.rs

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ use std::{
1616
collections::BTreeMap,
1717
convert::TryFrom,
1818
mem,
19+
ops::ControlFlow,
1920
pin::pin,
2021
sync::{
2122
atomic::{AtomicU64, Ordering},
@@ -444,13 +445,16 @@ async fn handle_core(
444445
},
445446

446447
_ = &mut idle_timeout, if manager.is_empty() => {
447-
let idled = manager.idle().await.context(StreamingCoordinatorIdleSnafu);
448+
if handle_idle(&mut manager, &tx).await.is_break() {
449+
break
450+
}
451+
},
448452

449-
let Err(error) = idled else { continue };
453+
_ = factory.container_requested(), if manager.is_empty() => {
454+
info!("Container requested to idle");
450455

451-
if tx.send(Err((error, None))).await.is_err() {
452-
// We can't send a response
453-
break;
456+
if handle_idle(&mut manager, &tx).await.is_break() {
457+
break
454458
}
455459
},
456460

@@ -506,6 +510,21 @@ fn response_to_message(response: MessageResponse) -> Message {
506510
Message::Text(resp)
507511
}
508512

513+
async fn handle_idle(manager: &mut CoordinatorManager, tx: &ResponseTx) -> ControlFlow<()> {
514+
let idled = manager.idle().await.context(StreamingCoordinatorIdleSnafu);
515+
516+
let Err(error) = idled else {
517+
return ControlFlow::Continue(());
518+
};
519+
520+
if tx.send(Err((error, None))).await.is_err() {
521+
// We can't send a response
522+
return ControlFlow::Break(());
523+
}
524+
525+
ControlFlow::Continue(())
526+
}
527+
509528
type ActiveExecutionInfo = (CancellationToken, Option<mpsc::Sender<String>>);
510529

511530
async fn handle_msg(

0 commit comments

Comments
 (0)