-
Hey, I am creating tokio wrapper around Linux SocketCAN iso-tp crate, using tokio AsyncFd. pub struct IsoTpWriteFuture<'a> {
socket: &'a IsoTpSocket,
packet: Vec<u8>,
}
impl Future for IsoTpWriteFuture<'_> {
type Output = io::Result<()>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
loop {
let mut guard = ready!(self.socket.0.poll_write_ready(cx))?;
match guard.try_io(|inner|inner.get_ref().write(&self.packet)) {
Err(err) => continue,
Ok(_) => return Poll::Ready(Ok(())),
}
}
}
} when I try to do two consecutive writes, the first will complete successfully, but second will hang at ready!(self.socket.0.poll_write_ready(cx))?; It seems like epoll is not listening for change from wouldblock to ready on write stream. I created a workaround, that kinda works: fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
loop {
let _ = ready!(self.socket.0.poll_write_ready(cx))?;
match self.socket.0.get_ref().write_vec(&self.packet) {
Err(err) if err.kind() == io::ErrorKind::WouldBlock => continue,
Ok(_) => return Poll::Ready(Ok(())),
Err(err) => return Poll::Ready(Err(err))
}
}
} but since the readiness is not cleared, when receiving WouldBlock the loop just spins until the write finally succeeds. Full code here: use socketcan_isotp::{self, RECV_BUFFER_SIZE};
pub use socketcan_isotp::{Id, StandardId, Error};
use std::io;
use std::os::raw::c_int;
use std::pin::Pin;
use std::task::{Context, Poll};
use futures::prelude::*;
use futures::ready;
use tokio::io::unix::AsyncFd;
/// Future for writing data to IsoTPSocket
pub struct IsoTpWriteFuture<'a> {
socket: &'a IsoTpSocket,
packet: Vec<u8>,
}
impl Future for IsoTpWriteFuture<'_> {
type Output = io::Result<()>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
loop {
let mut guard = ready!(self.socket.0.poll_write_ready(cx))?;
match guard.try_io(|inner|inner.get_ref().write(&self.packet)) {
Err(err) => continue,
Ok(_) => return Poll::Ready(Ok(())),
}
}
}
}
/// Future for readint data from IsoTPSocket
pub struct IsoTpReadFuture<'a> {
socket: &'a IsoTpSocket,
}
impl Future for IsoTpReadFuture<'_> {
type Output = io::Result<Vec<u8>>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
loop {
let mut ready_guard = ready!(self.socket.0.poll_read_ready(cx))?;
match ready_guard.try_io(|inner| inner.get_ref().read_to_vec()) {
Ok(result) => return Poll::Ready(result),
Err(_would_block) => continue,
}
}
}
}
/// An asynchronous I/O wrapped socketcan_isotp::IsoTpSocket
pub struct IsoTpSocket(
AsyncFd<socketcan_isotp::IsoTpSocket>
);
impl IsoTpSocket {
/// Open a named CAN device such as "vcan0"
pub fn open(ifname: &str, src: impl Into<Id>, dst: impl Into<Id>) -> Result<IsoTpSocket, socketcan_isotp::Error> {
let sock = socketcan_isotp::IsoTpSocket::open(ifname, src, dst)?;
sock.set_nonblocking(true)?;
Ok(IsoTpSocket(AsyncFd::new(sock)?))
}
/// Open by kernel interface number
pub fn open_if(if_index: c_int, src: impl Into<Id>, dst: impl Into<Id>) -> Result<IsoTpSocket, socketcan_isotp::Error> {
let sock = socketcan_isotp::IsoTpSocket::open_if(if_index, src, dst)?;
sock.set_nonblocking(true)?;
Ok(IsoTpSocket(AsyncFd::new(sock)?))
}
pub fn write_packet(&self, packet : Vec<u8>) -> Result<IsoTpWriteFuture, Error>{
Ok(IsoTpWriteFuture {
socket: &self,
packet,
})
}
pub fn read_packet(&self) -> Result<IsoTpReadFuture, Error>{
Ok(IsoTpReadFuture {
socket: &self,
})
}
} |
Beta Was this translation helpful? Give feedback.
Replies: 2 comments 2 replies
-
Not all APIs support poll/select/epoll. Some googling shows that it seems like this does, but also some APIs are super weird as far as how or when they issue notifications this way. You may want to do some digging as to how this interacts with poll, select, and epoll. |
Beta Was this translation helpful? Give feedback.
-
It was due to the bug in Linux Kernel. It is, as of today, resolved. |
Beta Was this translation helpful? Give feedback.
It was due to the bug in Linux Kernel. It is, as of today, resolved.