From fc7280beaa911dd6f49ac3b4414f279f0ec34e28 Mon Sep 17 00:00:00 2001 From: Sebastian Urban Date: Wed, 3 Apr 2024 20:58:57 +0200 Subject: [PATCH] chmux: make forward a function of Receiver --- remoc/src/chmux/mod.rs | 2 +- remoc/src/chmux/receiver.rs | 16 +++++++++++++++- remoc/src/rch/bin/receiver.rs | 2 +- remoc/src/rch/bin/sender.rs | 2 +- remoc/src/robj/lazy_blob/fw_bin.rs | 4 ++-- 5 files changed, 20 insertions(+), 6 deletions(-) diff --git a/remoc/src/chmux/mod.rs b/remoc/src/chmux/mod.rs index f83612d..6f84bc0 100644 --- a/remoc/src/chmux/mod.rs +++ b/remoc/src/chmux/mod.rs @@ -30,7 +30,7 @@ mod sender; pub use any_storage::{AnyBox, AnyEntry, AnyStorage}; pub use cfg::{Cfg, PortsExhausted}; pub use client::{Client, Connect, ConnectError}; -pub use forward::{forward, ForwardError}; +pub use forward::ForwardError; pub use listener::{Listener, ListenerError, ListenerStream, Request}; pub use mux::ChMux; pub use port_allocator::{PortAllocator, PortNumber, PortReq}; diff --git a/remoc/src/chmux/receiver.rs b/remoc/src/chmux/receiver.rs index d4ec737..28887e6 100644 --- a/remoc/src/chmux/receiver.rs +++ b/remoc/src/chmux/receiver.rs @@ -10,8 +10,9 @@ use tokio_util::sync::ReusableBoxFuture; use super::{ credit::{ChannelCreditReturner, UsedCredit}, + forward, mux::PortEvt, - AnyStorage, PortAllocator, Request, + AnyStorage, ForwardError, PortAllocator, Request, Sender, }; /// An error occurred during receiving a data message. @@ -580,6 +581,19 @@ impl Receiver { pub fn storage(&self) -> AnyStorage { self.storage.clone() } + + /// Forwards all data received to the specified sender. + /// + /// This also recursively spawns background tasks for forwarding data on received ports. + /// + /// Returns when the channel is closed, but spawned tasks will continue forwarding until + /// their channels are closed. + /// + /// Returns the total number of bytes forwarded on this channel, + /// i.e. not counting forwarded bytes on recursively forwarded channel. + pub async fn forward(&mut self, tx: &mut Sender) -> Result { + forward::forward(self, tx).await + } } impl Drop for Receiver { diff --git a/remoc/src/rch/bin/receiver.rs b/remoc/src/rch/bin/receiver.rs index 9e66358..4e93215 100644 --- a/remoc/src/rch/bin/receiver.rs +++ b/remoc/src/rch/bin/receiver.rs @@ -62,7 +62,7 @@ impl Receiver { let Ok(rx) = successor_rx.await else { return }; let Ok(mut rx) = rx.into_inner().await else { return }; let Ok(mut tx) = tx.into_inner().await else { return }; - if let Err(err) = chmux::forward(&mut rx, &mut tx).await { + if let Err(err) = rx.forward(&mut tx).await { tracing::debug!("forwarding binary channel failed: {err}"); } } diff --git a/remoc/src/rch/bin/sender.rs b/remoc/src/rch/bin/sender.rs index c2ac891..5ce4258 100644 --- a/remoc/src/rch/bin/sender.rs +++ b/remoc/src/rch/bin/sender.rs @@ -62,7 +62,7 @@ impl Sender { let Ok(tx) = successor_rx.await else { return }; let Ok(mut tx) = tx.into_inner().await else { return }; let Ok(mut rx) = rx.into_inner().await else { return }; - if let Err(err) = chmux::forward(&mut rx, &mut tx).await { + if let Err(err) = rx.forward(&mut tx).await { tracing::debug!("forwarding binary channel failed: {err}"); } } diff --git a/remoc/src/robj/lazy_blob/fw_bin.rs b/remoc/src/robj/lazy_blob/fw_bin.rs index c585ec4..9e5439d 100644 --- a/remoc/src/robj/lazy_blob/fw_bin.rs +++ b/remoc/src/robj/lazy_blob/fw_bin.rs @@ -3,7 +3,7 @@ use serde::{ser, Deserialize, Serialize}; use std::sync::Mutex; -use crate::{chmux, rch::bin}; +use crate::rch::bin; /// A chmux sender that can be remotely sent and forwarded. pub(crate) struct Sender { @@ -49,7 +49,7 @@ impl Serialize for Sender { // No error handling is performed, because complete transmission of // data is verified by size. - let _ = chmux::forward(&mut bin_fw_rx, &mut bin_tx).await; + let _ = bin_fw_rx.forward(&mut bin_tx).await; }); TransportedSender { bin_tx: bin_fw_tx }.serialize(serializer) }