-
I'm trying to implement a protocol with a UDP broadcast mechanism to discover devices on the network. Using a stream is a good fit to allow processing a dynamic number of responses, but I'm having trouble figuring out how to get the UDP stream to exit cleanly. How can I tell the socket or stream that I'd like to stop listening? Here's a minimal example... use bytes::BytesMut;
use futures::sink::SinkExt;
use futures::stream::StreamExt;
use std::net::{IpAddr, Ipv4Addr, SocketAddr, SocketAddrV4};
use tokio::net::UdpSocket;
use tokio_util::codec::{Decoder, Encoder};
use tokio_util::udp::UdpFramed;
#[tokio::main(flavor = "current_thread")]
async fn main() -> std::io::Result<()> {
let sock = UdpSocket::bind(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0)).await?;
sock.set_broadcast(true).unwrap();
let (mut tx, mut rx) = UdpFramed::new(sock, ACodec).split();
let rx_handle = tokio::spawn(async move {
while let Some(_resp) = rx.next().await {
println!("Response");
}
println!("This never prints");
});
tx.send((
Request,
SocketAddr::new(IpAddr::V4(Ipv4Addr::BROADCAST), 1234),
))
.await?;
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
// how to cancel udp read?
//rx_handle.abort();
rx_handle.await?;
Ok(())
}
struct ACodec;
struct Request;
impl Encoder<Request> for ACodec {
type Error = std::io::Error;
fn encode(&mut self, _item: Request, _dst: &mut BytesMut) -> Result<(), Self::Error> {
Ok(())
}
}
struct Response;
impl Decoder for ACodec {
type Item = Response;
type Error = std::io::Error;
fn decode(&mut self, _src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
Ok(Some(Response))
}
} |
Beta Was this translation helpful? Give feedback.
Replies: 1 comment 1 reply
-
Unless you are still on Tokio 0.2.x, the If you wish to cancel just the loop and still perform cleanup, you can do the following: let (kill, recv_kill) = tokio::sync::oneshot::channel();
tokio::spawn(async move {
tokio::select! {
_ = the_loop(rx) => {},
_ = recv_kill => {},
}
println!("Shutting down...");
});
async fn the_loop(rx: SplitStream<UdpFramed<ACodec>>) {
while let Some(_resp) = rx.next().await {
println!("Response");
}
} or similarly let (kill, recv_kill) = tokio::sync::oneshot::channel();
tokio::spawn(async move {
loop {
tokio::select! {
_resp = rx.next() => {
// handle frame
},
_ = &mut recv_kill => break,
}
}
println!("Shutting down...");
}); For more info, check out How to remotely shut down running tasks with Tokio. |
Beta Was this translation helpful? Give feedback.
Unless you are still on Tokio 0.2.x, the
JoinHandle
type returned bytokio::spawn
has anrx_handle.abort
method you can call. However, please note that this will not cause theprintln!
after the loop to print anything because it kills the entire task, not just the loop.If you wish to cancel just the loop and still perform cleanup, you can do the following: