Skip to content

Commit 6bd83d0

Browse files
committed
fix: decouple different ipc data channels
In some situations the ipc channels from actors to the background task can get overloaded. Currently we don't have a complete plan for how to handle this. But the major source of messages is telemetry, which is also one we can treat less reliably, so by dividing this from the data messages we can deprioritize them. Signed-off-by: Wim Looman <[email protected]>
1 parent 2a6c1c5 commit 6bd83d0

File tree

4 files changed

+86
-41
lines changed

4 files changed

+86
-41
lines changed

veecle-ipc/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ veecle-telemetry = { workspace = true, features = ["alloc"] }
2828
rand = { workspace = true, features = ["std_rng"] }
2929
serde = { workspace = true, features = ["derive"] }
3030
veecle-os = { workspace = true }
31+
veecle-telemetry = { workspace = true, features = ["alloc", "enable"] }
3132

3233
[lints]
3334
workspace = true

veecle-ipc/src/actors/output.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -52,13 +52,14 @@ pub async fn output<T>(
5252
where
5353
T: Storable<DataType: Serialize> + 'static,
5454
{
55-
let output = config.connector.output();
55+
let output = config.connector.storable_output();
5656
let send_policy = config.send_policy;
5757

5858
loop {
59-
let value = reader.wait_for_update().await.read(|value| {
60-
veecle_ipc_protocol::Message::Storable(EncodedStorable::new(value).unwrap())
61-
});
59+
let value = reader
60+
.wait_for_update()
61+
.await
62+
.read(|value| EncodedStorable::new(value).unwrap());
6263

6364
match send_policy {
6465
SendPolicy::Drop => {

veecle-ipc/src/connector.rs

Lines changed: 69 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -8,18 +8,77 @@ use tokio::net::UnixStream;
88
use tokio::sync::mpsc;
99
use tokio::task::JoinHandle;
1010
use tokio_util::codec::Framed;
11-
use veecle_ipc_protocol::{Codec, ControlRequest, ControlResponse, Message};
11+
use veecle_ipc_protocol::{Codec, ControlRequest, ControlResponse, EncodedStorable, Message};
1212

1313
use crate::Exporter;
1414

1515
type Inputs = Arc<Mutex<HashMap<&'static str, mpsc::Sender<String>>>>;
1616

17+
/// Holds various output channel senders for the [`Connector`], separated so they have decoupled
18+
/// buffering and prioritization.
19+
#[derive(Debug)]
20+
struct OutputTx {
21+
storable: mpsc::Sender<EncodedStorable>,
22+
telemetry: mpsc::Sender<veecle_telemetry::protocol::InstanceMessage<'static>>,
23+
control: mpsc::Sender<ControlRequest>,
24+
}
25+
26+
/// The receivers for [`OutputTx`].
27+
#[derive(Debug)]
28+
struct OutputRx {
29+
storable: mpsc::Receiver<EncodedStorable>,
30+
telemetry: mpsc::Receiver<veecle_telemetry::protocol::InstanceMessage<'static>>,
31+
control: mpsc::Receiver<ControlRequest>,
32+
}
33+
34+
impl OutputRx {
35+
/// Returns the first message available on any output channel.
36+
///
37+
/// Purposefully prioritizes the more important channels to drain first.
38+
/// This may lead to the low priority channels never being serviced if we are not keeping up.
39+
async fn recv(&mut self) -> Option<Message<'static>> {
40+
Some(tokio::select! {
41+
biased; // Polls all branches in order to guarantee prioritization.
42+
Some(control) = self.control.recv() => Message::ControlRequest(control),
43+
Some(storable) = self.storable.recv() => Message::Storable(storable),
44+
Some(telemetry) = self.telemetry.recv() => Message::Telemetry(telemetry),
45+
else => return None, // Only reached when all channels are closed.
46+
})
47+
}
48+
}
49+
50+
fn outputs() -> (OutputTx, OutputRx) {
51+
// Control requests are request-response so there should never be buffering as the sender will
52+
// be waiting on a response.
53+
let (control_tx, control_rx) = mpsc::channel(1);
54+
// The output channel capacity (128) determines buffering for IPC messages.
55+
// The `Output` actor uses `SendPolicy` to control behavior when this fills up:
56+
// - `SendPolicy::Panic` (default): panics to make buffer exhaustion visible
57+
// - `SendPolicy::Drop`: drops messages and logs a warning
58+
let (storable_tx, storable_rx) = mpsc::channel(128);
59+
// Telemetry can be quite chatty, so give it a large buffer, the `Exporter` will discard
60+
// messages if this is filled.
61+
let (telemetry_tx, telemetry_rx) = mpsc::channel(128);
62+
63+
(
64+
OutputTx {
65+
storable: storable_tx,
66+
control: control_tx,
67+
telemetry: telemetry_tx,
68+
},
69+
OutputRx {
70+
storable: storable_rx,
71+
control: control_rx,
72+
telemetry: telemetry_rx,
73+
},
74+
)
75+
}
76+
1777
/// Manages the connection to other runtimes via the `veecle-orchestrator`.
1878
#[derive(Debug)]
1979
pub struct Connector {
20-
output: mpsc::Sender<Message<'static>>,
80+
output_tx: OutputTx,
2181
inputs: Inputs,
22-
control_requests: mpsc::Sender<ControlRequest>,
2382
control_responses: Mutex<Option<mpsc::Receiver<ControlResponse>>>,
2483
_task: JoinHandle<Result<(), Box<dyn std::error::Error + Send + Sync>>>,
2584
}
@@ -38,13 +97,9 @@ impl Connector {
3897
let stream = UnixStream::connect(&socket).await.unwrap();
3998
let mut stream = Framed::new(stream, Codec::new());
4099

41-
// The output channel capacity (128) determines buffering for IPC messages.
42-
// The `Output` actor uses `SendPolicy` to control behavior when this fills up:
43-
// - `SendPolicy::Panic` (default): panics to make buffer exhaustion visible
44-
// - `SendPolicy::Drop`: drops messages and logs a warning
45-
let (output, mut output_rx) = mpsc::channel(128);
46100
let inputs = Inputs::default();
47-
let (control_request_tx, mut control_request_rx) = mpsc::channel(16);
101+
let (output_tx, mut output_rx) = outputs();
102+
48103
let (control_response_tx, control_response_rx) = mpsc::channel(16);
49104
let task = tokio::spawn({
50105
let inputs = inputs.clone();
@@ -55,11 +110,6 @@ impl Connector {
55110
let Some(message) = message else { break };
56111
stream.send(&message).await?;
57112
}
58-
request = control_request_rx.recv() => {
59-
let Some(request) = request else { break };
60-
let message = Message::ControlRequest(request);
61-
stream.send(&message).await?;
62-
}
63113
message = stream.next() => {
64114
let Some(message) = message else { break };
65115
let message = match message {
@@ -96,9 +146,8 @@ impl Connector {
96146
});
97147

98148
Self {
99-
output,
149+
output_tx,
100150
inputs,
101-
control_requests: control_request_tx,
102151
control_responses: Mutex::new(Some(control_response_rx)),
103152
_task: task,
104153
}
@@ -122,7 +171,7 @@ impl Connector {
122171
/// # }
123172
/// ```
124173
pub fn exporter(&self) -> Exporter {
125-
Exporter::new(self.output.clone())
174+
Exporter::new(self.output_tx.telemetry.clone())
126175
}
127176

128177
/// Registers a new channel that will receive input from the `veecle-orchestrator` tagged with `type_name`.
@@ -138,8 +187,8 @@ impl Connector {
138187
}
139188

140189
/// Gets a new sender to send values to the `veecle-orchestrator`.
141-
pub(crate) fn output(&self) -> mpsc::Sender<Message<'static>> {
142-
self.output.clone()
190+
pub(crate) fn storable_output(&self) -> mpsc::Sender<EncodedStorable> {
191+
self.output_tx.storable.clone()
143192
}
144193

145194
/// Gets the sender and receiver to send control messages and receive control responses from the `veecle-orchestrator`.
@@ -152,7 +201,7 @@ impl Connector {
152201
mpsc::Receiver<ControlResponse>,
153202
) {
154203
(
155-
self.control_requests.clone(),
204+
self.output_tx.control.clone(),
156205
self.control_responses
157206
.lock()
158207
.unwrap()

veecle-ipc/src/telemetry.rs

Lines changed: 11 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,12 @@ use veecle_telemetry::to_static::ToStatic;
66
/// An [`Export`] implementer that forwards telemetry messages via IPC.
77
#[derive(Debug)]
88
pub struct Exporter {
9-
sender: mpsc::Sender<veecle_ipc_protocol::Message<'static>>,
9+
sender: mpsc::Sender<InstanceMessage<'static>>,
1010
}
1111

1212
impl Exporter {
1313
/// Creates a new IPC telemetry exporter.
14-
pub fn new(sender: mpsc::Sender<veecle_ipc_protocol::Message<'static>>) -> Self {
14+
pub fn new(sender: mpsc::Sender<InstanceMessage<'static>>) -> Self {
1515
Self { sender }
1616
}
1717
}
@@ -23,8 +23,7 @@ impl Export for Exporter {
2323
/// and sends it through the IPC channel. If the channel is full or closed,
2424
/// the message is dropped to avoid blocking telemetry collection.
2525
fn export(&self, message: InstanceMessage<'_>) {
26-
let message = veecle_ipc_protocol::Message::Telemetry(message.to_static());
27-
let _ = self.sender.try_send(message);
26+
let _ = self.sender.try_send(message.to_static());
2827
}
2928
}
3029

@@ -60,20 +59,15 @@ mod tests {
6059

6160
exporter.export(test_message);
6261

63-
let received = receiver.recv().await.expect("should receive message");
64-
match received {
65-
veecle_ipc_protocol::Message::Telemetry(message) => {
66-
assert_eq!(message.thread_id, THREAD_ID);
67-
match message.message {
68-
TelemetryMessage::Log(message) => {
69-
assert_eq!(message.time_unix_nano, 1000000000);
70-
assert_eq!(message.severity, Severity::Info);
71-
assert_eq!(message.body.as_ref(), "test log message");
72-
}
73-
_ => panic!("Expected Log message"),
74-
}
62+
let message = receiver.recv().await.expect("should receive message");
63+
assert_eq!(message.thread_id, THREAD_ID);
64+
match message.message {
65+
TelemetryMessage::Log(message) => {
66+
assert_eq!(message.time_unix_nano, 1000000000);
67+
assert_eq!(message.severity, Severity::Info);
68+
assert_eq!(message.body.as_ref(), "test log message");
7569
}
76-
_ => panic!("Expected Telemetry"),
70+
_ => panic!("Expected Log message"),
7771
}
7872
}
7973

0 commit comments

Comments
 (0)