Skip to content

Commit

Permalink
chmux: make forward a function of Receiver
Browse files Browse the repository at this point in the history
  • Loading branch information
surban committed Apr 3, 2024
1 parent 8a35399 commit fc7280b
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 6 deletions.
2 changes: 1 addition & 1 deletion remoc/src/chmux/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ mod sender;
pub use any_storage::{AnyBox, AnyEntry, AnyStorage};
pub use cfg::{Cfg, PortsExhausted};
pub use client::{Client, Connect, ConnectError};
pub use forward::{forward, ForwardError};
pub use forward::ForwardError;
pub use listener::{Listener, ListenerError, ListenerStream, Request};
pub use mux::ChMux;
pub use port_allocator::{PortAllocator, PortNumber, PortReq};
Expand Down
16 changes: 15 additions & 1 deletion remoc/src/chmux/receiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ use tokio_util::sync::ReusableBoxFuture;

use super::{
credit::{ChannelCreditReturner, UsedCredit},
forward,
mux::PortEvt,
AnyStorage, PortAllocator, Request,
AnyStorage, ForwardError, PortAllocator, Request, Sender,
};

/// An error occurred during receiving a data message.
Expand Down Expand Up @@ -580,6 +581,19 @@ impl Receiver {
pub fn storage(&self) -> AnyStorage {
self.storage.clone()
}

/// Forwards all data received to the specified sender.
///
/// This also recursively spawns background tasks for forwarding data on received ports.
///
/// Returns when the channel is closed, but spawned tasks will continue forwarding until
/// their channels are closed.
///
/// Returns the total number of bytes forwarded on this channel,
/// i.e. not counting forwarded bytes on recursively forwarded channel.
pub async fn forward(&mut self, tx: &mut Sender) -> Result<usize, ForwardError> {
forward::forward(self, tx).await
}
}

impl Drop for Receiver {
Expand Down
2 changes: 1 addition & 1 deletion remoc/src/rch/bin/receiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ impl Receiver {
let Ok(rx) = successor_rx.await else { return };
let Ok(mut rx) = rx.into_inner().await else { return };
let Ok(mut tx) = tx.into_inner().await else { return };
if let Err(err) = chmux::forward(&mut rx, &mut tx).await {
if let Err(err) = rx.forward(&mut tx).await {
tracing::debug!("forwarding binary channel failed: {err}");
}
}
Expand Down
2 changes: 1 addition & 1 deletion remoc/src/rch/bin/sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ impl Sender {
let Ok(tx) = successor_rx.await else { return };
let Ok(mut tx) = tx.into_inner().await else { return };
let Ok(mut rx) = rx.into_inner().await else { return };
if let Err(err) = chmux::forward(&mut rx, &mut tx).await {
if let Err(err) = rx.forward(&mut tx).await {
tracing::debug!("forwarding binary channel failed: {err}");
}
}
Expand Down
4 changes: 2 additions & 2 deletions remoc/src/robj/lazy_blob/fw_bin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use serde::{ser, Deserialize, Serialize};
use std::sync::Mutex;

use crate::{chmux, rch::bin};
use crate::rch::bin;

/// A chmux sender that can be remotely sent and forwarded.
pub(crate) struct Sender {
Expand Down Expand Up @@ -49,7 +49,7 @@ impl Serialize for Sender {

// No error handling is performed, because complete transmission of
// data is verified by size.
let _ = chmux::forward(&mut bin_fw_rx, &mut bin_tx).await;
let _ = bin_fw_rx.forward(&mut bin_tx).await;
});
TransportedSender { bin_tx: bin_fw_tx }.serialize(serializer)
}
Expand Down

0 comments on commit fc7280b

Please sign in to comment.