UdpFramed type socket keeps producing Poll:ready after receiving the first frame #3875
-
I'm currently trying to bump the versions of the dependencies in the coap-rs crate. The issue as explained by the maintainer is that the I'm not really sure how to go about finding out if the bug originates from this crate. Once the issue is confirmed, I'll go ahead and open up a bug report :) I can confirm that this code worked fine with Thank you for taking the time to read my issue. |
Beta Was this translation helpful? Give feedback.
Replies: 2 comments 1 reply
-
As I also said on the Tokio discord, you need to set the socket in non-blocking mode before using The main thing I note is that, looking at your codec it seems like it doesn't consume the bytes in the |
Beta Was this translation helpful? Give feedback.
-
So the issue was that the implementation of fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let pin = self.get_mut();
pin.rd.reserve(INITIAL_RD_CAPACITY);
let (_n, addr) = unsafe {
// Read into the buffer without having to initialize the memory.
//
// safety: we know tokio::net::UdpSocket never reads from the memory
// during a recv
let res = {
let bytes = &mut *(pin.rd.bytes_mut() as *mut _ as *mut [u8]);
ready!(Pin::new(&mut pin.socket).poll_recv_from(cx, bytes))
};
let (n, addr) = res?;
pin.rd.advance_mut(n);
(n, addr)
};
let frame_res = pin.codec.decode(&mut pin.rd);
pin.rd.clear();
let frame = frame_res?;
let result = frame.map(|frame| Ok((frame, addr))); // frame -> (frame, addr)
Poll::Ready(result)
} In fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let pin = self.get_mut();
pin.rd.reserve(INITIAL_RD_CAPACITY);
loop {
// The first packet sets the `is_readable` flag, but it it up to the `decode()` function
// to return `None` and break out of the `if let` construct
// Are there still bytes left in the read buffer to decode?
if pin.is_readable {
if let Some(frame) = pin.codec.decode_eof(&mut pin.rd)? {
let current_addr = pin
.current_addr
.expect("will always be set before this line is called");
return Poll::Ready(Some(Ok((frame, current_addr))));
}
// if this line has been reached then decode has returned `None`.
pin.is_readable = false;
pin.rd.clear();
}
.
.
.
// The first packet that is sent sets `is_readable` to true
pin.current_addr = Some(addr);
pin.is_readable = true;
}
} Since CoAP doesn't have the concept of fragmentation, it was safe to assume one frame to be one packet. Therefore the fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<Packet>, io::Error> {
if buf.len() == 0 {
return Ok(None);
}
let packet = Ok(Some(Packet::from_bytes(buf).map_err(|cause| {
io::Error::new(io::ErrorKind::InvalidData, cause.to_string())
})?));
buf.clear();
packet
} Thank you @Darksonn for the tip to look at the decoder :) |
Beta Was this translation helpful? Give feedback.
So the issue was that the implementation of
Stream
forDecoder
intokio_util v0.2
automatically handled the advancing of the read pointer as soon as a frame eof was hit: