Skip to content

Commit

Permalink
chmux: forward channel closing
Browse files Browse the repository at this point in the history
Forward channel close request from remote receiver,
but continue forwarding received data until sender stops.
  • Loading branch information
surban committed Apr 3, 2024
1 parent 385dba0 commit 8a35399
Showing 1 changed file with 41 additions and 14 deletions.
55 changes: 41 additions & 14 deletions remoc/src/chmux/forward.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,19 @@ impl fmt::Display for ForwardError {
}
}

impl From<ForwardError> for std::io::Error {
fn from(err: ForwardError) -> Self {
match err {
ForwardError::Send(err) => err.into(),
ForwardError::Recv(err) => err.into(),
}
}
}

impl std::error::Error for ForwardError {}

/// Forwards all data received from a receiver to a sender.
///
/// This also recursively setups forwarding for all transmitted ports.
///
/// Returns the total number of bytes forwarded.
pub async fn forward(rx: &mut super::Receiver, tx: &mut super::Sender) -> Result<usize, ForwardError> {
pub(crate) async fn forward(rx: &mut super::Receiver, tx: &mut super::Sender) -> Result<usize, ForwardError> {
// Required to avoid borrow checking loop limitation.
fn spawn_forward(id: u32, mut rx: super::Receiver, mut tx: super::Sender) {
tokio::spawn(async move {
Expand All @@ -53,18 +58,32 @@ pub async fn forward(rx: &mut super::Receiver, tx: &mut super::Sender) -> Result
});
}

let override_graceful_close = tx.is_graceful_close_overridden();
tx.set_override_graceful_close(true);

let mut total = Wrapping(0);
let mut closed = false;

enum Event {
Received(Option<Received>),
Closed,
}

loop {
match rx.recv_any().await? {
// Data.
Some(Received::Data(data)) => {
let event = tokio::select! {
res = rx.recv_any() => Event::Received(res?),
() = tx.closed(), if !closed => Event::Closed,
};

match event {
// Data received.
Event::Received(Some(Received::Data(data))) => {
total += data.remaining();
tx.send(data.into()).await?;
}

// Data chunks.
Some(Received::Chunks) => {
// Data chunks received.
Event::Received(Some(Received::Chunks)) => {
let mut chunk_tx = tx.send_chunks();
loop {
match rx.recv_chunk().await {
Expand All @@ -82,8 +101,8 @@ pub async fn forward(rx: &mut super::Receiver, tx: &mut super::Sender) -> Result
}
}

// Ports.
Some(Received::Requests(reqs)) => {
// Ports received.
Event::Received(Some(Received::Requests(reqs))) => {
let allocator = tx.port_allocator();

// Allocate local outgoing ports for forwarding.
Expand Down Expand Up @@ -126,10 +145,18 @@ pub async fn forward(rx: &mut super::Receiver, tx: &mut super::Sender) -> Result
}
}

// End.
None => break,
// End received.
Event::Received(None) => break,

// Forwarding sender closed.
Event::Closed => {
rx.close().await;
closed = true;
}
}
}

tx.set_override_graceful_close(override_graceful_close);

Ok(total.0)
}

0 comments on commit 8a35399

Please sign in to comment.