Skip to content

Commit be8c268

Browse files
committed
Pass optional timeout alongside headers
1 parent 236a438 commit be8c268

File tree

8 files changed

+95
-26
lines changed

8 files changed

+95
-26
lines changed

src/api/eth.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,10 @@ impl<T: Transport> Eth<T> {
4040

4141
/// Get current block number
4242
pub fn block_number(&self, headers: Option<HeaderMap>) -> CallFuture<U64, T::Out> {
43-
CallFuture::new(self.transport.execute_with_headers("eth_blockNumber", vec![], headers))
43+
CallFuture::new(
44+
self.transport
45+
.execute_with_headers("eth_blockNumber", vec![], headers, None),
46+
)
4447
}
4548

4649
/// Call a constant method of contract without changing the state of the blockchain.
@@ -122,7 +125,7 @@ impl<T: Transport> Eth<T> {
122125
let filter = helpers::serialize(&filter);
123126
CallFuture::new(
124127
self.transport
125-
.execute_with_headers("eth_getLogs", vec![filter], headers),
128+
.execute_with_headers("eth_getLogs", vec![filter], headers, None),
126129
)
127130
}
128131

@@ -134,12 +137,12 @@ impl<T: Transport> Eth<T> {
134137
BlockId::Hash(hash) => {
135138
let hash = helpers::serialize(&hash);
136139
self.transport
137-
.execute_with_headers("eth_getBlockByHash", vec![hash, include_txs], headers)
140+
.execute_with_headers("eth_getBlockByHash", vec![hash, include_txs], headers, None)
138141
}
139142
BlockId::Number(num) => {
140143
let num = helpers::serialize(&num);
141144
self.transport
142-
.execute_with_headers("eth_getBlockByNumber", vec![num, include_txs], headers)
145+
.execute_with_headers("eth_getBlockByNumber", vec![num, include_txs], headers, None)
143146
}
144147
};
145148

src/lib.rs

Lines changed: 34 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111
// select! in WS transport
1212
#![recursion_limit = "256"]
1313

14+
use std::time::Duration;
15+
1416
use headers::HeaderMap;
1517
use jsonrpc_core as rpc;
1618

@@ -53,18 +55,30 @@ pub trait Transport: std::fmt::Debug + Clone {
5355
fn prepare(&self, method: &str, params: Vec<rpc::Value>) -> (RequestId, rpc::Call);
5456

5557
/// Execute prepared RPC call.
56-
fn send(&self, id: RequestId, request: rpc::Call, headers: Option<HeaderMap>) -> Self::Out;
58+
fn send(
59+
&self,
60+
id: RequestId,
61+
request: rpc::Call,
62+
headers: Option<HeaderMap>,
63+
timeout: Option<Duration>,
64+
) -> Self::Out;
5765

5866
/// Execute remote method with given parameters.
5967
fn execute(&self, method: &str, params: Vec<rpc::Value>) -> Self::Out {
6068
let (id, request) = self.prepare(method, params);
61-
self.send(id, request, None)
69+
self.send(id, request, None, None)
6270
}
6371

6472
/// Execute remote method with given parameters and headers.
65-
fn execute_with_headers(&self, method: &str, params: Vec<rpc::Value>, headers: Option<HeaderMap>) -> Self::Out {
73+
fn execute_with_headers(
74+
&self,
75+
method: &str,
76+
params: Vec<rpc::Value>,
77+
headers: Option<HeaderMap>,
78+
timeout: Option<Duration>,
79+
) -> Self::Out {
6680
let (id, request) = self.prepare(method, params);
67-
self.send(id, request, headers)
81+
self.send(id, request, headers, timeout)
6882
}
6983
}
7084

@@ -104,8 +118,14 @@ where
104118
(**self).prepare(method, params)
105119
}
106120

107-
fn send(&self, id: RequestId, request: rpc::Call, headers: Option<HeaderMap>) -> Self::Out {
108-
(**self).send(id, request, headers)
121+
fn send(
122+
&self,
123+
id: RequestId,
124+
request: rpc::Call,
125+
headers: Option<HeaderMap>,
126+
timeout: Option<Duration>,
127+
) -> Self::Out {
128+
(**self).send(id, request, headers, timeout)
109129
}
110130
}
111131

@@ -151,7 +171,7 @@ mod tests {
151171
use crate::api::Web3;
152172
use futures::future::BoxFuture;
153173
use headers::HeaderMap;
154-
use std::sync::Arc;
174+
use std::{sync::Arc, time::Duration};
155175

156176
#[derive(Debug, Clone)]
157177
struct FakeTransport;
@@ -163,7 +183,13 @@ mod tests {
163183
unimplemented!()
164184
}
165185

166-
fn send(&self, _id: RequestId, _request: rpc::Call, _headers: Option<HeaderMap>) -> Self::Out {
186+
fn send(
187+
&self,
188+
_id: RequestId,
189+
_request: rpc::Call,
190+
_headers: Option<HeaderMap>,
191+
_timeout: Option<Duration>,
192+
) -> Self::Out {
167193
unimplemented!()
168194
}
169195
}

src/transports/batch.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use futures::{
1111
};
1212
use headers::HeaderMap;
1313
use parking_lot::Mutex;
14-
use std::{collections::BTreeMap, pin::Pin, sync::Arc};
14+
use std::{collections::BTreeMap, pin::Pin, sync::Arc, time::Duration};
1515

1616
type Pending = oneshot::Sender<error::Result<rpc::Value>>;
1717
type PendingRequests = Arc<Mutex<BTreeMap<RequestId, Pending>>>;
@@ -76,7 +76,13 @@ where
7676
self.transport.prepare(method, params)
7777
}
7878

79-
fn send(&self, id: RequestId, request: rpc::Call, _headers: Option<HeaderMap>) -> Self::Out {
79+
fn send(
80+
&self,
81+
id: RequestId,
82+
request: rpc::Call,
83+
_headers: Option<HeaderMap>,
84+
_timeout: Option<Duration>,
85+
) -> Self::Out {
8086
let (tx, rx) = oneshot::channel();
8187
self.pending.lock().insert(id, tx);
8288
self.batch.lock().push((id, request));

src/transports/either.rs

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
//! A strongly-typed transport alternative.
22
3+
use std::time::Duration;
4+
35
use crate::{api, error, rpc, BatchTransport, DuplexTransport, RequestId, Transport};
46
use futures::{
57
future::{BoxFuture, FutureExt},
@@ -37,10 +39,16 @@ where
3739
}
3840
}
3941

40-
fn send(&self, id: RequestId, request: rpc::Call, headers: Option<HeaderMap>) -> Self::Out {
42+
fn send(
43+
&self,
44+
id: RequestId,
45+
request: rpc::Call,
46+
headers: Option<HeaderMap>,
47+
timeout: Option<Duration>,
48+
) -> Self::Out {
4149
match *self {
42-
Self::Left(ref a) => a.send(id, request, headers).boxed(),
43-
Self::Right(ref b) => b.send(id, request, headers).boxed(),
50+
Self::Left(ref a) => a.send(id, request, headers, timeout).boxed(),
51+
Self::Right(ref b) => b.send(id, request, headers, timeout).boxed(),
4452
}
4553
}
4654
}

src/transports/http.rs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ use std::{
1818
atomic::{AtomicUsize, Ordering},
1919
Arc,
2020
},
21+
time::Duration,
2122
};
2223

2324
/// HTTP Transport
@@ -80,6 +81,7 @@ async fn execute_rpc<T: DeserializeOwned>(
8081
request: &Request,
8182
id: RequestId,
8283
headers: Option<HeaderMap>,
84+
timeout: Option<Duration>,
8385
) -> Result<T> {
8486
log::debug!("[id:{}] sending request: {:?}", id, serde_json::to_string(&request)?);
8587
let mut request_builder = client.post(url).json(request);
@@ -88,6 +90,10 @@ async fn execute_rpc<T: DeserializeOwned>(
8890
request_builder = request_builder.headers(headers);
8991
}
9092

93+
if let Some(timeout) = timeout {
94+
request_builder = request_builder.timeout(timeout);
95+
}
96+
9197
let response = request_builder
9298
.send()
9399
.await
@@ -127,10 +133,10 @@ impl Transport for Http {
127133
(id, request)
128134
}
129135

130-
fn send(&self, id: RequestId, call: Call, headers: Option<HeaderMap>) -> Self::Out {
136+
fn send(&self, id: RequestId, call: Call, headers: Option<HeaderMap>, timeout: Option<Duration>) -> Self::Out {
131137
let (client, url) = self.new_request();
132138
Box::pin(async move {
133-
let output: Output = execute_rpc(&client, url, &Request::Single(call), id, headers).await?;
139+
let output: Output = execute_rpc(&client, url, &Request::Single(call), id, headers, timeout).await?;
134140
helpers::to_result_from_output(output)
135141
})
136142
}
@@ -148,7 +154,7 @@ impl BatchTransport for Http {
148154
let (client, url) = self.new_request();
149155
let (ids, calls): (Vec<_>, Vec<_>) = requests.into_iter().unzip();
150156
Box::pin(async move {
151-
let value = execute_rpc(&client, url, &Request::Batch(calls), id, None).await?;
157+
let value = execute_rpc(&client, url, &Request::Batch(calls), id, None, None).await?;
152158
let outputs = handle_possible_error_object_for_batched_request(value)?;
153159
handle_batch_response(&ids, outputs)
154160
})

src/transports/ipc.rs

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ use std::{
1616
pin::Pin,
1717
sync::{atomic::AtomicUsize, Arc},
1818
task::{Context, Poll},
19+
time::Duration,
1920
};
2021
use tokio::{
2122
io::AsyncWriteExt,
@@ -64,7 +65,13 @@ impl Transport for Ipc {
6465
(id, request)
6566
}
6667

67-
fn send(&self, id: RequestId, call: rpc::Call, _headers: Option<HeaderMap>) -> Self::Out {
68+
fn send(
69+
&self,
70+
id: RequestId,
71+
call: rpc::Call,
72+
_headers: Option<HeaderMap>,
73+
_timeout: Option<Duration>,
74+
) -> Self::Out {
6875
let (response_tx, response_rx) = oneshot::channel();
6976
let message = TransportMessage::Single((id, call, response_tx));
7077

@@ -357,7 +364,7 @@ mod test {
357364
"test": -1,
358365
})],
359366
);
360-
let response = ipc.send(req_id, request, None).await;
367+
let response = ipc.send(req_id, request, None, None).await;
361368
let expected_response_json: serde_json::Value = json!({
362369
"test": 1,
363370
});
@@ -369,7 +376,7 @@ mod test {
369376
"test": 3,
370377
})],
371378
);
372-
let response = ipc.send(req_id, request, None).await;
379+
let response = ipc.send(req_id, request, None, None).await;
373380
let expected_response_json: serde_json::Value = json!({
374381
"test": "string1",
375382
});

src/transports/test.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use crate::{
66
};
77
use futures::future::{self, BoxFuture, FutureExt};
88
use headers::HeaderMap;
9-
use std::{cell::RefCell, collections::VecDeque, rc::Rc};
9+
use std::{cell::RefCell, collections::VecDeque, rc::Rc, time::Duration};
1010

1111
type Result<T> = BoxFuture<'static, error::Result<T>>;
1212

@@ -27,7 +27,13 @@ impl Transport for TestTransport {
2727
(self.requests.borrow().len(), request)
2828
}
2929

30-
fn send(&self, id: RequestId, request: rpc::Call, _headers: Option<HeaderMap>) -> Result<rpc::Value> {
30+
fn send(
31+
&self,
32+
id: RequestId,
33+
request: rpc::Call,
34+
_headers: Option<HeaderMap>,
35+
_timeout: Option<Duration>,
36+
) -> Result<rpc::Value> {
3137
future::ready(match self.responses.borrow_mut().pop_front() {
3238
Some(response) => Ok(response),
3339
None => {

src/transports/ws.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ use std::{
2222
marker::Unpin,
2323
pin::Pin,
2424
sync::{atomic, Arc},
25+
time::Duration,
2526
};
2627
use url::Url;
2728

@@ -474,7 +475,13 @@ impl Transport for WebSocket {
474475
(id, request)
475476
}
476477

477-
fn send(&self, id: RequestId, request: rpc::Call, _headers: Option<HeaderMap>) -> Self::Out {
478+
fn send(
479+
&self,
480+
id: RequestId,
481+
request: rpc::Call,
482+
_headers: Option<HeaderMap>,
483+
_timeout: Option<Duration>,
484+
) -> Self::Out {
478485
let response = self.send_request(id, rpc::Request::Single(request));
479486
Response::new(response, batch_to_single)
480487
}

0 commit comments

Comments
 (0)