Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ libp2p-rendezvous = { version = "0.17.0", path = "protocols/rendezvous" }
libp2p-request-response = { version = "0.29.0", path = "protocols/request-response" }
libp2p-server = { version = "0.12.7", path = "misc/server" }
libp2p-stream = { version = "0.4.0-alpha", path = "protocols/stream" }
libp2p-swarm = { version = "0.47.0", path = "swarm" }
libp2p-swarm = { version = "0.47.1", path = "swarm" }
libp2p-swarm-derive = { version = "=0.35.1", path = "swarm-derive" } # `libp2p-swarm-derive` may not be compatible with different `libp2p-swarm` non-breaking releases. E.g. `libp2p-swarm` might introduce a new enum variant `FromSwarm` (which is `#[non-exhaustive]`) in a non-breaking release. Older versions of `libp2p-swarm-derive` would not forward this enum variant within the `NetworkBehaviour` hierarchy. Thus the version pinning is required.
libp2p-swarm-test = { version = "0.6.0", path = "swarm-test" }
libp2p-tcp = { version = "0.44.0", path = "transports/tcp" }
Expand Down
4 changes: 4 additions & 0 deletions swarm/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
## 0.47.1

- Add smart dialing support.

## 0.47.0

- Remove `async-std` support.
Expand Down
2 changes: 1 addition & 1 deletion swarm/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name = "libp2p-swarm"
edition.workspace = true
rust-version = { workspace = true }
description = "The libp2p swarm"
version = "0.47.0"
version = "0.47.1"
authors = ["Parity Technologies <[email protected]>"]
license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p"
Expand Down
38 changes: 20 additions & 18 deletions swarm/src/connection/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,23 @@
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use std::{
collections::HashMap,
convert::Infallible,
fmt,
num::{NonZeroU8, NonZeroUsize},
pin::Pin,
sync::Arc,
task::{Context, Poll, Waker},
};

use concurrent_dial::ConcurrentDial;
use concurrent_dial::{ConcurrentDial, Dial};
use dial_ranker::DialRanker;
use fnv::FnvHashMap;
use futures::{
channel::{mpsc, oneshot},
future::{poll_fn, BoxFuture, Either},
future::{poll_fn, Either},
prelude::*,
ready,
stream::{FuturesUnordered, SelectAll},
Expand All @@ -44,16 +47,16 @@ use libp2p_core::{
use tracing::Instrument;
use web_time::{Duration, Instant};

use super::{
Connected, Connection, ConnectionError, ConnectionId, IncomingInfo,
PendingInboundConnectionError, PendingOutboundConnectionError, PendingPoint,
};
use crate::{
connection::{
Connected, Connection, ConnectionError, ConnectionId, IncomingInfo,
PendingInboundConnectionError, PendingOutboundConnectionError, PendingPoint,
},
transport::TransportError,
ConnectedPoint, ConnectionHandler, Executor, Multiaddr, PeerId,
transport::TransportError, ConnectedPoint, ConnectionHandler, Executor, Multiaddr, PeerId,
};

mod concurrent_dial;
pub(crate) mod dial_ranker;
mod task;

enum ExecSwitch {
Expand Down Expand Up @@ -142,6 +145,9 @@ where

/// How long a connection should be kept alive once it starts idling.
idle_connection_timeout: Duration,

/// Ranker that determines the ranking of outgoing connection attempts.
dial_ranker: Option<Arc<DialRanker>>,
}

#[derive(Debug)]
Expand Down Expand Up @@ -333,6 +339,7 @@ where
no_established_connections_waker: None,
established_connection_events: Default::default(),
new_connection_dropped_listeners: Default::default(),
dial_ranker: config.dial_ranker,
}
}

Expand Down Expand Up @@ -413,15 +420,7 @@ where
/// that establishes and negotiates the connection.
pub(crate) fn add_outgoing(
&mut self,
dials: Vec<
BoxFuture<
'static,
(
Multiaddr,
Result<(PeerId, StreamMuxerBox), TransportError<std::io::Error>>,
),
>,
>,
dials: Vec<(Multiaddr, Dial)>,
peer: Option<PeerId>,
role_override: Endpoint,
port_use: PortUse,
Expand All @@ -438,7 +437,7 @@ where
self.executor.spawn(
task::new_for_pending_outgoing_connection(
connection_id,
ConcurrentDial::new(dials, concurrency_factor),
ConcurrentDial::new(dials, concurrency_factor, self.dial_ranker.clone()),
abort_receiver,
self.pending_connection_events_tx.clone(),
)
Expand Down Expand Up @@ -979,6 +978,8 @@ pub(crate) struct PoolConfig {
pub(crate) per_connection_event_buffer_size: usize,
/// Number of addresses concurrently dialed for a single outbound connection attempt.
pub(crate) dial_concurrency_factor: NonZeroU8,
/// Ranker that determines the ranking of outgoing connection attempts.
pub(crate) dial_ranker: Option<Arc<DialRanker>>,
/// How long a connection should be kept alive once it is idling.
pub(crate) idle_connection_timeout: Duration,
/// The configured override for substream protocol upgrades, if any.
Expand All @@ -1000,6 +1001,7 @@ impl PoolConfig {
idle_connection_timeout: Duration::from_secs(10),
substream_upgrade_protocol_override: None,
max_negotiating_inbound_streams: 128,
dial_ranker: None,
}
}

Expand Down
71 changes: 54 additions & 17 deletions swarm/src/connection/pool/concurrent_dial.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,28 @@
// DEALINGS IN THE SOFTWARE.

use std::{
collections::HashMap,
num::NonZeroU8,
pin::Pin,
sync::Arc,
task::{Context, Poll},
time::Duration,
};

use futures::{
future::{BoxFuture, Future},
ready,
stream::{FuturesUnordered, StreamExt},
FutureExt,
};
use futures_timer::Delay;
use libp2p_core::muxing::StreamMuxerBox;
use libp2p_identity::PeerId;

use super::DialRanker;
use crate::{transport::TransportError, Multiaddr};

type Dial = BoxFuture<
pub(crate) type Dial = BoxFuture<
'static,
(
Multiaddr,
Expand All @@ -43,29 +49,56 @@ type Dial = BoxFuture<
>;

pub(crate) struct ConcurrentDial {
concurrency_factor: NonZeroU8,
dials: FuturesUnordered<Dial>,
pending_dials: Box<dyn Iterator<Item = Dial> + Send>,
pending_dials: Box<dyn Iterator<Item = (Multiaddr, Option<Duration>, Dial)> + Send>,
errors: Vec<(Multiaddr, TransportError<std::io::Error>)>,
}

impl Unpin for ConcurrentDial {}

impl ConcurrentDial {
pub(crate) fn new(pending_dials: Vec<Dial>, concurrency_factor: NonZeroU8) -> Self {
let mut pending_dials = pending_dials.into_iter();

pub(crate) fn new(
pending_dials: Vec<(Multiaddr, Dial)>,
concurrency_factor: NonZeroU8,
dial_ranker: Option<Arc<DialRanker>>,
) -> Self {
let dials = FuturesUnordered::new();
for dial in pending_dials.by_ref() {
dials.push(dial);
if dials.len() == concurrency_factor.get() as usize {
break;
}
}

let pending_dials: Vec<_> = if let Some(dial_ranker) = dial_ranker {
let addresses = pending_dials.iter().map(|(k, _)| k.clone()).collect();
let mut dials: HashMap<Multiaddr, Dial> = HashMap::from_iter(pending_dials);
dial_ranker(addresses)
.into_iter()
.filter_map(|(addr, delay)| dials.remove(&addr).map(|dial| (addr, delay, dial)))
.collect()
} else {
pending_dials
.into_iter()
.map(|(addr, dial)| (addr, None, dial))
.collect()
};
Self {
concurrency_factor,
dials,
errors: Default::default(),
pending_dials: Box::new(pending_dials),
pending_dials: Box::new(pending_dials.into_iter()),
}
}

fn dial_pending(&mut self) -> bool {
if let Some((_, delay, dial)) = self.pending_dials.next() {
self.dials.push(
async move {
if let Some(delay) = delay {
Delay::new(delay).await;
}
dial.await
}
.boxed(),
);
true
} else {
false
}
}
}
Expand All @@ -92,12 +125,16 @@ impl Future for ConcurrentDial {
}
Some((addr, Err(e))) => {
self.errors.push((addr, e));
if let Some(dial) = self.pending_dials.next() {
self.dials.push(dial)
}
self.dial_pending();
}
None => {
return Poll::Ready(Err(std::mem::take(&mut self.errors)));
while self.dials.len() < self.concurrency_factor.get() as usize
&& self.dial_pending()
{}

if self.dials.is_empty() {
return Poll::Ready(Err(std::mem::take(&mut self.errors)));
}
}
}
}
Expand Down
Loading