diff --git a/Cargo.lock b/Cargo.lock index fdeada37500..b22c5f2201b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2933,7 +2933,7 @@ dependencies = [ [[package]] name = "libp2p-quic" -version = "0.13.0" +version = "0.13.1" dependencies = [ "futures", "futures-timer", diff --git a/Cargo.toml b/Cargo.toml index 9e65d1ab62f..ad0e97fe345 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -96,7 +96,7 @@ libp2p-perf = { version = "0.4.0", path = "protocols/perf" } libp2p-ping = { version = "0.47.0", path = "protocols/ping" } libp2p-plaintext = { version = "0.43.0", path = "transports/plaintext" } libp2p-pnet = { version = "0.26.0", path = "transports/pnet" } -libp2p-quic = { version = "0.13.0", path = "transports/quic" } +libp2p-quic = { version = "0.13.1", path = "transports/quic" } libp2p-relay = { version = "0.21.1", path = "protocols/relay" } libp2p-rendezvous = { version = "0.17.0", path = "protocols/rendezvous" } libp2p-request-response = { version = "0.29.0", path = "protocols/request-response" } diff --git a/transports/quic/CHANGELOG.md b/transports/quic/CHANGELOG.md index d73e32ce992..28f94d0923b 100644 --- a/transports/quic/CHANGELOG.md +++ b/transports/quic/CHANGELOG.md @@ -1,3 +1,7 @@ +## 0.13.1 +- Improving the way opened lister sockets are re-used when dialing. + See [PR 6224](https://github.com/libp2p/rust-libp2p/pull/6224) + ## 0.13.0 - Remove `async-std` support. diff --git a/transports/quic/Cargo.toml b/transports/quic/Cargo.toml index 402a10564db..0343a4ac79f 100644 --- a/transports/quic/Cargo.toml +++ b/transports/quic/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "libp2p-quic" -version = "0.13.0" +version = "0.13.1" authors = ["Parity Technologies "] edition.workspace = true rust-version = { workspace = true } diff --git a/transports/quic/src/transport.rs b/transports/quic/src/transport.rs index 814f7e69287..33553d3f178 100644 --- a/transports/quic/src/transport.rs +++ b/transports/quic/src/transport.rs @@ -19,15 +19,13 @@ // DEALINGS IN THE SOFTWARE. use std::{ - collections::{ - hash_map::{DefaultHasher, Entry}, - HashMap, HashSet, - }, + collections::{hash_map::Entry, HashMap, HashSet}, fmt, - hash::{Hash, Hasher}, + hash::Hash, io, net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, UdpSocket}, pin::Pin, + sync::{Arc, PoisonError, RwLock}, task::{Context, Poll, Waker}, time::Duration, }; @@ -55,6 +53,114 @@ use crate::{ ConnectError, Connecting, Connection, Error, }; +type Port = u16; + +/// Error that can occur when accessing the PortReuse registry. +#[derive(Debug, thiserror::Error)] +pub(crate) enum PortReuseError { + /// The lock was poisoned, indicating a panic occurred in another thread + /// while holding the lock. + #[error("port reuse registry lock is poisoned")] + LockPoisoned, +} + +impl From> for PortReuseError { + fn from(_: PoisonError) -> Self { + PortReuseError::LockPoisoned + } +} + +/// The configuration for port reuse of listening sockets. +// Note: This mimics the same logic as [`libp2p_tcp::PortReuse`], should optimize to use OS routing. +#[derive(Debug, Clone, Default)] +struct PortReuse { + /// The addresses and ports of the registered listener sockets + /// whose ports could be reused while dialing. + listen_addrs: Arc>>, +} + +impl PortReuse { + /// Registers a socket address for port reuse. + /// + /// # Errors + /// + /// Returns `Err(PortReuseError::LockPoisoned)` if the lock is poisoned. + fn register(&mut self, ip: IpAddr, port: u16) -> Result<(), PortReuseError> { + tracing::trace!(%ip, %port, "Registering QUIC address for port reuse"); + + let mut addrs = match self.listen_addrs.write() { + Ok(guard) => guard, + Err(poisoned) => { + tracing::error!(%ip, %port, error=%poisoned, + "Port reuse registry lock is poisoned, refusing to use potentially inconsistent data" + ); + return Err(PortReuseError::LockPoisoned); // could recover here, if availability + // over consistency is more important + } + }; + + addrs.insert((ip, port)); + Ok(()) + } + + /// Unregisters a socket address for port reuse. + /// + /// # Errors + /// + /// Returns `Err(PortReuseError::LockPoisoned)` if the lock was poisoned. + fn unregister(&mut self, ip: IpAddr, port: u16) -> Result<(), PortReuseError> { + tracing::trace!(%ip, %port, "Unregistering QUIC address from port reuse"); + + let mut addrs = match self.listen_addrs.write() { + Ok(guard) => guard, + Err(poisoned) => { + tracing::error!(%ip, %port, error=%poisoned, + "Port reuse registry lock is poisoned during unregister, refusing to use potentially inconsistent data" + ); + return Err(PortReuseError::LockPoisoned); // recover possible here + } + }; + + addrs.remove(&(ip, port)); + Ok(()) + } + + /// Selects a listening port suitable for re-use + /// with the local address when dialing. + /// + /// If multiple listening addresses with ports are registered for + /// reuse, one port is chosen where IP protocol version and + /// loopback status is the same as that of `remote_ip`. + /// + /// # Returns + /// + /// - `Ok(Some((ip, port)))` if a suitable address is found + /// - `Ok(None)` if no suitable address is found + /// - `Err(PortReuseError::LockPoisoned)` if the lock is poisoned + fn local_dial_port(&self, remote_ip: &IpAddr) -> Result, PortReuseError> { + let addrs = match self.listen_addrs.read() { + Ok(guard) => guard, + Err(poisoned) => { + tracing::error!( + remote_ip = %remote_ip, + "Port reuse registry lock poisoned during lookup: {poisoned}" + ); + return Err(PortReuseError::LockPoisoned); // and also recover here + } + }; + + let result = addrs + .iter() + .find(|(ip, _port)| { + ip.is_ipv4() == remote_ip.is_ipv4() && ip.is_loopback() == remote_ip.is_loopback() + }) + .map(|(_ip, port)| port) + .copied(); + + Ok(result) + } +} + /// Implementation of the [`Transport`] trait for QUIC. /// /// By default only QUIC Version 1 (RFC 9000) is supported. In the [`Multiaddr`] this maps to @@ -82,6 +188,8 @@ pub struct GenTransport { waker: Option, /// Holepunching attempts hole_punch_attempts: HashMap>, + /// Registry for port reuse + port_reuse: PortReuse, } #[expect(deprecated)] @@ -99,6 +207,7 @@ impl GenTransport

{ waker: None, support_draft_29, hole_punch_attempts: Default::default(), + port_reuse: PortReuse::default(), } } @@ -145,38 +254,38 @@ impl GenTransport

{ Ok((socket_addr, version, peer_id)) } - /// Pick any listener to use for dialing. + /// Find eligible listener using PortReuse registry fn eligible_listener(&mut self, socket_addr: &SocketAddr) -> Option<&mut Listener

> { - let mut listeners: Vec<_> = self - .listeners - .iter_mut() - .filter(|l| { - if l.is_closed { - return false; - } - SocketFamily::is_same(&l.socket_addr().ip(), &socket_addr.ip()) - }) - .filter(|l| { - if socket_addr.ip().is_loopback() { - l.listening_addresses - .iter() - .any(|ip_addr| ip_addr.is_loopback()) - } else { - true - } - }) - .collect(); - match listeners.len() { - 0 => None, - 1 => listeners.pop(), - _ => { - // Pick any listener to use for dialing. - // We hash the socket address to achieve determinism. - let mut hasher = DefaultHasher::new(); - socket_addr.hash(&mut hasher); - let index = hasher.finish() as usize % listeners.len(); - Some(listeners.swap_remove(index)) + // Query the PortReuse registry for a suitable listening address + let local_port = match self.port_reuse.local_dial_port(&socket_addr.ip()) { + Ok(Some(port)) => port, + Ok(None) => return None, + Err(e) => { + tracing::error!( + error = %e, + remote_addr = %socket_addr, + "Failed to query port reuse registry" + ); + return None; } + }; + + // Find the listener with this port + self.listeners.iter_mut().find(|l| { + !l.is_closed + && l.socket_addr().port() == local_port + && SocketFamily::is_same(&l.socket_addr().ip(), &socket_addr.ip()) + }) + } + + fn get_or_create_dialer(&mut self, socket_addr: SocketAddr) -> Result { + let socket_family = socket_addr.ip().into(); + if let Some(occupied) = self.dialer.get(&socket_family) { + Ok(occupied.clone()) + } else { + let endpoint = self.bound_socket(socket_addr)?; + self.dialer.insert(socket_family, endpoint.clone()); + Ok(endpoint) } } @@ -235,6 +344,7 @@ impl Transport for GenTransport

{ endpoint, self.handshake_timeout, version, + self.port_reuse.clone(), )?; self.listeners.push(listener); @@ -271,28 +381,21 @@ impl Transport for GenTransport

{ match (dial_opts.role, dial_opts.port_use) { (Endpoint::Dialer, _) | (Endpoint::Listener, PortUse::Reuse) => { - let endpoint = if let Some(listener) = dial_opts - .port_use - .eq(&PortUse::Reuse) - .then(|| self.eligible_listener(&socket_addr)) - .flatten() - { - listener.endpoint.clone() - } else { - let socket_family = socket_addr.ip().into(); - let dialer = if dial_opts.port_use == PortUse::Reuse { - if let Some(occupied) = self.dialer.get(&socket_family) { - occupied.clone() - } else { - let endpoint = self.bound_socket(socket_addr)?; - self.dialer.insert(socket_family, endpoint.clone()); - endpoint - } + let endpoint = if dial_opts.port_use == PortUse::Reuse { + if let Some(listener) = self.eligible_listener(&socket_addr) { + tracing::debug!( + local_addr=%listener.socket_addr(), + remote_addr=%socket_addr, + "Reusing listener socket for dial" + ); + listener.endpoint.clone() } else { - self.bound_socket(socket_addr)? - }; - dialer + self.get_or_create_dialer(socket_addr)? + } + } else { + self.bound_socket(socket_addr)? }; + let handshake_timeout = self.handshake_timeout; let mut client_config = self.quinn_config.client_config.clone(); if version == ProtocolVersion::Draft29 { @@ -448,7 +551,14 @@ struct Listener { /// The stream must be awaken after it has been closed to deliver the last event. close_listener_waker: Option, - listening_addresses: HashSet, + /// Registry for port reuse + port_reuse: PortReuse, + + /// Track registered addresses for cleanup + /// + /// We maintain this separate from the shared PortReuse registry so we know + /// exactly which addresses to unregister when this listener closes. + registered_addrs: HashSet, } impl Listener

{ @@ -458,17 +568,33 @@ impl Listener

{ endpoint: quinn::Endpoint, handshake_timeout: Duration, version: ProtocolVersion, + mut port_reuse: PortReuse, ) -> Result { let if_watcher; let pending_event; - let mut listening_addresses = HashSet::new(); + let mut registered_addrs = HashSet::new(); let local_addr = socket.local_addr()?; + if local_addr.ip().is_unspecified() { + // Wildcard address - use if-watch to discover concrete addresses if_watcher = Some(P::new_if_watcher()?); pending_event = None; } else { + // Specific address - register immediately if_watcher = None; - listening_addresses.insert(local_addr.ip()); + + // Register in PortReuse + if let Err(e) = port_reuse.register(local_addr.ip(), local_addr.port()) { + tracing::error!( + error = %e, + ip = %local_addr.ip(), + port = local_addr.port(), + "Failed to register address in port reuse registry during listener creation" + ); + // Continue anyway - port reuse just won't work for this address + } + registered_addrs.insert(local_addr.ip()); + let ma = socketaddr_to_multiaddr(&local_addr, version); pending_event = Some(TransportEvent::NewAddress { listener_id, @@ -490,7 +616,8 @@ impl Listener

{ is_closed: false, pending_event, close_listener_waker: None, - listening_addresses, + port_reuse, + registered_addrs, }) } @@ -501,6 +628,20 @@ impl Listener

{ return; } self.endpoint.close(From::from(0u32), &[]); + + // Unregister all addresses for this listener + let port = self.socket_addr().port(); + for ip in self.registered_addrs.drain() { + if let Err(e) = self.port_reuse.unregister(ip, port) { + tracing::warn!( + error = %e, + ip = %ip, + port = port, + "Failed to unregister address during listener close" + ); + } + } + self.pending_event = Some(TransportEvent::ListenerClosed { listener_id: self.listener_id, reason, @@ -540,7 +681,20 @@ impl Listener

{ address=%listen_addr, "New listen address" ); - self.listening_addresses.insert(inet.addr()); + + // Register the new address for port reuse + if let Err(e) = self.port_reuse.register(inet.addr(), endpoint_addr.port()) + { + tracing::error!( + error = %e, + address = %listen_addr, + "Failed to register new address in port reuse registry" + ); + // Continue anyway - just won't be able to reuse this address + } else { + self.registered_addrs.insert(inet.addr()); + } + return Poll::Ready(TransportEvent::NewAddress { listener_id: self.listener_id, listen_addr, @@ -555,7 +709,21 @@ impl Listener

{ address=%listen_addr, "Expired listen address" ); - self.listening_addresses.remove(&inet.addr()); + + // Unregister the address from port reuse + if let Err(e) = self + .port_reuse + .unregister(inet.addr(), endpoint_addr.port()) + { + tracing::warn!( + error = %e, + address = %listen_addr, + "Failed to unregister expired address from port reuse registry" + ); + } else { + self.registered_addrs.remove(&inet.addr()); + } + return Poll::Ready(TransportEvent::AddressExpired { listener_id: self.listener_id, listen_addr, @@ -575,6 +743,7 @@ impl Listener

{ impl Stream for Listener

{ type Item = TransportEvent< as Transport>::ListenerUpgrade, Error>; + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { loop { if let Some(event) = self.pending_event.take() { @@ -746,6 +915,25 @@ mod tests { use super::*; + // Helper to create an IPv4 address + fn ipv4(a: u8, b: u8, c: u8, d: u8) -> IpAddr { + IpAddr::V4(Ipv4Addr::new(a, b, c, d)) + } + + // Helper to create an IPv6 address + fn ipv6(segments: [u16; 8]) -> IpAddr { + IpAddr::V6(Ipv6Addr::new( + segments[0], + segments[1], + segments[2], + segments[3], + segments[4], + segments[5], + segments[6], + segments[7], + )) + } + #[test] fn multiaddr_to_udp_conversion() { assert!(multiaddr_to_socketaddr( @@ -946,4 +1134,147 @@ mod tests { ) .unwrap(); } + + #[test] + fn test_port_reuse_separation() { + // Test Ipv4 and IPv6 separation + let mut registry = PortReuse::default(); + let ipv4_addr = ipv4(192, 168, 1, 100); + let ipv6_addr = ipv6([0x2001, 0xdb8, 0, 0, 0, 0, 0, 1]); + let ipv4_port = 9000; + let ipv6_port = 9001; + + // Register both IPv4 and IPv6 + assert!(registry.register(ipv4_addr, ipv4_port).is_ok()); + assert!(registry.register(ipv6_addr, ipv6_port).is_ok()); + + // IPv4 lookup should only find IPv4 port + let result = registry.local_dial_port(&ipv4(8, 8, 8, 8)); + assert_eq!(result.unwrap(), Some(ipv4_port)); + + // IPv6 lookup should only find IPv6 port + let result = registry.local_dial_port(&ipv6([0x2606, 0x4700, 0x4700, 0, 0, 0, 0, 0x1111])); + assert_eq!(result.unwrap(), Some(ipv6_port)); + + // Test loopback and non-loopback separation + let loopback = ipv4(127, 0, 0, 1); + let loopback_port = 9000; + + // Register both loopback and non-loopback + assert!(registry.register(loopback, loopback_port).is_ok()); + + // Loopback lookup should find loopback port + let result = registry.local_dial_port(&ipv4(127, 0, 0, 2)); + assert_eq!(result.unwrap(), Some(loopback_port)); + + // Non-loopback lookup should find non-loopback port + let result = registry.local_dial_port(&ipv4(8, 8, 8, 8)); + assert_eq!(result.unwrap(), Some(ipv4_port)); + } + + #[test] + fn test_port_reuse_unregister() { + let mut registry = PortReuse::default(); + let ip = ipv4(192, 168, 1, 100); + let port = 9000; + + // Register + assert!(registry.register(ip, port).is_ok()); + assert_eq!(registry.local_dial_port(&ip).unwrap(), Some(port)); + + // Unregister + assert!(registry.unregister(ip, port).is_ok()); + assert_eq!(registry.local_dial_port(&ip).unwrap(), None); + + // Test non-existent unregistere + let ip = ipv4(192, 168, 1, 200); + let port = 19000; + assert!(registry.unregister(ip, port).is_ok()); + + // Should still be empty + assert_eq!(registry.local_dial_port(&ip).unwrap(), None); + } + + #[test] + fn test_port_reuse_multiple_ports_same_ip() { + let mut registry = PortReuse::default(); + let ip = ipv4(192, 168, 1, 100); + let port1 = 9000; + let port2 = 9001; + + // Register same IP with different ports + assert!(registry.register(ip, port1).is_ok()); + assert!(registry.register(ip, port2).is_ok()); + + // Should find one of them (which one is implementation-dependent) + let result = registry.local_dial_port(&ipv4(8, 8, 8, 8)); + let found_port = result.unwrap().unwrap(); + assert!(found_port == port1 || found_port == port2); + + // Unregister one + assert!(registry.unregister(ip, port1).is_ok()); + + // Should now find the other one + let result = registry.local_dial_port(&ipv4(8, 8, 8, 8)); + assert_eq!(result.unwrap(), Some(port2)); + } + + #[test] + fn test_port_reuse_multiple_ips_same_port() { + let mut registry = PortReuse::default(); + let ip1 = ipv4(192, 168, 1, 100); + let ip2 = ipv4(192, 168, 1, 101); + let port = 9000; + + // Register different IPs with same port + assert!(registry.register(ip1, port).is_ok()); + assert!(registry.register(ip2, port).is_ok()); + + // Should find the port + let result = registry.local_dial_port(&ipv4(8, 8, 8, 8)); + assert_eq!(result.unwrap(), Some(port)); + } + + #[test] + fn test_port_reuse_scenario() { + let mut registry = PortReuse::default(); + let port = 9000; + + // Simulate listening on 0.0.0.0:9000 + // if-watch discovers these addresses: + let discovered_addrs = vec![ + ipv4(127, 0, 0, 1), // Loopback + ipv4(192, 168, 1, 100), // WiFi + ipv4(10, 8, 0, 5), // VPN + ipv4(172, 17, 0, 1), // Docker + ]; + + // Register all discovered addresses + for ip in &discovered_addrs { + assert!(registry.register(*ip, port).is_ok()); + } + + // Dial to loopback -> should find loopback port + let result = registry.local_dial_port(&ipv4(127, 0, 0, 2)); + assert_eq!(result.unwrap(), Some(port)); + + // Dial to internet -> should find non-loopback port + let result = registry.local_dial_port(&ipv4(8, 8, 8, 8)); + assert_eq!(result.unwrap(), Some(port)); + + // VPN disconnects - unregister VPN address + assert!(registry.unregister(ipv4(10, 8, 0, 5), port).is_ok()); + + // Should still work with remaining addresses + let result = registry.local_dial_port(&ipv4(8, 8, 8, 8)); + assert_eq!(result.unwrap(), Some(port)); + + // All interfaces go down + for ip in &discovered_addrs { + let _ = registry.unregister(*ip, port); + } + + // Should return None now + assert_eq!(registry.local_dial_port(&ipv4(8, 8, 8, 8)).unwrap(), None); + } } diff --git a/transports/quic/tests/smoke.rs b/transports/quic/tests/smoke.rs index b14c17b2564..16608833108 100644 --- a/transports/quic/tests/smoke.rs +++ b/transports/quic/tests/smoke.rs @@ -817,3 +817,213 @@ impl Spawn for libp2p_quic::tokio::Provider { tokio::spawn(future); } } + +#[cfg(feature = "tokio")] +#[tokio::test] +// Regression test for the original bug: listening on 0.0.0.0 and dialing to 127.0.0.1 +// should reuse the listener socket, not create a new one. +// +// This is the exact scenario from issue #4259. +async fn test_port_reuse_wildcard_to_loopback() { + let _ = tracing_subscriber::fmt() + .with_env_filter(EnvFilter::from_default_env()) + .try_init(); + + let (_, mut a_transport) = create_default_transport::(); + let (_, mut b_transport) = create_default_transport::(); + + // A listens on 0.0.0.0 (wildcard) - this is the bug scenario + a_transport + .listen_on( + ListenerId::next(), + "/ip4/0.0.0.0/udp/0/quic-v1".parse().unwrap(), + ) + .unwrap(); + + // Wait for if-watch to discover 127.0.0.1 + let a_listen_addr = 'outer: loop { + let ev = a_transport.next().await.unwrap(); + let listen_addr = ev.into_new_address().unwrap(); + for proto in listen_addr.iter() { + if let Protocol::Ip4(ip4) = proto { + if ip4.is_loopback() { + break 'outer listen_addr; + } + } + } + }; + + // Drain all remaining NewAddress events to avoid them interfering + poll_fn(|cx| { + let mut pinned = Pin::new(&mut a_transport); + while pinned.as_mut().poll(cx).is_ready() {} + Poll::Ready(()) + }) + .await; + + // B listens on 127.0.0.1 + let b_addr = start_listening(&mut b_transport, "/ip4/127.0.0.1/udp/0/quic-v1").await; + + // A dials B - the critical test + let (_, send_back_addr, _) = connect(&mut b_transport, &mut a_transport, b_addr).await.0; + + // Source address should be A's loopback listener + // This proves we reused the listener socket instead of creating a new one + assert_eq!( + send_back_addr, a_listen_addr, + "Port reuse failed: expected source {} but got {}", + a_listen_addr, send_back_addr + ); +} + +// Test that listener cleanup properly unregisters addresses from PortReuse registry. +// +// This ensures closed listeners don't leak entries in the registry. +#[cfg(feature = "tokio")] +#[tokio::test] +async fn test_port_reuse_listener_cleanup() { + let _ = tracing_subscriber::fmt() + .with_env_filter(EnvFilter::from_default_env()) + .try_init(); + + let (_, mut a_transport) = create_default_transport::(); + let (_, mut b_transport) = create_default_transport::(); + + // A listens on wildcard + let listener_id = ListenerId::next(); + a_transport + .listen_on(listener_id, "/ip4/0.0.0.0/udp/0/quic-v1".parse().unwrap()) + .unwrap(); + + // Wait for loopback address discovery + let a_loopback_addr = loop { + let ev = a_transport.next().await.unwrap(); + if let Some(listen_addr) = ev.into_new_address() { + if listen_addr + .iter() + .any(|p| matches!(p, Protocol::Ip4(ip) if ip.is_loopback())) + { + break listen_addr; + } + } + }; + + // Drain events + poll_fn(|cx| { + let mut pinned = Pin::new(&mut a_transport); + while pinned.as_mut().poll(cx).is_ready() {} + Poll::Ready(()) + }) + .await; + + // B listens + let b_addr = start_listening(&mut b_transport, "/ip4/127.0.0.1/udp/0/quic-v1").await; + + // First dial should work and reuse listener + let (_, send_back_addr, _) = connect(&mut b_transport, &mut a_transport, b_addr).await.0; + assert_eq!( + send_back_addr, a_loopback_addr, + "First dial should reuse listener" + ); + + // Remove A's listener + assert!(a_transport.remove_listener(listener_id)); + + // Wait for listener close event + loop { + let ev = a_transport.next().await.unwrap(); + if matches!(ev, TransportEvent::ListenerClosed { .. }) { + break; + } + } + + // Poll once more to ensure listener stream is removed + poll_fn(|cx| { + let mut pinned = Pin::new(&mut a_transport); + while pinned.as_mut().poll(cx).is_ready() {} + Poll::Ready(()) + }) + .await; + + // Second dial to a new B address should NOT reuse the old (closed) listener + // It should create a new dialer socket instead + let b2_addr = start_listening(&mut b_transport, "/ip4/127.0.0.1/udp/0/quic-v1").await; + let (_, send_back_addr2, _) = connect(&mut b_transport, &mut a_transport, b2_addr).await.0; + + // The source address should be different (new dialer socket, not the closed listener) + assert_ne!( + send_back_addr2, a_loopback_addr, + "After listener closes, dial should create new socket, not reuse closed listener" + ); +} + +// Test that PortUse::New creates a fresh socket instead of reusing listeners. +// +// This verifies the PortUse policy enum is respected correctly. +#[cfg(feature = "tokio")] +#[tokio::test] +async fn test_port_use_new_vs_reuse() { + let _ = tracing_subscriber::fmt() + .with_env_filter(EnvFilter::from_default_env()) + .try_init(); + + let (_, mut a_transport) = create_default_transport::(); + let (_, mut b_transport) = create_default_transport::(); + + // A listens on 127.0.0.1 + let a_listen_addr = start_listening(&mut a_transport, "/ip4/127.0.0.1/udp/0/quic-v1").await; + + // B listens on 127.0.0.1 + let b_addr = start_listening(&mut b_transport, "/ip4/127.0.0.1/udp/0/quic-v1").await; + + // Test 1: Dial with PortUse::Reuse (should reuse listener) + let (_, send_back_addr_reuse, _) = connect(&mut b_transport, &mut a_transport, b_addr.clone()) + .await + .0; + + // With PortUse::Reuse, source address should be listener's address + assert_eq!( + send_back_addr_reuse, a_listen_addr, + "PortUse::Reuse should reuse listener socket" + ); + + // Test 2: Dial with PortUse::New (should create new socket) + let dial_fut = a_transport + .dial( + b_addr, + DialOpts { + role: Endpoint::Dialer, + port_use: PortUse::New, // NEW socket + }, + ) + .unwrap(); + + let (_, send_back_addr_new, _) = tokio::join!( + async { + loop { + match b_transport.next().await.unwrap() { + TransportEvent::Incoming { + upgrade, + send_back_addr, + .. + } => { + let (peer_id, connection) = upgrade.await.unwrap(); + break (peer_id, send_back_addr, connection); + } + _ => continue, + } + } + }, + async { + let (peer_id, connection) = dial_fut.await.unwrap(); + (peer_id, Multiaddr::empty(), connection) // send_back_addr not available on dialer side + } + ) + .0; + + // With PortUse::New, source address should be different (new socket) + assert_ne!( + send_back_addr_new, a_listen_addr, + "PortUse::New should create fresh socket, not reuse listener" + ); +}