Skip to content

Commit

Permalink
fix(comms): timeout and ban for bad behaviour in protocol negotation
Browse files Browse the repository at this point in the history
  • Loading branch information
sdbondi committed Aug 31, 2023
1 parent 5e8a3ec commit c36556d
Show file tree
Hide file tree
Showing 5 changed files with 109 additions and 27 deletions.
8 changes: 7 additions & 1 deletion comms/core/src/connection_manager/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ pub enum ConnectionManagerEvent {

// Substreams
NewInboundSubstream(NodeId, ProtocolId, Substream),

// Other
PeerViolation { peer_node_id: NodeId, details: String },
}

impl fmt::Display for ConnectionManagerEvent {
Expand All @@ -85,6 +88,9 @@ impl fmt::Display for ConnectionManagerEvent {
node_id.short_str(),
String::from_utf8_lossy(protocol)
),
PeerViolation { peer_node_id, details } => {
write!(f, "PeerViolation({}, {})", peer_node_id.short_str(), details)
},
}
}
}
Expand Down Expand Up @@ -401,7 +407,7 @@ where
}

async fn handle_event(&mut self, event: ConnectionManagerEvent) {
use ConnectionManagerEvent::{NewInboundSubstream, PeerConnectFailed, PeerConnected, PeerInboundConnectFailed};
use ConnectionManagerEvent::*;

match event {
NewInboundSubstream(node_id, protocol, stream) => {
Expand Down
91 changes: 66 additions & 25 deletions comms/core/src/connection_manager/peer_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use std::{
time::{Duration, Instant},
};

use futures::{future::BoxFuture, stream::FuturesUnordered};
use log::*;
use multiaddr::Multiaddr;
use tokio::{
Expand Down Expand Up @@ -65,6 +66,8 @@ use crate::{

const LOG_TARGET: &str = "comms::connection_manager::peer_connection";

const PROTOCOL_NEGOTIATION_TIMEOUT: Duration = Duration::from_secs(5);

static ID_COUNTER: AtomicUsize = AtomicUsize::new(0);

pub fn try_create(
Expand Down Expand Up @@ -358,7 +361,9 @@ struct PeerConnectionActor {
incoming_substreams: IncomingSubstreams,
control: Control,
event_notifier: mpsc::Sender<ConnectionManagerEvent>,
our_supported_protocols: Vec<ProtocolId>,
our_supported_protocols: Arc<Vec<ProtocolId>>,
inbound_protocol_negotiations:
FuturesUnordered<BoxFuture<'static, Result<(ProtocolId, Substream), PeerConnectionError>>>,
their_supported_protocols: Vec<ProtocolId>,
}

Expand All @@ -381,7 +386,10 @@ impl PeerConnectionActor {
incoming_substreams: connection.into_incoming(),
request_rx,
event_notifier,
our_supported_protocols,
// our_supported_protocols never changes so we make it cheap to clone (used in inbound_protocol_negotiations
// futures)
our_supported_protocols: Arc::new(our_supported_protocols),
inbound_protocol_negotiations: FuturesUnordered::new(),
their_supported_protocols,
}
}
Expand All @@ -401,22 +409,16 @@ impl PeerConnectionActor {

maybe_substream = self.incoming_substreams.next() => {
match maybe_substream {
Some(substream) => {
if let Err(err) = self.handle_incoming_substream(substream).await {
error!(
target: LOG_TARGET,
"[{}] Incoming substream for peer '{}' failed to open because '{error}'",
self,
self.peer_node_id.short_str(),
error = err
)
}
},
Some(substream) => self.handle_incoming_substream(substream).await,
None => {
debug!(target: LOG_TARGET, "[{}] Peer '{}' closed the connection", self, self.peer_node_id.short_str());
break;
},
}
},

Some(result) = self.inbound_protocol_negotiations.next() => {
self.handle_inbound_protocol_negotiation_result(result).await;
}
}
}
Expand Down Expand Up @@ -461,27 +463,66 @@ impl PeerConnectionActor {
}

#[tracing::instrument(level="trace", skip(self, stream),fields(comms.direction="inbound"))]
async fn handle_incoming_substream(&mut self, mut stream: Substream) -> Result<(), PeerConnectionError> {
let selected_protocol = ProtocolNegotiation::new(&mut stream)
.negotiate_protocol_inbound(&self.our_supported_protocols)
.await?;
async fn handle_incoming_substream(&mut self, mut stream: Substream) {
let our_supported_protocols = self.our_supported_protocols.clone();
self.inbound_protocol_negotiations.push(Box::pin(async move {
let mut protocol_negotiation = ProtocolNegotiation::new(&mut stream);

let selected_protocol = time::timeout(
PROTOCOL_NEGOTIATION_TIMEOUT,
protocol_negotiation.negotiate_protocol_inbound(&our_supported_protocols),
)
.await
.map_err(|_| PeerConnectionError::ProtocolNegotiationTimeout)??;
Ok((selected_protocol, stream))
}));
}

self.notify_event(ConnectionManagerEvent::NewInboundSubstream(
self.peer_node_id.clone(),
selected_protocol,
stream,
))
.await;
async fn handle_inbound_protocol_negotiation_result(
&mut self,
result: Result<(ProtocolId, Substream), PeerConnectionError>,
) {
match result {
Ok((selected_protocol, stream)) => {
self.notify_event(ConnectionManagerEvent::NewInboundSubstream(
self.peer_node_id.clone(),
selected_protocol,
stream,
))
.await;
},
Err(PeerConnectionError::ProtocolError(err)) if err.is_ban_offence() => {
error!(
target: LOG_TARGET,
"[{}] PEER VIOLATION: Incoming substream for peer '{}' failed to open because '{}'",
self,
self.peer_node_id.short_str(),
err
);

Ok(())
self.notify_event(ConnectionManagerEvent::PeerViolation {
peer_node_id: self.peer_node_id.clone(),
details: err.to_string(),
})
.await;
},
Err(err) => {
error!(
target: LOG_TARGET,
"[{}] Incoming substream for peer '{}' failed to open because '{error}'",
self,
self.peer_node_id.short_str(),
error = err
);
},
}
}

#[tracing::instrument(skip(self))]
async fn open_negotiated_protocol_stream(
&mut self,
protocol: ProtocolId,
) -> Result<NegotiatedSubstream<Substream>, PeerConnectionError> {
const PROTOCOL_NEGOTIATION_TIMEOUT: Duration = Duration::from_secs(10);
debug!(
target: LOG_TARGET,
"[{}] Negotiating protocol '{}' on new substream for peer '{}'",
Expand Down
13 changes: 12 additions & 1 deletion comms/core/src/connectivity/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -540,11 +540,12 @@ impl ConnectivityManagerActor {
Ok(())
}

#[allow(clippy::too_many_lines)]
async fn update_state_on_connectivity_event(
&mut self,
event: &ConnectionManagerEvent,
) -> Result<(), ConnectivityError> {
use ConnectionManagerEvent::{PeerConnectFailed, PeerConnected, PeerDisconnected};
use ConnectionManagerEvent::*;
match event {
PeerConnected(new_conn) => {
match self.on_new_connection(new_conn).await {
Expand Down Expand Up @@ -573,6 +574,16 @@ impl ConnectivityManagerActor {
}
}
},
PeerViolation { peer_node_id, details } => {
self.ban_peer(
peer_node_id,
// 1 day
Duration::from_secs(24 * 60 * 60),
format!("Peer violation: {details}"),
)
.await?;
return Ok(());
},
_ => {},
}

Expand Down
16 changes: 16 additions & 0 deletions comms/core/src/protocol/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,19 @@ pub enum ProtocolError {
#[error("Failed to send notification because notification sender disconnected")]
NotificationSenderDisconnected,
}

impl ProtocolError {
pub fn is_ban_offence(&self) -> bool {
match self {
ProtocolError::IoError(_) |
ProtocolError::ProtocolNegotiationTerminatedByPeer |
ProtocolError::ProtocolOutboundNegotiationFailed { .. } |
ProtocolError::ProtocolNotRegistered |
ProtocolError::ProtocolInboundNegotiationFailed |
ProtocolError::ProtocolOptimisticNegotiationFailed |
ProtocolError::NotificationSenderDisconnected => false,

ProtocolError::ProtocolIdTooLong => true,
}
}
}
8 changes: 8 additions & 0 deletions comms/dht/examples/memory_net/utilities.rs
Original file line number Diff line number Diff line change
Expand Up @@ -657,6 +657,14 @@ fn connection_manager_logger(
node_name
);
},
PeerViolation { peer_node_id, details } => {
println!(
"'{}' violated protocol with '{}' because '{}'",
node_name,
get_name(peer_node_id),
details
);
},
}
event
}
Expand Down

0 comments on commit c36556d

Please sign in to comment.