From 64b552eda0bf139f5f25f3f872f29a15628f952f Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Tue, 15 Oct 2024 01:38:57 +0300 Subject: [PATCH] Make requests to peers with explicit peer ID succeed --- .../subspace-networking/src/behavior/tests.rs | 6 +- crates/subspace-networking/src/node.rs | 3 + crates/subspace-networking/src/node/tests.rs | 91 +++++++++++++++++++ crates/subspace-networking/src/node_runner.rs | 62 +++++++++++-- .../src/protocols/reserved_peers/tests.rs | 8 +- 5 files changed, 154 insertions(+), 16 deletions(-) create mode 100644 crates/subspace-networking/src/node/tests.rs diff --git a/crates/subspace-networking/src/behavior/tests.rs b/crates/subspace-networking/src/behavior/tests.rs index e1bb5d5924f..0933c338d98 100644 --- a/crates/subspace-networking/src/behavior/tests.rs +++ b/crates/subspace-networking/src/behavior/tests.rs @@ -25,7 +25,7 @@ use std::task::{Context, Poll}; use std::time::Duration; use tokio::time::sleep; -#[tokio::test()] +#[tokio::test] async fn test_address_timed_removal_from_known_peers_cache() { // Cache initialization let peer_id = PeerId::random(); @@ -92,7 +92,7 @@ async fn test_address_timed_removal_from_known_peers_cache() { assert_eq!(removed_addresses.len(), 2); } -#[tokio::test()] +#[tokio::test] async fn test_different_removal_timing_from_known_peers_cache() { // Cache initialization let peer_id = PeerId::random(); @@ -282,7 +282,7 @@ async fn test_address_p2p_prefix_addition() { assert_eq!(append_p2p_suffix(peer_id, short_addr.clone()), long_addr); } -#[tokio::test()] +#[tokio::test] async fn test_known_peers_removal_address_after_specified_interval() { let config = KnownPeersManagerConfig { enable_known_peers_source: false, diff --git a/crates/subspace-networking/src/node.rs b/crates/subspace-networking/src/node.rs index a4720856c9b..dbd9cb78324 100644 --- a/crates/subspace-networking/src/node.rs +++ b/crates/subspace-networking/src/node.rs @@ -1,3 +1,6 @@ +#[cfg(test)] +mod tests; + use crate::protocols::request_response::handlers::generic_request_handler::GenericRequest; use crate::protocols::request_response::request_response_factory; use crate::shared::{Command, CreatedSubscription, PeerDiscovered, Shared}; diff --git a/crates/subspace-networking/src/node/tests.rs b/crates/subspace-networking/src/node/tests.rs new file mode 100644 index 00000000000..f63023326fa --- /dev/null +++ b/crates/subspace-networking/src/node/tests.rs @@ -0,0 +1,91 @@ +// TODO: Remove +#![allow( + clippy::needless_return, + reason = "https://github.com/rust-lang/rust-clippy/issues/13458" +)] + +use crate::protocols::request_response::handlers::generic_request_handler::{ + GenericRequest, GenericRequestHandler, +}; +use crate::{construct, Config}; +use futures::channel::oneshot; +use libp2p::multiaddr::Protocol; +use parity_scale_codec::{Decode, Encode}; +use parking_lot::Mutex; +use std::sync::Arc; + +#[derive(Encode, Decode)] +struct ExampleRequest; + +impl GenericRequest for ExampleRequest { + const PROTOCOL_NAME: &'static str = "/example"; + const LOG_TARGET: &'static str = "example_request"; + type Response = ExampleResponse; +} + +#[derive(Encode, Decode, Debug)] +struct ExampleResponse; + +#[tokio::test] +async fn request_with_addresses() { + tracing_subscriber::fmt::init(); + + let config_1 = Config { + listen_on: vec!["/ip4/0.0.0.0/tcp/0".parse().unwrap()], + allow_non_global_addresses_in_dht: true, + request_response_protocols: vec![GenericRequestHandler::::create( + |_, _example_request| async { Some(ExampleResponse) }, + )], + ..Config::default() + }; + let (node_1, mut node_runner_1) = construct(config_1).unwrap(); + + let (node_1_address_sender, node_1_address_receiver) = oneshot::channel(); + let on_new_listener_handler = node_1.on_new_listener(Arc::new({ + let node_1_address_sender = Mutex::new(Some(node_1_address_sender)); + + move |address| { + if matches!(address.iter().next(), Some(Protocol::Ip4(_))) { + if let Some(node_1_address_sender) = node_1_address_sender.lock().take() { + node_1_address_sender.send(address.clone()).unwrap(); + } + } + } + })); + + tokio::spawn(async move { + node_runner_1.run().await; + }); + + // Wait for first node to know its address + let node_1_addr = node_1_address_receiver.await.unwrap(); + drop(on_new_listener_handler); + + let config_2 = Config { + listen_on: vec!["/ip4/0.0.0.0/tcp/0".parse().unwrap()], + allow_non_global_addresses_in_dht: true, + request_response_protocols: vec![GenericRequestHandler::::create( + |_, _| async { None }, + )], + ..Config::default() + }; + + let (node_2, mut node_runner_2) = construct(config_2).unwrap(); + + tokio::spawn(async move { + node_runner_2.run().await; + }); + + // Make request to previously unknown peer + node_2 + .send_generic_request(node_1.id(), vec![node_1_addr], ExampleRequest) + .await + .unwrap(); + + // Subsequent requests should succeed without explicit addresses through already established + // connection + node_2 + .send_generic_request(node_1.id(), Vec::new(), ExampleRequest) + .await + .unwrap(); +} diff --git a/crates/subspace-networking/src/node_runner.rs b/crates/subspace-networking/src/node_runner.rs index ef54c15e824..f56e64d025b 100644 --- a/crates/subspace-networking/src/node_runner.rs +++ b/crates/subspace-networking/src/node_runner.rs @@ -6,14 +6,14 @@ use crate::constructor; use crate::constructor::temporary_bans::TemporaryBans; use crate::constructor::LocalOnlyRecordStore; use crate::protocols::request_response::request_response_factory::{ - Event as RequestResponseEvent, IfDisconnected, + Event as RequestResponseEvent, IfDisconnected, OutboundFailure, RequestFailure, }; use crate::shared::{Command, CreatedSubscription, PeerDiscovered, Shared}; use crate::utils::{is_global_address_or_dns, strip_peer_id, SubspaceMetrics}; use async_mutex::Mutex as AsyncMutex; use bytes::Bytes; use event_listener_primitives::HandlerId; -use futures::channel::mpsc; +use futures::channel::{mpsc, oneshot}; use futures::future::Fuse; use futures::{FutureExt, StreamExt}; use libp2p::autonat::{Event as AutonatEvent, NatStatus, OutboundProbeEvent}; @@ -81,6 +81,13 @@ enum BootstrapCommandState { Finished, } +#[derive(Debug)] +struct PendingGenericRequest { + protocol_name: &'static str, + request: Vec, + result_sender: oneshot::Sender, RequestFailure>>, +} + /// Runner for the Node. #[must_use = "Node does not function properly unless its runner is driven forward"] pub struct NodeRunner @@ -126,6 +133,7 @@ where bootstrap_command_state: Arc>, /// Receives an event on peer address removal from the persistent storage. removed_addresses_rx: mpsc::UnboundedReceiver, + requests_pending_connections: HashMap>, /// Optional storage for the [`HandlerId`] of the address removal task. /// We keep to stop the task along with the rest of the networking. _address_removal_task_handler_id: Option, @@ -220,6 +228,7 @@ where bootstrap_addresses, bootstrap_command_state: Arc::new(AsyncMutex::new(BootstrapCommandState::default())), removed_addresses_rx, + requests_pending_connections: HashMap::new(), _address_removal_task_handler_id: address_removal_task_handler_id, } } @@ -484,6 +493,18 @@ where num_established, .. } => { + if let Some(generic_requests) = self.requests_pending_connections.remove(&peer_id) { + let request_response = &mut self.swarm.behaviour_mut().request_response; + for request in generic_requests { + request_response.send_request( + &peer_id, + request.protocol_name, + request.request, + request.result_sender, + IfDisconnected::ImmediateError, + ); + } + } // Save known addresses that were successfully dialed. if let ConnectedPoint::Dialer { address, .. } = &endpoint { // filter non-global addresses when non-globals addresses are disabled @@ -590,6 +611,19 @@ where }; } SwarmEvent::OutgoingConnectionError { peer_id, error, .. } => { + if let Some(peer_id) = &peer_id { + if let Some(generic_requests) = + self.requests_pending_connections.remove(peer_id) + { + for request in generic_requests { + // Do not care if receiver is gone + let _: Result<(), _> = request + .result_sender + .send(Err(RequestFailure::Network(OutboundFailure::DialFailure))); + } + } + } + if let Some(peer_id) = &peer_id { let should_ban_temporarily = self.should_temporary_ban_on_dial_error(peer_id, &error); @@ -1435,12 +1469,21 @@ where request, result_sender, } => { + // TODO: Ideally it'd be much simpler with https://github.com/libp2p/rust-libp2p/issues/5634 if !addresses.is_empty() && !self .swarm .connected_peers() .any(|candidate| candidate == &peer_id) { + self.requests_pending_connections + .entry(peer_id) + .or_default() + .push(PendingGenericRequest { + protocol_name, + request, + result_sender, + }); if let Err(error) = self.swarm.dial( DialOpts::peer_id(peer_id) .addresses(addresses) @@ -1449,14 +1492,15 @@ where ) { warn!(%error, "Failed to dial disconnected peer on generic request"); } + } else { + self.swarm.behaviour_mut().request_response.send_request( + &peer_id, + protocol_name, + request, + result_sender, + IfDisconnected::TryConnect, + ); } - self.swarm.behaviour_mut().request_response.send_request( - &peer_id, - protocol_name, - request, - result_sender, - IfDisconnected::TryConnect, - ); } Command::GetProviders { key, diff --git a/crates/subspace-networking/src/protocols/reserved_peers/tests.rs b/crates/subspace-networking/src/protocols/reserved_peers/tests.rs index 20ac649f1e7..805ba3a9dcf 100644 --- a/crates/subspace-networking/src/protocols/reserved_peers/tests.rs +++ b/crates/subspace-networking/src/protocols/reserved_peers/tests.rs @@ -19,7 +19,7 @@ use tokio::time::sleep; const DIALING_INTERVAL_IN_SECS: Duration = Duration::from_secs(1); -#[tokio::test()] +#[tokio::test] async fn test_connection_breaks_after_timeout_without_reservation() { let connection_timeout = Duration::from_millis(300); let long_delay = Duration::from_millis(1000); @@ -63,7 +63,7 @@ async fn test_connection_breaks_after_timeout_without_reservation() { assert!(!peer2.is_connected(peer1.local_peer_id())); } -#[tokio::test()] +#[tokio::test] async fn test_connection_reservation() { let connection_timeout = Duration::from_millis(300); let long_delay = Duration::from_millis(1000); @@ -111,7 +111,7 @@ async fn test_connection_reservation() { assert!(peer2.is_connected(peer1.local_peer_id())); } -#[tokio::test()] +#[tokio::test] async fn test_connection_reservation_symmetry() { let connection_timeout = Duration::from_millis(300); let long_delay = Duration::from_millis(1000); @@ -157,7 +157,7 @@ async fn test_connection_reservation_symmetry() { assert!(!peer2.is_connected(peer1.local_peer_id())); } -#[tokio::test()] +#[tokio::test] async fn test_reserved_peers_dial_event() { let connection_timeout = Duration::from_millis(1300); let long_delay = Duration::from_millis(2000);