Skip to content

Commit e2b28f7

Browse files
committed
Avoid buffering large amounts of rustc output.
1 parent 458138b commit e2b28f7

File tree

2 files changed

+77
-25
lines changed

2 files changed

+77
-25
lines changed

src/cargo/core/compiler/job_queue.rs

Lines changed: 35 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,28 @@ pub struct JobQueue<'a, 'cfg> {
9393
///
9494
/// It is created from JobQueue when we have fully assembled the crate graph
9595
/// (i.e., all package dependencies are known).
96+
///
97+
/// # Message queue
98+
///
99+
/// Each thread running a process uses the message queue to send messages back
100+
/// to the main thread. The main thread coordinates everything, and handles
101+
/// printing output.
102+
///
103+
/// It is important to be careful which messages use `push` vs `push_bounded`.
104+
/// `push` is for priority messages (like tokens, or "finished") where the
105+
/// sender shouldn't block. We want to handle those so real work can proceed
106+
/// ASAP.
107+
///
108+
/// `push_bounded` is only for messages being printed to stdout/stderr. Being
109+
/// bounded prevents a flood of messages causing a large amount of memory
110+
/// being used.
111+
///
112+
/// `push` also avoids blocking which helps avoid deadlocks. For example, when
113+
/// the diagnostic server thread is dropped, it waits for the thread to exit.
114+
/// But if the thread is blocked on a full queue, and there is a critical
115+
/// error, the drop will deadlock. This should be fixed at some point in the
116+
/// future. The jobserver thread has a similar problem, though it will time
117+
/// out after 1 second.
96118
struct DrainState<'a, 'cfg> {
97119
// This is the length of the DependencyQueue when starting out
98120
total_units: usize,
@@ -212,11 +234,11 @@ impl<'a> JobState<'a> {
212234
}
213235

214236
pub fn stdout(&self, stdout: String) {
215-
self.messages.push(Message::Stdout(stdout));
237+
self.messages.push_bounded(Message::Stdout(stdout));
216238
}
217239

218240
pub fn stderr(&self, stderr: String) {
219-
self.messages.push(Message::Stderr(stderr));
241+
self.messages.push_bounded(Message::Stderr(stderr));
220242
}
221243

222244
/// A method used to signal to the coordinator thread that the rmeta file
@@ -341,7 +363,10 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> {
341363
let state = DrainState {
342364
total_units: self.queue.len(),
343365
queue: self.queue,
344-
messages: Arc::new(Queue::new()),
366+
// 100 here is somewhat arbitrary. It is a few screenfulls of
367+
// output, and hopefully at most a few megabytes of memory for
368+
// typical messages.
369+
messages: Arc::new(Queue::new(100)),
345370
active: HashMap::new(),
346371
compiled: HashSet::new(),
347372
documented: HashSet::new(),
@@ -370,6 +395,9 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> {
370395
// Create a helper thread to manage the diagnostics for rustfix if
371396
// necessary.
372397
let messages = state.messages.clone();
398+
// It is important that this uses `push` instead of `push_bounded` for
399+
// now. If someone wants to fix this to be bounded, the `drop`
400+
// implementation needs to be changed to avoid possible deadlocks.
373401
let _diagnostic_server = cx
374402
.bcx
375403
.build_config
@@ -578,10 +606,7 @@ impl<'a, 'cfg> DrainState<'a, 'cfg> {
578606
// to run above to calculate CPU usage over time. To do this we
579607
// listen for a message with a timeout, and on timeout we run the
580608
// previous parts of the loop again.
581-
let mut events = Vec::new();
582-
while let Some(event) = self.messages.try_pop() {
583-
events.push(event);
584-
}
609+
let mut events = self.messages.try_pop_all();
585610
info!(
586611
"tokens in use: {}, rustc_tokens: {:?}, waiting_rustcs: {:?} (events this tick: {})",
587612
self.tokens.len(),
@@ -815,15 +840,10 @@ impl<'a, 'cfg> DrainState<'a, 'cfg> {
815840
};
816841

817842
match fresh {
818-
Freshness::Fresh => {
819-
self.timings.add_fresh();
820-
doit();
821-
}
822-
Freshness::Dirty => {
823-
self.timings.add_dirty();
824-
scope.spawn(move |_| doit());
825-
}
843+
Freshness::Fresh => self.timings.add_fresh(),
844+
Freshness::Dirty => self.timings.add_dirty(),
826845
}
846+
scope.spawn(move |_| doit());
827847

828848
Ok(())
829849
}

src/cargo/util/queue.rs

Lines changed: 42 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,30 +5,51 @@ use std::time::{Duration, Instant};
55
/// A simple, threadsafe, queue of items of type `T`
66
///
77
/// This is a sort of channel where any thread can push to a queue and any
8-
/// thread can pop from a queue. Currently queues have infinite capacity where
9-
/// `push` will never block but `pop` will block.
8+
/// thread can pop from a queue.
9+
///
10+
/// This supports both bounded and unbounded operations. `push` will never block,
11+
/// and allows the queue to grow without bounds. `push_bounded` will block if the
12+
/// queue is over capacity, and will resume once there is enough capacity.
1013
pub struct Queue<T> {
1114
state: Mutex<State<T>>,
12-
condvar: Condvar,
15+
popper_cv: Condvar,
16+
bounded_cv: Condvar,
17+
bound: usize,
1318
}
1419

1520
struct State<T> {
1621
items: VecDeque<T>,
1722
}
1823

1924
impl<T> Queue<T> {
20-
pub fn new() -> Queue<T> {
25+
pub fn new(bound: usize) -> Queue<T> {
2126
Queue {
2227
state: Mutex::new(State {
2328
items: VecDeque::new(),
2429
}),
25-
condvar: Condvar::new(),
30+
popper_cv: Condvar::new(),
31+
bounded_cv: Condvar::new(),
32+
bound,
2633
}
2734
}
2835

2936
pub fn push(&self, item: T) {
3037
self.state.lock().unwrap().items.push_back(item);
31-
self.condvar.notify_one();
38+
self.popper_cv.notify_one();
39+
}
40+
41+
/// Pushes an item onto the queue, blocking if the queue is full.
42+
pub fn push_bounded(&self, item: T) {
43+
let mut state = self.state.lock().unwrap();
44+
loop {
45+
if state.items.len() >= self.bound {
46+
state = self.bounded_cv.wait(state).unwrap();
47+
} else {
48+
state.items.push_back(item);
49+
self.popper_cv.notify_one();
50+
break;
51+
}
52+
}
3253
}
3354

3455
pub fn pop(&self, timeout: Duration) -> Option<T> {
@@ -39,16 +60,27 @@ impl<T> Queue<T> {
3960
if elapsed >= timeout {
4061
break;
4162
}
42-
let (lock, result) = self.condvar.wait_timeout(state, timeout - elapsed).unwrap();
63+
let (lock, result) = self
64+
.popper_cv
65+
.wait_timeout(state, timeout - elapsed)
66+
.unwrap();
4367
state = lock;
4468
if result.timed_out() {
4569
break;
4670
}
4771
}
48-
state.items.pop_front()
72+
let value = state.items.pop_front()?;
73+
if state.items.len() < self.bound {
74+
// Assumes threads cannot be canceled.
75+
self.bounded_cv.notify_one();
76+
}
77+
Some(value)
4978
}
5079

51-
pub fn try_pop(&self) -> Option<T> {
52-
self.state.lock().unwrap().items.pop_front()
80+
pub fn try_pop_all(&self) -> Vec<T> {
81+
let mut state = self.state.lock().unwrap();
82+
let result = state.items.drain(..).collect();
83+
self.bounded_cv.notify_all();
84+
result
5385
}
5486
}

0 commit comments

Comments
 (0)