Skip to content
This repository was archived by the owner on Oct 31, 2024. It is now read-only.

Commit b8cd730

Browse files
Freyskeydhadjiszs
andauthored
fix(p2p): accept listener connection during bootstrap (#484)
Signed-off-by: Simon Paitrault <[email protected]> Signed-off-by: Monir Hadji <[email protected]> Co-authored-by: Monir Hadji <[email protected]>
1 parent 5b6ddb8 commit b8cd730

File tree

12 files changed

+216
-63
lines changed

12 files changed

+216
-63
lines changed

crates/topos-p2p/src/behaviour/grpc/event.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ pub enum Event {
1414
OutboundSuccess {
1515
peer_id: PeerId,
1616
request_id: RequestId,
17+
#[allow(unused)]
1718
channel: Channel,
1819
},
1920

crates/topos-p2p/src/network.rs

Lines changed: 29 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ use crate::{
1414
};
1515
use futures::Stream;
1616
use libp2p::{
17-
core::upgrade,
17+
core::{transport::MemoryTransport, upgrade},
1818
dns,
1919
identity::Keypair,
2020
kad::store::MemoryStore,
@@ -30,6 +30,7 @@ use std::{
3030
};
3131
use tokio::sync::{mpsc, oneshot};
3232
use tokio_stream::wrappers::ReceiverStream;
33+
use tracing::debug;
3334

3435
pub fn builder<'a>() -> NetworkBuilder<'a> {
3536
NetworkBuilder::default()
@@ -48,9 +49,16 @@ pub struct NetworkBuilder<'a> {
4849
local_port: Option<u8>,
4950
config: NetworkConfig,
5051
grpc_context: GrpcContext,
52+
memory_transport: bool,
5153
}
5254

5355
impl<'a> NetworkBuilder<'a> {
56+
#[cfg(test)]
57+
pub(crate) fn memory(mut self) -> Self {
58+
self.memory_transport = true;
59+
60+
self
61+
}
5462
pub fn grpc_context(mut self, grpc_context: GrpcContext) -> Self {
5563
self.grpc_context = grpc_context;
5664

@@ -131,6 +139,7 @@ impl<'a> NetworkBuilder<'a> {
131139

132140
let grpc = grpc::Behaviour::new(self.grpc_context);
133141

142+
debug!("Known peers: {:?}", self.known_peers);
134143
let behaviour = Behaviour {
135144
gossipsub,
136145
peer_info: PeerInfoBehaviour::new(PEER_INFO_PROTOCOL, &peer_key),
@@ -148,23 +157,29 @@ impl<'a> NetworkBuilder<'a> {
148157
grpc,
149158
};
150159

151-
let transport = {
160+
let multiplex_config = libp2p::yamux::Config::default();
161+
162+
let transport = if self.memory_transport {
163+
MemoryTransport::new()
164+
.upgrade(upgrade::Version::V1)
165+
.authenticate(noise::Config::new(&peer_key)?)
166+
.multiplex(multiplex_config)
167+
.timeout(TWO_HOURS)
168+
.boxed()
169+
} else {
152170
let tcp = libp2p::tcp::tokio::Transport::new(Config::default().nodelay(true));
153171
let dns_tcp = dns::tokio::Transport::system(tcp).unwrap();
154172

155173
let tcp = libp2p::tcp::tokio::Transport::new(Config::default().nodelay(true));
156-
dns_tcp.or_transport(tcp)
174+
dns_tcp
175+
.or_transport(tcp)
176+
.upgrade(upgrade::Version::V1)
177+
.authenticate(noise::Config::new(&peer_key)?)
178+
.multiplex(multiplex_config)
179+
.timeout(TWO_HOURS)
180+
.boxed()
157181
};
158182

159-
let multiplex_config = libp2p::yamux::Config::default();
160-
161-
let transport = transport
162-
.upgrade(upgrade::Version::V1)
163-
.authenticate(noise::Config::new(&peer_key)?)
164-
.multiplex(multiplex_config)
165-
.timeout(TWO_HOURS)
166-
.boxed();
167-
168183
let swarm = Swarm::new(
169184
transport,
170185
behaviour,
@@ -216,8 +231,8 @@ impl<'a> NetworkBuilder<'a> {
216231
pending_record_requests: HashMap::new(),
217232
shutdown,
218233
health_state: crate::runtime::HealthState {
219-
bootpeer_connection_retries: 3,
220-
successfully_connected_to_bootpeer: if self.known_peers.is_empty() {
234+
bootnode_connection_retries: 3,
235+
successfully_connected_to_bootnode: if self.known_peers.is_empty() {
221236
// Node seems to be a boot node
222237
Some(ConnectionId::new_unchecked(0))
223238
} else {

crates/topos-p2p/src/runtime/handle_event.rs

Lines changed: 43 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use libp2p::{multiaddr::Protocol, swarm::SwarmEvent};
1+
use libp2p::{core::Endpoint, multiaddr::Protocol, swarm::SwarmEvent};
22
use tracing::{debug, error, info, warn};
33

44
use crate::{error::P2PError, event::ComposedEvent, Event, Runtime};
@@ -62,13 +62,13 @@ impl EventHandler<SwarmEvent<ComposedEvent>> for Runtime {
6262
error,
6363
} if self
6464
.health_state
65-
.successfully_connected_to_bootpeer
65+
.successfully_connected_to_bootnode
6666
.is_none()
67-
&& self.health_state.dialed_bootpeer.contains(&connection_id) =>
67+
&& self.health_state.dialed_bootnode.contains(&connection_id) =>
6868
{
69-
warn!("Unable to connect to bootpeer {peer_id}: {error:?}");
70-
self.health_state.dialed_bootpeer.remove(&connection_id);
71-
if self.health_state.dialed_bootpeer.is_empty() {
69+
warn!("Unable to connect to bootnode {peer_id}: {error:?}");
70+
self.health_state.dialed_bootnode.remove(&connection_id);
71+
if self.health_state.dialed_bootnode.is_empty() {
7272
// We tried to connect to all bootnode without success
7373
error!("Unable to connect to any bootnode");
7474
}
@@ -100,25 +100,49 @@ impl EventHandler<SwarmEvent<ComposedEvent>> for Runtime {
100100
num_established,
101101
concurrent_dial_errors,
102102
established_in,
103-
} if self.health_state.dialed_bootpeer.contains(&connection_id) => {
104-
info!("Successfully connected to bootpeer {peer_id}");
103+
} if self.health_state.dialed_bootnode.contains(&connection_id) => {
104+
info!("Successfully connected to bootnode {peer_id}");
105105
if self
106106
.health_state
107-
.successfully_connected_to_bootpeer
107+
.successfully_connected_to_bootnode
108108
.is_none()
109109
{
110-
self.health_state.successfully_connected_to_bootpeer = Some(connection_id);
111-
_ = self.health_state.dialed_bootpeer.remove(&connection_id);
110+
self.health_state.successfully_connected_to_bootnode = Some(connection_id);
111+
_ = self.health_state.dialed_bootnode.remove(&connection_id);
112112
}
113113
}
114114

115115
SwarmEvent::ConnectionEstablished {
116-
peer_id, endpoint, ..
116+
peer_id,
117+
endpoint,
118+
connection_id,
119+
..
117120
} => {
118-
info!(
119-
"Connection established with peer {peer_id} as {:?}",
120-
endpoint.to_endpoint()
121-
);
121+
if self
122+
.health_state
123+
.successfully_connected_to_bootnode
124+
.is_none()
125+
&& self.boot_peers.contains(&peer_id)
126+
{
127+
info!(
128+
"Connection established with bootnode {peer_id} as {:?}",
129+
endpoint.to_endpoint()
130+
);
131+
132+
if endpoint.to_endpoint() == Endpoint::Listener {
133+
if let Err(error) = self.swarm.dial(peer_id) {
134+
error!(
135+
"Unable to dial bootnode {peer_id} after incoming connection: \
136+
{error}"
137+
);
138+
}
139+
}
140+
} else {
141+
info!(
142+
"Connection established with peer {peer_id} as {:?}",
143+
endpoint.to_endpoint()
144+
);
145+
}
122146

123147
if self.swarm.connected_peers().count() >= self.config.minimum_cluster_size {
124148
if let Err(error) = self.swarm.behaviour_mut().gossipsub.subscribe() {
@@ -164,8 +188,8 @@ impl EventHandler<SwarmEvent<ComposedEvent>> for Runtime {
164188
peer_id: Some(ref peer_id),
165189
connection_id,
166190
} if self.boot_peers.contains(peer_id) => {
167-
info!("Dialing bootpeer {peer_id} on connection: {connection_id}");
168-
self.health_state.dialed_bootpeer.insert(connection_id);
191+
info!("Dialing bootnode {peer_id} on connection: {connection_id}");
192+
self.health_state.dialed_bootnode.insert(connection_id);
169193
}
170194

171195
SwarmEvent::Dialing {
@@ -185,6 +209,7 @@ impl EventHandler<SwarmEvent<ComposedEvent>> for Runtime {
185209
SwarmEvent::ListenerError { listener_id, error } => {
186210
error!("Unhandled ListenerError {listener_id:?} | {error}")
187211
}
212+
188213
event => {
189214
warn!("Unhandled SwarmEvent: {:?}", event);
190215
}

crates/topos-p2p/src/runtime/handle_event/discovery.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ impl EventHandler<Box<Event>> for Runtime {
4545
{
4646
if self
4747
.health_state
48-
.successfully_connected_to_bootpeer
48+
.successfully_connected_to_bootnode
4949
.is_none()
5050
{
5151
warn!(
@@ -85,11 +85,11 @@ impl EventHandler<Box<Event>> for Runtime {
8585
} if num_remaining == 0
8686
&& self
8787
.health_state
88-
.successfully_connected_to_bootpeer
88+
.successfully_connected_to_bootnode
8989
.is_none()
9090
&& self.swarm.behaviour().discovery.health_status == HealthStatus::Unhealthy =>
9191
{
92-
match self.health_state.bootpeer_connection_retries.checked_sub(1) {
92+
match self.health_state.bootnode_connection_retries.checked_sub(1) {
9393
None => {
9494
error!(
9595
"Bootstrap query finished but unable to connect to bootnode, stopping"
@@ -103,7 +103,7 @@ impl EventHandler<Box<Event>> for Runtime {
103103
{} more times",
104104
new
105105
);
106-
self.health_state.bootpeer_connection_retries = new;
106+
self.health_state.bootnode_connection_retries = new;
107107
}
108108
}
109109
}
@@ -119,7 +119,7 @@ impl EventHandler<Box<Event>> for Runtime {
119119
} if num_remaining == 0
120120
&& self
121121
.health_state
122-
.successfully_connected_to_bootpeer
122+
.successfully_connected_to_bootnode
123123
.is_some()
124124
&& self.swarm.behaviour().discovery.health_status == HealthStatus::Unhealthy =>
125125
{
Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,52 @@
1+
use tracing::debug;
2+
13
use crate::{behaviour::grpc, Runtime};
24

35
use super::{EventHandler, EventResult};
46

57
#[async_trait::async_trait]
68
impl EventHandler<grpc::Event> for Runtime {
7-
async fn handle(&mut self, _event: grpc::Event) -> EventResult {
9+
async fn handle(&mut self, event: grpc::Event) -> EventResult {
10+
match event {
11+
grpc::Event::OutboundFailure {
12+
peer_id,
13+
request_id,
14+
error,
15+
} => {
16+
debug!(
17+
"Outbound connection failure to peer {} for request {}: {}",
18+
peer_id, request_id, error
19+
);
20+
}
21+
grpc::Event::OutboundSuccess {
22+
peer_id,
23+
request_id,
24+
..
25+
} => {
26+
debug!(
27+
"Outbound connection success to peer {} for request {}",
28+
peer_id, request_id
29+
);
30+
}
31+
grpc::Event::InboundNegotiatedConnection {
32+
request_id,
33+
connection_id,
34+
} => {
35+
debug!(
36+
"Inbound connection negotiated for request {} with connection {}",
37+
request_id, connection_id
38+
);
39+
}
40+
grpc::Event::OutboundNegotiatedConnection {
41+
peer_id,
42+
request_id,
43+
} => {
44+
debug!(
45+
"Outbound connection negotiated to peer {} for request {}",
46+
peer_id, request_id
47+
);
48+
}
49+
}
850
Ok(())
951
}
1052
}

crates/topos-p2p/src/runtime/mod.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -62,11 +62,11 @@ pub(crate) struct HealthState {
6262
/// Indicates if the node is listening on any address
6363
pub(crate) is_listening: bool,
6464
/// List the bootnodes that the node has tried to connect to
65-
pub(crate) dialed_bootpeer: HashSet<ConnectionId>,
65+
pub(crate) dialed_bootnode: HashSet<ConnectionId>,
6666
/// Indicates if the node has successfully connected to a bootnode
67-
pub(crate) successfully_connected_to_bootpeer: Option<ConnectionId>,
67+
pub(crate) successfully_connected_to_bootnode: Option<ConnectionId>,
6868
/// Track the number of remaining retries to connect to any bootnode
69-
pub(crate) bootpeer_connection_retries: usize,
69+
pub(crate) bootnode_connection_retries: usize,
7070
}
7171

7272
impl Runtime {
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
use std::time::Duration;
2+
3+
use futures::{future::join_all, FutureExt};
4+
use rstest::rstest;
5+
use test_log::test;
6+
use topos_test_sdk::tce::NodeConfig;
7+
use tracing::Instrument;
8+
9+
#[rstest]
10+
#[test(tokio::test)]
11+
#[timeout(Duration::from_secs(5))]
12+
async fn two_bootnode_communicating() {
13+
let bootnode = NodeConfig::memory(2);
14+
let local = NodeConfig::memory(1);
15+
let bootnode_known_peers = vec![(local.peer_id(), local.addr.clone())];
16+
let local_known_peers = vec![(bootnode.peer_id(), bootnode.addr.clone())];
17+
18+
let mut handlers = Vec::new();
19+
20+
let context_local = tracing::info_span!("start_node", "peer_id" = local.peer_id().to_string());
21+
22+
let context_bootnode =
23+
tracing::info_span!("start_node", "peer_id" = bootnode.peer_id().to_string());
24+
handlers.push(
25+
async move {
26+
let (client, mut stream, runtime) = crate::network::builder()
27+
.minimum_cluster_size(1)
28+
.peer_key(local.keypair.clone())
29+
.listen_addresses(&[local.addr.clone()])
30+
.known_peers(&local_known_peers)
31+
.memory()
32+
.build()
33+
.await
34+
.expect("Unable to create p2p network");
35+
36+
runtime.bootstrap(&mut stream).await
37+
}
38+
.instrument(context_local)
39+
.boxed(),
40+
);
41+
42+
handlers.push(
43+
async move {
44+
let (client, mut stream, runtime) = crate::network::builder()
45+
.minimum_cluster_size(1)
46+
.peer_key(bootnode.keypair.clone())
47+
.listen_addresses(&[bootnode.addr.clone()])
48+
.known_peers(&bootnode_known_peers)
49+
.memory()
50+
.build()
51+
.await
52+
.expect("Unable to create p2p network");
53+
54+
runtime.bootstrap(&mut stream).await
55+
}
56+
.instrument(context_bootnode)
57+
.boxed(),
58+
);
59+
assert!(join_all(handlers).await.iter().all(Result::is_ok));
60+
}

crates/topos-p2p/src/tests/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
mod behaviour;
2+
mod bootstrap;
23
mod command;
34
mod support;

crates/topos-test-sdk/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ ethers.workspace = true
2424
async-trait.workspace = true
2525
futures.workspace = true
2626
lazy_static = { version = "1.4.0" }
27-
libp2p.workspace = true
27+
libp2p = { workspace = true, features = ["macros"] }
2828
proc_macro_sdk = { path = "./proc_macro_sdk/" }
2929
rand.workspace = true
3030
rstest.workspace = true

0 commit comments

Comments
 (0)