Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions veecle-ipc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ veecle-telemetry = { workspace = true, features = ["alloc"] }
rand = { workspace = true, features = ["std_rng"] }
serde = { workspace = true, features = ["derive"] }
veecle-os = { workspace = true }
veecle-telemetry = { workspace = true, features = ["alloc", "enable"] }

[lints]
workspace = true
9 changes: 5 additions & 4 deletions veecle-ipc/src/actors/output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,14 @@ pub async fn output<T>(
where
T: Storable<DataType: Serialize> + 'static,
{
let output = config.connector.output();
let output = config.connector.storable_output();
let send_policy = config.send_policy;

loop {
let value = reader.wait_for_update().await.read(|value| {
veecle_ipc_protocol::Message::Storable(EncodedStorable::new(value).unwrap())
});
let value = reader
.wait_for_update()
.await
.read(|value| EncodedStorable::new(value).unwrap());

match send_policy {
SendPolicy::Drop => {
Expand Down
89 changes: 69 additions & 20 deletions veecle-ipc/src/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,77 @@ use tokio::net::UnixStream;
use tokio::sync::mpsc;
use tokio::task::JoinHandle;
use tokio_util::codec::Framed;
use veecle_ipc_protocol::{Codec, ControlRequest, ControlResponse, Message};
use veecle_ipc_protocol::{Codec, ControlRequest, ControlResponse, EncodedStorable, Message};

use crate::Exporter;

type Inputs = Arc<Mutex<HashMap<&'static str, mpsc::Sender<String>>>>;

/// Holds various output channel senders for the [`Connector`], separated so they have decoupled
/// buffering and prioritization.
#[derive(Debug)]
struct OutputTx {
storable: mpsc::Sender<EncodedStorable>,
telemetry: mpsc::Sender<veecle_telemetry::protocol::InstanceMessage<'static>>,
control: mpsc::Sender<ControlRequest>,
}

/// The receivers for [`OutputTx`].
#[derive(Debug)]
struct OutputRx {
storable: mpsc::Receiver<EncodedStorable>,
telemetry: mpsc::Receiver<veecle_telemetry::protocol::InstanceMessage<'static>>,
control: mpsc::Receiver<ControlRequest>,
}

impl OutputRx {
/// Returns the first message available on any output channel.
///
/// Purposefully prioritizes the more important channels to drain first.
/// This may lead to the low priority channels never being serviced if we are not keeping up.
async fn recv(&mut self) -> Option<Message<'static>> {
Some(tokio::select! {
biased; // Polls all branches in order to guarantee prioritization.
Some(control) = self.control.recv() => Message::ControlRequest(control),
Some(storable) = self.storable.recv() => Message::Storable(storable),
Some(telemetry) = self.telemetry.recv() => Message::Telemetry(telemetry),
else => return None, // Only reached when all channels are closed.
})
}
}

fn outputs() -> (OutputTx, OutputRx) {
// Control requests are request-response so there should never be buffering as the sender will
// be waiting on a response.
let (control_tx, control_rx) = mpsc::channel(1);
// The output channel capacity (128) determines buffering for IPC messages.
// The `Output` actor uses `SendPolicy` to control behavior when this fills up:
// - `SendPolicy::Panic` (default): panics to make buffer exhaustion visible
// - `SendPolicy::Drop`: drops messages and logs a warning
let (storable_tx, storable_rx) = mpsc::channel(128);
// Telemetry can be quite chatty, so give it a large buffer, the `Exporter` will discard
// messages if this is filled.
let (telemetry_tx, telemetry_rx) = mpsc::channel(128);

(
OutputTx {
storable: storable_tx,
control: control_tx,
telemetry: telemetry_tx,
},
OutputRx {
storable: storable_rx,
control: control_rx,
telemetry: telemetry_rx,
},
)
}

/// Manages the connection to other runtimes via the `veecle-orchestrator`.
#[derive(Debug)]
pub struct Connector {
output: mpsc::Sender<Message<'static>>,
output_tx: OutputTx,
inputs: Inputs,
control_requests: mpsc::Sender<ControlRequest>,
control_responses: Mutex<Option<mpsc::Receiver<ControlResponse>>>,
_task: JoinHandle<Result<(), Box<dyn std::error::Error + Send + Sync>>>,
}
Expand All @@ -38,13 +97,9 @@ impl Connector {
let stream = UnixStream::connect(&socket).await.unwrap();
let mut stream = Framed::new(stream, Codec::new());

// The output channel capacity (128) determines buffering for IPC messages.
// The `Output` actor uses `SendPolicy` to control behavior when this fills up:
// - `SendPolicy::Panic` (default): panics to make buffer exhaustion visible
// - `SendPolicy::Drop`: drops messages and logs a warning
let (output, mut output_rx) = mpsc::channel(128);
let inputs = Inputs::default();
let (control_request_tx, mut control_request_rx) = mpsc::channel(16);
let (output_tx, mut output_rx) = outputs();

let (control_response_tx, control_response_rx) = mpsc::channel(16);
let task = tokio::spawn({
let inputs = inputs.clone();
Expand All @@ -55,11 +110,6 @@ impl Connector {
let Some(message) = message else { break };
stream.send(&message).await?;
}
request = control_request_rx.recv() => {
let Some(request) = request else { break };
let message = Message::ControlRequest(request);
stream.send(&message).await?;
}
message = stream.next() => {
let Some(message) = message else { break };
let message = match message {
Expand Down Expand Up @@ -96,9 +146,8 @@ impl Connector {
});

Self {
output,
output_tx,
inputs,
control_requests: control_request_tx,
control_responses: Mutex::new(Some(control_response_rx)),
_task: task,
}
Expand All @@ -122,7 +171,7 @@ impl Connector {
/// # }
/// ```
pub fn exporter(&self) -> Exporter {
Exporter::new(self.output.clone())
Exporter::new(self.output_tx.telemetry.clone())
}

/// Registers a new channel that will receive input from the `veecle-orchestrator` tagged with `type_name`.
Expand All @@ -138,8 +187,8 @@ impl Connector {
}

/// Gets a new sender to send values to the `veecle-orchestrator`.
pub(crate) fn output(&self) -> mpsc::Sender<Message<'static>> {
self.output.clone()
pub(crate) fn storable_output(&self) -> mpsc::Sender<EncodedStorable> {
self.output_tx.storable.clone()
}

/// Gets the sender and receiver to send control messages and receive control responses from the `veecle-orchestrator`.
Expand All @@ -152,7 +201,7 @@ impl Connector {
mpsc::Receiver<ControlResponse>,
) {
(
self.control_requests.clone(),
self.output_tx.control.clone(),
self.control_responses
.lock()
.unwrap()
Expand Down
28 changes: 11 additions & 17 deletions veecle-ipc/src/telemetry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ use veecle_telemetry::to_static::ToStatic;
/// An [`Export`] implementer that forwards telemetry messages via IPC.
#[derive(Debug)]
pub struct Exporter {
sender: mpsc::Sender<veecle_ipc_protocol::Message<'static>>,
sender: mpsc::Sender<InstanceMessage<'static>>,
}

impl Exporter {
/// Creates a new IPC telemetry exporter.
pub fn new(sender: mpsc::Sender<veecle_ipc_protocol::Message<'static>>) -> Self {
pub fn new(sender: mpsc::Sender<InstanceMessage<'static>>) -> Self {
Self { sender }
}
}
Expand All @@ -23,8 +23,7 @@ impl Export for Exporter {
/// and sends it through the IPC channel. If the channel is full or closed,
/// the message is dropped to avoid blocking telemetry collection.
fn export(&self, message: InstanceMessage<'_>) {
let message = veecle_ipc_protocol::Message::Telemetry(message.to_static());
let _ = self.sender.try_send(message);
let _ = self.sender.try_send(message.to_static());
}
}

Expand Down Expand Up @@ -60,20 +59,15 @@ mod tests {

exporter.export(test_message);

let received = receiver.recv().await.expect("should receive message");
match received {
veecle_ipc_protocol::Message::Telemetry(message) => {
assert_eq!(message.thread_id, THREAD_ID);
match message.message {
TelemetryMessage::Log(message) => {
assert_eq!(message.time_unix_nano, 1000000000);
assert_eq!(message.severity, Severity::Info);
assert_eq!(message.body.as_ref(), "test log message");
}
_ => panic!("Expected Log message"),
}
let message = receiver.recv().await.expect("should receive message");
assert_eq!(message.thread_id, THREAD_ID);
match message.message {
TelemetryMessage::Log(message) => {
assert_eq!(message.time_unix_nano, 1000000000);
assert_eq!(message.severity, Severity::Info);
assert_eq!(message.body.as_ref(), "test log message");
}
_ => panic!("Expected Telemetry"),
_ => panic!("Expected Log message"),
}
}

Expand Down
Loading