Skip to content

Commit f160490

Browse files
committed
watch: add check() method on sender
This allows simple checking whether an item-specific send error has occurred after the channel has been closed.
1 parent b2b0b4a commit f160490

File tree

4 files changed

+113
-32
lines changed

4 files changed

+113
-32
lines changed

remoc/src/rch/watch/mod.rs

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -63,9 +63,6 @@ mod sender;
6363
pub use receiver::{ChangedError, Receiver, ReceiverStream, RecvError};
6464
pub use sender::{SendError, Sender};
6565

66-
/// Length of queuing for storing errors that occurred during remote send.
67-
const ERROR_QUEUE: usize = 16;
68-
6966
/// Returns a reference to the inner value.
7067
pub struct Ref<'a, T>(tokio::sync::watch::Ref<'a, Result<T, RecvError>>);
7168

@@ -94,7 +91,7 @@ where
9491
T: RemoteSend,
9592
{
9693
let (tx, rx) = tokio::sync::watch::channel(Ok(init));
97-
let (remote_send_err_tx, remote_send_err_rx) = tokio::sync::mpsc::channel(ERROR_QUEUE);
94+
let (remote_send_err_tx, remote_send_err_rx) = tokio::sync::mpsc::unbounded_channel();
9895

9996
let sender = Sender::new(tx, remote_send_err_tx.clone(), remote_send_err_rx, DEFAULT_MAX_ITEM_SIZE);
10097
let receiver = Receiver::new(rx, remote_send_err_tx, None);
@@ -104,7 +101,7 @@ where
104101
/// Send implementation for deserializer of Sender and serializer of Receiver.
105102
async fn send_impl<T, Codec>(
106103
mut rx: tokio::sync::watch::Receiver<Result<T, RecvError>>, raw_tx: chmux::Sender,
107-
mut raw_rx: chmux::Receiver, remote_send_err_tx: tokio::sync::mpsc::Sender<RemoteSendError>,
104+
mut raw_rx: chmux::Receiver, remote_send_err_tx: tokio::sync::mpsc::UnboundedSender<RemoteSendError>,
108105
max_item_size: usize,
109106
) where
110107
T: Serialize + Send + Clone + 'static,
@@ -124,7 +121,7 @@ async fn send_impl<T, Codec>(
124121
match backchannel_msg {
125122
Ok(Some(mut msg)) if msg.remaining() >= 1 => {
126123
if msg.get_u8() == BACKCHANNEL_MSG_ERROR {
127-
let _ = remote_send_err_tx.try_send(RemoteSendError::Forward);
124+
let _ = remote_send_err_tx.send(RemoteSendError::Forward);
128125
}
129126
}
130127
_ => break,
@@ -137,7 +134,7 @@ async fn send_impl<T, Codec>(
137134
Ok(()) => {
138135
let value = rx.borrow_and_update().clone();
139136
if let Err(err) = remote_tx.send(value).await {
140-
let _ = remote_send_err_tx.try_send(RemoteSendError::Send(err.kind.clone()));
137+
let _ = remote_send_err_tx.send(RemoteSendError::Send(err.kind.clone()));
141138
if err.is_item_specific() {
142139
break
143140
}
@@ -153,7 +150,7 @@ async fn send_impl<T, Codec>(
153150
/// Receive implementation for serializer of Sender and deserializer of Receiver.
154151
async fn recv_impl<T, Codec>(
155152
tx: tokio::sync::watch::Sender<Result<T, RecvError>>, mut raw_tx: chmux::Sender, raw_rx: chmux::Receiver,
156-
mut remote_send_err_rx: tokio::sync::mpsc::Receiver<RemoteSendError>,
153+
mut remote_send_err_rx: tokio::sync::mpsc::UnboundedReceiver<RemoteSendError>,
157154
mut current_err: Option<RemoteSendError>, max_item_size: usize,
158155
) where
159156
T: DeserializeOwned + Send + 'static,

remoc/src/rch/watch/receiver.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ use super::{
1515
base::{self, PortDeserializer, PortSerializer},
1616
RemoteSendError, DEFAULT_MAX_ITEM_SIZE,
1717
},
18-
Ref, ERROR_QUEUE,
18+
Ref,
1919
};
2020
use crate::{chmux, codec, RemoteSend};
2121

@@ -87,7 +87,7 @@ impl Error for ChangedError {}
8787
#[derive(Clone)]
8888
pub struct Receiver<T, Codec = codec::Default, const MAX_ITEM_SIZE: usize = DEFAULT_MAX_ITEM_SIZE> {
8989
rx: tokio::sync::watch::Receiver<Result<T, RecvError>>,
90-
remote_send_err_tx: tokio::sync::mpsc::Sender<RemoteSendError>,
90+
remote_send_err_tx: tokio::sync::mpsc::UnboundedSender<RemoteSendError>,
9191
remote_max_item_size: Option<usize>,
9292
_codec: PhantomData<Codec>,
9393
}
@@ -115,7 +115,8 @@ pub(crate) struct TransportedReceiver<T, Codec> {
115115
impl<T, Codec, const MAX_ITEM_SIZE: usize> Receiver<T, Codec, MAX_ITEM_SIZE> {
116116
pub(crate) fn new(
117117
rx: tokio::sync::watch::Receiver<Result<T, RecvError>>,
118-
remote_send_err_tx: tokio::sync::mpsc::Sender<RemoteSendError>, remote_max_item_size: Option<usize>,
118+
remote_send_err_tx: tokio::sync::mpsc::UnboundedSender<RemoteSendError>,
119+
remote_max_item_size: Option<usize>,
119120
) -> Self {
120121
Self { rx, remote_send_err_tx, remote_max_item_size, _codec: PhantomData }
121122
}
@@ -201,7 +202,7 @@ where
201202
let (raw_tx, raw_rx) = match connect.await {
202203
Ok(tx_rx) => tx_rx,
203204
Err(err) => {
204-
let _ = remote_send_err_tx.try_send(RemoteSendError::Connect(err));
205+
let _ = remote_send_err_tx.send(RemoteSendError::Connect(err));
205206
return;
206207
}
207208
};
@@ -248,7 +249,7 @@ where
248249

249250
// Create channels.
250251
let (tx, rx) = tokio::sync::watch::channel(data);
251-
let (remote_send_err_tx, remote_send_err_rx) = tokio::sync::mpsc::channel(ERROR_QUEUE);
252+
let (remote_send_err_tx, remote_send_err_rx) = tokio::sync::mpsc::unbounded_channel();
252253

253254
PortDeserializer::accept(port, |local_port, request| {
254255
async move {

remoc/src/rch/watch/sender.rs

Lines changed: 31 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,14 @@
1-
use futures::{task::noop_waker, FutureExt};
1+
use futures::FutureExt;
22
use serde::{Deserialize, Serialize};
3-
use std::{
4-
error::Error,
5-
fmt,
6-
marker::PhantomData,
7-
sync::Mutex,
8-
task::{Context, Poll},
9-
};
3+
use std::{error::Error, fmt, marker::PhantomData, sync::Mutex};
104

115
use super::{
126
super::{
137
base::{self, PortDeserializer, PortSerializer},
148
RemoteSendError, SendErrorExt,
159
},
1610
receiver::RecvError,
17-
Receiver, Ref, ERROR_QUEUE,
11+
Receiver, Ref,
1812
};
1913
use crate::{chmux, codec, RemoteSend};
2014

@@ -118,8 +112,8 @@ impl<T, Codec> fmt::Debug for Sender<T, Codec> {
118112

119113
pub(crate) struct SenderInner<T, Codec> {
120114
tx: tokio::sync::watch::Sender<Result<T, RecvError>>,
121-
remote_send_err_tx: tokio::sync::mpsc::Sender<RemoteSendError>,
122-
remote_send_err_rx: Mutex<tokio::sync::mpsc::Receiver<RemoteSendError>>,
115+
remote_send_err_tx: tokio::sync::mpsc::UnboundedSender<RemoteSendError>,
116+
remote_send_err_rx: Mutex<tokio::sync::mpsc::UnboundedReceiver<RemoteSendError>>,
123117
current_err: Mutex<Option<RemoteSendError>>,
124118
max_item_size: usize,
125119
_codec: PhantomData<Codec>,
@@ -150,8 +144,8 @@ where
150144
/// Creates a new sender.
151145
pub(crate) fn new(
152146
tx: tokio::sync::watch::Sender<Result<T, RecvError>>,
153-
remote_send_err_tx: tokio::sync::mpsc::Sender<RemoteSendError>,
154-
remote_send_err_rx: tokio::sync::mpsc::Receiver<RemoteSendError>, max_item_size: usize,
147+
remote_send_err_tx: tokio::sync::mpsc::UnboundedSender<RemoteSendError>,
148+
remote_send_err_rx: tokio::sync::mpsc::UnboundedReceiver<RemoteSendError>, max_item_size: usize,
155149
) -> Self {
156150
let inner = SenderInner {
157151
tx,
@@ -267,6 +261,28 @@ where
267261
*current_err = None;
268262
}
269263

264+
/// Checks that no item-specific send error has occurred.
265+
///
266+
/// This method clears non-item-specific errors present on the channel.
267+
///
268+
/// # Error reporting
269+
/// Sending and error reporting are done asynchronously.
270+
/// Thus, the reporting of an error may be delayed.
271+
///
272+
/// To verify that no item-specific send error has occurred during the lifetime of
273+
/// the channel, call this method after the channel is closed, i.e.
274+
/// [`closed`](Self::closed) has returned or [`is_closed`](Self::is_closed) is
275+
/// `true`.
276+
pub fn check(&mut self) -> Result<(), SendError> {
277+
while let Some(err) = self.error() {
278+
if err.is_item_specific() {
279+
return Err(err);
280+
}
281+
self.clear_error();
282+
}
283+
Ok(())
284+
}
285+
270286
/// Maximum allowed item size in bytes.
271287
pub fn max_item_size(&self) -> usize {
272288
self.inner.as_ref().unwrap().max_item_size
@@ -361,7 +377,7 @@ where
361377

362378
// Create internal communication channels.
363379
let (tx, rx) = tokio::sync::watch::channel(data);
364-
let (remote_send_err_tx, remote_send_err_rx) = tokio::sync::mpsc::channel(ERROR_QUEUE);
380+
let (remote_send_err_tx, remote_send_err_rx) = tokio::sync::mpsc::unbounded_channel();
365381
let remote_send_err_tx2 = remote_send_err_tx.clone();
366382

367383
// Accept chmux port request.
@@ -371,7 +387,7 @@ where
371387
let (raw_tx, raw_rx) = match request.accept_from(local_port).await {
372388
Ok(tx_rx) => tx_rx,
373389
Err(err) => {
374-
let _ = remote_send_err_tx.try_send(RemoteSendError::Listen(err));
390+
let _ = remote_send_err_tx.send(RemoteSendError::Listen(err));
375391
return;
376392
}
377393
};

remoc/tests/rch/watch.rs

Lines changed: 71 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ async fn simple() {
1515
let end_value = 124;
1616

1717
println!("Sending remote mpsc channel receiver");
18-
let (tx, rx) = watch::channel(start_value);
18+
let (mut tx, rx) = watch::channel(start_value);
1919
a_tx.send(rx).await.unwrap();
2020
println!("Receiving remote mpsc channel receiver");
2121
let mut rx = b_rx.recv().await.unwrap().unwrap();
@@ -47,6 +47,8 @@ async fn simple() {
4747
sleep(Duration::from_millis(20)).await;
4848
}
4949
}
50+
51+
tx.check().unwrap();
5052
drop(tx);
5153

5254
println!("Waiting for receive task");
@@ -62,7 +64,7 @@ async fn simple_stream() {
6264
let end_value = 124;
6365

6466
println!("Sending remote mpsc channel receiver");
65-
let (tx, rx) = watch::channel(start_value);
67+
let (mut tx, rx) = watch::channel(start_value);
6668
a_tx.send(rx).await.unwrap();
6769
println!("Receiving remote mpsc channel receiver");
6870
let rx = b_rx.recv().await.unwrap().unwrap();
@@ -94,6 +96,8 @@ async fn simple_stream() {
9496
prev_value -= 1;
9597
}
9698
}
99+
100+
tx.check().unwrap();
97101
drop(tx);
98102

99103
println!("Waiting for receive task");
@@ -109,7 +113,7 @@ async fn close() {
109113
let (tx, rx) = watch::channel(123);
110114
a_tx.send(tx).await.unwrap();
111115
println!("Receiving remote mpsc channel sender");
112-
let tx = b_rx.recv().await.unwrap().unwrap();
116+
let mut tx = b_rx.recv().await.unwrap().unwrap();
113117

114118
println!("Cloning receiver");
115119
let rx2 = rx.clone();
@@ -126,6 +130,7 @@ async fn close() {
126130
println!("Waiting for close notification");
127131
tx.closed().await;
128132
assert!(tx.is_closed());
133+
tx.check().unwrap();
129134

130135
println!("Attempting to send");
131136
match tx.send(15) {
@@ -144,7 +149,7 @@ async fn conn_failure() {
144149
let (tx, rx) = watch::channel(123);
145150
a_tx.send(tx).await.unwrap();
146151
println!("Receiving remote mpsc channel sender");
147-
let tx = b_rx.recv().await.unwrap().unwrap();
152+
let mut tx = b_rx.recv().await.unwrap().unwrap();
148153

149154
println!("Cloning receiver");
150155
let _rx2 = rx.clone();
@@ -157,6 +162,7 @@ async fn conn_failure() {
157162
println!("Waiting for close notification");
158163
tx.closed().await;
159164
assert!(tx.is_closed());
165+
tx.check().unwrap();
160166

161167
println!("Attempting to send");
162168
match tx.send(15) {
@@ -229,4 +235,65 @@ async fn max_item_size_exceeded() {
229235
assert!(matches!(tx.error(), Some(SendError::RemoteSend(SendErrorKind::MaxItemSizeExceeded))));
230236
tx.clear_error();
231237
assert!(matches!(tx.error(), None));
238+
tx.check().unwrap();
239+
}
240+
241+
#[tokio::test]
242+
async fn max_item_size_exceeded_check() {
243+
crate::init();
244+
let ((mut a_tx, _), (_, mut b_rx)) = loop_channel::<watch::Receiver<Vec<u8>>>().await;
245+
246+
println!("Sending remote mpsc channel receiver");
247+
let (mut tx, rx) = watch::channel(Vec::new());
248+
a_tx.send(rx).await.unwrap();
249+
println!("Receiving remote mpsc channel receiver");
250+
let mut rx = b_rx.recv().await.unwrap().unwrap();
251+
252+
assert_eq!(tx.max_item_size(), rx.max_item_size());
253+
let max_item_size = tx.max_item_size();
254+
println!("Maximum send and recv item size is {max_item_size}");
255+
256+
{
257+
let value = rx.borrow().unwrap();
258+
println!("Initial value: {value:?}");
259+
}
260+
261+
let recv_task = tokio::spawn(async move {
262+
loop {
263+
let res = rx.changed().await;
264+
println!("RX changed result: {res:?}");
265+
if res.is_err() {
266+
break res;
267+
}
268+
269+
let value = rx.borrow_and_update().unwrap().clone();
270+
println!("Received value change: {} elements", value.len());
271+
}
272+
});
273+
274+
// Happy case: sent data size is under limit.
275+
// JSON encoding will result in much larger transfer size.
276+
let elems = max_item_size / 10;
277+
println!("Sending {elems} elements");
278+
let value = vec![100; elems];
279+
tx.send(value.clone()).unwrap();
280+
assert_eq!(*tx.borrow(), value);
281+
282+
sleep(Duration::from_millis(100)).await;
283+
284+
// Failure case: sent data size exceeds limits.
285+
let elems = max_item_size * 10;
286+
println!("Sending {elems} elements");
287+
let value = vec![100; elems];
288+
tx.send(value.clone()).unwrap();
289+
assert_eq!(*tx.borrow(), value);
290+
291+
println!("Wait for sender close");
292+
tx.closed().await;
293+
let res = tx.check();
294+
println!("Sender check result: {res:?}");
295+
assert!(matches!(res, Err(SendError::RemoteSend(SendErrorKind::MaxItemSizeExceeded))));
296+
297+
println!("Waiting for receive task");
298+
assert!(matches!(recv_task.await.unwrap(), Err(ChangedError::Closed)));
232299
}

0 commit comments

Comments
 (0)