Skip to content

Commit

Permalink
feat: clear handshake error when connections go stale (#6528)
Browse files Browse the repository at this point in the history
Description
---
Cleared handshake error form the client side by forcefully disconnecting
form the peer.

Motivation and Context
---
This is to syop the phenomenon whereby a peer can dial another peer
(`dial-eer XXXXX`) but cannot ping that same peer (`ping-peer XXXXX`).
(See #6516)

How Has This Been Tested?
---
Unit tests pass
Long-term system-level testing has to be done to confirm this solved the
issue.

What process can a PR reviewer use to test or verify this change?
---
Review code changes.

<!-- Checklist -->
<!-- 1. Is the title of your PR in the form that would make nice release
notes? The title, excluding the conventional commit
tag, will be included exactly as is in the CHANGELOG, so please think
about it carefully. -->


Breaking Changes
---

- [x] None
- [ ] Requires data directory on base node to be deleted
- [ ] Requires hard fork
- [ ] Other - Please specify

<!-- Does this include a breaking change? If so, include this line as a
footer -->
<!-- BREAKING CHANGE: Description what the user should do, e.g. delete a
database, resync the chain -->
  • Loading branch information
hansieodendaal authored Sep 5, 2024
1 parent 14a6631 commit b5f5e03
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 26 deletions.
41 changes: 27 additions & 14 deletions comms/core/src/connection_manager/dialer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -520,6 +520,8 @@ where
debug!(target: LOG_TARGET, "Dial succeeded for peer '{}' after {} attempt(s)", state.peer().node_id.short_str(), state.num_attempts());
break (state, Ok((socket, addr)));
},
// Connection went stale, propagate error to enable starting a fresh connection
(state, Err(ConnectionManagerError::NoiseHandshakeError(e))) => break (state, Err(ConnectionManagerError::NoiseHandshakeError(e))),
// Inflight dial was cancelled
(state, Err(ConnectionManagerError::DialCancelled)) => break (state, Err(ConnectionManagerError::DialCancelled)),
(state, Err(err)) => {
Expand All @@ -546,6 +548,7 @@ where
/// Attempts to dial a peer sequentially on all addresses; if connections are to be minimized only.
/// Returns ownership of the given `DialState` and a success or failure result for the dial,
/// or None if the dial was cancelled inflight
#[allow(clippy::too_many_lines)]
async fn dial_peer(
mut dial_state: DialState,
noise_config: &NoiseConfig,
Expand All @@ -558,9 +561,9 @@ where
let addresses = dial_state.peer().addresses.clone().into_vec();
if addresses.is_empty() {
let node_id_hex = dial_state.peer().node_id.clone().to_hex();
debug!(
trace!(
target: LOG_TARGET,
"No more contactable addresses for peer '{}'",
"Dial - No more contactable addresses for peer '{}'",
node_id_hex
);
return (
Expand All @@ -570,9 +573,9 @@ where
}
let cancel_signal = dial_state.get_cancel_signal();
for address in addresses {
debug!(
trace!(
target: LOG_TARGET,
"Attempting address '{}' for peer '{}'",
"Dial - Attempting address '{}' for peer '{}'",
address,
dial_state.peer().node_id.short_str()
);
Expand All @@ -589,18 +592,18 @@ where
address: moved_address.to_string(),
details: err.to_string(),
})?;
debug!(
target: LOG_TARGET,
"Socket established on '{}'. Performing noise upgrade protocol", moved_address
);
let initial_dial_time = timer.elapsed();

debug!(
"Dialed peer: {} on address: {} on tcp after: {}",
trace!(
"Dial - Dialed peer: {} on address: {} on tcp after: {}",
node_id.short_str(),
moved_address,
timer.elapsed().as_millis()
);
trace!(
target: LOG_TARGET,
"Dial - Socket established on '{}'. Performing noise upgrade protocol", moved_address
);
timer = Instant::now();

socket
Expand All @@ -610,10 +613,20 @@ where

let noise_socket = noise_config
.upgrade_socket(socket, ConnectionDirection::Outbound)
.await?;
.await
.map_err(|err| {
warn!(
target: LOG_TARGET,
"Dial - failed to upgrade noise: {} on address: {} ({})",
node_id,
moved_address,
err
);
err
})?;

let noise_upgrade_time = timer.elapsed();
debug!(
trace!(
"Dial - upgraded noise: {} on address: {} on tcp after: {} ms",
node_id.short_str(),
moved_address,
Expand All @@ -638,7 +651,7 @@ where
Either::Left((Err(err), _)) => {
debug!(
target: LOG_TARGET,
"(Attempt {}) Dial failed on address '{}' for peer '{}' because '{}'",
"Dial - (Attempt {}) Dial failed on address '{}' for peer '{}' because '{}'",
dial_state.num_attempts(),
address,
dial_state.peer().node_id.short_str(),
Expand All @@ -656,7 +669,7 @@ where
Either::Right(_) => {
debug!(
target: LOG_TARGET,
"Dial for peer '{}' cancelled",
"Dial - for peer '{}' cancelled",
dial_state.peer().node_id.short_str()
);
return (dial_state, Err(ConnectionManagerError::DialCancelled));
Expand Down
16 changes: 12 additions & 4 deletions comms/core/src/connection_manager/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use crate::{
connection_manager::PeerConnectionRequest,
multiplexing::YamuxControlError,
noise,
noise::NoiseError,
peer_manager::PeerManagerError,
peer_validator::PeerValidatorError,
protocol::{IdentityProtocolError, ProtocolError},
Expand Down Expand Up @@ -65,10 +66,14 @@ pub enum ConnectionManagerError {
},
#[error("The noise transport failed to provide a valid static public key for the peer")]
InvalidStaticPublicKey,
// This is a String because we need this error to be clonable so that we can
// This is a String because we need this error to be clone-able so that we can
// send the same response to multiple requesters
#[error("Noise error: {0}")]
NoiseError(String),
#[error("Noise snow error: {0}")]
NoiseSnowError(String),
// This is a String because we need this error to be clone-able so that we can
// send the same response to multiple requesters
#[error("Noise handshake error: {0}")]
NoiseHandshakeError(String),
#[error("Peer is banned, denying connection")]
PeerBanned,
#[error("Identity protocol failed: {0}")]
Expand Down Expand Up @@ -97,7 +102,10 @@ impl From<yamux::ConnectionError> for ConnectionManagerError {

impl From<noise::NoiseError> for ConnectionManagerError {
fn from(err: noise::NoiseError) -> Self {
ConnectionManagerError::NoiseError(err.to_string())
match err {
NoiseError::SnowError(e) => ConnectionManagerError::NoiseSnowError(e.to_string()),
NoiseError::HandshakeFailed(e) => ConnectionManagerError::NoiseHandshakeError(e.to_string()),
}
}
}

Expand Down
23 changes: 18 additions & 5 deletions comms/core/src/connection_manager/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -350,11 +350,23 @@ where
const CONNECTION_DIRECTION: ConnectionDirection = ConnectionDirection::Inbound;
trace!(
target: LOG_TARGET,
"Starting noise protocol upgrade for peer at address '{}'", peer_addr
"Listen - starting noise protocol upgrade for peer at address '{}'", peer_addr
);

let timer = Instant::now();
let mut noise_socket = noise_config.upgrade_socket(socket, CONNECTION_DIRECTION).await?;
let mut noise_socket = noise_config
.upgrade_socket(socket, CONNECTION_DIRECTION)
.await
.map_err(|err| {
warn!(
target: LOG_TARGET,
"Listen - failed to upgrade noise: {} on address: {} ({})",
node_identity.node_id(),
peer_addr,
err
);
err
})?;

let authenticated_public_key = noise_socket
.get_remote_public_key()
Expand All @@ -363,17 +375,18 @@ where

trace!(
target: LOG_TARGET,
"Noise socket upgrade completed in {:.2?} with public key '{}'",
"Listen - noise socket upgrade completed in {:.2?} with public key '{}'",
latency,
authenticated_public_key
);

// Check if we know the peer and if it is banned
let known_peer = common::find_unbanned_peer(peer_manager, &authenticated_public_key).await?;

debug!(
trace!(
target: LOG_TARGET,
"Starting peer identity exchange for peer with public key '{}'", authenticated_public_key
"Listen - starting peer identity exchange for peer with public key '{}'",
authenticated_public_key
);

let peer_identity_result = common::perform_identity_exchange(
Expand Down
14 changes: 13 additions & 1 deletion comms/core/src/connectivity/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -659,7 +659,19 @@ impl ConnectivityManagerActor {
let (node_id, mut new_status, connection) = match event {
PeerDisconnected(_, node_id, minimized) => (node_id, ConnectionStatus::Disconnected(*minimized), None),
PeerConnected(conn) => (conn.peer_node_id(), ConnectionStatus::Connected, Some(conn.clone())),

PeerConnectFailed(node_id, ConnectionManagerError::NoiseHandshakeError(msg)) => {
if let Some(conn) = self.pool.get_connection(node_id) {
warn!(
target: LOG_TARGET,
"Handshake error to peer '{}', disconnecting for a fresh retry ({})",
node_id,
msg
);
let mut conn = conn.clone();
conn.disconnect(Minimized::No).await?;
}
(node_id, ConnectionStatus::Failed, None)
},
PeerConnectFailed(node_id, ConnectionManagerError::DialCancelled) => {
if let Some(conn) = self.pool.get_connection(node_id) {
if conn.is_connected() && conn.direction().is_inbound() {
Expand Down
4 changes: 2 additions & 2 deletions comms/core/src/noise/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,11 @@ impl NoiseConfig {

match direction {
ConnectionDirection::Outbound => {
debug!(target: LOG_TARGET, "Starting noise initiator handshake ");
trace!(target: LOG_TARGET, "Starting noise initiator handshake ");
builder.build_initiator()?
},
ConnectionDirection::Inbound => {
debug!(target: LOG_TARGET, "Starting noise responder handshake");
trace!(target: LOG_TARGET, "Starting noise responder handshake");
builder.build_responder()?
},
}
Expand Down

0 comments on commit b5f5e03

Please sign in to comment.