diff --git a/Cargo.lock b/Cargo.lock index fdeada37500..8c6d4445185 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1373,12 +1373,12 @@ dependencies = [ [[package]] name = "futures-bounded" -version = "0.2.4" +version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "91f328e7fb845fc832912fb6a34f40cf6d1888c92f974d1893a54e97b5ff542e" +checksum = "b604752cefc5aa3ab98992a107a8bd99465d2825c1584e0b60cb6957b21e19d7" dependencies = [ - "futures-timer", "futures-util", + "tokio", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 9e65d1ab62f..ff845b1c98c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -121,7 +121,7 @@ libp2p-yamux = { version = "0.47.0", path = "muxers/yamux" } asynchronous-codec = { version = "0.7.0" } env_logger = "0.11" futures = "0.3.30" -futures-bounded = { version = "0.2.4" } +futures-bounded = { version = "0.3", features = ["tokio"]} futures-rustls = { version = "0.26.0", default-features = false } getrandom = "0.2" if-watch = "3.2.1" diff --git a/protocols/autonat/src/v2/client/handler/dial_back.rs b/protocols/autonat/src/v2/client/handler/dial_back.rs index 3fd3cf0b5ed..e208233cd6b 100644 --- a/protocols/autonat/src/v2/client/handler/dial_back.rs +++ b/protocols/autonat/src/v2/client/handler/dial_back.rs @@ -22,7 +22,7 @@ pub struct Handler { impl Handler { pub(crate) fn new() -> Self { Self { - inbound: StreamSet::new(Duration::from_secs(5), 2), + inbound: StreamSet::new(|| futures_bounded::Delay::tokio(Duration::from_secs(5)), 2), } } } diff --git a/protocols/autonat/src/v2/client/handler/dial_request.rs b/protocols/autonat/src/v2/client/handler/dial_request.rs index bafdd9d818c..909c07679ac 100644 --- a/protocols/autonat/src/v2/client/handler/dial_request.rs +++ b/protocols/autonat/src/v2/client/handler/dial_request.rs @@ -91,7 +91,10 @@ impl Handler { pub(crate) fn new() -> Self { Self { queued_events: VecDeque::new(), - outbound: FuturesMap::new(Duration::from_secs(10), 10), + outbound: FuturesMap::new( + || futures_bounded::Delay::tokio(Duration::from_secs(10)), + 10, + ), queued_streams: VecDeque::default(), } } diff --git a/protocols/autonat/src/v2/server/handler/dial_back.rs b/protocols/autonat/src/v2/server/handler/dial_back.rs index 836cb50b5c3..3cfca38dd0f 100644 --- a/protocols/autonat/src/v2/server/handler/dial_back.rs +++ b/protocols/autonat/src/v2/server/handler/dial_back.rs @@ -33,7 +33,7 @@ impl Handler { Self { pending_nonce: Some(cmd), requested_substream_nonce: None, - outbound: FuturesSet::new(Duration::from_secs(10), 5), + outbound: FuturesSet::new(|| futures_bounded::Delay::tokio(Duration::from_secs(10)), 5), } } } diff --git a/protocols/autonat/src/v2/server/handler/dial_request.rs b/protocols/autonat/src/v2/server/handler/dial_request.rs index 5b4318bd643..4074aabd4a5 100644 --- a/protocols/autonat/src/v2/server/handler/dial_request.rs +++ b/protocols/autonat/src/v2/server/handler/dial_request.rs @@ -64,7 +64,10 @@ where observed_multiaddr, dial_back_cmd_sender, dial_back_cmd_receiver, - inbound: FuturesSet::new(Duration::from_secs(10), 10), + inbound: FuturesSet::new( + || futures_bounded::Delay::tokio(Duration::from_secs(10)), + 10, + ), rng, } } diff --git a/protocols/dcutr/src/handler/relayed.rs b/protocols/dcutr/src/handler/relayed.rs index 43e433a5268..f5655d8dab5 100644 --- a/protocols/dcutr/src/handler/relayed.rs +++ b/protocols/dcutr/src/handler/relayed.rs @@ -87,8 +87,14 @@ impl Handler { Self { endpoint, queued_events: Default::default(), - inbound_stream: futures_bounded::FuturesSet::new(Duration::from_secs(10), 1), - outbound_stream: futures_bounded::FuturesSet::new(Duration::from_secs(10), 1), + inbound_stream: futures_bounded::FuturesSet::new( + || futures_bounded::Delay::tokio(Duration::from_secs(10)), + 1, + ), + outbound_stream: futures_bounded::FuturesSet::new( + || futures_bounded::Delay::tokio(Duration::from_secs(10)), + 1, + ), holepunch_candidates, attempts: 0, } diff --git a/protocols/identify/src/handler.rs b/protocols/identify/src/handler.rs index b77450a617d..844c25886cd 100644 --- a/protocols/identify/src/handler.rs +++ b/protocols/identify/src/handler.rs @@ -139,7 +139,7 @@ impl Handler { remote_peer_id, events: SmallVec::new(), active_streams: futures_bounded::FuturesSet::new( - STREAM_TIMEOUT, + move || futures_bounded::Delay::tokio(STREAM_TIMEOUT), MAX_CONCURRENT_STREAMS_PER_CONNECTION, ), trigger_next_identify: Delay::new(Duration::ZERO), diff --git a/protocols/kad/src/handler.rs b/protocols/kad/src/handler.rs index 2c7b6c52257..dd809dec32b 100644 --- a/protocols/kad/src/handler.rs +++ b/protocols/kad/src/handler.rs @@ -463,7 +463,7 @@ impl Handler { next_connec_unique_id: UniqueConnecId(0), inbound_substreams: Default::default(), outbound_substreams: futures_bounded::FuturesTupleSet::new( - substreams_timeout, + move || futures_bounded::Delay::tokio(substreams_timeout), MAX_NUM_STREAMS, ), pending_streams: Default::default(), diff --git a/protocols/perf/src/server/handler.rs b/protocols/perf/src/server/handler.rs index 2748a5b6ad9..d65b0d59978 100644 --- a/protocols/perf/src/server/handler.rs +++ b/protocols/perf/src/server/handler.rs @@ -49,7 +49,7 @@ impl Handler { pub fn new() -> Self { Self { inbound: futures_bounded::FuturesSet::new( - crate::RUN_TIMEOUT, + move || futures_bounded::Delay::tokio(crate::RUN_TIMEOUT), crate::MAX_PARALLEL_RUNS_PER_CONNECTION, ), } diff --git a/protocols/relay/src/behaviour/handler.rs b/protocols/relay/src/behaviour/handler.rs index af130c35516..514b15a13a2 100644 --- a/protocols/relay/src/behaviour/handler.rs +++ b/protocols/relay/src/behaviour/handler.rs @@ -391,11 +391,11 @@ impl Handler { pub fn new(config: Config, endpoint: ConnectedPoint) -> Handler { Handler { inbound_workers: futures_bounded::FuturesSet::new( - STREAM_TIMEOUT, + move || futures_bounded::Delay::tokio(STREAM_TIMEOUT), MAX_CONCURRENT_STREAMS_PER_CONNECTION, ), outbound_workers: futures_bounded::FuturesMap::new( - STREAM_TIMEOUT, + move || futures_bounded::Delay::tokio(STREAM_TIMEOUT), MAX_CONCURRENT_STREAMS_PER_CONNECTION, ), endpoint, diff --git a/protocols/relay/src/priv_client/handler.rs b/protocols/relay/src/priv_client/handler.rs index 5f46dbf4460..6a616a17af9 100644 --- a/protocols/relay/src/priv_client/handler.rs +++ b/protocols/relay/src/priv_client/handler.rs @@ -142,19 +142,19 @@ impl Handler { queued_events: Default::default(), pending_streams: Default::default(), inflight_reserve_requests: futures_bounded::FuturesTupleSet::new( - STREAM_TIMEOUT, + move || futures_bounded::Delay::tokio(STREAM_TIMEOUT), MAX_CONCURRENT_STREAMS_PER_CONNECTION, ), inflight_inbound_circuit_requests: futures_bounded::FuturesSet::new( - STREAM_TIMEOUT, + move || futures_bounded::Delay::tokio(STREAM_TIMEOUT), MAX_CONCURRENT_STREAMS_PER_CONNECTION, ), inflight_outbound_connect_requests: futures_bounded::FuturesTupleSet::new( - STREAM_TIMEOUT, + move || futures_bounded::Delay::tokio(STREAM_TIMEOUT), MAX_CONCURRENT_STREAMS_PER_CONNECTION, ), inflight_outbound_circuit_deny_requests: futures_bounded::FuturesSet::new( - DENYING_CIRCUIT_TIMEOUT, + move || futures_bounded::Delay::tokio(DENYING_CIRCUIT_TIMEOUT), MAX_NUMBER_DENYING_CIRCUIT, ), reservation: Reservation::None, diff --git a/protocols/request-response/src/handler.rs b/protocols/request-response/src/handler.rs index abc994f160e..443904fa00c 100644 --- a/protocols/request-response/src/handler.rs +++ b/protocols/request-response/src/handler.rs @@ -111,7 +111,7 @@ where pending_events: VecDeque::new(), inbound_request_id, worker_streams: futures_bounded::FuturesMap::new( - substream_timeout, + move || futures_bounded::Delay::tokio(substream_timeout), max_concurrent_streams, ), }