Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make requests to peers with explicit addresses succeed #3127

Merged
merged 1 commit into from
Oct 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions crates/subspace-networking/src/behavior/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use std::task::{Context, Poll};
use std::time::Duration;
use tokio::time::sleep;

#[tokio::test()]
#[tokio::test]
async fn test_address_timed_removal_from_known_peers_cache() {
// Cache initialization
let peer_id = PeerId::random();
Expand Down Expand Up @@ -92,7 +92,7 @@ async fn test_address_timed_removal_from_known_peers_cache() {
assert_eq!(removed_addresses.len(), 2);
}

#[tokio::test()]
#[tokio::test]
async fn test_different_removal_timing_from_known_peers_cache() {
// Cache initialization
let peer_id = PeerId::random();
Expand Down Expand Up @@ -282,7 +282,7 @@ async fn test_address_p2p_prefix_addition() {
assert_eq!(append_p2p_suffix(peer_id, short_addr.clone()), long_addr);
}

#[tokio::test()]
#[tokio::test]
async fn test_known_peers_removal_address_after_specified_interval() {
let config = KnownPeersManagerConfig {
enable_known_peers_source: false,
Expand Down
3 changes: 3 additions & 0 deletions crates/subspace-networking/src/node.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
#[cfg(test)]
mod tests;

use crate::protocols::request_response::handlers::generic_request_handler::GenericRequest;
use crate::protocols::request_response::request_response_factory;
use crate::shared::{Command, CreatedSubscription, PeerDiscovered, Shared};
Expand Down
91 changes: 91 additions & 0 deletions crates/subspace-networking/src/node/tests.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// TODO: Remove
#![allow(
clippy::needless_return,
reason = "https://github.com/rust-lang/rust-clippy/issues/13458"
)]

use crate::protocols::request_response::handlers::generic_request_handler::{
GenericRequest, GenericRequestHandler,
};
use crate::{construct, Config};
use futures::channel::oneshot;
use libp2p::multiaddr::Protocol;
use parity_scale_codec::{Decode, Encode};
use parking_lot::Mutex;
use std::sync::Arc;

#[derive(Encode, Decode)]
struct ExampleRequest;

impl GenericRequest for ExampleRequest {
const PROTOCOL_NAME: &'static str = "/example";
const LOG_TARGET: &'static str = "example_request";
type Response = ExampleResponse;
}

#[derive(Encode, Decode, Debug)]
struct ExampleResponse;

#[tokio::test]
async fn request_with_addresses() {
tracing_subscriber::fmt::init();

let config_1 = Config {
listen_on: vec!["/ip4/0.0.0.0/tcp/0".parse().unwrap()],
allow_non_global_addresses_in_dht: true,
request_response_protocols: vec![GenericRequestHandler::<ExampleRequest>::create(
|_, _example_request| async { Some(ExampleResponse) },
)],
..Config::default()
};
let (node_1, mut node_runner_1) = construct(config_1).unwrap();

let (node_1_address_sender, node_1_address_receiver) = oneshot::channel();
let on_new_listener_handler = node_1.on_new_listener(Arc::new({
let node_1_address_sender = Mutex::new(Some(node_1_address_sender));

move |address| {
if matches!(address.iter().next(), Some(Protocol::Ip4(_))) {
if let Some(node_1_address_sender) = node_1_address_sender.lock().take() {
node_1_address_sender.send(address.clone()).unwrap();
}
}
}
}));

tokio::spawn(async move {
node_runner_1.run().await;
});

// Wait for first node to know its address
let node_1_addr = node_1_address_receiver.await.unwrap();
drop(on_new_listener_handler);

let config_2 = Config {
listen_on: vec!["/ip4/0.0.0.0/tcp/0".parse().unwrap()],
allow_non_global_addresses_in_dht: true,
request_response_protocols: vec![GenericRequestHandler::<ExampleRequest>::create(
|_, _| async { None },
)],
..Config::default()
};

let (node_2, mut node_runner_2) = construct(config_2).unwrap();

tokio::spawn(async move {
node_runner_2.run().await;
});

// Make request to previously unknown peer
node_2
.send_generic_request(node_1.id(), vec![node_1_addr], ExampleRequest)
.await
.unwrap();

// Subsequent requests should succeed without explicit addresses through already established
// connection
node_2
.send_generic_request(node_1.id(), Vec::new(), ExampleRequest)
.await
.unwrap();
}
62 changes: 53 additions & 9 deletions crates/subspace-networking/src/node_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@ use crate::constructor;
use crate::constructor::temporary_bans::TemporaryBans;
use crate::constructor::LocalOnlyRecordStore;
use crate::protocols::request_response::request_response_factory::{
Event as RequestResponseEvent, IfDisconnected,
Event as RequestResponseEvent, IfDisconnected, OutboundFailure, RequestFailure,
};
use crate::shared::{Command, CreatedSubscription, PeerDiscovered, Shared};
use crate::utils::{is_global_address_or_dns, strip_peer_id, SubspaceMetrics};
use async_mutex::Mutex as AsyncMutex;
use bytes::Bytes;
use event_listener_primitives::HandlerId;
use futures::channel::mpsc;
use futures::channel::{mpsc, oneshot};
use futures::future::Fuse;
use futures::{FutureExt, StreamExt};
use libp2p::autonat::{Event as AutonatEvent, NatStatus, OutboundProbeEvent};
Expand Down Expand Up @@ -81,6 +81,13 @@ enum BootstrapCommandState {
Finished,
}

#[derive(Debug)]
struct PendingGenericRequest {
protocol_name: &'static str,
request: Vec<u8>,
result_sender: oneshot::Sender<Result<Vec<u8>, RequestFailure>>,
}

/// Runner for the Node.
#[must_use = "Node does not function properly unless its runner is driven forward"]
pub struct NodeRunner<LocalRecordProvider>
Expand Down Expand Up @@ -126,6 +133,7 @@ where
bootstrap_command_state: Arc<AsyncMutex<BootstrapCommandState>>,
/// Receives an event on peer address removal from the persistent storage.
removed_addresses_rx: mpsc::UnboundedReceiver<PeerAddressRemovedEvent>,
requests_pending_connections: HashMap<PeerId, Vec<PendingGenericRequest>>,
/// Optional storage for the [`HandlerId`] of the address removal task.
/// We keep to stop the task along with the rest of the networking.
_address_removal_task_handler_id: Option<HandlerId>,
Expand Down Expand Up @@ -220,6 +228,7 @@ where
bootstrap_addresses,
bootstrap_command_state: Arc::new(AsyncMutex::new(BootstrapCommandState::default())),
removed_addresses_rx,
requests_pending_connections: HashMap::new(),
_address_removal_task_handler_id: address_removal_task_handler_id,
}
}
Expand Down Expand Up @@ -484,6 +493,18 @@ where
num_established,
..
} => {
if let Some(generic_requests) = self.requests_pending_connections.remove(&peer_id) {
let request_response = &mut self.swarm.behaviour_mut().request_response;
for request in generic_requests {
request_response.send_request(
&peer_id,
request.protocol_name,
request.request,
request.result_sender,
IfDisconnected::ImmediateError,
);
}
}
// Save known addresses that were successfully dialed.
if let ConnectedPoint::Dialer { address, .. } = &endpoint {
// filter non-global addresses when non-globals addresses are disabled
Expand Down Expand Up @@ -590,6 +611,19 @@ where
};
}
SwarmEvent::OutgoingConnectionError { peer_id, error, .. } => {
if let Some(peer_id) = &peer_id {
if let Some(generic_requests) =
self.requests_pending_connections.remove(peer_id)
{
for request in generic_requests {
// Do not care if receiver is gone
let _: Result<(), _> = request
.result_sender
.send(Err(RequestFailure::Network(OutboundFailure::DialFailure)));
}
}
}

if let Some(peer_id) = &peer_id {
let should_ban_temporarily =
self.should_temporary_ban_on_dial_error(peer_id, &error);
Expand Down Expand Up @@ -1435,12 +1469,21 @@ where
request,
result_sender,
} => {
// TODO: Ideally it'd be much simpler with https://github.com/libp2p/rust-libp2p/issues/5634
if !addresses.is_empty()
&& !self
.swarm
.connected_peers()
.any(|candidate| candidate == &peer_id)
{
self.requests_pending_connections
.entry(peer_id)
.or_default()
.push(PendingGenericRequest {
protocol_name,
request,
result_sender,
});
if let Err(error) = self.swarm.dial(
DialOpts::peer_id(peer_id)
.addresses(addresses)
Expand All @@ -1449,14 +1492,15 @@ where
) {
warn!(%error, "Failed to dial disconnected peer on generic request");
}
} else {
self.swarm.behaviour_mut().request_response.send_request(
&peer_id,
protocol_name,
request,
result_sender,
IfDisconnected::TryConnect,
);
}
self.swarm.behaviour_mut().request_response.send_request(
&peer_id,
protocol_name,
request,
result_sender,
IfDisconnected::TryConnect,
);
}
Command::GetProviders {
key,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use tokio::time::sleep;

const DIALING_INTERVAL_IN_SECS: Duration = Duration::from_secs(1);

#[tokio::test()]
#[tokio::test]
async fn test_connection_breaks_after_timeout_without_reservation() {
let connection_timeout = Duration::from_millis(300);
let long_delay = Duration::from_millis(1000);
Expand Down Expand Up @@ -63,7 +63,7 @@ async fn test_connection_breaks_after_timeout_without_reservation() {
assert!(!peer2.is_connected(peer1.local_peer_id()));
}

#[tokio::test()]
#[tokio::test]
async fn test_connection_reservation() {
let connection_timeout = Duration::from_millis(300);
let long_delay = Duration::from_millis(1000);
Expand Down Expand Up @@ -111,7 +111,7 @@ async fn test_connection_reservation() {
assert!(peer2.is_connected(peer1.local_peer_id()));
}

#[tokio::test()]
#[tokio::test]
async fn test_connection_reservation_symmetry() {
let connection_timeout = Duration::from_millis(300);
let long_delay = Duration::from_millis(1000);
Expand Down Expand Up @@ -157,7 +157,7 @@ async fn test_connection_reservation_symmetry() {
assert!(!peer2.is_connected(peer1.local_peer_id()));
}

#[tokio::test()]
#[tokio::test]
async fn test_reserved_peers_dial_event() {
let connection_timeout = Duration::from_millis(1300);
let long_delay = Duration::from_millis(2000);
Expand Down
Loading