Skip to content

Commit

Permalink
feat: verify active base node peer connections and disconnect if stale (
Browse files Browse the repository at this point in the history
#6655)

Description
---
Added check connections to the p2p services (`MonitorPeersService`). All
active connections are pinged on a set (slowish) interval (10 times
slower than the _auto ping metadata interval_). The nodes that do not
respond timeously on three consecutive iterations with a corresponding
pong are disconnected. This will help keep the list of active
connections (lazily) up to date.

**Edit:** 
Fixed an error in the liveness service where misbehaving ping peers were
never disconnected. The liveness service and monitor peers service work
hand in hand. Liveness selects 8 randomly connected peers to obtain
metadata from and will disconnect any of those that misbehave after 1
minute (2x ping intervals). The monitor peers service assesses all
connected peers at a much slower pace and disconnects non-responsive
peers after 15 minutes (10 x 3 ping intervals).

Motivation and Context
---
See #6516

How Has This Been Tested?
---
Performed system-level testing. From the log below we can see that 5 of
41 active peer connections did not respond with a ping. Peer
`e19e1454a1e0519866297960ad ` was disconnected because it did not
respond three times in a row,
```
2024-10-29 15:12:07.664466900 [minotari::base_node::monitor_peers] TRACE Found 5 of 41 outbound base node peer 
  connections that did not respond to pings
2024-10-29 15:12:07.664619800 [minotari::base_node::monitor_peers] TRACE Peer e2fa82050c2f7579febafb7e08 
  stats - (iteration, connected, responsive) [(3, true, true), (4, true, true), (5, true, false)]
2024-10-29 15:12:07.664683300 [minotari::base_node::monitor_peers] DEBUG Disconnecting e19e1454a1e0519866297960ad 
  as the peer is no longer responsive - (iteration, connected, responsive) [(2, true, true), (3, true, false), 
  (4, true, false), (5, true, false)]
2024-10-29 15:12:07.665853300 [minotari::base_node::monitor_peers] TRACE Peer 6ea597117476676d5ddcb18153 
  stats - (iteration, connected, responsive) [(1, true, true), (2, true, true), (3, true, true), (4, true, true), 
  (5, true, false)]
2024-10-29 15:12:07.665965500 [minotari::base_node::monitor_peers] TRACE Peer a671f812efe5ab14cbb3c1f9f4 
  stats - (iteration, connected, responsive) [(2, true, true), (3, true, true), (4, true, true), 
  (5, true, false)]
2024-10-29 15:12:07.665997800 [minotari::base_node::monitor_peers] TRACE Peer e336b264e02f611cf4fbf51f22 
  stats - (iteration, connected, responsive) [(2, true, true), (3, true, true), (4, true, true), 
  (5, true, false)]
```

What process can a PR reviewer use to test or verify this change?
---
- Code review
- System-level testing

<!-- Checklist -->
<!-- 1. Is the title of your PR in the form that would make nice release
notes? The title, excluding the conventional commit
tag, will be included exactly as is in the CHANGELOG, so please think
about it carefully. -->


Breaking Changes
---

- [x] None
- [ ] Requires data directory on base node to be deleted
- [ ] Requires hard fork
- [ ] Other - Please specify

<!-- Does this include a breaking change? If so, include this line as a
footer -->
<!-- BREAKING CHANGE: Description what the user should do, e.g. delete a
database, resync the chain -->

---------

Co-authored-by: SW van Heerden <[email protected]>
  • Loading branch information
hansieodendaal and SWvheerden authored Nov 1, 2024
1 parent 47b4877 commit e61b5e2
Show file tree
Hide file tree
Showing 12 changed files with 554 additions and 79 deletions.
8 changes: 7 additions & 1 deletion applications/minotari_node/src/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,10 @@ use tari_p2p::{
initialization,
initialization::P2pInitializer,
peer_seeds::SeedPeer,
services::liveness::{config::LivenessConfig, LivenessInitializer},
services::{
liveness::{config::LivenessConfig, LivenessInitializer},
monitor_peers::MonitorPeersInitializer,
},
P2pConfig,
TransportType,
};
Expand Down Expand Up @@ -155,6 +158,9 @@ where B: BlockchainBackend + 'static
},
peer_message_subscriptions,
))
.add_initializer(MonitorPeersInitializer::new(
base_node_config.metadata_auto_ping_interval,
))
.add_initializer(ChainMetadataServiceInitializer)
.add_initializer(BaseNodeStateMachineInitializer::new(
self.db.clone().into(),
Expand Down
49 changes: 27 additions & 22 deletions applications/minotari_node/src/commands/command/ping_peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,34 +45,39 @@ impl HandleCommand<Args> for CommandContext {
}

impl CommandContext {
/// Function to process the dial-peer command
/// Function to process the ping-peer command
pub async fn ping_peer(&mut self, dest_node_id: NodeId) -> Result<(), Error> {
println!("🏓 Pinging peer...");
let mut liveness_events = self.liveness.get_event_stream();
let mut liveness = self.liveness.clone();
task::spawn(async move {
if let Err(e) = liveness.send_ping(dest_node_id.clone()).await {
println!("🏓 Ping failed to send to {}: {}", dest_node_id, e);
return;
}
loop {
match liveness_events.recv().await {
Ok(event) => {
if let LivenessEvent::ReceivedPong(pong) = &*event {
if pong.node_id == dest_node_id {
println!(
"🏓️ Pong received, round-trip-time is {:.2?}!",
pong.latency.unwrap_or_default()
);
match liveness.send_ping(dest_node_id.clone()).await {
Ok(nonce) => {
println!("🏓 Pinging peer {} with nonce {} ...", dest_node_id, nonce);
loop {
match liveness_events.recv().await {
Ok(event) => {
if let LivenessEvent::ReceivedPong(pong) = &*event {
if pong.node_id == dest_node_id && pong.nonce == nonce {
println!(
"🏓️ Pong: peer {} responded with nonce {}, round-trip-time is {:.2?}!",
pong.node_id,
pong.nonce,
pong.latency.unwrap_or_default()
);
break;
}
}
},
Err(RecvError::Closed) => {
break;
}
},
Err(RecvError::Lagged(_)) => {},
}
},
Err(RecvError::Closed) => {
break;
},
Err(RecvError::Lagged(_)) => {},
}
}
},
Err(e) => {
println!("🏓 Ping failed to send to {}: {}", dest_node_id, e);
},
}
});
Ok(())
Expand Down
3 changes: 2 additions & 1 deletion applications/minotari_node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ mod grpc_method;
mod metrics;
mod recovery;
mod utils;

use std::{process, sync::Arc};

use commands::{cli_loop::CliLoop, command::CommandContext};
Expand Down Expand Up @@ -151,7 +152,7 @@ pub async fn run_base_node_with_cli(
}

// Run, node, run!
let context = CommandContext::new(&ctx, shutdown);
let context = CommandContext::new(&ctx, shutdown.clone());
let main_loop = CliLoop::new(context, cli.watch, cli.non_interactive_mode);
if cli.non_interactive_mode {
println!("Node started in non-interactive mode (pid = {})", process::id());
Expand Down
12 changes: 3 additions & 9 deletions base_layer/core/src/base_node/chain_metadata_service/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,22 +141,13 @@ impl ChainMetadataService {
match event {
// Received a ping, check if it contains ChainMetadata
LivenessEvent::ReceivedPing(event) => {
debug!(
target: LOG_TARGET,
"Received ping from neighbouring node '{}'.", event.node_id
);
self.number_of_rounds_no_pings = 0;
if event.metadata.has(MetadataKey::ChainMetadata) {
self.send_chain_metadata_to_event_publisher(event).await?;
}
},
// Received a pong, check if our neighbour sent it and it contains ChainMetadata
LivenessEvent::ReceivedPong(event) => {
trace!(
target: LOG_TARGET,
"Received pong from neighbouring node '{}'.",
event.node_id
);
self.number_of_rounds_no_pings = 0;
if event.metadata.has(MetadataKey::ChainMetadata) {
self.send_chain_metadata_to_event_publisher(event).await?;
Expand Down Expand Up @@ -325,6 +316,7 @@ mod test {
metadata,
node_id: node_id.clone(),
latency: None,
nonce: 0,
};

let sample_event = LivenessEvent::ReceivedPong(Box::new(pong_event));
Expand All @@ -347,6 +339,7 @@ mod test {
metadata,
node_id,
latency: None,
nonce: 0,
};

let sample_event = LivenessEvent::ReceivedPong(Box::new(pong_event));
Expand All @@ -365,6 +358,7 @@ mod test {
metadata,
node_id,
latency: None,
nonce: 0,
};

let sample_event = LivenessEvent::ReceivedPong(Box::new(pong_event));
Expand Down
27 changes: 20 additions & 7 deletions base_layer/p2p/src/services/liveness/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ use crate::proto::liveness::MetadataKey;
pub enum LivenessRequest {
/// Send a ping to the given node ID
SendPing(NodeId),
/// Ping a list of peers
SendPings(Vec<NodeId>),
/// Retrieve the total number of pings received
GetPingCount,
/// Retrieve the total number of pongs received
Expand All @@ -55,7 +57,7 @@ pub enum LivenessRequest {
#[derive(Debug)]
pub enum LivenessResponse {
/// Indicates that the request succeeded
Ok,
Ok(Option<Vec<u64>>),
/// Used to return a counter value from `GetPingCount` and `GetPongCount`
Count(usize),
/// Response for GetAvgLatency and GetNetworkAvgLatency
Expand Down Expand Up @@ -84,14 +86,17 @@ pub struct PingPongEvent {
pub latency: Option<Duration>,
/// Metadata of the corresponding node
pub metadata: Metadata,
/// The nonce of the ping/pong message, for clients that want to match pings with pongs
pub nonce: u64,
}

impl PingPongEvent {
pub fn new(node_id: NodeId, latency: Option<Duration>, metadata: Metadata) -> Self {
pub fn new(node_id: NodeId, latency: Option<Duration>, metadata: Metadata, nonce: u64) -> Self {
Self {
node_id,
latency,
metadata,
nonce,
}
}
}
Expand Down Expand Up @@ -122,9 +127,17 @@ impl LivenessHandle {
}

/// Send a ping to a given node ID
pub async fn send_ping(&mut self, node_id: NodeId) -> Result<(), LivenessError> {
pub async fn send_ping(&mut self, node_id: NodeId) -> Result<u64, LivenessError> {
match self.handle.call(LivenessRequest::SendPing(node_id)).await?? {
LivenessResponse::Ok => Ok(()),
LivenessResponse::Ok(Some(nonces)) => Ok(nonces[0]),
_ => Err(LivenessError::UnexpectedApiResponse),
}
}

/// Send pings to a list of peers
pub async fn send_pings(&mut self, node_ids: Vec<NodeId>) -> Result<Vec<u64>, LivenessError> {
match self.handle.call(LivenessRequest::SendPings(node_ids)).await?? {
LivenessResponse::Ok(Some(nonces)) => Ok(nonces),
_ => Err(LivenessError::UnexpectedApiResponse),
}
}
Expand Down Expand Up @@ -152,15 +165,15 @@ impl LivenessHandle {
.call(LivenessRequest::SetMetadataEntry(key, value))
.await??
{
LivenessResponse::Ok => Ok(()),
LivenessResponse::Ok(_) => Ok(()),
_ => Err(LivenessError::UnexpectedApiResponse),
}
}

/// Add a monitored peer to the basic config if not present
pub async fn check_add_monitored_peer(&mut self, node_id: NodeId) -> Result<(), LivenessError> {
match self.handle.call(LivenessRequest::AddMonitoredPeer(node_id)).await?? {
LivenessResponse::Ok => Ok(()),
LivenessResponse::Ok(_) => Ok(()),
_ => Err(LivenessError::UnexpectedApiResponse),
}
}
Expand All @@ -172,7 +185,7 @@ impl LivenessHandle {
.call(LivenessRequest::RemoveMonitoredPeer(node_id))
.await??
{
LivenessResponse::Ok => Ok(()),
LivenessResponse::Ok(_) => Ok(()),
_ => Err(LivenessError::UnexpectedApiResponse),
}
}
Expand Down
12 changes: 8 additions & 4 deletions base_layer/p2p/src/services/liveness/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,11 @@ impl LivenessMock {
self.mock_state.add_request_call(req.clone());
match req {
SendPing(_) => {
reply.send(Ok(LivenessResponse::Ok)).unwrap();
reply.send(Ok(LivenessResponse::Ok(Some(vec![0])))).unwrap();
},
SendPings(node_ids) => {
let nonces: Vec<u64> = (0..node_ids.len() as u64).collect();
reply.send(Ok(LivenessResponse::Ok(Some(nonces)))).unwrap();
},
GetPingCount => {
reply.send(Ok(LivenessResponse::Count(1))).unwrap();
Expand All @@ -140,13 +144,13 @@ impl LivenessMock {
reply.send(Ok(LivenessResponse::AvgLatency(None))).unwrap();
},
SetMetadataEntry(_, _) => {
reply.send(Ok(LivenessResponse::Ok)).unwrap();
reply.send(Ok(LivenessResponse::Ok(None))).unwrap();
},
AddMonitoredPeer(_) => {
reply.send(Ok(LivenessResponse::Ok)).unwrap();
reply.send(Ok(LivenessResponse::Ok(None))).unwrap();
},
RemoveMonitoredPeer(_) => {
reply.send(Ok(LivenessResponse::Ok)).unwrap();
reply.send(Ok(LivenessResponse::Ok(None))).unwrap();
},
}
}
Expand Down
3 changes: 2 additions & 1 deletion base_layer/p2p/src/services/liveness/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ pub use handle::{

mod message;
mod service;
pub use service::MAX_INFLIGHT_TTL;

mod state;
pub use state::Metadata;
Expand Down Expand Up @@ -87,7 +88,7 @@ const LOG_TARGET: &str = "p2p::services::liveness";

/// Initializer for the Liveness service handle and service future.
pub struct LivenessInitializer {
config: Option<LivenessConfig>,
pub(crate) config: Option<LivenessConfig>,
inbound_message_subscription_factory: Arc<TopicSubscriptionFactory<TariMessageType, Arc<PeerMessage>>>,
}

Expand Down
Loading

0 comments on commit e61b5e2

Please sign in to comment.