Skip to content

Commit

Permalink
feat: Reimplement ConnectionEvents and PeerConnectionEvents (#320)
Browse files Browse the repository at this point in the history
  • Loading branch information
dariusc93 authored Oct 28, 2024
1 parent 34c1f7c commit bfc09a2
Show file tree
Hide file tree
Showing 4 changed files with 226 additions and 23 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
# 0.12.2
- feat: Reimplement ConnectionEvents and PeerConnectionEvents stream via `Ipfs::{connection_events, peer_connection_events}`. [PR 320](https://github.com/dariusc93/rust-ipfs/pull/320)

# 0.12.1
- fix: enable "wasm-bindgen" feature for `instant` when building wasm32 target.

Expand Down
12 changes: 9 additions & 3 deletions examples/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,8 @@ async fn main() -> anyhow::Result<()> {

let ipfs: Ipfs = uninitialized.start().await?;

ipfs.default_bootstrap().await?;

if opt.bootstrap {
ipfs.default_bootstrap().await?;
if let Err(_e) = ipfs.bootstrap().await {}
}

Expand All @@ -94,6 +93,8 @@ async fn main() -> anyhow::Result<()> {
}
}

let mut st = ipfs.connection_events().await?;

for addr in opt.connect {
let Some(peer_id) = addr.peer_id() else {
writeln!(stdout, ">{addr} does not contain a p2p protocol. skipping")?;
Expand All @@ -110,7 +111,7 @@ async fn main() -> anyhow::Result<()> {

let mut event_stream = ipfs.pubsub_events(&topic).await?;

let stream = ipfs.pubsub_subscribe(topic.to_string()).await?;
let stream = ipfs.pubsub_subscribe(&topic).await?;

pin_mut!(stream);

Expand All @@ -125,6 +126,11 @@ async fn main() -> anyhow::Result<()> {
writeln!(stdout, "{}: {}", msg.source.expect("Message should contain a source peer_id"), String::from_utf8_lossy(&msg.data))?;
}
}
conn_ev = st.next() => {
if let Some(ev) = conn_ev {
writeln!(stdout, "connection event: {ev:?}")?;
}
}
Some(event) = event_stream.next() => {
match event {
PubsubEvent::Subscribe { peer_id } => writeln!(stdout, "{} subscribed", peer_id)?,
Expand Down
97 changes: 93 additions & 4 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ pub use libp2p::{
};

use libp2p::swarm::dial_opts::PeerCondition;
use libp2p::swarm::ConnectionId;
use libp2p::{
core::{muxing::StreamMuxerBox, transport::Boxed},
kad::{store::MemoryStoreConfig, Mode, Record},
Expand All @@ -122,7 +123,6 @@ use libp2p::{
swarm::dial_opts::DialOpts,
StreamProtocol,
};

pub use libp2p_connection_limits::ConnectionLimits;
use serde::Serialize;

Expand Down Expand Up @@ -235,6 +235,9 @@ struct IpfsOptions {

pub connection_limits: Option<ConnectionLimits>,

/// Channel capacity for emitting connection events over.
pub connection_event_cap: usize,

pub(crate) protocols: Libp2pProtocol,
}

Expand Down Expand Up @@ -293,6 +296,7 @@ impl Default for IpfsOptions {
transport_configuration: TransportConfig::default(),
pubsub_config: PubsubConfig::default(),
swarm_configuration: SwarmConfig::default(),
connection_event_cap: 256,
span: None,
protocols: Default::default(),
connection_limits: None,
Expand Down Expand Up @@ -377,6 +381,11 @@ enum IpfsEvent {
RemoveListeningAddress(Multiaddr, Channel<()>),
AddExternalAddress(Multiaddr, Channel<()>),
RemoveExternalAddress(Multiaddr, Channel<()>),
ConnectionEvents(Channel<futures::channel::mpsc::Receiver<ConnectionEvents>>),
PeerConnectionEvents(
PeerId,
Channel<futures::channel::mpsc::Receiver<PeerConnectionEvents>>,
),
Bootstrap(Channel<ReceiverChannel<KadResult>>),
AddPeer(AddPeerOpt, Channel<()>),
RemovePeer(PeerId, Option<Multiaddr>, Channel<bool>),
Expand Down Expand Up @@ -485,6 +494,39 @@ pub enum FDLimit {
Custom(u64),
}

#[derive(Debug, Clone)]
pub enum PeerConnectionEvents {
IncomingConnection {
connection_id: ConnectionId,
addr: Multiaddr,
},
OutgoingConnection {
connection_id: ConnectionId,
addr: Multiaddr,
},
ClosedConnection {
connection_id: ConnectionId,
},
}

#[derive(Debug, Clone)]
pub enum ConnectionEvents {
IncomingConnection {
peer_id: PeerId,
connection_id: ConnectionId,
addr: Multiaddr,
},
OutgoingConnection {
peer_id: PeerId,
connection_id: ConnectionId,
addr: Multiaddr,
},
ClosedConnection {
peer_id: PeerId,
connection_id: ConnectionId,
},
}

/// Configured Ipfs which can only be started.
#[allow(clippy::type_complexity)]
pub struct UninitializedIpfs<C: NetworkBehaviour<ToSwarm = void::Void> + Send> {
Expand Down Expand Up @@ -557,6 +599,12 @@ impl<C: NetworkBehaviour<ToSwarm = void::Void> + Send> UninitializedIpfs<C> {
self
}

/// Set connection event capacity
pub fn set_connection_event_capacity(mut self, cap: usize) -> Self {
self.options.connection_event_cap = cap;
self
}

/// Adds a listening addresses
pub fn add_listening_addrs(mut self, addrs: Vec<Multiaddr>) -> Self {
self.options.listening_addrs.extend(addrs);
Expand Down Expand Up @@ -1031,7 +1079,13 @@ impl<C: NetworkBehaviour<ToSwarm = void::Void> + Send> UninitializedIpfs<C> {
});
}

let mut fut = task::IpfsTask::new(swarm, repo_events.fuse(), receiver.fuse(), &ipfs.repo);
let mut fut = task::IpfsTask::new(
swarm,
repo_events.fuse(),
receiver.fuse(),
&ipfs.repo,
options.connection_event_cap,
);
fut.swarm_event = swarm_event;
fut.local_external_addr = local_external_addr;

Expand Down Expand Up @@ -1503,8 +1557,8 @@ impl Ipfs {

Ok(stream.boxed())
}
.instrument(self.span.clone())
.await
.instrument(self.span.clone())
.await
}

/// Publishes to the topic which may have been subscribed to earlier
Expand Down Expand Up @@ -1765,6 +1819,41 @@ impl Ipfs {
.await
}

pub async fn connection_events(&self) -> Result<BoxStream<'static, ConnectionEvents>, Error> {
async move {
let (tx, rx) = oneshot_channel();

self.to_task
.clone()
.send(IpfsEvent::ConnectionEvents(tx))
.await?;

let rx = rx.await??;
Ok(rx.boxed())
}
.instrument(self.span.clone())
.await
}

pub async fn peer_connection_events(
&self,
peer_id: PeerId,
) -> Result<BoxStream<'static, PeerConnectionEvents>, Error> {
async move {
let (tx, rx) = oneshot_channel();

self.to_task
.clone()
.send(IpfsEvent::PeerConnectionEvents(peer_id, tx))
.await?;

let rx = rx.await??;
Ok(rx.boxed())
}
.instrument(self.span.clone())
.await
}

/// Obtain the addresses associated with the given `PeerId`; they are first searched for locally
/// and the DHT is used as a fallback: a `Kademlia::get_closest_peers(peer_id)` query is run and
/// when it's finished, the newly added DHT records are checked for the existence of the desired
Expand Down
Loading

0 comments on commit bfc09a2

Please sign in to comment.