From c59a775b097eeb62e59d158908fd60b24e55c11b Mon Sep 17 00:00:00 2001 From: Zach Brown Date: Fri, 20 Dec 2024 07:14:55 -0800 Subject: [PATCH 1/5] adjust sendmmsg wrapper to use stack allocated libc types --- streamer/src/sendmmsg.rs | 49 ++++++++++++++++++++++++++++------------ 1 file changed, 35 insertions(+), 14 deletions(-) diff --git a/streamer/src/sendmmsg.rs b/streamer/src/sendmmsg.rs index 1b2257f2f00172..c038147826df95 100644 --- a/streamer/src/sendmmsg.rs +++ b/streamer/src/sendmmsg.rs @@ -68,6 +68,7 @@ fn mmsghdr_for_packet( ) { const SIZE_OF_SOCKADDR_IN: usize = mem::size_of::(); const SIZE_OF_SOCKADDR_IN6: usize = mem::size_of::(); + const SIZE_OF_SOCKADDR_STORAGE: usize = mem::size_of::(); iov.write(iovec { iov_base: packet.as_ptr() as *mut libc::c_void, @@ -76,20 +77,34 @@ fn mmsghdr_for_packet( let msg_namelen = match dest { SocketAddr::V4(socket_addr_v4) => { + let ptr = addr.as_mut_ptr() as *mut _; unsafe { ptr::write( - addr.as_mut_ptr() as *mut _, + ptr, *nix::sys::socket::SockaddrIn::from(*socket_addr_v4).as_ref(), ); + // Zero the remaining bytes after sockaddr_in + ptr::write_bytes( + (ptr as *mut u8).add(SIZE_OF_SOCKADDR_IN), + 0, + SIZE_OF_SOCKADDR_STORAGE - SIZE_OF_SOCKADDR_IN, + ); } SIZE_OF_SOCKADDR_IN as socklen_t } SocketAddr::V6(socket_addr_v6) => { + let ptr = addr.as_mut_ptr() as *mut _; unsafe { ptr::write( - addr.as_mut_ptr() as *mut _, + ptr, *nix::sys::socket::SockaddrIn6::from(*socket_addr_v6).as_ref(), ); + // Zero the remaining bytes after sockaddr_in6 + ptr::write_bytes( + (ptr as *mut u8).add(SIZE_OF_SOCKADDR_IN6), + 0, + SIZE_OF_SOCKADDR_STORAGE - SIZE_OF_SOCKADDR_IN6, + ); } SIZE_OF_SOCKADDR_IN6 as socklen_t } @@ -166,22 +181,28 @@ where S: Borrow, T: AsRef<[u8]>, { - let size = packets.len(); - let mut iovs = vec![MaybeUninit::uninit(); size]; - let mut addrs = vec![MaybeUninit::zeroed(); size]; - let mut hdrs = vec![MaybeUninit::uninit(); size]; + const MAX_IOV: usize = libc::UIO_MAXIOV as usize; + + if packets.len() > MAX_IOV { + return Err(SendPktsError::IoError( + io::Error::new(io::ErrorKind::InvalidInput, "batch size exceeds UIO_MAXIOV"), + packets.len(), + )); + } + + let mut iovs = [MaybeUninit::uninit(); MAX_IOV]; + let mut addrs = [MaybeUninit::uninit(); MAX_IOV]; + let mut hdrs = [MaybeUninit::uninit(); MAX_IOV]; + + // izip! will iterate packets.len() times, leaving hdrs, iovs, and addrs initialized only up to packets.len() for ((pkt, dest), hdr, iov, addr) in izip!(packets, &mut hdrs, &mut iovs, &mut addrs) { mmsghdr_for_packet(pkt.as_ref(), dest.borrow(), iov, addr, hdr); } - // mmsghdr_for_packet() performs initialization so we can safely transmute - // the Vecs to their initialized counterparts - let _iovs = unsafe { mem::transmute::>, Vec>(iovs) }; - let _addrs = unsafe { - mem::transmute::>, Vec>(addrs) - }; - let mut hdrs = unsafe { mem::transmute::>, Vec>(hdrs) }; + // SAFETY: hdrs is initialized by mmsghdr_for_packet in packets.len() + let hdrs = + unsafe { std::slice::from_raw_parts_mut(hdrs.as_mut_ptr() as *mut mmsghdr, packets.len()) }; - sendmmsg_retry(sock, &mut hdrs) + sendmmsg_retry(sock, hdrs) } pub fn multi_target_send( From 2bd285707dc6e5c5db1540c99c981dfd99d4baf4 Mon Sep 17 00:00:00 2001 From: Zach Brown Date: Fri, 20 Dec 2024 08:14:53 -0800 Subject: [PATCH 2/5] use constants for sockaddr padding --- streamer/src/sendmmsg.rs | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/streamer/src/sendmmsg.rs b/streamer/src/sendmmsg.rs index c038147826df95..91e80debd95cc0 100644 --- a/streamer/src/sendmmsg.rs +++ b/streamer/src/sendmmsg.rs @@ -69,6 +69,8 @@ fn mmsghdr_for_packet( const SIZE_OF_SOCKADDR_IN: usize = mem::size_of::(); const SIZE_OF_SOCKADDR_IN6: usize = mem::size_of::(); const SIZE_OF_SOCKADDR_STORAGE: usize = mem::size_of::(); + const SOCKADDR_IN_PADDING: usize = SIZE_OF_SOCKADDR_STORAGE - SIZE_OF_SOCKADDR_IN; + const SOCKADDR_IN6_PADDING: usize = SIZE_OF_SOCKADDR_STORAGE - SIZE_OF_SOCKADDR_IN6; iov.write(iovec { iov_base: packet.as_ptr() as *mut libc::c_void, @@ -77,7 +79,7 @@ fn mmsghdr_for_packet( let msg_namelen = match dest { SocketAddr::V4(socket_addr_v4) => { - let ptr = addr.as_mut_ptr() as *mut _; + let ptr: *mut sockaddr_in = addr.as_mut_ptr() as *mut _; unsafe { ptr::write( ptr, @@ -87,13 +89,13 @@ fn mmsghdr_for_packet( ptr::write_bytes( (ptr as *mut u8).add(SIZE_OF_SOCKADDR_IN), 0, - SIZE_OF_SOCKADDR_STORAGE - SIZE_OF_SOCKADDR_IN, + SOCKADDR_IN_PADDING, ); } SIZE_OF_SOCKADDR_IN as socklen_t } SocketAddr::V6(socket_addr_v6) => { - let ptr = addr.as_mut_ptr() as *mut _; + let ptr: *mut sockaddr_in6 = addr.as_mut_ptr() as *mut _; unsafe { ptr::write( ptr, @@ -103,7 +105,7 @@ fn mmsghdr_for_packet( ptr::write_bytes( (ptr as *mut u8).add(SIZE_OF_SOCKADDR_IN6), 0, - SIZE_OF_SOCKADDR_STORAGE - SIZE_OF_SOCKADDR_IN6, + SOCKADDR_IN6_PADDING, ); } SIZE_OF_SOCKADDR_IN6 as socklen_t From 8fcf6e0517c1f1ade95e78dff74bae61340752ed Mon Sep 17 00:00:00 2001 From: Zach Brown Date: Fri, 20 Dec 2024 10:17:47 -0800 Subject: [PATCH 3/5] chunk packets to handle packets.len() > MAX_IOV --- streamer/src/sendmmsg.rs | 25 ++++++++++++++++--------- 1 file changed, 16 insertions(+), 9 deletions(-) diff --git a/streamer/src/sendmmsg.rs b/streamer/src/sendmmsg.rs index 91e80debd95cc0..eb841e148c319a 100644 --- a/streamer/src/sendmmsg.rs +++ b/streamer/src/sendmmsg.rs @@ -177,20 +177,15 @@ fn sendmmsg_retry(sock: &UdpSocket, hdrs: &mut [mmsghdr]) -> Result<(), SendPkts } } +const MAX_IOV: usize = libc::UIO_MAXIOV as usize; + #[cfg(target_os = "linux")] -pub fn batch_send(sock: &UdpSocket, packets: &[(T, S)]) -> Result<(), SendPktsError> +pub fn batch_send_max_iov(sock: &UdpSocket, packets: &[(T, S)]) -> Result<(), SendPktsError> where S: Borrow, T: AsRef<[u8]>, { - const MAX_IOV: usize = libc::UIO_MAXIOV as usize; - - if packets.len() > MAX_IOV { - return Err(SendPktsError::IoError( - io::Error::new(io::ErrorKind::InvalidInput, "batch size exceeds UIO_MAXIOV"), - packets.len(), - )); - } + assert!(packets.len() <= MAX_IOV); let mut iovs = [MaybeUninit::uninit(); MAX_IOV]; let mut addrs = [MaybeUninit::uninit(); MAX_IOV]; @@ -207,6 +202,18 @@ where sendmmsg_retry(sock, hdrs) } +#[cfg(target_os = "linux")] +pub fn batch_send(sock: &UdpSocket, packets: &[(T, S)]) -> Result<(), SendPktsError> +where + S: Borrow, + T: AsRef<[u8]>, +{ + for chunk in packets.chunks(MAX_IOV) { + batch_send_max_iov(sock, chunk)?; + } + Ok(()) +} + pub fn multi_target_send( sock: &UdpSocket, packet: T, From 74c62f360a5a00e5c24a7d38ddf7e2a0422fb369 Mon Sep 17 00:00:00 2001 From: Zach Brown Date: Fri, 20 Dec 2024 10:27:24 -0800 Subject: [PATCH 4/5] target_os = "linux" for MAX_IOV --- streamer/src/sendmmsg.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/streamer/src/sendmmsg.rs b/streamer/src/sendmmsg.rs index eb841e148c319a..402706aad958e6 100644 --- a/streamer/src/sendmmsg.rs +++ b/streamer/src/sendmmsg.rs @@ -177,6 +177,7 @@ fn sendmmsg_retry(sock: &UdpSocket, hdrs: &mut [mmsghdr]) -> Result<(), SendPkts } } +#[cfg(target_os = "linux")] const MAX_IOV: usize = libc::UIO_MAXIOV as usize; #[cfg(target_os = "linux")] From e2a2a13f3aec4aaf572e6ac0331828781704d616 Mon Sep 17 00:00:00 2001 From: Zach Brown Date: Mon, 30 Dec 2024 17:21:00 +0000 Subject: [PATCH 5/5] assume_init_drop for initialized hdrs, iovs, addrs --- streamer/src/sendmmsg.rs | 20 +++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/streamer/src/sendmmsg.rs b/streamer/src/sendmmsg.rs index 402706aad958e6..93e23f81c01e36 100644 --- a/streamer/src/sendmmsg.rs +++ b/streamer/src/sendmmsg.rs @@ -196,11 +196,25 @@ where for ((pkt, dest), hdr, iov, addr) in izip!(packets, &mut hdrs, &mut iovs, &mut addrs) { mmsghdr_for_packet(pkt.as_ref(), dest.borrow(), iov, addr, hdr); } - // SAFETY: hdrs is initialized by mmsghdr_for_packet in packets.len() - let hdrs = + + // SAFETY: The first `packets.len()` elements of `hdrs`, `iovs`, and `addrs` are + // guaranteed to be initialized by `mmsghdr_for_packet` before this loop. + let hdrs_slice = unsafe { std::slice::from_raw_parts_mut(hdrs.as_mut_ptr() as *mut mmsghdr, packets.len()) }; - sendmmsg_retry(sock, hdrs) + let result = sendmmsg_retry(sock, hdrs_slice); + + // SAFETY: The first `packets.len()` elements of `hdrs`, `iovs`, and `addrs` are + // guaranteed to be initialized by `mmsghdr_for_packet` before this loop. + for (hdr, iov, addr) in izip!(&mut hdrs, &mut iovs, &mut addrs).take(packets.len()) { + unsafe { + hdr.assume_init_drop(); + iov.assume_init_drop(); + addr.assume_init_drop(); + } + } + + result } #[cfg(target_os = "linux")]