Skip to content

Commit

Permalink
chore: Wait on identify before returning connection (#47)
Browse files Browse the repository at this point in the history
* chore: Wait on identify before returning connection

* chore: Log time it took to obtain identify

Note: Replace with metrics (related to #40)

* chore: relocate conditions

* chore: Cleanup logic

* chore: Update peerbook test

* chore: Update CHANGELOG.md
  • Loading branch information
dariusc93 authored Mar 20, 2023
1 parent b62ca96 commit 3ff6085
Show file tree
Hide file tree
Showing 2 changed files with 175 additions and 31 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# 0.3.8 [unreleased]
- chore: Wait on identify before returning connection [PR 47]
- feat(repo): Allow custom repo store [PR 46]

[PR 47]: https://github.com/dariusc93/rust-ipfs/pull/47
[PR 46]: https://github.com/dariusc93/rust-ipfs/pull/46

# 0.3.7
Expand Down
204 changes: 173 additions & 31 deletions src/p2p/peerbook.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ use libp2p::swarm::{
};
use libp2p::PeerId;
use std::collections::hash_map::Entry;
use std::time::Duration;
use std::time::{Duration, Instant};
use tracing::log;
use wasm_timer::Interval;

use std::collections::{HashMap, HashSet, VecDeque};
Expand Down Expand Up @@ -127,14 +128,18 @@ pub struct Behaviour {

pending_connections: HashMap<ConnectionId, oneshot::Sender<anyhow::Result<()>>>,

pending_identify_timer: HashMap<PeerId, (Interval, Instant)>,

pending_identify: HashMap<PeerId, oneshot::Sender<anyhow::Result<()>>>,

peer_info: HashMap<PeerId, PeerInfo>,
peer_rtt: HashMap<PeerId, [Duration; 3]>,

whitelist: HashSet<PeerId>,

protocols: Vec<Vec<u8>>,

// For connection limits (took from )
// For connection limits (took from libp2p pr)
pending_inbound_connections: HashSet<ConnectionId>,
pending_outbound_connections: HashSet<ConnectionId>,
established_inbound_connections: HashSet<ConnectionId>,
Expand All @@ -152,6 +157,8 @@ impl Default for Behaviour {
Duration::from_secs(60),
),
pending_connections: Default::default(),
pending_identify_timer: Default::default(),
pending_identify: Default::default(),
peer_info: Default::default(),
peer_rtt: Default::default(),
whitelist: Default::default(),
Expand Down Expand Up @@ -195,7 +202,15 @@ impl Behaviour {

pub fn inject_peer_info<I: Into<PeerInfo>>(&mut self, info: I) {
let info: PeerInfo = info.into();
let peer_id = info.peer_id;
self.peer_info.insert(info.peer_id, info);
if let Some(ch) = self.pending_identify.remove(&peer_id) {
let _ = ch.send(Ok(()));
if let Some((_, instant)) = self.pending_identify_timer.remove(&peer_id) {
let elapse = instant.elapsed();
log::debug!("Took {:?} to obtain identify for {peer_id}", elapse);
}
}
}

pub fn peers(&self) -> impl Iterator<Item = &PeerId> {
Expand Down Expand Up @@ -228,6 +243,7 @@ impl Behaviour {

pub fn remove_peer_info(&mut self, peer_id: PeerId) {
self.peer_info.remove(&peer_id);
self.peer_info.shrink_to_fit();
}

fn check_limit(&mut self, limit: Option<u32>, current: usize) -> Result<(), ConnectionDenied> {
Expand Down Expand Up @@ -372,9 +388,27 @@ impl NetworkBehaviour for Behaviour {
#[allow(clippy::single_match)]
fn on_swarm_event(&mut self, event: FromSwarm<Self::ConnectionHandler>) {
match event {
FromSwarm::ConnectionEstablished(ConnectionEstablished { connection_id, .. }) => {
FromSwarm::ConnectionEstablished(ConnectionEstablished {
peer_id,
connection_id,
..
}) => {
if let Some(ch) = self.pending_connections.remove(&connection_id) {
let _ = ch.send(Ok(()));
if self.get_peer_info(peer_id).is_none() {
self.pending_identify.insert(peer_id, ch);
self.pending_identify_timer.insert(
peer_id,
(
Interval::new_at(
std::time::Instant::now() + Duration::from_secs(5),
Duration::from_secs(5),
),
Instant::now(),
),
);
} else {
let _ = ch.send(Ok(()));
}
}
}
FromSwarm::DialFailure(DialFailure {
Expand Down Expand Up @@ -421,19 +455,46 @@ impl NetworkBehaviour for Behaviour {
return Poll::Ready(event);
}

let mut removal = vec![];
for (peer_id, (timer, _)) in self.pending_identify_timer.iter_mut() {
match timer.poll_next_unpin(cx) {
Poll::Ready(Some(_)) => {
removal.push(*peer_id);
continue;
}
Poll::Ready(None) => {
log::error!("timer for {} was not available", peer_id);
removal.push(*peer_id);
continue;
}
Poll::Pending => continue,
}
}

for peer_id in removal.iter() {
if let Some(ch) = self.pending_identify.remove(peer_id) {
let _ = ch.send(Ok(()));
}
if let Some((_, instant)) = self.pending_identify_timer.remove(peer_id) {
let elapse = instant.elapsed();
log::debug!("Took {:?} to complete", elapse);
}
}

// Used to cleanup any info that may be left behind after a peer is no longer connected while giving time to all
// Note: If a peer is whitelisted, this will retain the info as a cache, although this may change in the future
//
while let Poll::Ready(Some(_)) = self.cleanup_interval.poll_next_unpin(cx) {
let list = self.peer_info.keys().copied().collect::<Vec<_>>();
for peer_id in list {
if !self.established_per_peer.contains_key(&peer_id)
|| !self.whitelist.contains(&peer_id)
&& !self.whitelist.contains(&peer_id)
{
self.peer_info.remove(&peer_id);
self.peer_rtt.remove(&peer_id);
}
}
println!("PeerInfo Len: {}", self.peer_info.len());
}

Poll::Pending
Expand All @@ -442,30 +503,32 @@ impl NetworkBehaviour for Behaviour {

#[cfg(test)]
mod test {
use std::time::Duration;

use super::Behaviour as PeerBook;
use crate::p2p::{peerbook::ConnectionLimits, transport::build_transport};
use futures::StreamExt;
use libp2p::{
identify::{self, Config},
identity::Keypair,
swarm::{keep_alive, NetworkBehaviour, SwarmBuilder, SwarmEvent},
swarm::{
behaviour::toggle::Toggle, keep_alive, NetworkBehaviour, SwarmBuilder, SwarmEvent,
},
Multiaddr, PeerId, Swarm,
};

#[derive(NetworkBehaviour)]
struct Behaviour {
peerbook: PeerBook,
identify: Toggle<identify::Behaviour>,
keep_alive: keep_alive::Behaviour,
}

//TODO: Expand test out
#[tokio::test]
async fn connection_limits() {
let (_, addr1, mut swarm1) = build_swarm().await;
let (peer2, _, mut swarm2) = build_swarm().await;
let (peer3, _, mut swarm3) = build_swarm().await;
let (peer4, _, mut swarm4) = build_swarm().await;
let (_, addr1, mut swarm1) = build_swarm(false).await;
let (peer2, _, mut swarm2) = build_swarm(false).await;
let (peer3, _, mut swarm3) = build_swarm(false).await;
let (peer4, _, mut swarm4) = build_swarm(false).await;

swarm1
.behaviour_mut()
Expand All @@ -475,37 +538,51 @@ mod test {
..Default::default()
});

swarm2.dial(addr1.clone()).unwrap();
let mut oneshot = swarm2.behaviour_mut().peerbook.connect(addr1.clone());

loop {
if let Some(SwarmEvent::ConnectionEstablished { .. }) = swarm1.next().await {
break;
tokio::select! {
biased;
_ = swarm1.next() => {},
_ = swarm2.next() => {},
conn_res = (&mut oneshot) => {
conn_res.unwrap().unwrap();
break;
}
}
}
swarm1.behaviour_mut().peerbook.add(peer3);
swarm3.dial(addr1.clone()).unwrap();
let mut oneshot = swarm3.behaviour_mut().peerbook.connect(addr1.clone());

tokio::time::timeout(Duration::from_secs(10), async {
loop {
if let Some(SwarmEvent::ConnectionEstablished { .. }) = swarm1.next().await {
loop {
tokio::select! {
biased;
_ = swarm1.next() => {},
_ = swarm3.next() => {},
conn_res = (&mut oneshot) => {
conn_res.unwrap().unwrap();
break;
}
}
})
.await
.unwrap();
}

swarm4.dial(addr1.clone()).unwrap();
let mut oneshot = swarm4.behaviour_mut().peerbook.connect(addr1.clone());

tokio::time::timeout(Duration::from_secs(10), async {
loop {
if let Some(SwarmEvent::IncomingConnectionError { .. }) = swarm1.next().await {
loop {
tokio::select! {
biased;
e = swarm1.select_next_some() => {
if matches!(e, SwarmEvent::IncomingConnectionError { .. }) {
break;
}
},
_ = swarm4.next() => {},
conn_res = (&mut oneshot) => {
assert!(conn_res.unwrap().is_err());
break;
}
}
})
.await
.unwrap();
}

let list = swarm1.connected_peers().copied().collect::<Vec<_>>();

Expand All @@ -514,13 +591,78 @@ mod test {
assert!(!list.contains(&peer4));
}

async fn build_swarm() -> (PeerId, Multiaddr, libp2p::swarm::Swarm<Behaviour>) {
#[tokio::test]
async fn connect_without_identify() {
let (_, addr1, mut swarm1) = build_swarm(false).await;
let (peer2, _, mut swarm2) = build_swarm(false).await;

let mut oneshot = swarm2.behaviour_mut().peerbook.connect(addr1.clone());

loop {
tokio::select! {
biased;
_ = swarm1.next() => {},
_ = swarm2.next() => {},
conn_res = (&mut oneshot) => {
conn_res.unwrap().unwrap();
break;
}
}
}

let list = swarm1.connected_peers().copied().collect::<Vec<_>>();

assert!(list.contains(&peer2));
}

#[tokio::test]
async fn connect_with_identify() {
let (_, addr1, mut swarm1) = build_swarm(true).await;
let (peer2, _, mut swarm2) = build_swarm(true).await;

let mut oneshot = swarm2.behaviour_mut().peerbook.connect(addr1.clone());
let mut peer_1_identify = false;
let mut peer_2_identify = false;
loop {
tokio::select! {
biased;
Some(e) = swarm1.next() => {
if let SwarmEvent::Behaviour(BehaviourEvent::Identify(identify::Event::Received { .. })) = e {
peer_2_identify = true;
}
},
Some(e) = swarm2.next() => {
if let SwarmEvent::Behaviour(BehaviourEvent::Identify(identify::Event::Received { .. })) = e {
peer_1_identify = true;
}
},
conn_res = (&mut oneshot) => {
conn_res.unwrap().unwrap();
}
}

if peer_1_identify && peer_2_identify {
break;
}
}

let list = swarm1.connected_peers().copied().collect::<Vec<_>>();

assert!(list.contains(&peer2));
}

async fn build_swarm(identify: bool) -> (PeerId, Multiaddr, libp2p::swarm::Swarm<Behaviour>) {
let key = Keypair::generate_ed25519();
let peer_id = key.public().to_peer_id();
let pubkey = key.public();
let peer_id = pubkey.to_peer_id();
let transport = build_transport(key, None, Default::default()).unwrap();

let behaviour = Behaviour {
peerbook: PeerBook::default(),
identify: Toggle::from(identify.then_some(identify::Behaviour::new(Config::new(
"/peerbook/0.1".into(),
pubkey,
)))),
keep_alive: keep_alive::Behaviour,
};

Expand Down

0 comments on commit 3ff6085

Please sign in to comment.