From 05fe8248adf4355003b710560ce6c6bc478c95f8 Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Tue, 26 Nov 2024 18:49:34 +0200 Subject: [PATCH] feat(request-response): Add support for custom dial options when making request to disconnected peer --- Cargo.lock | 2 +- Cargo.toml | 2 +- .../autonat/src/v1/behaviour/as_client.rs | 21 ++- protocols/request-response/CHANGELOG.md | 5 + protocols/request-response/Cargo.toml | 4 +- protocols/request-response/src/lib.rs | 125 +++++++++++------ .../request-response/tests/error_reporting.rs | 16 +-- protocols/request-response/tests/ping.rs | 127 +++++++++++++++++- swarm/CHANGELOG.md | 3 + swarm/src/dial_opts.rs | 6 + 10 files changed, 249 insertions(+), 62 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d405464f58f..9593b696467 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3174,7 +3174,7 @@ dependencies = [ [[package]] name = "libp2p-request-response" -version = "0.27.1" +version = "0.28.0" dependencies = [ "anyhow", "async-std", diff --git a/Cargo.toml b/Cargo.toml index b631b587dee..4271bb7f41b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -96,7 +96,7 @@ libp2p-pnet = { version = "0.25.0", path = "transports/pnet" } libp2p-quic = { version = "0.11.2", path = "transports/quic" } libp2p-relay = { version = "0.18.1", path = "protocols/relay" } libp2p-rendezvous = { version = "0.15.0", path = "protocols/rendezvous" } -libp2p-request-response = { version = "0.27.1", path = "protocols/request-response" } +libp2p-request-response = { version = "0.28.0", path = "protocols/request-response" } libp2p-server = { version = "0.12.8", path = "misc/server" } libp2p-stream = { version = "0.2.0-alpha.1", path = "protocols/stream" } libp2p-swarm = { version = "0.45.2", path = "swarm" } diff --git a/protocols/autonat/src/v1/behaviour/as_client.rs b/protocols/autonat/src/v1/behaviour/as_client.rs index 385dee50ee1..33a87c50629 100644 --- a/protocols/autonat/src/v1/behaviour/as_client.rs +++ b/protocols/autonat/src/v1/behaviour/as_client.rs @@ -154,11 +154,20 @@ impl HandleInnerEvent for AsClient<'_> { error, request_id, } => { - tracing::debug!( - %peer, - "Outbound Failure {} when on dial-back request to peer.", - error, - ); + if let Some(peer) = peer { + tracing::debug!( + %peer, + %request_id, + "Outbound Failure {} when on dial-back request to peer.", + error, + ); + } else { + tracing::debug!( + %request_id, + "Outbound Failure {} when on dial-back request to peer.", + error, + ); + } let probe_id = self .ongoing_outbound .remove(&request_id) @@ -169,7 +178,7 @@ impl HandleInnerEvent for AsClient<'_> { VecDeque::from([ToSwarm::GenerateEvent(Event::OutboundProbe( OutboundProbeEvent::Error { probe_id, - peer: Some(peer), + peer, error: OutboundProbeError::OutboundRequest(error), }, ))]) diff --git a/protocols/request-response/CHANGELOG.md b/protocols/request-response/CHANGELOG.md index 9ed658fc90f..c66830a07f8 100644 --- a/protocols/request-response/CHANGELOG.md +++ b/protocols/request-response/CHANGELOG.md @@ -1,3 +1,8 @@ +## 0.28.0 + +- `send_request` Add support for custom dial options when making request to disconnected peer. + See [PR 5692](https://github.com/libp2p/rust-libp2p/pull/5692). + ## 0.27.1 - Deprecate `void` crate. diff --git a/protocols/request-response/Cargo.toml b/protocols/request-response/Cargo.toml index b2e6fd0b0ac..07e9b7d1b72 100644 --- a/protocols/request-response/Cargo.toml +++ b/protocols/request-response/Cargo.toml @@ -3,7 +3,7 @@ name = "libp2p-request-response" edition = "2021" rust-version = { workspace = true } description = "Generic Request/Response Protocols" -version = "0.27.1" +version = "0.28.0" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" @@ -19,7 +19,7 @@ libp2p-core = { workspace = true } libp2p-swarm = { workspace = true } libp2p-identity = { workspace = true } rand = "0.8" -serde = { version = "1.0", optional = true} +serde = { version = "1.0", optional = true } serde_json = { version = "1.0.117", optional = true } smallvec = "1.13.2" tracing = { workspace = true } diff --git a/protocols/request-response/src/lib.rs b/protocols/request-response/src/lib.rs index e627f5668ff..9436450136c 100644 --- a/protocols/request-response/src/lib.rs +++ b/protocols/request-response/src/lib.rs @@ -136,7 +136,7 @@ pub enum Event { /// An outbound request failed. OutboundFailure { /// The peer to whom the request was sent. - peer: PeerId, + peer: Option, /// The (local) ID of the failed request. request_id: OutboundRequestId, /// The error that occurred. @@ -333,6 +333,24 @@ impl Config { } } +#[derive(Debug, Eq, PartialEq, Hash)] +enum PendingOutgoingRequest { + PeerId(PeerId), + ConnectionId(ConnectionId), +} + +impl From for PendingOutgoingRequest { + fn from(peer_id: PeerId) -> Self { + Self::PeerId(peer_id) + } +} + +impl From for PendingOutgoingRequest { + fn from(connection_id: ConnectionId) -> Self { + Self::ConnectionId(connection_id) + } +} + /// A request/response protocol for some message codec. pub struct Behaviour where @@ -360,7 +378,8 @@ where addresses: PeerAddresses, /// Requests that have not yet been sent and are waiting for a connection /// to be established. - pending_outbound_requests: HashMap; 10]>>, + pending_outbound_requests: + HashMap; 10]>>, } impl Behaviour @@ -417,12 +436,16 @@ where /// connection is established. /// /// > **Note**: In order for such a dialing attempt to succeed, - /// > the `RequestResonse` protocol must either be embedded - /// > in another `NetworkBehaviour` that provides peer and - /// > address discovery, or known addresses of peers must be - /// > managed via [`Behaviour::add_address`] and + /// > the `peer` must be [`DialOpts`] with multiaddresses or + /// > in case of simple [`PeerId`] `RequestResponse` protocol + /// > must either be embedded in another `NetworkBehaviour` + /// > that provides peer and address discovery, or known addresses of + /// > peers must be managed via [`Behaviour::add_address`] and /// > [`Behaviour::remove_address`]. - pub fn send_request(&mut self, peer: &PeerId, request: TCodec::Request) -> OutboundRequestId { + pub fn send_request(&mut self, peer: Peer, request: TCodec::Request) -> OutboundRequestId + where + DialOpts: From, + { let request_id = self.next_outbound_request_id(); let request = OutboundMessage { request_id, @@ -430,15 +453,28 @@ where protocols: self.outbound_protocols.clone(), }; - if let Some(request) = self.try_send_request(peer, request) { - self.pending_events.push_back(ToSwarm::Dial { - opts: DialOpts::peer_id(*peer).build(), - }); - self.pending_outbound_requests - .entry(*peer) - .or_default() - .push(request); - } + let opts = DialOpts::from(peer); + let maybe_peer_id = opts.get_peer_id(); + let request = if let Some(peer_id) = &maybe_peer_id { + if let Some(request) = self.try_send_request(peer_id, request) { + request + } else { + // Sent successfully + return request_id; + } + } else { + request + }; + + self.pending_outbound_requests + .entry(if let Some(peer_id) = maybe_peer_id { + peer_id.into() + } else { + opts.connection_id().into() + }) + .or_default() + .push(request); + self.pending_events.push_back(ToSwarm::Dial { opts }); request_id } @@ -506,7 +542,7 @@ where // Check if request is still pending to be sent. let pen_conn = self .pending_outbound_requests - .get(peer) + .get(&PendingOutgoingRequest::from(*peer)) .map(|rps| rps.iter().any(|rp| rp.request_id == *request_id)) .unwrap_or(false); @@ -665,30 +701,41 @@ where for request_id in connection.pending_outbound_responses { self.pending_events .push_back(ToSwarm::GenerateEvent(Event::OutboundFailure { - peer: peer_id, + peer: Some(peer_id), request_id, error: OutboundFailure::ConnectionClosed, })); } } - fn on_dial_failure(&mut self, DialFailure { peer_id, .. }: DialFailure) { - if let Some(peer) = peer_id { - // If there are pending outgoing requests when a dial failure occurs, - // it is implied that we are not connected to the peer, since pending - // outgoing requests are drained when a connection is established and - // only created when a peer is not connected when a request is made. - // Thus these requests must be considered failed, even if there is - // another, concurrent dialing attempt ongoing. - if let Some(pending) = self.pending_outbound_requests.remove(&peer) { - for request in pending { - self.pending_events - .push_back(ToSwarm::GenerateEvent(Event::OutboundFailure { - peer, - request_id: request.request_id, - error: OutboundFailure::DialFailure, - })); - } + fn on_dial_failure( + &mut self, + DialFailure { + peer_id, + connection_id, + .. + }: DialFailure, + ) { + let key = if let Some(peer_id) = peer_id { + peer_id.into() + } else { + connection_id.into() + }; + + // If there are pending outgoing requests when a dial failure occurs, + // it is implied that we are not connected to the peer, since pending + // outgoing requests are drained when a connection is established and + // only created when a peer is not connected when a request is made. + // Thus, these requests must be considered failed, even if there is + // another, concurrent dialing attempt ongoing. + if let Some(pending) = self.pending_outbound_requests.remove(&key) { + for request in pending { + self.pending_events + .push_back(ToSwarm::GenerateEvent(Event::OutboundFailure { + peer: peer_id, + request_id: request.request_id, + error: OutboundFailure::DialFailure, + })); } } } @@ -703,7 +750,7 @@ where ) { let mut connection = Connection::new(connection_id, remote_address); - if let Some(pending_requests) = self.pending_outbound_requests.remove(&peer) { + if let Some(pending_requests) = self.pending_outbound_requests.remove(&peer.into()) { for request in pending_requests { connection .pending_outbound_responses @@ -887,7 +934,7 @@ where self.pending_events .push_back(ToSwarm::GenerateEvent(Event::OutboundFailure { - peer, + peer: Some(peer), request_id, error: OutboundFailure::Timeout, })); @@ -901,7 +948,7 @@ where self.pending_events .push_back(ToSwarm::GenerateEvent(Event::OutboundFailure { - peer, + peer: Some(peer), request_id, error: OutboundFailure::UnsupportedProtocols, })); @@ -912,7 +959,7 @@ where self.pending_events .push_back(ToSwarm::GenerateEvent(Event::OutboundFailure { - peer, + peer: Some(peer), request_id, error: OutboundFailure::Io(error), })) diff --git a/protocols/request-response/tests/error_reporting.rs b/protocols/request-response/tests/error_reporting.rs index 19f323e169f..88e32e4db3a 100644 --- a/protocols/request-response/tests/error_reporting.rs +++ b/protocols/request-response/tests/error_reporting.rs @@ -51,7 +51,7 @@ async fn report_outbound_failure_on_read_response() { .send_request(&peer1_id, Action::FailOnReadResponse); let (peer, req_id_done, error) = wait_outbound_failure(&mut swarm2).await.unwrap(); - assert_eq!(peer, peer1_id); + assert_eq!(peer, Some(peer1_id)); assert_eq!(req_id_done, req_id); let error = match error { @@ -94,7 +94,7 @@ async fn report_outbound_failure_on_write_request() { .send_request(&peer1_id, Action::FailOnWriteRequest); let (peer, req_id_done, error) = wait_outbound_failure(&mut swarm2).await.unwrap(); - assert_eq!(peer, peer1_id); + assert_eq!(peer, Some(peer1_id)); assert_eq!(req_id_done, req_id); let error = match error { @@ -151,7 +151,7 @@ async fn report_outbound_timeout_on_read_response() { .send_request(&peer1_id, Action::TimeoutOnReadResponse); let (peer, req_id_done, error) = wait_outbound_failure(&mut swarm2).await.unwrap(); - assert_eq!(peer, peer1_id); + assert_eq!(peer, Some(peer1_id)); assert_eq!(req_id_done, req_id); assert!(matches!(error, OutboundFailure::Timeout)); }; @@ -203,7 +203,7 @@ async fn report_outbound_failure_on_max_streams() { .send_request(&peer1_id, Action::FailOnMaxStreams); let (peer, req_id_done, error) = wait_outbound_failure(&mut swarm2).await.unwrap(); - assert_eq!(peer, peer1_id); + assert_eq!(peer, Some(peer1_id)); assert_eq!(req_id_done, outbound_req_id); assert!(matches!(error, OutboundFailure::Io(_))); }; @@ -236,7 +236,7 @@ async fn report_inbound_failure_on_read_request() { .send_request(&peer1_id, Action::FailOnReadRequest); let (peer, req_id_done, error) = wait_outbound_failure(&mut swarm2).await.unwrap(); - assert_eq!(peer, peer1_id); + assert_eq!(peer, Some(peer1_id)); assert_eq!(req_id_done, req_id); match error { @@ -295,7 +295,7 @@ async fn report_inbound_failure_on_write_response() { .send_request(&peer1_id, Action::FailOnWriteResponse); let (peer, req_id_done, error) = wait_outbound_failure(&mut swarm2).await.unwrap(); - assert_eq!(peer, peer1_id); + assert_eq!(peer, Some(peer1_id)); assert_eq!(req_id_done, req_id); match error { @@ -352,7 +352,7 @@ async fn report_inbound_timeout_on_write_response() { .send_request(&peer1_id, Action::TimeoutOnWriteResponse); let (peer, req_id_done, error) = wait_outbound_failure(&mut swarm2).await.unwrap(); - assert_eq!(peer, peer1_id); + assert_eq!(peer, Some(peer1_id)); assert_eq!(req_id_done, req_id); match error { @@ -612,7 +612,7 @@ async fn wait_inbound_failure( async fn wait_outbound_failure( swarm: &mut Swarm>, -) -> Result<(PeerId, OutboundRequestId, OutboundFailure)> { +) -> Result<(Option, OutboundRequestId, OutboundFailure)> { loop { match swarm.select_next_some().await.try_into_behaviour_event() { Ok(request_response::Event::OutboundFailure { diff --git a/protocols/request-response/tests/ping.rs b/protocols/request-response/tests/ping.rs index 827afae249c..1873ca88fbe 100644 --- a/protocols/request-response/tests/ping.rs +++ b/protocols/request-response/tests/ping.rs @@ -24,6 +24,7 @@ use futures::prelude::*; use libp2p_identity::PeerId; use libp2p_request_response as request_response; use libp2p_request_response::ProtocolSupport; +use libp2p_swarm::dial_opts::DialOpts; use libp2p_swarm::{StreamProtocol, Swarm, SwarmEvent}; use libp2p_swarm_test::SwarmExt; use rand::Rng; @@ -42,10 +43,7 @@ async fn is_response_outbound() { let mut swarm1 = Swarm::new_ephemeral(|_| { request_response::cbor::Behaviour::::new( - [( - StreamProtocol::new("/ping/1"), - request_response::ProtocolSupport::Full, - )], + [(StreamProtocol::new("/ping/1"), ProtocolSupport::Full)], request_response::Config::default(), ) }); @@ -65,7 +63,7 @@ async fn is_response_outbound() { request_id: req_id, error: _error, } => { - assert_eq!(&offline_peer, &peer); + assert_eq!(&Some(offline_peer), &peer); assert_eq!(req_id, request_id1); } e => panic!("Peer: Unexpected event: {e:?}"), @@ -176,6 +174,125 @@ async fn ping_protocol() { peer2.await; } +/// Exercises a simple ping protocol where peers are not connected prior to request sending. +#[async_std::test] +#[cfg(feature = "cbor")] +async fn ping_protocol_explicit_address() { + let ping = Ping("ping".to_string().into_bytes()); + let pong = Pong("pong".to_string().into_bytes()); + + let protocols = iter::once((StreamProtocol::new("/ping/1"), ProtocolSupport::Full)); + let cfg = request_response::Config::default(); + + let mut swarm1 = Swarm::new_ephemeral(|_| { + request_response::cbor::Behaviour::::new(protocols.clone(), cfg.clone()) + }); + let peer1_id = *swarm1.local_peer_id(); + let mut swarm2 = Swarm::new_ephemeral(|_| { + request_response::cbor::Behaviour::::new(protocols, cfg) + }); + let peer2_id = *swarm2.local_peer_id(); + + let (peer1_listen_addr, _) = swarm1.listen().with_memory_addr_external().await; + + let expected_ping = ping.clone(); + let expected_pong = pong.clone(); + + let peer1 = async move { + loop { + match swarm1.next_swarm_event().await.try_into_behaviour_event() { + Ok(request_response::Event::Message { + peer, + message: + request_response::Message::Request { + request, channel, .. + }, + }) => { + assert_eq!(&request, &expected_ping); + assert_eq!(&peer, &peer2_id); + swarm1 + .behaviour_mut() + .send_response(channel, pong.clone()) + .unwrap(); + } + Ok(request_response::Event::ResponseSent { peer, .. }) => { + assert_eq!(&peer, &peer2_id); + } + Ok(e) => { + panic!("Peer1: Unexpected event: {e:?}") + } + Err(..) => {} + } + } + }; + + let peer2 = async { + let req_id = swarm2.behaviour_mut().send_request(&peer1_id, ping.clone()); + assert!(swarm2.behaviour().is_pending_outbound(&peer1_id, &req_id)); + + // Can't dial to unknown peer + match swarm2 + .next_swarm_event() + .await + .try_into_behaviour_event() + .unwrap() + { + request_response::Event::OutboundFailure { + peer, request_id, .. + } => { + assert_eq!(&peer, &Some(peer1_id)); + assert_eq!(req_id, request_id); + } + e => panic!("Peer2: Unexpected event: {e:?}"), + } + + let req_id = swarm2.behaviour_mut().send_request( + DialOpts::peer_id(peer1_id) + .addresses(vec![peer1_listen_addr]) + .build(), + ping.clone(), + ); + assert!(swarm2.behaviour().is_pending_outbound(&peer1_id, &req_id)); + + // Dial to peer with explicit address succeeds + match swarm2.select_next_some().await { + SwarmEvent::Dialing { peer_id, .. } => { + assert_eq!(&peer_id, &Some(peer1_id)); + } + e => panic!("Peer2: Unexpected event: {e:?}"), + } + match swarm2.select_next_some().await { + SwarmEvent::ConnectionEstablished { peer_id, .. } => { + assert_eq!(&peer_id, &peer1_id); + } + e => panic!("Peer2: Unexpected event: {e:?}"), + } + match swarm2 + .next_swarm_event() + .await + .try_into_behaviour_event() + .unwrap() + { + request_response::Event::Message { + peer, + message: + request_response::Message::Response { + request_id, + response, + }, + } => { + assert_eq!(&response, &expected_pong); + assert_eq!(&peer, &peer1_id); + assert_eq!(req_id, request_id); + } + e => panic!("Peer2: Unexpected event: {e:?}"), + } + }; + + async_std::task::spawn(Box::pin(peer1)); + peer2.await; +} + #[async_std::test] #[cfg(feature = "cbor")] async fn emits_inbound_connection_closed_failure() { diff --git a/swarm/CHANGELOG.md b/swarm/CHANGELOG.md index 0109a33747c..76b67eb4395 100644 --- a/swarm/CHANGELOG.md +++ b/swarm/CHANGELOG.md @@ -6,6 +6,9 @@ - Deprecate `void` crate. See [PR 5676](https://github.com/libp2p/rust-libp2p/pull/5676). +- Add `impl From<&PeerId> for DialOpts`. + See [PR 5692](https://github.com/libp2p/rust-libp2p/pull/5692). + ## 0.45.1 - Update `libp2p-swarm-derive` to version `0.35.0`, see [PR 5545] diff --git a/swarm/src/dial_opts.rs b/swarm/src/dial_opts.rs index 4f5b621327c..ca814cca146 100644 --- a/swarm/src/dial_opts.rs +++ b/swarm/src/dial_opts.rs @@ -172,6 +172,12 @@ impl From for DialOpts { } } +impl From<&PeerId> for DialOpts { + fn from(peer_id: &PeerId) -> Self { + DialOpts::peer_id(*peer_id).build() + } +} + #[derive(Debug)] pub struct WithPeerId { peer_id: PeerId,