-
Hallo, I want to use a plain Linux socket/file descriptor with Tokio with the help of AsyncFd. My problem is that the socket is often reported as readable, but it is not. So an endless loop is entered and never exited. What I am doing wrong here? See AsyncSocket::accept. I created an example where socket2::Socket is a replacement for the stuff I want to integrate. If the test communication_async is started Socket::accept is called in an endless loop but no client made an attempt. I got the same problems on AsyncRead and AsyncWrite and the connect functionality. With epoll it works. See test communication_epoll. Accept is only called after a client made an attempt. (Tested with netcat) use std::{io::Error, net::SocketAddr, os::unix::prelude::AsRawFd};
use tokio::io::unix::AsyncFd;
pub fn create_address<A: std::net::ToSocketAddrs>(
address: A,
) -> std::io::Result<std::net::SocketAddr> {
Ok(address.to_socket_addrs()?.next().unwrap())
}
struct Socket {
socket: socket2::Socket,
}
impl AsRawFd for Socket {
fn as_raw_fd(&self) -> std::os::unix::prelude::RawFd {
self.socket.as_raw_fd()
}
}
impl Socket {
pub fn new() -> std::io::Result<Self> {
let socket = socket2::Socket::new(socket2::Domain::IPV4, socket2::Type::STREAM, None)?;
let socket_ref = socket2::SockRef::from(&socket);
socket_ref.set_reuse_address(true)?;
socket_ref.set_reuse_port(true)?;
Ok(Self { socket })
}
pub fn set_nonblocking(&self) -> std::io::Result<()> {
self.socket.set_nonblocking(true)?;
Ok(())
}
pub fn bind(&self, local_address: std::net::SocketAddr) -> std::io::Result<()> {
self.socket.bind(&socket2::SockAddr::from(local_address))?;
Ok(())
}
pub fn listen(&self) -> std::io::Result<()> {
self.socket.listen(128)?;
Ok(())
}
pub fn connect(&self, remote_address: std::net::SocketAddr) -> std::io::Result<()> {
println!("connect");
match self
.socket
.connect(&socket2::SockAddr::from(remote_address))
{
Ok(()) => Ok(()),
Err(ref e) if e.raw_os_error().unwrap() == 115 => {
Err(Error::new(std::io::ErrorKind::WouldBlock, "In progress"))
}
Err(e) => Err(e),
}
}
pub fn accept(&self) -> std::io::Result<(Socket, std::net::SocketAddr)> {
println!("accept");
let (peer_socket, peer_address) = self.socket.accept()?;
Ok((
Self {
socket: peer_socket,
},
peer_address.as_socket().unwrap(),
))
}
pub fn recv(&self, buf: &mut [u8]) -> std::io::Result<usize> {
let bytes_recv = self
.socket
.recv(unsafe { &mut *(buf as *mut [u8] as *mut [std::mem::MaybeUninit<u8>]) })?;
Ok(bytes_recv)
}
pub fn send(&self, out: &[u8]) -> std::io::Result<usize> {
let bytes_sent = self.socket.send(out)?;
Ok(bytes_sent)
}
}
impl std::io::Read for Socket {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
let recv_bytes = self.recv(buf)?;
Ok(recv_bytes)
}
}
impl std::io::Write for Socket {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
let sent_bytes = self.send(buf)?;
Ok(sent_bytes)
}
fn flush(&mut self) -> std::io::Result<()> {
Ok(())
}
}
struct AsyncSocket {
socket: AsyncFd<Socket>,
}
impl AsRawFd for AsyncSocket {
fn as_raw_fd(&self) -> std::os::unix::prelude::RawFd {
self.socket.as_raw_fd()
}
}
impl AsyncSocket {
pub fn new() -> std::io::Result<Self> {
let socket = Socket::new()?;
socket.set_nonblocking()?;
Ok(Self {
socket: AsyncFd::new(socket)?,
})
}
pub fn from_socket(socket: Socket) -> Self {
Self {
socket: AsyncFd::new(socket).unwrap(),
}
}
pub fn bind(&self, local_address: std::net::SocketAddr) -> std::io::Result<()> {
self.socket.get_ref().bind(local_address)?;
Ok(())
}
pub fn listen(&self) -> std::io::Result<()> {
self.socket.get_ref().listen()?;
Ok(())
}
pub async fn accept(&self) -> std::io::Result<(AsyncSocket, SocketAddr)> {
//Why is the socket repoted as readable?
//loop {
let mut guard = self.socket.readable().await?;
match guard.try_io(|inner| inner.get_ref().accept()) {
Ok(result) => match result {
Ok((socket, address)) => {
return Ok((
AsyncSocket {
socket: AsyncFd::new(socket)?,
},
address,
))
}
Err(e) => return Err(e),
},
// Ends here. Otherwise is will be an endless loop.
Err(_would_block) =>
/* continue */
{
todo!("Why it got WouldBlock?")
}
}
//}
}
pub async fn connect(&self, remote_address: SocketAddr) -> std::io::Result<()> {
match self.socket.get_ref().connect(remote_address) {
Ok(()) => Ok(()),
Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => loop {
let mut guard = self.socket.writable().await?;
match guard.try_io(|inner| inner.get_ref().connect(remote_address)) {
Ok(result) => match result {
Ok(()) => return Ok(()),
Err(e) => return Err(e),
},
Err(_) => continue,
}
},
Err(e) => Err(e),
}
}
}
#[cfg(test)]
mod tests {
use std::time::Duration;
use super::*;
#[test]
fn communication_blocking() {
let server_address = create_address(("127.0.0.1", 5000)).unwrap();
let server = Socket::new().unwrap();
server.bind(server_address).unwrap();
server.listen().unwrap();
let client = Socket::new().unwrap();
let client_address = create_address(("127.0.0.1", 5001)).unwrap();
let client_remote_address = server_address;
client.bind(client_address).unwrap();
let data: [u8; 4] = [22, 33, 44, 55];
let client_hdl = std::thread::spawn(move || {
client.connect(client_remote_address).unwrap();
let sent_bytes = client.send(&data).unwrap();
assert_eq!(data.len(), sent_bytes);
});
let (peer_socket, peer_address) = server.accept().unwrap();
assert_eq!(true, peer_socket.as_raw_fd() > 0);
assert_eq!(client_address, peer_address);
let mut received_data: [u8; 100] = [0; 100];
let recv_bytes = peer_socket.recv(&mut received_data).unwrap();
assert_eq!(data.len(), recv_bytes);
assert_eq!(data, received_data[0..recv_bytes]);
client_hdl.join().unwrap();
}
#[tokio::test(flavor = "multi_thread")]
async fn communication_async() {
let server_address = create_address(("127.0.0.1", 5002)).unwrap();
let client_address = create_address(("127.0.0.1", 5003)).unwrap();
let remote_address = server_address;
let server = AsyncSocket::new().unwrap();
server.bind(server_address).unwrap();
server.listen().unwrap();
//let hdl = tokio::spawn(async move {
// let client = AsyncSocket::new().unwrap();
// client.bind(client_address).unwrap();
// client.connect(remote_address).await.unwrap();
//});
let (socket, address) = server.accept().await.unwrap();
tokio::time::sleep(Duration::from_millis(5000));
//hdl.await.unwrap();
}
#[tokio::test(flavor = "multi_thread")]
async fn communication_epoll() {
let server_address = create_address(("127.0.0.1", 5004)).unwrap();
let client_address = create_address(("127.0.0.1", 5005)).unwrap();
let remote_address = server_address;
let server = AsyncSocket::new().unwrap();
server.bind(server_address).unwrap();
server.listen().unwrap();
let epfd = epoll::create(false).unwrap();
epoll::ctl(
epfd,
epoll::ControlOptions::EPOLL_CTL_ADD,
server.as_raw_fd(),
epoll::Event::new(epoll::Events::EPOLLIN, 0),
)
.unwrap();
let mut events: [epoll::Event; 128] = [epoll::Event::new(epoll::Events::empty(), 0); 128];
let fds = epoll::wait(epfd, 5000, &mut events).unwrap();
assert_eq!(fds, 0);
epoll::close(epfd).unwrap();
}
} |
Beta Was this translation helpful? Give feedback.
Replies: 2 comments 6 replies
-
Readiness notifications can be returned by the OS spuriously. Hence the need to wait for readiness and retry in a loop. |
Beta Was this translation helpful? Give feedback.
-
Solved in issue #4549 |
Beta Was this translation helpful? Give feedback.
Solved in issue #4549