From 17aa20af14f9bc871e5e360e1190a39d2d4757d3 Mon Sep 17 00:00:00 2001 From: Darius Date: Fri, 25 Oct 2024 15:02:08 -0500 Subject: [PATCH 1/5] feat: Reimplement ConnectionEvents and PeerConnectionEvents stream via `Ipfs::{connection_events, peer_connection_events}` --- CHANGELOG.md | 4 ++ examples/pubsub.rs | 12 +++-- src/lib.rs | 97 ++++++++++++++++++++++++++++++++++-- src/task.rs | 121 ++++++++++++++++++++++++++++++++++++++++++--- 4 files changed, 221 insertions(+), 13 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ff5180951..e5965003a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,7 @@ +# 0.12.2 +- feat: Reimplement ConnectionEvents and PeerConnectionEvents stream via `Ipfs::{connection_events, peer_connection_events}`. [PR XXX](https://github.com/dariusc93/rust-ipfs/pull/XXX) + + # 0.12.1 - fix: enable "wasm-bindgen" feature for `instant` when building wasm32 target. diff --git a/examples/pubsub.rs b/examples/pubsub.rs index fdbd71212..f6b8428b7 100644 --- a/examples/pubsub.rs +++ b/examples/pubsub.rs @@ -67,9 +67,8 @@ async fn main() -> anyhow::Result<()> { let ipfs: Ipfs = uninitialized.start().await?; - ipfs.default_bootstrap().await?; - if opt.bootstrap { + ipfs.default_bootstrap().await?; if let Err(_e) = ipfs.bootstrap().await {} } @@ -94,6 +93,8 @@ async fn main() -> anyhow::Result<()> { } } + let mut st = ipfs.connection_events().await?; + for addr in opt.connect { let Some(peer_id) = addr.peer_id() else { writeln!(stdout, ">{addr} does not contain a p2p protocol. skipping")?; @@ -110,7 +111,7 @@ async fn main() -> anyhow::Result<()> { let mut event_stream = ipfs.pubsub_events(&topic).await?; - let stream = ipfs.pubsub_subscribe(topic.to_string()).await?; + let stream = ipfs.pubsub_subscribe(&topic).await?; pin_mut!(stream); @@ -125,6 +126,11 @@ async fn main() -> anyhow::Result<()> { writeln!(stdout, "{}: {}", msg.source.expect("Message should contain a source peer_id"), String::from_utf8_lossy(&msg.data))?; } } + conn_ev = st.next() => { + if let Some(ev) = conn_ev { + writeln!(stdout, "connection event: {ev:?}")?; + } + } Some(event) = event_stream.next() => { match event { PubsubEvent::Subscribe { peer_id } => writeln!(stdout, "{} subscribed", peer_id)?, diff --git a/src/lib.rs b/src/lib.rs index c66ba65d6..b64d7eb36 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -114,6 +114,7 @@ pub use libp2p::{ }; use libp2p::swarm::dial_opts::PeerCondition; +use libp2p::swarm::ConnectionId; use libp2p::{ core::{muxing::StreamMuxerBox, transport::Boxed}, kad::{store::MemoryStoreConfig, Mode, Record}, @@ -122,7 +123,6 @@ use libp2p::{ swarm::dial_opts::DialOpts, StreamProtocol, }; - pub use libp2p_connection_limits::ConnectionLimits; use serde::Serialize; @@ -235,6 +235,9 @@ struct IpfsOptions { pub connection_limits: Option, + /// Channel capacity for emitting connection events over. + pub connection_event_cap: usize, + pub(crate) protocols: Libp2pProtocol, } @@ -293,6 +296,7 @@ impl Default for IpfsOptions { transport_configuration: TransportConfig::default(), pubsub_config: PubsubConfig::default(), swarm_configuration: SwarmConfig::default(), + connection_event_cap: 256, span: None, protocols: Default::default(), connection_limits: None, @@ -377,6 +381,11 @@ enum IpfsEvent { RemoveListeningAddress(Multiaddr, Channel<()>), AddExternalAddress(Multiaddr, Channel<()>), RemoveExternalAddress(Multiaddr, Channel<()>), + ConnectionEvents(Channel>), + PeerConnectionEvents( + PeerId, + Channel>, + ), Bootstrap(Channel>), AddPeer(AddPeerOpt, Channel<()>), RemovePeer(PeerId, Option, Channel), @@ -485,6 +494,39 @@ pub enum FDLimit { Custom(u64), } +#[derive(Debug, Clone)] +pub enum PeerConnectionEvents { + IncomingConnection { + connection_id: ConnectionId, + addr: Multiaddr, + }, + OutgoingConnection { + connection_id: ConnectionId, + addr: Multiaddr, + }, + ClosedConnection { + connection_id: ConnectionId, + }, +} + +#[derive(Debug, Clone)] +pub enum ConnectionEvents { + IncomingConnection { + peer_id: PeerId, + connection_id: ConnectionId, + addr: Multiaddr, + }, + OutgoingConnection { + peer_id: PeerId, + connection_id: ConnectionId, + addr: Multiaddr, + }, + ClosedConnection { + peer_id: PeerId, + connection_id: ConnectionId, + }, +} + /// Configured Ipfs which can only be started. #[allow(clippy::type_complexity)] pub struct UninitializedIpfs + Send> { @@ -557,6 +599,12 @@ impl + Send> UninitializedIpfs { self } + /// Set connection event capacity + pub fn set_connection_event_capacity(mut self, cap: usize) -> Self { + self.options.connection_event_cap = cap; + self + } + /// Adds a listening addresses pub fn add_listening_addrs(mut self, addrs: Vec) -> Self { self.options.listening_addrs.extend(addrs); @@ -1031,7 +1079,13 @@ impl + Send> UninitializedIpfs { }); } - let mut fut = task::IpfsTask::new(swarm, repo_events.fuse(), receiver.fuse(), &ipfs.repo); + let mut fut = task::IpfsTask::new( + swarm, + repo_events.fuse(), + receiver.fuse(), + &ipfs.repo, + options.connection_event_cap, + ); fut.swarm_event = swarm_event; fut.local_external_addr = local_external_addr; @@ -1503,8 +1557,8 @@ impl Ipfs { Ok(stream.boxed()) } - .instrument(self.span.clone()) - .await + .instrument(self.span.clone()) + .await } /// Publishes to the topic which may have been subscribed to earlier @@ -1765,6 +1819,41 @@ impl Ipfs { .await } + pub async fn connection_events(&self) -> Result, Error> { + async move { + let (tx, rx) = oneshot_channel(); + + self.to_task + .clone() + .send(IpfsEvent::ConnectionEvents(tx)) + .await?; + + let rx = rx.await??; + Ok(rx.boxed()) + } + .instrument(self.span.clone()) + .await + } + + pub async fn peer_connection_events( + &self, + peer_id: PeerId, + ) -> Result, Error> { + async move { + let (tx, rx) = oneshot_channel(); + + self.to_task + .clone() + .send(IpfsEvent::PeerConnectionEvents(peer_id, tx)) + .await?; + + let rx = rx.await??; + Ok(rx.boxed()) + } + .instrument(self.span.clone()) + .await + } + /// Obtain the addresses associated with the given `PeerId`; they are first searched for locally /// and the DHT is used as a fallback: a `Kademlia::get_closest_peers(peer_id)` query is run and /// when it's finished, the newly added DHT records are checked for the existence of the desired diff --git a/src/task.rs b/src/task.rs index 5c3c54c9f..5a0aeabd6 100644 --- a/src/task.rs +++ b/src/task.rs @@ -9,8 +9,8 @@ use futures::{ FutureExt, StreamExt, }; -use crate::TSwarmEvent; use crate::{p2p::MultiaddrExt, Channel, InnerPubsubEvent}; +use crate::{ConnectionEvents, PeerConnectionEvents, TSwarmEvent}; use std::{ collections::{hash_map::Entry, HashMap, HashSet}, @@ -35,6 +35,9 @@ pub use crate::{p2p::BehaviourEvent, p2p::KadResult}; pub use libp2p::{self, core::transport::ListenerId, swarm::NetworkBehaviour, Multiaddr, PeerId}; use multibase::Base; +use libp2p::core::ConnectedPoint; +#[cfg(not(target_arch = "wasm32"))] +use libp2p::mdns::Event as MdnsEvent; use libp2p::{ autonat, identify::{Event as IdentifyEvent, Info as IdentifyInfo}, @@ -46,9 +49,6 @@ use libp2p::{ rendezvous::{Cookie, Namespace}, swarm::{ConnectionId, SwarmEvent}, }; - -#[cfg(not(target_arch = "wasm32"))] -use libp2p::mdns::Event as MdnsEvent; use tokio::sync::Notify; /// Background task of `Ipfs` created when calling `UninitializedIpfs::start`. @@ -77,10 +77,16 @@ pub(crate) struct IpfsTask> { HashMap<(PeerId, Namespace), Vec>>>>, pub(crate) rzv_cookie: HashMap>, + pub(crate) peer_connection_events: + HashMap>>, + pub(crate) connection_events: Vec>, + pub(crate) pending_connection: HashMap>, pub(crate) pending_disconnection: HashMap>>, pub(crate) pending_add_listener: HashMap>, pub(crate) pending_remove_listener: HashMap>, + + pub(crate) event_capacity: usize, } impl> IpfsTask { @@ -89,11 +95,13 @@ impl> IpfsTask { repo_events: Fuse>, from_facade: Fuse>, repo: &Repo, + event_capacity: usize, ) -> Self { IpfsTask { repo_events, from_facade, swarm, + event_capacity, provider_stream: HashMap::new(), record_stream: HashMap::new(), dht_peer_lookup: Default::default(), @@ -110,6 +118,8 @@ impl> IpfsTask { rzv_discover_pending: Default::default(), rzv_cookie: Default::default(), listening_addresses: HashMap::new(), + peer_connection_events: HashMap::new(), + connection_events: Vec::new(), pending_disconnection: Default::default(), pending_connection: Default::default(), pending_add_listener: Default::default(), @@ -158,6 +168,11 @@ impl> futures::Future for IpfsTask if self.timer.event_cleanup.poll_unpin(cx).is_ready() { self.pubsub_event_stream.retain(|ch| !ch.is_closed()); + self.connection_events.retain(|ch| !ch.is_closed()); + self.peer_connection_events.retain(|_, ch_list| { + ch_list.retain(|ch| !ch.is_closed()); + !ch_list.is_empty() + }); self.timer.event_cleanup.reset(Duration::from_secs(60)); } @@ -186,6 +201,11 @@ impl> IpfsTask { }, _ = &mut event_cleanup => { self.pubsub_event_stream.retain(|ch| !ch.is_closed()); + self.connection_events.retain(|ch| !ch.is_closed()); + self.peer_connection_events.retain(|_, ch_list| { + ch_list.retain(|ch| !ch.is_closed()); + !ch_list.is_empty() + }); event_cleanup.reset(Duration::from_secs(60)); } } @@ -229,10 +249,69 @@ impl> IpfsTask { let _ = ret.send(Ok(address)); } } - SwarmEvent::ConnectionEstablished { connection_id, .. } => { + SwarmEvent::ConnectionEstablished { + peer_id, + connection_id, + endpoint, + .. + } => { if let Some(ch) = self.pending_connection.remove(&connection_id) { _ = ch.send(Ok(())); } + + if let Some(ch_list) = self.peer_connection_events.get_mut(&peer_id) { + let ev = match &endpoint { + ConnectedPoint::Dialer { address, .. } => { + PeerConnectionEvents::OutgoingConnection { + connection_id, + addr: address.clone(), + } + } + ConnectedPoint::Listener { local_addr, .. } if endpoint.is_relayed() => { + PeerConnectionEvents::IncomingConnection { + connection_id, + addr: local_addr.clone(), + } + } + ConnectedPoint::Listener { send_back_addr, .. } => { + PeerConnectionEvents::IncomingConnection { + connection_id, + addr: send_back_addr.clone(), + } + } + }; + for ch in ch_list { + let _ = ch.try_send(ev.clone()); + } + } + + for ch in &mut self.connection_events { + let ev = match &endpoint { + ConnectedPoint::Dialer { address, .. } => { + ConnectionEvents::OutgoingConnection { + peer_id, + connection_id, + addr: address.clone(), + } + } + ConnectedPoint::Listener { local_addr, .. } if endpoint.is_relayed() => { + ConnectionEvents::IncomingConnection { + peer_id, + connection_id, + addr: local_addr.clone(), + } + } + ConnectedPoint::Listener { send_back_addr, .. } => { + ConnectionEvents::IncomingConnection { + peer_id, + connection_id, + addr: send_back_addr.clone(), + } + } + }; + + let _ = ch.try_send(ev); + } } SwarmEvent::OutgoingConnectionError { connection_id, @@ -243,12 +322,29 @@ impl> IpfsTask { _ = ch.send(Err(anyhow::Error::from(error))); } } - SwarmEvent::ConnectionClosed { peer_id, .. } => { + SwarmEvent::ConnectionClosed { + peer_id, + connection_id, + .. + } => { if let Some(ch) = self.pending_disconnection.remove(&peer_id) { for ch in ch { let _ = ch.send(Ok(())); } } + + if let Some(ch_list) = self.peer_connection_events.get_mut(&peer_id) { + for ch in ch_list { + _ = ch.try_send(PeerConnectionEvents::ClosedConnection { connection_id }); + } + } + + for ch in &mut self.connection_events { + _ = ch.try_send(ConnectionEvents::ClosedConnection { + peer_id, + connection_id, + }); + } } SwarmEvent::ExpiredListenAddr { listener_id, @@ -1008,6 +1104,19 @@ impl> IpfsTask { self.swarm.remove_external_address(&addr); _ = ret.send(Ok(())) } + IpfsEvent::ConnectionEvents(ret) => { + let (tx, rx) = futures::channel::mpsc::channel(self.event_capacity); + self.connection_events.push(tx); + _ = ret.send(Ok(rx)); + } + IpfsEvent::PeerConnectionEvents(peer_id, ret) => { + let (tx, rx) = futures::channel::mpsc::channel(self.event_capacity); + self.peer_connection_events + .entry(peer_id) + .or_default() + .push(tx); + _ = ret.send(Ok(rx)) + } IpfsEvent::Bootstrap(ret) => { let Some(kad) = self.swarm.behaviour_mut().kademlia.as_mut() else { let _ = ret.send(Err(anyhow!("kad protocol is disabled"))); From a2daed601c59dac575517a8504f2aa7b657fa893 Mon Sep 17 00:00:00 2001 From: Darius Date: Fri, 25 Oct 2024 15:45:34 -0500 Subject: [PATCH 2/5] chore: Remove peer id from address if exist --- src/task.rs | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/src/task.rs b/src/task.rs index 5a0aeabd6..80dbe3be5 100644 --- a/src/task.rs +++ b/src/task.rs @@ -38,6 +38,7 @@ use multibase::Base; use libp2p::core::ConnectedPoint; #[cfg(not(target_arch = "wasm32"))] use libp2p::mdns::Event as MdnsEvent; +use libp2p::multiaddr::Protocol; use libp2p::{ autonat, identify::{Event as IdentifyEvent, Info as IdentifyInfo}, @@ -262,6 +263,10 @@ impl> IpfsTask { if let Some(ch_list) = self.peer_connection_events.get_mut(&peer_id) { let ev = match &endpoint { ConnectedPoint::Dialer { address, .. } => { + let mut address = address.clone(); + if matches!(address.iter().last(), Some(Protocol::P2p(_))) { + address.pop(); + } PeerConnectionEvents::OutgoingConnection { connection_id, addr: address.clone(), @@ -288,6 +293,10 @@ impl> IpfsTask { for ch in &mut self.connection_events { let ev = match &endpoint { ConnectedPoint::Dialer { address, .. } => { + let mut address = address.clone(); + if matches!(address.iter().last(), Some(Protocol::P2p(_))) { + address.pop(); + } ConnectionEvents::OutgoingConnection { peer_id, connection_id, From 1fa3c46bd83783519e9c6c550354795eaf7012b9 Mon Sep 17 00:00:00 2001 From: Darius Clark Date: Fri, 25 Oct 2024 15:53:55 -0500 Subject: [PATCH 3/5] Update CHANGELOG.md --- CHANGELOG.md | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e5965003a..bded6120c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,5 @@ # 0.12.2 -- feat: Reimplement ConnectionEvents and PeerConnectionEvents stream via `Ipfs::{connection_events, peer_connection_events}`. [PR XXX](https://github.com/dariusc93/rust-ipfs/pull/XXX) - +- feat: Reimplement ConnectionEvents and PeerConnectionEvents stream via `Ipfs::{connection_events, peer_connection_events}`. [PR 320](https://github.com/dariusc93/rust-ipfs/pull/320) # 0.12.1 - fix: enable "wasm-bindgen" feature for `instant` when building wasm32 target. From 82980e663855b941ede4cf3014343e3b749887a8 Mon Sep 17 00:00:00 2001 From: Darius Date: Fri, 25 Oct 2024 16:58:14 -0500 Subject: [PATCH 4/5] chore: remove peerid from address and misc cleanup --- src/task.rs | 86 ++++++++++++++++++++++------------------------------- 1 file changed, 36 insertions(+), 50 deletions(-) diff --git a/src/task.rs b/src/task.rs index 80dbe3be5..63bf6da8d 100644 --- a/src/task.rs +++ b/src/task.rs @@ -35,7 +35,7 @@ pub use crate::{p2p::BehaviourEvent, p2p::KadResult}; pub use libp2p::{self, core::transport::ListenerId, swarm::NetworkBehaviour, Multiaddr, PeerId}; use multibase::Base; -use libp2p::core::ConnectedPoint; +use libp2p::core::{ConnectedPoint, Endpoint}; #[cfg(not(target_arch = "wasm32"))] use libp2p::mdns::Event as MdnsEvent; use libp2p::multiaddr::Protocol; @@ -260,63 +260,49 @@ impl> IpfsTask { _ = ch.send(Ok(())); } + let (ep, mut addr) = match &endpoint { + ConnectedPoint::Dialer { address, .. } => (Endpoint::Dialer, address.clone()), + ConnectedPoint::Listener { local_addr, .. } if endpoint.is_relayed() => { + (Endpoint::Listener, local_addr.clone()) + } + ConnectedPoint::Listener { send_back_addr, .. } => { + (Endpoint::Listener, send_back_addr.clone()) + } + }; + + if matches!(addr.iter().last(), Some(Protocol::P2p(_))) { + addr.pop(); + } + if let Some(ch_list) = self.peer_connection_events.get_mut(&peer_id) { - let ev = match &endpoint { - ConnectedPoint::Dialer { address, .. } => { - let mut address = address.clone(); - if matches!(address.iter().last(), Some(Protocol::P2p(_))) { - address.pop(); - } - PeerConnectionEvents::OutgoingConnection { - connection_id, - addr: address.clone(), - } - } - ConnectedPoint::Listener { local_addr, .. } if endpoint.is_relayed() => { - PeerConnectionEvents::IncomingConnection { - connection_id, - addr: local_addr.clone(), - } - } - ConnectedPoint::Listener { send_back_addr, .. } => { - PeerConnectionEvents::IncomingConnection { - connection_id, - addr: send_back_addr.clone(), - } - } + let ev = match ep { + Endpoint::Dialer => PeerConnectionEvents::OutgoingConnection { + connection_id, + addr: addr.clone(), + }, + Endpoint::Listener => PeerConnectionEvents::IncomingConnection { + connection_id, + addr: addr.clone(), + }, }; + for ch in ch_list { let _ = ch.try_send(ev.clone()); } } for ch in &mut self.connection_events { - let ev = match &endpoint { - ConnectedPoint::Dialer { address, .. } => { - let mut address = address.clone(); - if matches!(address.iter().last(), Some(Protocol::P2p(_))) { - address.pop(); - } - ConnectionEvents::OutgoingConnection { - peer_id, - connection_id, - addr: address.clone(), - } - } - ConnectedPoint::Listener { local_addr, .. } if endpoint.is_relayed() => { - ConnectionEvents::IncomingConnection { - peer_id, - connection_id, - addr: local_addr.clone(), - } - } - ConnectedPoint::Listener { send_back_addr, .. } => { - ConnectionEvents::IncomingConnection { - peer_id, - connection_id, - addr: send_back_addr.clone(), - } - } + let ev = match ep { + Endpoint::Dialer => ConnectionEvents::OutgoingConnection { + peer_id, + connection_id, + addr: addr.clone(), + }, + Endpoint::Listener => ConnectionEvents::IncomingConnection { + peer_id, + connection_id, + addr: addr.clone(), + }, }; let _ = ch.try_send(ev); From 27a7f97497435f65dd00fbd672e58c1823152684 Mon Sep 17 00:00:00 2001 From: Darius Clark Date: Mon, 28 Oct 2024 15:51:08 -0400 Subject: [PATCH 5/5] chore: misc cleanup --- src/task.rs | 29 +++++++++++++++-------------- 1 file changed, 15 insertions(+), 14 deletions(-) diff --git a/src/task.rs b/src/task.rs index 63bf6da8d..ccd31e6ce 100644 --- a/src/task.rs +++ b/src/task.rs @@ -257,7 +257,7 @@ impl> IpfsTask { .. } => { if let Some(ch) = self.pending_connection.remove(&connection_id) { - _ = ch.send(Ok(())); + let _ = ch.send(Ok(())); } let (ep, mut addr) = match &endpoint { @@ -314,7 +314,7 @@ impl> IpfsTask { .. } => { if let Some(ch) = self.pending_connection.remove(&connection_id) { - _ = ch.send(Err(anyhow::Error::from(error))); + let _ = ch.send(Err(anyhow::Error::from(error))); } } SwarmEvent::ConnectionClosed { @@ -330,12 +330,13 @@ impl> IpfsTask { if let Some(ch_list) = self.peer_connection_events.get_mut(&peer_id) { for ch in ch_list { - _ = ch.try_send(PeerConnectionEvents::ClosedConnection { connection_id }); + let _ = + ch.try_send(PeerConnectionEvents::ClosedConnection { connection_id }); } } for ch in &mut self.connection_events { - _ = ch.try_send(ConnectionEvents::ClosedConnection { + let _ = ch.try_send(ConnectionEvents::ClosedConnection { peer_id, connection_id, }); @@ -649,7 +650,7 @@ impl> IpfsTask { trace!("kad: pending routable peer {} ({})", peer, address); } KademliaEvent::ModeChanged { new_mode } => { - _ = new_mode; + let _ = new_mode; } } } @@ -909,7 +910,7 @@ impl> IpfsTask { let connection_id = target.connection_id(); if let Err(e) = self.swarm.dial(target) { - _ = ret.send(Err(anyhow::Error::from(e))); + let _ = ret.send(Err(anyhow::Error::from(e))); return; } self.pending_connection.insert(connection_id, ret); @@ -925,7 +926,7 @@ impl> IpfsTask { return; }; - _ = ret.send(Ok(stream.new_control())) + let _ = ret.send(Ok(stream.new_control())); } #[cfg(feature = "experimental_stream")] IpfsEvent::NewStream(protocol, ret) => { @@ -934,12 +935,12 @@ impl> IpfsTask { return; }; - _ = ret.send( + let _ = ret.send( stream .new_control() .accept(protocol) .map_err(anyhow::Error::from), - ) + ); } IpfsEvent::Addresses(ret) => { let addrs = self.swarm.behaviour_mut().addrs(); @@ -968,7 +969,7 @@ impl> IpfsTask { } IpfsEvent::Disconnect(peer, ret) => { if self.swarm.disconnect_peer_id(peer).is_err() { - _ = ret.send(Err(anyhow::anyhow!("Peer is not connected"))); + let _ = ret.send(Err(anyhow::anyhow!("Peer is not connected"))); return; } @@ -1093,16 +1094,16 @@ impl> IpfsTask { } IpfsEvent::AddExternalAddress(addr, ret) => { self.swarm.add_external_address(addr); - _ = ret.send(Ok(())) + let _ = ret.send(Ok(())); } IpfsEvent::RemoveExternalAddress(addr, ret) => { self.swarm.remove_external_address(&addr); - _ = ret.send(Ok(())) + let _ = ret.send(Ok(())); } IpfsEvent::ConnectionEvents(ret) => { let (tx, rx) = futures::channel::mpsc::channel(self.event_capacity); self.connection_events.push(tx); - _ = ret.send(Ok(rx)); + let _ = ret.send(Ok(rx)); } IpfsEvent::PeerConnectionEvents(peer_id, ret) => { let (tx, rx) = futures::channel::mpsc::channel(self.event_capacity); @@ -1110,7 +1111,7 @@ impl> IpfsTask { .entry(peer_id) .or_default() .push(tx); - _ = ret.send(Ok(rx)) + let _ = ret.send(Ok(rx)); } IpfsEvent::Bootstrap(ret) => { let Some(kad) = self.swarm.behaviour_mut().kademlia.as_mut() else {