From d1a4811f3f0027e4823d305f5b06e53740260845 Mon Sep 17 00:00:00 2001 From: Mykhailo Kremniov Date: Fri, 12 Jan 2024 23:18:38 +0200 Subject: [PATCH] p2p ban tests were rewritten --- p2p/p2p-test-utils/src/lib.rs | 18 +- p2p/src/ban_config.rs | 2 +- p2p/src/peer_manager/mod.rs | 15 + p2p/src/peer_manager/peerdb/mod.rs | 15 + .../tests/addr_list_response_caching.rs | 49 +- p2p/src/peer_manager/tests/ban.rs | 756 ++++++++++++------ p2p/src/peer_manager/tests/connections.rs | 8 +- p2p/src/peer_manager/tests/eviction.rs | 73 +- p2p/src/peer_manager/tests/mod.rs | 64 +- p2p/src/peer_manager/tests/utils.rs | 150 +++- p2p/src/testing_utils.rs | 56 +- p2p/src/tests/helpers/test_node.rs | 19 +- 12 files changed, 872 insertions(+), 353 deletions(-) diff --git a/p2p/p2p-test-utils/src/lib.rs b/p2p/p2p-test-utils/src/lib.rs index ed2a08f7ce..e12b742ce8 100644 --- a/p2p/p2p-test-utils/src/lib.rs +++ b/p2p/p2p-test-utils/src/lib.rs @@ -142,7 +142,7 @@ impl P2pBasicTestTimeGetter { } /// A timeout for blocking calls. -pub const LONG_TIMEOUT: Duration = Duration::from_secs(600); +pub const LONG_TIMEOUT: Duration = Duration::from_secs(30); // FIXME /// A short timeout for events that shouldn't occur. pub const SHORT_TIMEOUT: Duration = Duration::from_millis(500); @@ -221,6 +221,7 @@ macro_rules! expect_no_recv { }; } +/// Wait until the specified value is received from the channel. pub async fn wait_for_recv(receiver: &mut mpsc::UnboundedReceiver, value: &T) { let wait_loop = async { loop { @@ -232,3 +233,18 @@ pub async fn wait_for_recv(receiver: &mut mpsc::UnboundedReceiver, val expect_future_val!(wait_loop); } + +/// Wait until the sender stops putting messages into the channel. +pub async fn wait_for_no_recv(receiver: &mut mpsc::UnboundedReceiver) { + let wait_loop = async { + loop { + let wait_result = + tokio::time::timeout(Duration::from_millis(100), receiver.recv()).await; + if wait_result.is_err() { + break; + } + } + }; + + expect_future_val!(wait_loop); +} diff --git a/p2p/src/ban_config.rs b/p2p/src/ban_config.rs index 300c6979f8..f952f078e3 100644 --- a/p2p/src/ban_config.rs +++ b/p2p/src/ban_config.rs @@ -38,7 +38,7 @@ make_config_setting!( ); /// Settings related to banning and discouragement. -#[derive(Default, Debug)] +#[derive(Default, Debug, Clone)] pub struct BanConfig { /// The score threshold after which a peer is banned. pub ban_threshold: BanThreshold, diff --git a/p2p/src/peer_manager/mod.rs b/p2p/src/peer_manager/mod.rs index 4522708676..68d9691d8e 100644 --- a/p2p/src/peer_manager/mod.rs +++ b/p2p/src/peer_manager/mod.rs @@ -1291,6 +1291,8 @@ where peer.announced_addresses.insert(&address, &mut make_pseudo_rng()); + // FIXME: check for ban etc here + self.peerdb.peer_discovered(address); let peer_ids = self @@ -1902,6 +1904,9 @@ pub trait Observer { pub trait PeerManagerQueryInterface { #[cfg(test)] fn peers(&self) -> &BTreeMap; + + #[cfg(test)] + fn peer_db(&self) -> &dyn peerdb::PeerDbQueryInterface; } impl PeerManagerQueryInterface for PeerManager @@ -1914,7 +1919,17 @@ where fn peers(&self) -> &BTreeMap { &self.peers } + + #[cfg(test)] + fn peer_db(&self) -> &dyn peerdb::PeerDbQueryInterface { + &self.peerdb + } } #[cfg(test)] mod tests; + +#[cfg(test)] +pub mod test_utils { + pub use super::tests::utils::*; +} diff --git a/p2p/src/peer_manager/peerdb/mod.rs b/p2p/src/peer_manager/peerdb/mod.rs index ba5f73b501..585e71d22b 100644 --- a/p2p/src/peer_manager/peerdb/mod.rs +++ b/p2p/src/peer_manager/peerdb/mod.rs @@ -635,6 +635,21 @@ impl PeerDb { } } +pub trait PeerDbQueryInterface { + fn is_address_banned(&self, address: &BannableAddress) -> bool; + fn is_address_discouraged(&self, address: &BannableAddress) -> bool; +} + +impl PeerDbQueryInterface for PeerDb { + fn is_address_banned(&self, address: &BannableAddress) -> bool { + self.is_address_banned(address) + } + + fn is_address_discouraged(&self, address: &BannableAddress) -> bool { + self.is_address_discouraged(address) + } +} + #[cfg(test)] mod tests; diff --git a/p2p/src/peer_manager/tests/addr_list_response_caching.rs b/p2p/src/peer_manager/tests/addr_list_response_caching.rs index 429bfde5fa..746a08accf 100644 --- a/p2p/src/peer_manager/tests/addr_list_response_caching.rs +++ b/p2p/src/peer_manager/tests/addr_list_response_caching.rs @@ -37,7 +37,7 @@ use crate::{ default_backend::{ transport::TcpTransportSocket, types::{Command, Message}, - ConnectivityHandle, DefaultNetworkingService, + DefaultNetworkingService, }, types::{PeerInfo, Role}, ConnectivityService, NetworkingService, @@ -45,24 +45,20 @@ use crate::{ peer_manager::{ addr_list_response_cache, peerdb::{ - address_tables::table::test_utils::make_non_colliding_addresses, - storage::PeerDbStorage, storage_impl::PeerDbStorageImpl, + address_tables::table::test_utils::make_non_colliding_addresses, storage::PeerDbStorage, }, PeerManager, }, protocol::ProtocolConfig, - testing_utils::{peerdb_inmemory_store, TestAddressMaker, TEST_PROTOCOL_VERSION}, + testing_utils::{TestAddressMaker, TEST_PROTOCOL_VERSION}, types::peer_id::PeerId, - PeerManagerEvent, }; +use super::make_standalone_peer_manager; + // Note: addr list requests are only handled for inbound peers, so we only test this variant. type TestNetworkingService = DefaultNetworkingService; -type TestPeerManager = PeerManager< - DefaultNetworkingService, - PeerDbStorageImpl, ->; #[tracing::instrument(skip(seed))] #[rstest] @@ -272,28 +268,19 @@ fn setup_peer_mgr( p2p_config: &Arc, time_getter: &P2pBasicTestTimeGetter, rng: &mut impl Rng, -) -> (TestPeerManager, UnboundedReceiver) { - let (cmd_sender, cmd_receiver) = tokio::sync::mpsc::unbounded_channel(); - let (_conn_event_sender, conn_event_receiver) = tokio::sync::mpsc::unbounded_channel(); - let (_peer_mgr_event_sender, peer_mgr_event_receiver) = - tokio::sync::mpsc::unbounded_channel::(); - let connectivity_handle = ConnectivityHandle::::new( - // Note: technically, we should pass the bind addresses here, but since we don't - // establish real connections, it doesn't really matter. - vec![], - cmd_sender, - conn_event_receiver, - ); - - let mut peer_mgr = PeerManager::::new( - Arc::clone(chain_config), - Arc::clone(p2p_config), - connectivity_handle, - peer_mgr_event_receiver, - time_getter.get_time_getter(), - peerdb_inmemory_store(), - ) - .unwrap(); +) -> ( + PeerManager, + UnboundedReceiver, +) { + let (mut peer_mgr, _conn_event_sender, _peer_mgr_event_sender, cmd_receiver, _) = + make_standalone_peer_manager( + Arc::clone(chain_config), + Arc::clone(p2p_config), + // Note: technically, we should pass the bind addresses here, but since we don't + // establish real connections, it doesn't really matter. + vec![], + time_getter.get_time_getter(), + ); let addresses_in_db = make_non_colliding_addresses( &[peer_mgr.peerdb.address_tables().new_addr_table()], diff --git a/p2p/src/peer_manager/tests/ban.rs b/p2p/src/peer_manager/tests/ban.rs index 0c65f7d0c1..9c74487cd8 100644 --- a/p2p/src/peer_manager/tests/ban.rs +++ b/p2p/src/peer_manager/tests/ban.rs @@ -13,240 +13,571 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::sync::Arc; +use std::{sync::Arc, time::Duration}; -use crate::{ - config::NodeType, - net::{ - default_backend::{types::Command, ConnectivityHandle}, - types::{services::Service, PeerInfo, PeerRole, Role}, - }, - peer_manager::{OutboundConnectType, PeerManager}, - testing_utils::{ - connect_and_accept_services, connect_services, get_connectivity_event, - peerdb_inmemory_store, test_p2p_config, TestAddressMaker, TestTransportChannel, - TestTransportMaker, TestTransportNoise, TestTransportTcp, TEST_PROTOCOL_VERSION, - }, - types::peer_id::PeerId, - utils::oneshot_nofail, - PeerManagerEvent, -}; -use common::{chain::config, primitives::user_agent::mintlayer_core_user_agent}; -use p2p_test_utils::P2pBasicTestTimeGetter; +use rstest::rstest; use crate::{ - error::{P2pError, PeerError}, + ban_config::BanConfig, + config::P2pConfig, + message::{AddrListRequest, AddrListResponse, AnnounceAddrRequest, PeerManagerMessage}, net::{ self, default_backend::{ transport::{MpscChannelTransport, NoiseTcpTransport, TcpTransportSocket}, + types::{Command, Message}, DefaultNetworkingService, }, + types::{services::Service, ConnectivityEvent, PeerRole}, ConnectivityService, NetworkingService, }, - peer_manager::tests::make_peer_manager, + peer_manager::{ + peerdb::test_utils::make_non_colliding_addresses_for_peer_db_in_distinct_addr_groups, + tests::{ + make_peer_manager, make_standalone_peer_manager, + utils::{ + adjust_peer_score, ban_peer_manually, expect_cmd_connect_to, + inbound_block_relay_peer_accepted_by_backend, + inbound_full_relay_peer_accepted_by_backend, + outbound_block_relay_peer_accepted_by_backend, query_peer_manager, + wait_for_heartbeat, + }, + }, + MAX_ADDR_RATE_PER_SECOND, + }, + testing_utils::{ + connect_and_accept_services, get_connectivity_event, test_p2p_config, + test_p2p_config_with_ban_config, test_peer_mgr_config_with_no_auto_outbound_connections, + TestAddressMaker, TestTransportChannel, TestTransportMaker, TestTransportNoise, + TestTransportTcp, TEST_PROTOCOL_VERSION, + }, + types::peer_id::PeerId, }; +use common::{chain::config, primitives::user_agent::mintlayer_core_user_agent}; +use p2p_test_utils::{expect_no_recv, expect_recv, wait_for_no_recv, P2pBasicTestTimeGetter}; +use test_utils::random::{make_seedable_rng, Seed}; -// ban peer whose connected to us -async fn ban_connected_peer() -where - A: TestTransportMaker, - T: NetworkingService + 'static + std::fmt::Debug, - T::ConnectivityHandle: ConnectivityService, -{ - let addr1 = A::make_address(); - let addr2 = A::make_address(); +// FIXME: prolong discouragement + +#[tracing::instrument(skip(seed))] +#[rstest] +#[trace] +#[case(Seed::from_entropy())] +#[tokio::test] +async fn auto_ban_connected_peer(#[case] seed: Seed) { + let mut rng = make_seedable_rng(seed); + + let chain_config = Arc::new(config::create_unit_test_config()); + let ban_config = BanConfig { + ban_threshold: 100.into(), + ban_duration: Duration::from_secs(60 * 60).into(), + discouragement_threshold: 1000.into(), + discouragement_duration: Duration::from_secs(60 * 60 * 10).into(), + }; + let p2p_config = Arc::new(test_p2p_config_with_ban_config(ban_config.clone())); + + let time_getter = P2pBasicTestTimeGetter::new(); + let bind_addr = TestTransportTcp::make_address(); + + let ( + peer_mgr, + conn_event_sender, + peer_mgr_event_sender, + mut cmd_receiver, + mut peer_mgr_notification_receiver, + ) = make_standalone_peer_manager( + Arc::clone(&chain_config), + Arc::clone(&p2p_config), + vec![bind_addr], + time_getter.get_time_getter(), + ); - let config = Arc::new(config::create_mainnet()); - let (mut pm1, _shutdown_sender, _subscribers_sender) = - make_peer_manager::(A::make_transport(), addr1, Arc::clone(&config)).await; - let (mut pm2, _shutdown_sender, _subscribers_sender) = - make_peer_manager::(A::make_transport(), addr2, config).await; + let peer_mgr_join_handle = logging::spawn_in_current_span(async move { + let mut peer_mgr = peer_mgr; + let _ = peer_mgr.run_internal(None).await; + peer_mgr + }); + + let peer_addr = TestAddressMaker::new_random_address_with_rng(&mut rng); + let peer_id = inbound_block_relay_peer_accepted_by_backend( + &conn_event_sender, + peer_addr, + bind_addr, + &chain_config, + ); - let (address, peer_info, _) = connect_services::( - &mut pm1.peer_connectivity_handle, - &mut pm2.peer_connectivity_handle, + let cmd = expect_recv!(cmd_receiver); + assert_eq!(cmd, Command::Accept { peer_id }); + + adjust_peer_score( + &peer_mgr_event_sender, + peer_id, + *ban_config.ban_threshold / 2, ) .await; - let peer_id = peer_info.peer_id; - pm2.accept_connection( - address, - pm2.peer_connectivity_handle.local_addresses()[0], - Role::Inbound, - peer_info, - None, - ); - pm2.adjust_peer_score(peer_id, 1000); - let addr1 = pm1.peer_connectivity_handle.local_addresses()[0].clone().as_bannable(); - assert!(pm2.peerdb.is_address_banned(&addr1)); - let event = get_connectivity_event::(&mut pm2.peer_connectivity_handle).await; - assert!(std::matches!( - event, - Ok(net::types::ConnectivityEvent::ConnectionClosed { .. }) - )); -} + let is_banned = query_peer_manager(&peer_mgr_event_sender, move |peer_mgr| { + peer_mgr.peer_db().is_address_banned(&peer_addr.as_bannable()) + }) + .await; + assert!(!is_banned); -#[tracing::instrument] -#[tokio::test] -async fn ban_connected_peer_tcp() { - ban_connected_peer::>().await; -} + expect_no_recv!(cmd_receiver); -#[tracing::instrument] -#[tokio::test] -async fn ban_connected_peer_channels() { - ban_connected_peer::>() - .await; -} + adjust_peer_score( + &peer_mgr_event_sender, + peer_id, + *ban_config.ban_threshold / 2, + ) + .await; -#[tracing::instrument] -#[tokio::test] -async fn ban_connected_peer_noise() { - ban_connected_peer::>().await; -} + let is_banned = query_peer_manager(&peer_mgr_event_sender, move |peer_mgr| { + peer_mgr.peer_db().is_address_banned(&peer_addr.as_bannable()) + }) + .await; + assert!(is_banned); -async fn banned_peer_attempts_to_connect() -where - A: TestTransportMaker, - T: NetworkingService + std::fmt::Debug + 'static, - T::ConnectivityHandle: ConnectivityService, -{ - let addr1 = A::make_address(); - let addr2 = A::make_address(); + let cmd = expect_recv!(cmd_receiver); + assert_eq!(cmd, Command::Disconnect { peer_id }); - let config = Arc::new(config::create_mainnet()); - let (mut pm1, _shutdown_sender, _subscribers_sender) = - make_peer_manager::(A::make_transport(), addr1, Arc::clone(&config)).await; - let (mut pm2, _shutdown_sender, _subscribers_sender) = - make_peer_manager::(A::make_transport(), addr2, config).await; + wait_for_no_recv(&mut peer_mgr_notification_receiver).await; - let (address, peer_info, _) = connect_services::( - &mut pm1.peer_connectivity_handle, - &mut pm2.peer_connectivity_handle, - ) + time_getter.advance_time(*ban_config.ban_duration); + + wait_for_heartbeat(&mut peer_mgr_notification_receiver).await; + + let is_banned = query_peer_manager(&peer_mgr_event_sender, move |peer_mgr| { + peer_mgr.peer_db().is_address_banned(&peer_addr.as_bannable()) + }) .await; - let peer_id = peer_info.peer_id; - pm2.accept_connection( - address, - pm2.peer_connectivity_handle.local_addresses()[0], - Role::Inbound, - peer_info, - None, - ); + assert!(!is_banned); - pm2.adjust_peer_score(peer_id, 1000); - let addr1 = pm1.peer_connectivity_handle.local_addresses()[0].clone().as_bannable(); - assert!(pm2.peerdb.is_address_banned(&addr1)); - let event = get_connectivity_event::(&mut pm2.peer_connectivity_handle).await; - assert!(std::matches!( - event, - Ok(net::types::ConnectivityEvent::ConnectionClosed { .. }) - )); -} + drop(conn_event_sender); + drop(peer_mgr_event_sender); -#[tracing::instrument] -#[tokio::test] -async fn banned_peer_attempts_to_connect_tcp() { - banned_peer_attempts_to_connect::>().await; + let _peer_mgr = peer_mgr_join_handle.await.unwrap(); } -#[tracing::instrument] +// Check that a peer is disconnected when it's banned manually. +// Also check that it's unbanned once the ban duration expires. +#[tracing::instrument(skip(seed))] +#[rstest] +#[trace] +#[case(Seed::from_entropy())] #[tokio::test] -async fn banned_peer_attempts_to_connect_channel() { - banned_peer_attempts_to_connect::< - TestTransportChannel, - DefaultNetworkingService, - >() +async fn disconnect_manually_banned_peer(#[case] seed: Seed) { + let mut rng = make_seedable_rng(seed); + + let chain_config = Arc::new(config::create_unit_test_config()); + let ban_config = BanConfig { + ban_threshold: Default::default(), + ban_duration: Duration::from_secs(60 * 60).into(), + discouragement_threshold: Default::default(), + discouragement_duration: Duration::from_secs(60 * 60 * 10).into(), + }; + let p2p_config = Arc::new(test_p2p_config_with_ban_config(ban_config.clone())); + + let time_getter = P2pBasicTestTimeGetter::new(); + let bind_addr = TestTransportTcp::make_address(); + + let ( + peer_mgr, + conn_event_sender, + peer_mgr_event_sender, + mut cmd_receiver, + mut peer_mgr_notification_receiver, + ) = make_standalone_peer_manager( + Arc::clone(&chain_config), + Arc::clone(&p2p_config), + vec![bind_addr], + time_getter.get_time_getter(), + ); + + let peer_mgr_join_handle = logging::spawn_in_current_span(async move { + let mut peer_mgr = peer_mgr; + let _ = peer_mgr.run_internal(None).await; + peer_mgr + }); + + let peer_addr = TestAddressMaker::new_random_address_with_rng(&mut rng); + let peer_id = inbound_block_relay_peer_accepted_by_backend( + &conn_event_sender, + peer_addr, + bind_addr, + &chain_config, + ); + + let cmd = expect_recv!(cmd_receiver); + assert_eq!(cmd, Command::Accept { peer_id }); + + let is_banned = query_peer_manager(&peer_mgr_event_sender, move |peer_mgr| { + peer_mgr.peer_db().is_address_banned(&peer_addr.as_bannable()) + }) .await; -} + assert!(!is_banned); -#[tracing::instrument] -#[tokio::test] -async fn banned_peer_attempts_to_connect_noise() { - banned_peer_attempts_to_connect::< - TestTransportNoise, - DefaultNetworkingService, - >() + ban_peer_manually(&peer_mgr_event_sender, peer_addr.as_bannable()).await; + + let is_banned = query_peer_manager(&peer_mgr_event_sender, move |peer_mgr| { + peer_mgr.peer_db().is_address_banned(&peer_addr.as_bannable()) + }) .await; -} + assert!(is_banned); -// attempt to connect to banned peer -async fn connect_to_banned_peer() -where - A: TestTransportMaker, - T: NetworkingService + 'static + std::fmt::Debug, - T::ConnectivityHandle: ConnectivityService, -{ - let addr1 = A::make_address(); - let addr2 = A::make_address(); + let cmd = expect_recv!(cmd_receiver); + assert_eq!(cmd, Command::Disconnect { peer_id }); - let config = Arc::new(config::create_mainnet()); - let (mut pm1, _shutdown_sender, _subscribers_sender) = - make_peer_manager::(A::make_transport(), addr1, Arc::clone(&config)).await; - let (mut pm2, _shutdown_sender, _subscribers_sender) = - make_peer_manager::(A::make_transport(), addr2, config).await; + wait_for_no_recv(&mut peer_mgr_notification_receiver).await; - let (address, peer_info1, _peer_info2) = connect_services::( - &mut pm1.peer_connectivity_handle, - &mut pm2.peer_connectivity_handle, - ) + time_getter.advance_time(*ban_config.ban_duration); + + wait_for_heartbeat(&mut peer_mgr_notification_receiver).await; + + let is_banned = query_peer_manager(&peer_mgr_event_sender, move |peer_mgr| { + peer_mgr.peer_db().is_address_banned(&peer_addr.as_bannable()) + }) .await; - let peer_id = peer_info1.peer_id; - pm2.accept_connection( - address, - pm2.peer_connectivity_handle.local_addresses()[0], - Role::Inbound, - peer_info1, - None, + assert!(!is_banned); + + drop(conn_event_sender); + drop(peer_mgr_event_sender); + + let _peer_mgr = peer_mgr_join_handle.await.unwrap(); +} + +// Check that an incoming connection from a banned peer is rejected. +#[tracing::instrument(skip(seed))] +#[rstest] +#[trace] +#[case(Seed::from_entropy())] +#[tokio::test] +async fn reject_incoming_connection_from_banned_peer(#[case] seed: Seed) { + let mut rng = make_seedable_rng(seed); + + let chain_config = Arc::new(config::create_unit_test_config()); + let p2p_config = Arc::new(test_p2p_config()); + + let time_getter = P2pBasicTestTimeGetter::new(); + let bind_addr = TestTransportTcp::make_address(); + + let (mut peer_mgr, conn_event_sender, peer_mgr_event_sender, mut cmd_receiver, _) = + make_standalone_peer_manager( + Arc::clone(&chain_config), + Arc::clone(&p2p_config), + vec![bind_addr], + time_getter.get_time_getter(), + ); + + let peer_addrs = make_non_colliding_addresses_for_peer_db_in_distinct_addr_groups( + &peer_mgr.peerdb, + 2, + &mut rng, ); - let remote_addr = pm1.peer_connectivity_handle.local_addresses()[0]; + peer_mgr.ban(peer_addrs[0].as_bannable()); - pm2.adjust_peer_score(peer_id, 10); - assert!(!pm2.peerdb.is_address_banned(&remote_addr.as_bannable())); + let peer_mgr_join_handle = logging::spawn_in_current_span(async move { + let mut peer_mgr = peer_mgr; + let _ = peer_mgr.run_internal(None).await; + peer_mgr + }); - pm2.adjust_peer_score(peer_id, 90); - assert!(pm2.peerdb.is_address_banned(&remote_addr.as_bannable())); + // Connection from the banned peer is rejected. + let peer1_id = inbound_block_relay_peer_accepted_by_backend( + &conn_event_sender, + peer_addrs[0], + bind_addr, + &chain_config, + ); + let cmd = expect_recv!(cmd_receiver); + assert_eq!(cmd, Command::Disconnect { peer_id: peer1_id }); + + // Connection from the normal peer is accepted. + let peer2_id = inbound_block_relay_peer_accepted_by_backend( + &conn_event_sender, + peer_addrs[1], + bind_addr, + &chain_config, + ); + let cmd = expect_recv!(cmd_receiver); + assert_eq!(cmd, Command::Accept { peer_id: peer2_id }); - let event = get_connectivity_event::(&mut pm2.peer_connectivity_handle).await; - match &event { - Ok(net::types::ConnectivityEvent::ConnectionClosed { .. }) => {} - _ => panic!("unexpected event: {event:?}"), - } - pm2.handle_connectivity_event(event.unwrap()); - - let (response_sender, response_receiver) = oneshot_nofail::channel(); - pm2.connect(remote_addr, OutboundConnectType::Manual { response_sender }); - let res = response_receiver.await.unwrap(); - match res { - Err(P2pError::PeerError(PeerError::BannedAddress(_))) => {} - _ => panic!("unexpected result: {res:?}"), - } + drop(conn_event_sender); + drop(peer_mgr_event_sender); + + let _peer_mgr = peer_mgr_join_handle.await.unwrap(); } -#[tracing::instrument] +// Check that outgoing connections to banned peers are not established. +#[tracing::instrument(skip(seed))] +#[rstest] +#[trace] +#[case(Seed::from_entropy())] #[tokio::test] -async fn connect_to_banned_peer_tcp() { - connect_to_banned_peer::>() - .await; +async fn no_outgoing_connection_to_banned_peer(#[case] seed: Seed) { + let mut rng = make_seedable_rng(seed); + + let chain_config = Arc::new(config::create_unit_test_config()); + let p2p_config = Arc::new(test_p2p_config()); + + let time_getter = P2pBasicTestTimeGetter::new(); + let bind_addr = TestTransportTcp::make_address(); + + let (mut peer_mgr, conn_event_sender, peer_mgr_event_sender, mut cmd_receiver, _) = + make_standalone_peer_manager( + Arc::clone(&chain_config), + Arc::clone(&p2p_config), + vec![bind_addr], + time_getter.get_time_getter(), + ); + + let peer_addrs = make_non_colliding_addresses_for_peer_db_in_distinct_addr_groups( + &peer_mgr.peerdb, + 2, + &mut rng, + ); + + peer_mgr.ban(peer_addrs[0].as_bannable()); + + let peer_mgr_join_handle = logging::spawn_in_current_span(async move { + let mut peer_mgr = peer_mgr; + let _ = peer_mgr.run_internal(None).await; + peer_mgr + }); + + // Connection to the normal peer is established. + let cmd = expect_recv!(cmd_receiver); + expect_cmd_connect_to(&cmd, &peer_addrs[1]); + outbound_block_relay_peer_accepted_by_backend( + &conn_event_sender, + peer_addrs[1], + bind_addr, + &chain_config, + ); + + // No other connection attempts are made. + expect_no_recv!(cmd_receiver); + + drop(conn_event_sender); + drop(peer_mgr_event_sender); + + let _peer_mgr = peer_mgr_join_handle.await.unwrap(); } -#[tracing::instrument] +// Check that address announcements don't include banned addresses. +#[tracing::instrument(skip(seed))] +#[rstest] +#[trace] +#[case(Seed::from_entropy())] #[tokio::test] -async fn connect_to_banned_peer_channels() { - connect_to_banned_peer::>( - ) - .await; +async fn banned_address_is_not_announced(#[case] seed: Seed) { + let mut rng = make_seedable_rng(seed); + + let chain_config = Arc::new(config::create_unit_test_config()); + let p2p_config = Arc::new(P2pConfig { + allow_discover_private_ips: true.into(), + + bind_addresses: Default::default(), + socks5_proxy: Default::default(), + disable_noise: Default::default(), + boot_nodes: Default::default(), + reserved_nodes: Default::default(), + whitelisted_addresses: Default::default(), + ban_config: Default::default(), + outbound_connection_timeout: Default::default(), + ping_check_period: Default::default(), + ping_timeout: Default::default(), + peer_handshake_timeout: Default::default(), + max_clock_diff: Default::default(), + node_type: Default::default(), + user_agent: mintlayer_core_user_agent(), + sync_stalling_timeout: Default::default(), + peer_manager_config: Default::default(), + protocol_config: Default::default(), + }); + + let time_getter = P2pBasicTestTimeGetter::new(); + let bind_addr = TestTransportTcp::make_address(); + + let (mut peer_mgr, conn_event_sender, peer_mgr_event_sender, mut cmd_receiver, _) = + make_standalone_peer_manager( + Arc::clone(&chain_config), + Arc::clone(&p2p_config), + vec![bind_addr], + time_getter.get_time_getter(), + ); + + let addrs = make_non_colliding_addresses_for_peer_db_in_distinct_addr_groups( + &peer_mgr.peerdb, + 4, + &mut rng, + ); + let [banned_addr, normal_addr, peer1_addr, peer2_addr]: [_; 4] = addrs.try_into().unwrap(); + + peer_mgr.ban(banned_addr.as_bannable()); + + let peer_mgr_join_handle = logging::spawn_in_current_span(async move { + let mut peer_mgr = peer_mgr; + let _ = peer_mgr.run_internal(None).await; + peer_mgr + }); + + let peer1_id = inbound_full_relay_peer_accepted_by_backend( + &conn_event_sender, + peer1_addr, + bind_addr, + &chain_config, + ); + let cmd = expect_recv!(cmd_receiver); + assert_eq!(cmd, Command::Accept { peer_id: peer1_id }); + + let peer2_id = inbound_full_relay_peer_accepted_by_backend( + &conn_event_sender, + peer2_addr, + bind_addr, + &chain_config, + ); + let cmd = expect_recv!(cmd_receiver); + assert_eq!(cmd, Command::Accept { peer_id: peer2_id }); + + conn_event_sender + .send(ConnectivityEvent::Message { + peer_id: peer1_id, + message: PeerManagerMessage::AnnounceAddrRequest(AnnounceAddrRequest { + address: banned_addr.as_peer_address(), + }), + }) + .unwrap(); + + time_getter.advance_time(Duration::from_secs((1.0 / MAX_ADDR_RATE_PER_SECOND) as u64)); + + conn_event_sender + .send(ConnectivityEvent::Message { + peer_id: peer1_id, + message: PeerManagerMessage::AnnounceAddrRequest(AnnounceAddrRequest { + address: normal_addr.as_peer_address(), + }), + }) + .unwrap(); + + let cmd = expect_recv!(cmd_receiver); + assert_eq!( + cmd, + Command::SendMessage { + peer_id: peer2_id, + message: Message::AnnounceAddrRequest(AnnounceAddrRequest { + address: normal_addr.as_peer_address(), + }) + } + ); + + // No other connection attempts are made. + expect_no_recv!(cmd_receiver); + + drop(conn_event_sender); + drop(peer_mgr_event_sender); + + let _peer_mgr = peer_mgr_join_handle.await.unwrap(); } -#[tracing::instrument] +// Check that address responses don't include banned addresses. +#[tracing::instrument(skip(seed))] +#[rstest] +#[trace] +#[case(Seed::from_entropy())] #[tokio::test] -async fn connect_to_banned_peer_noise() { - connect_to_banned_peer::>() - .await; +async fn banned_address_not_in_addr_response(#[case] seed: Seed) { + let mut rng = make_seedable_rng(seed); + + let chain_config = Arc::new(config::create_unit_test_config()); + let p2p_config = Arc::new(P2pConfig { + allow_discover_private_ips: true.into(), + peer_manager_config: test_peer_mgr_config_with_no_auto_outbound_connections(), + + bind_addresses: Default::default(), + socks5_proxy: Default::default(), + disable_noise: Default::default(), + boot_nodes: Default::default(), + reserved_nodes: Default::default(), + whitelisted_addresses: Default::default(), + ban_config: Default::default(), + outbound_connection_timeout: Default::default(), + ping_check_period: Default::default(), + ping_timeout: Default::default(), + peer_handshake_timeout: Default::default(), + max_clock_diff: Default::default(), + node_type: Default::default(), + user_agent: mintlayer_core_user_agent(), + sync_stalling_timeout: Default::default(), + protocol_config: Default::default(), + }); + + let time_getter = P2pBasicTestTimeGetter::new(); + let bind_addr = TestTransportTcp::make_address(); + + let (mut peer_mgr, conn_event_sender, peer_mgr_event_sender, mut cmd_receiver, _) = + make_standalone_peer_manager( + Arc::clone(&chain_config), + Arc::clone(&p2p_config), + vec![bind_addr], + time_getter.get_time_getter(), + ); + + let addrs = make_non_colliding_addresses_for_peer_db_in_distinct_addr_groups( + &peer_mgr.peerdb, + 3, + &mut rng, + ); + let [banned_addr, normal_addr, peer_addr]: [_; 3] = addrs.try_into().unwrap(); + + peer_mgr.peerdb.peer_discovered(banned_addr); + peer_mgr.peerdb.peer_discovered(normal_addr); + + peer_mgr.ban(banned_addr.as_bannable()); + + let peer_mgr_join_handle = logging::spawn_in_current_span(async move { + let mut peer_mgr = peer_mgr; + let _ = peer_mgr.run_internal(None).await; + peer_mgr + }); + + let peer_id = inbound_full_relay_peer_accepted_by_backend( + &conn_event_sender, + peer_addr, + bind_addr, + &chain_config, + ); + let cmd = expect_recv!(cmd_receiver); + assert_eq!(cmd, Command::Accept { peer_id }); + + conn_event_sender + .send(ConnectivityEvent::Message { + peer_id, + message: PeerManagerMessage::AddrListRequest(AddrListRequest {}), + }) + .unwrap(); + + let cmd = expect_recv!(cmd_receiver); + assert_eq!( + cmd, + Command::SendMessage { + peer_id, + message: Message::AddrListResponse(AddrListResponse { + addresses: vec![normal_addr.as_peer_address()], + }) + } + ); + + // No other connection attempts are made. + expect_no_recv!(cmd_receiver); + + drop(conn_event_sender); + drop(peer_mgr_event_sender); + + let _peer_mgr = peer_mgr_join_handle.await.unwrap(); } +// FIXME: move this test somewhere else. async fn validate_invalid_connection() where A: TestTransportMaker, @@ -307,6 +638,7 @@ async fn validate_invalid_connection_noise() { .await; } +// FIXME: and this one async fn inbound_connection_invalid_magic() where A: TestTransportMaker, @@ -376,65 +708,3 @@ async fn inbound_connection_invalid_magic_noise() { >() .await; } - -// Test that manually banned peers are also disconnected -#[tracing::instrument] -#[test] -fn ban_and_disconnect() { - type TestNetworkingService = DefaultNetworkingService; - - let bind_address = TestTransportTcp::make_address(); - let chain_config = Arc::new(config::create_mainnet()); - let p2p_config = Arc::new(test_p2p_config()); - let (cmd_sender, mut cmd_receiver) = tokio::sync::mpsc::unbounded_channel(); - let (_conn_sender, conn_receiver) = tokio::sync::mpsc::unbounded_channel(); - let (_peer_sender, peer_receiver) = tokio::sync::mpsc::unbounded_channel::(); - let time_getter = P2pBasicTestTimeGetter::new(); - let connectivity_handle = - ConnectivityHandle::::new(vec![], cmd_sender, conn_receiver); - - let mut pm = PeerManager::::new( - Arc::clone(&chain_config), - Arc::clone(&p2p_config), - connectivity_handle, - peer_receiver, - time_getter.get_time_getter(), - peerdb_inmemory_store(), - ) - .unwrap(); - - let peer_id_1 = PeerId::new(); - let address_1 = TestAddressMaker::new_random_address(); - let peer_info = PeerInfo { - peer_id: peer_id_1, - protocol_version: TEST_PROTOCOL_VERSION, - network: *chain_config.magic_bytes(), - software_version: *chain_config.software_version(), - user_agent: mintlayer_core_user_agent(), - common_services: NodeType::Full.into(), - }; - pm.accept_connection(address_1, bind_address, Role::Inbound, peer_info, None); - assert_eq!(pm.peers.len(), 1); - - // Peer is accepted by the peer manager - match cmd_receiver.try_recv() { - Ok(Command::Accept { peer_id }) if peer_id == peer_id_1 => {} - v => panic!("unexpected command: {v:?}"), - } - - let (ban_sender, mut ban_receiver) = oneshot_nofail::channel(); - pm.handle_control_event(PeerManagerEvent::Ban(address_1.as_bannable(), ban_sender)); - ban_receiver.try_recv().unwrap().unwrap(); - - // Peer is disconnected by the peer manager - match cmd_receiver.try_recv() { - Ok(Command::Disconnect { peer_id }) if peer_id == peer_id_1 => {} - v => panic!("unexpected command: {v:?}"), - } - - // No more messages - match cmd_receiver.try_recv() { - Err(_) => {} - v => panic!("unexpected command: {v:?}"), - } -} diff --git a/p2p/src/peer_manager/tests/connections.rs b/p2p/src/peer_manager/tests/connections.rs index cfeb1ec549..615ddc2552 100644 --- a/p2p/src/peer_manager/tests/connections.rs +++ b/p2p/src/peer_manager/tests/connections.rs @@ -47,7 +47,9 @@ use crate::{ peerdb::{self, config::PeerDbConfig}, tests::{ get_connected_peers, run_peer_manager, - utils::{expect_connect_cmd, make_full_relay_peer_info, recv_command_advance_time}, + utils::{ + expect_cmd_connect_to_one_of, make_full_relay_peer_info, recv_command_advance_time, + }, }, PeerManager, }, @@ -1464,7 +1466,7 @@ async fn feeler_connections_test_impl(seed: Seed) { log::debug!("Expecting normal outbound connection attempt"); let cmd = cmd_receiver.recv().await.unwrap(); - let outbound_peer_addr = expect_connect_cmd(&cmd, &mut addresses); + let outbound_peer_addr = expect_cmd_connect_to_one_of(&cmd, &mut addresses); let outbound_peer_id = PeerId::new(); conn_event_sender @@ -1511,7 +1513,7 @@ async fn feeler_connections_test_impl(seed: Seed) { recv_command_advance_time(&mut cmd_receiver, &time_getter, feeler_connections_interval) .await .unwrap(); - let addr = expect_connect_cmd(&cmd, &mut addresses); + let addr = expect_cmd_connect_to_one_of(&cmd, &mut addresses); let is_last_addr = addresses.is_empty(); let should_succeed = { let rand_bool = rng.gen_bool(0.5); diff --git a/p2p/src/peer_manager/tests/eviction.rs b/p2p/src/peer_manager/tests/eviction.rs index 2326f8fe7b..d953ddb8bf 100644 --- a/p2p/src/peer_manager/tests/eviction.rs +++ b/p2p/src/peer_manager/tests/eviction.rs @@ -18,29 +18,24 @@ use std::{collections::BTreeSet, sync::Arc, time::Duration}; use logging::log; use rstest::rstest; use test_utils::random::make_seedable_rng; -use tokio::sync::mpsc; use p2p_test_utils::{expect_no_recv, expect_recv, P2pBasicTestTimeGetter}; use test_utils::random::Seed; use crate::{ config::P2pConfig, - net::{ - default_backend::{types::Command, ConnectivityHandle}, - types::ConnectivityEvent, - }, + net::default_backend::types::Command, peer_manager::{ config::PeerManagerConfig, - dns_seed::DefaultDnsSeed, peerdb::{self, config::PeerDbConfig, salt::Salt}, peers_eviction, - tests::utils::{expect_connect_cmd, make_block_relay_peer_info}, - PeerManager, + tests::{ + make_standalone_peer_manager, + utils::{expect_cmd_connect_to_one_of, outbound_block_relay_peer_accepted_by_backend}, + }, }, sync::sync_status::PeerBlockSyncStatus, - testing_utils::{peerdb_inmemory_store, TestTransportMaker, TestTransportTcp}, - tests::helpers::PeerManagerObserver, - types::peer_id::PeerId, + testing_utils::{TestTransportMaker, TestTransportTcp}, }; use common::{ chain::{config, Block}, @@ -49,7 +44,6 @@ use common::{ }; use crate::{ - net::default_backend::{transport::TcpTransportSocket, DefaultNetworkingService}, peer_manager::{tests::utils::wait_for_heartbeat, HEARTBEAT_INTERVAL_MAX}, PeerManagerEvent, }; @@ -86,8 +80,6 @@ mod dont_evict_if_blocks_in_flight { )] test_case: TestCase, ) { - type TestNetworkingService = DefaultNetworkingService; - let mut rng = make_seedable_rng(seed); let chain_config = Arc::new(config::create_unit_test_config()); @@ -148,34 +140,20 @@ mod dont_evict_if_blocks_in_flight { }); let bind_address = TestTransportTcp::make_address(); - let (cmd_sender, mut cmd_receiver) = mpsc::unbounded_channel(); - let (conn_event_sender, conn_event_receiver) = mpsc::unbounded_channel(); - let (peer_mgr_event_sender, peer_mgr_event_receiver) = - mpsc::unbounded_channel::(); let time_getter = P2pBasicTestTimeGetter::new(); - let connectivity_handle = ConnectivityHandle::::new( - vec![], - cmd_sender, - conn_event_receiver, - ); - let (peer_mgr_notification_sender, mut peer_mgr_notification_receiver) = - mpsc::unbounded_channel(); - let peer_mgr_observer = Box::new(PeerManagerObserver::new(peer_mgr_notification_sender)); - let mut peer_mgr = PeerManager::::new_generic( + let ( + mut peer_mgr, + conn_event_sender, + peer_mgr_event_sender, + mut cmd_receiver, + mut peer_mgr_notification_receiver, + ) = make_standalone_peer_manager( Arc::clone(&chain_config), Arc::clone(&p2p_config), - connectivity_handle, - peer_mgr_event_receiver, + vec![bind_address], time_getter.get_time_getter(), - peerdb_inmemory_store(), - Some(peer_mgr_observer), - Box::new(DefaultDnsSeed::new( - Arc::clone(&chain_config), - Arc::clone(&p2p_config), - )), - ) - .unwrap(); + ); let addr_count = 3; let addresses = @@ -198,26 +176,23 @@ mod dont_evict_if_blocks_in_flight { log::debug!("Expecting outbound connection attempt #1"); let cmd = expect_recv!(cmd_receiver); - let peer1_addr = expect_connect_cmd(&cmd, &mut addresses); + let peer1_addr = expect_cmd_connect_to_one_of(&cmd, &mut addresses); log::debug!("Expecting outbound connection attempt #2"); let cmd = expect_recv!(cmd_receiver); - let peer2_addr = expect_connect_cmd(&cmd, &mut addresses); + let peer2_addr = expect_cmd_connect_to_one_of(&cmd, &mut addresses); log::debug!("Expecting outbound connection attempt #3"); let cmd = expect_recv!(cmd_receiver); - let peer3_addr = expect_connect_cmd(&cmd, &mut addresses); + let peer3_addr = expect_cmd_connect_to_one_of(&cmd, &mut addresses); for peer_addr in [peer1_addr, peer2_addr, peer3_addr] { - let peer_id = PeerId::new(); - conn_event_sender - .send(ConnectivityEvent::OutboundAccepted { - peer_address: peer_addr, - bind_address, - peer_info: make_block_relay_peer_info(peer_id, &chain_config), - node_address_as_seen_by_peer: None, - }) - .unwrap(); + let peer_id = outbound_block_relay_peer_accepted_by_backend( + &conn_event_sender, + peer_addr, + bind_address, + &chain_config, + ); log::debug!("Expecting Command::Accept"); let cmd = expect_recv!(cmd_receiver); diff --git a/p2p/src/peer_manager/tests/mod.rs b/p2p/src/peer_manager/tests/mod.rs index 8482961e16..5ef44f77ae 100644 --- a/p2p/src/peer_manager/tests/mod.rs +++ b/p2p/src/peer_manager/tests/mod.rs @@ -20,7 +20,7 @@ mod connections; mod eviction; mod peer_types; mod ping; -mod utils; +pub mod utils; mod whitelist; use std::{sync::Arc, time::Duration}; @@ -29,7 +29,7 @@ use p2p_test_utils::expect_recv; use tokio::sync::{mpsc, oneshot}; use ::utils::atomics::SeqCstAtomicBool; -use common::time_getter::TimeGetter; +use common::{chain::ChainConfig, time_getter::TimeGetter}; use crypto::random::Rng; use p2p_types::socket_address::SocketAddress; use test_utils::assert_matches_return_val; @@ -42,11 +42,16 @@ use crate::{ interface::types::ConnectedPeer, message::{PeerManagerMessage, PingRequest, PingResponse}, net::{ - default_backend::types::Command, types::ConnectivityEvent, ConnectivityService, - NetworkingService, + default_backend::{ + transport::TcpTransportSocket, types::Command, ConnectivityHandle, + DefaultNetworkingService, + }, + types::ConnectivityEvent, + ConnectivityService, NetworkingService, }, peer_manager::PeerManager, testing_utils::{peerdb_inmemory_store, test_p2p_config}, + tests::helpers::{PeerManagerNotification, PeerManagerObserver}, types::peer_id::PeerId, utils::oneshot_nofail, P2pConfig, P2pEventHandler, PeerManagerEvent, @@ -54,7 +59,7 @@ use crate::{ use self::utils::cmd_to_peer_man_msg; -use super::peerdb::storage::PeerDbStorage; +use super::{dns_seed::DefaultDnsSeed, peerdb::storage::PeerDbStorage}; async fn make_peer_manager_custom( transport: T::Transport, @@ -133,6 +138,55 @@ where (peer_manager, shutdown_sender, subscribers_sender) } +type TcpNetworkingService = DefaultNetworkingService; + +/// Create a peer manager without a backend. +pub fn make_standalone_peer_manager( + chain_config: Arc, + p2p_config: Arc, + bind_addresses: Vec, + time_getter: TimeGetter, +) -> ( + PeerManager, + mpsc::UnboundedSender, + mpsc::UnboundedSender, + mpsc::UnboundedReceiver, + mpsc::UnboundedReceiver, +) { + let (cmd_sender, cmd_receiver) = mpsc::unbounded_channel(); + let (conn_event_sender, conn_event_receiver) = mpsc::unbounded_channel(); + let (peer_mgr_event_sender, peer_mgr_event_receiver) = + mpsc::unbounded_channel::(); + let connectivity_handle = ConnectivityHandle::::new( + bind_addresses, + cmd_sender, + conn_event_receiver, + ); + let (peer_mgr_notification_sender, peer_mgr_notification_receiver) = mpsc::unbounded_channel(); + let peer_mgr_observer = Box::new(PeerManagerObserver::new(peer_mgr_notification_sender)); + let dns_seed = DefaultDnsSeed::new(Arc::clone(&chain_config), Arc::clone(&p2p_config)); + + let peer_mgr = PeerManager::::new_generic( + Arc::clone(&chain_config), + Arc::clone(&p2p_config), + connectivity_handle, + peer_mgr_event_receiver, + time_getter, + peerdb_inmemory_store(), + Some(peer_mgr_observer), + Box::new(dns_seed), + ) + .unwrap(); + + ( + peer_mgr, + conn_event_sender, + peer_mgr_event_sender, + cmd_receiver, + peer_mgr_notification_receiver, + ) +} + async fn run_peer_manager( transport: T::Transport, addr: SocketAddress, diff --git a/p2p/src/peer_manager/tests/utils.rs b/p2p/src/peer_manager/tests/utils.rs index 9ae00259d4..31e78d47c5 100644 --- a/p2p/src/peer_manager/tests/utils.rs +++ b/p2p/src/peer_manager/tests/utils.rs @@ -20,7 +20,9 @@ use logging::log; use tokio::sync::mpsc; use p2p_test_utils::{wait_for_recv, P2pBasicTestTimeGetter}; -use p2p_types::{services::Service, socket_address::SocketAddress, PeerId}; +use p2p_types::{ + bannable_address::BannableAddress, services::Service, socket_address::SocketAddress, PeerId, +}; use test_utils::assert_matches_return_val; use crate::{ @@ -28,10 +30,13 @@ use crate::{ message::PeerManagerMessage, net::{ default_backend::types::{CategorizedMessage, Command}, - types::PeerInfo, + types::{ConnectivityEvent, PeerInfo}, }, + peer_manager::PeerManagerQueryInterface, testing_utils::TEST_PROTOCOL_VERSION, tests::helpers::PeerManagerNotification, + utils::oneshot_nofail, + PeerManagerEvent, }; pub fn cmd_to_peer_man_msg(cmd: Command) -> (PeerId, PeerManagerMessage) { @@ -86,7 +91,10 @@ pub fn make_block_relay_peer_info(peer_id: PeerId, chain_config: &ChainConfig) - } } -pub fn expect_connect_cmd(cmd: &Command, addresses: &mut BTreeSet) -> SocketAddress { +pub fn expect_cmd_connect_to_one_of( + cmd: &Command, + addresses: &mut BTreeSet, +) -> SocketAddress { match cmd { Command::Connect { address, @@ -103,6 +111,89 @@ pub fn expect_connect_cmd(cmd: &Command, addresses: &mut BTreeSet } } +pub fn expect_cmd_connect_to(cmd: &Command, expected_address: &SocketAddress) { + match cmd { + Command::Connect { + address, + local_services_override: _, + } => { + assert_eq!(address, expected_address); + } + cmd => { + panic!("Unexpected command received: {cmd:?}"); + } + } +} + +/// Send a ConnectivityEvent simulating a connection being accepted by the backend. +pub fn outbound_block_relay_peer_accepted_by_backend( + conn_event_sender: &mpsc::UnboundedSender, + peer_address: SocketAddress, + bind_address: SocketAddress, + chain_config: &ChainConfig, +) -> PeerId { + let peer_id = PeerId::new(); + conn_event_sender + .send(ConnectivityEvent::OutboundAccepted { + peer_address, + bind_address, + peer_info: make_block_relay_peer_info(peer_id, &chain_config), + node_address_as_seen_by_peer: None, + }) + .unwrap(); + + peer_id +} + +/// Send a ConnectivityEvent simulating a connection being accepted by the backend. +pub fn inbound_block_relay_peer_accepted_by_backend( + conn_event_sender: &mpsc::UnboundedSender, + peer_address: SocketAddress, + bind_address: SocketAddress, + chain_config: &ChainConfig, +) -> PeerId { + let peer_id = PeerId::new(); + conn_event_sender + .send(ConnectivityEvent::InboundAccepted { + peer_address, + bind_address, + peer_info: make_block_relay_peer_info(peer_id, &chain_config), + node_address_as_seen_by_peer: None, + }) + .unwrap(); + + peer_id +} + +/// Send a ConnectivityEvent simulating a connection being accepted by the backend. +pub fn inbound_full_relay_peer_accepted_by_backend( + conn_event_sender: &mpsc::UnboundedSender, + peer_address: SocketAddress, + bind_address: SocketAddress, + chain_config: &ChainConfig, +) -> PeerId { + let peer_id = PeerId::new(); + conn_event_sender + .send(ConnectivityEvent::InboundAccepted { + peer_address, + bind_address, + peer_info: make_full_relay_peer_info(peer_id, &chain_config), + node_address_as_seen_by_peer: None, + }) + .unwrap(); + + peer_id +} + +/// Send a ConnectivityEvent simulating a PeerManagerMessage being received from the backend. +pub fn peer_mgr_message_received_from_backend( + conn_event_sender: &mpsc::UnboundedSender, + peer_id: PeerId, + message: PeerManagerMessage, +) { + conn_event_sender.send(ConnectivityEvent::Message { peer_id, message }).unwrap(); +} + pub async fn wait_for_heartbeat( peer_mgr_notification_receiver: &mut mpsc::UnboundedReceiver, ) { @@ -112,3 +203,56 @@ pub async fn wait_for_heartbeat( ) .await; } + +pub async fn query_peer_manager( + peer_mgr_event_sender: &mpsc::UnboundedSender, + query_func: F, +) -> R +where + F: FnOnce(&dyn PeerManagerQueryInterface) -> R + Send + 'static, + R: Send + 'static, +{ + let (response_sender, mut response_receiver) = mpsc::unbounded_channel(); + + peer_mgr_event_sender + .send(PeerManagerEvent::GenericQuery(Box::new( + move |peer_mgr: &dyn PeerManagerQueryInterface| { + let res = query_func(peer_mgr); + response_sender.send(res).unwrap(); + }, + ))) + .unwrap(); + + response_receiver.recv().await.unwrap() +} + +pub async fn adjust_peer_score( + peer_mgr_event_sender: &mpsc::UnboundedSender, + peer_id: PeerId, + score_adjustment: u32, +) { + let (result_sender, result_receiver) = oneshot_nofail::channel(); + + peer_mgr_event_sender + .send(PeerManagerEvent::AdjustPeerScore( + peer_id, + score_adjustment, + result_sender, + )) + .unwrap(); + + result_receiver.await.unwrap().unwrap(); +} + +pub async fn ban_peer_manually( + peer_mgr_event_sender: &mpsc::UnboundedSender, + peer_addr: BannableAddress, +) { + let (result_sender, result_receiver) = oneshot_nofail::channel(); + + peer_mgr_event_sender + .send(PeerManagerEvent::Ban(peer_addr, result_sender)) + .unwrap(); + + result_receiver.await.unwrap().unwrap(); +} diff --git a/p2p/src/testing_utils.rs b/p2p/src/testing_utils.rs index a916952df4..f02e72509b 100644 --- a/p2p/src/testing_utils.rs +++ b/p2p/src/testing_utils.rs @@ -29,6 +29,7 @@ use p2p_types::socket_address::SocketAddress; use tokio::time::timeout; use crate::{ + ban_config::BanConfig, config::P2pConfig, net::{ default_backend::transport::{ @@ -136,7 +137,7 @@ impl TestTransportMaker for TestTransportNoise { pub struct TestAddressMaker {} impl TestAddressMaker { - pub fn new_random_ipv6_addr_with_rng(rng: &mut impl Rng) -> Ipv6Addr { + pub fn new_random_ipv6_addr(rng: &mut impl Rng) -> Ipv6Addr { Ipv6Addr::new( rng.gen(), rng.gen(), @@ -149,15 +150,17 @@ impl TestAddressMaker { ) } - pub fn new_random_ipv4_addr_with_rng(rng: &mut impl Rng) -> Ipv4Addr { + pub fn new_random_ipv4_addr(rng: &mut impl Rng) -> Ipv4Addr { Ipv4Addr::new(rng.gen(), rng.gen(), rng.gen(), rng.gen()) } pub fn new_random_address_with_rng(rng: &mut impl Rng) -> SocketAddress { - let ip = Self::new_random_ipv6_addr_with_rng(rng); + let ip = Self::new_random_ipv6_addr(rng); SocketAddress::new(SocketAddr::new(IpAddr::V6(ip), rng.gen())) } + // TODO: remove this function; rename new_random_address_with_rng to new_random_address and + // use it in all the tests. pub fn new_random_address() -> SocketAddress { let mut rng = make_pseudo_rng(); Self::new_random_address_with_rng(&mut rng) @@ -337,6 +340,53 @@ pub fn test_p2p_config_with_peer_db_config(peerdb_config: PeerDbConfig) -> P2pCo }) } +pub fn test_p2p_config_with_ban_config(ban_config: BanConfig) -> P2pConfig { + P2pConfig { + ban_config, + + bind_addresses: Default::default(), + socks5_proxy: Default::default(), + disable_noise: Default::default(), + boot_nodes: Default::default(), + reserved_nodes: Default::default(), + whitelisted_addresses: Default::default(), + outbound_connection_timeout: Default::default(), + ping_check_period: Default::default(), + ping_timeout: Default::default(), + peer_handshake_timeout: Default::default(), + max_clock_diff: Default::default(), + node_type: Default::default(), + allow_discover_private_ips: Default::default(), + user_agent: mintlayer_core_user_agent(), + sync_stalling_timeout: Default::default(), + peer_manager_config: Default::default(), + protocol_config: Default::default(), + } +} + +pub fn test_peer_mgr_config_with_no_auto_outbound_connections() -> PeerManagerConfig { + PeerManagerConfig { + outbound_block_relay_count: 0.into(), + outbound_block_relay_extra_count: 0.into(), + outbound_full_relay_count: 0.into(), + outbound_full_relay_extra_count: 0.into(), + enable_feeler_connections: false.into(), + + outbound_block_relay_connection_min_age: Default::default(), + peerdb_config: Default::default(), + preserved_inbound_count_address_group: Default::default(), + preserved_inbound_count_ping: Default::default(), + preserved_inbound_count_new_blocks: Default::default(), + preserved_inbound_count_new_transactions: Default::default(), + max_inbound_connections: Default::default(), + outbound_full_relay_connection_min_age: Default::default(), + stale_tip_time_diff: Default::default(), + main_loop_tick_interval: Default::default(), + feeler_connections_interval: Default::default(), + force_dns_query_if_no_global_addresses_known: Default::default(), + } +} + pub async fn get_two_connected_sockets() -> (T::Stream, T::Stream) where A: TestTransportMaker, diff --git a/p2p/src/tests/helpers/test_node.rs b/p2p/src/tests/helpers/test_node.rs index 7f5d1bee26..8cd9105e08 100644 --- a/p2p/src/tests/helpers/test_node.rs +++ b/p2p/src/tests/helpers/test_node.rs @@ -44,7 +44,7 @@ use crate::{ ConnectivityService, }, peer_manager::{ - peerdb::storage_impl::PeerDbStorageImpl, PeerManager, PeerManagerQueryInterface, + peerdb::storage_impl::PeerDbStorageImpl, test_utils::query_peer_manager, PeerManager, }, protocol::ProtocolVersion, sync::SyncManager, @@ -300,19 +300,10 @@ where } pub async fn get_peers_info(&self) -> TestPeersInfo { - let (response_sender, mut response_receiver) = mpsc::unbounded_channel(); - - self.peer_mgr_event_sender - .send(PeerManagerEvent::GenericQuery(Box::new( - move |mgr: &dyn PeerManagerQueryInterface| { - response_sender - .send(TestPeersInfo::from_peer_mgr_peer_contexts(mgr.peers())) - .unwrap(); - }, - ))) - .unwrap(); - - response_receiver.recv().await.unwrap() + query_peer_manager(&self.peer_mgr_event_sender, |peer_mgr| { + TestPeersInfo::from_peer_mgr_peer_contexts(peer_mgr.peers()) + }) + .await } pub async fn assert_connected_to(&self, expected_connections: &[(SocketAddress, PeerRole)]) {