Skip to content
This repository was archived by the owner on Oct 19, 2024. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions ethers-core/src/types/trace/geth/call.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ pub struct CallFrame {
pub output: Option<Bytes>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub error: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none", rename = "revertReason")]
pub revert_reason: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub calls: Option<Vec<CallFrame>>,
#[serde(default, skip_serializing_if = "Option::is_none")]
Expand Down
6 changes: 6 additions & 0 deletions ethers-providers/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ serde_json = { workspace = true, features = ["raw_value"] }

http = "0.2"
reqwest = { workspace = true, features = ["json"] }
reqwest-middleware = "0.2.4"
url.workspace = true
base64 = "0.21"

Expand All @@ -55,6 +56,8 @@ enr = { version = "0.9.0", default-features = false, features = ["k256", "serde"
# tracing
tracing = { workspace = true, features = ["attributes"] }
tracing-futures = { workspace = true, features = ["std-future"] }
anyhow = "1.0.71"


[target.'cfg(target_family = "windows")'.dependencies]
winapi = { version = "0.3", optional = true }
Expand All @@ -63,6 +66,7 @@ winapi = { version = "0.3", optional = true }
# tokio
tokio = { workspace = true, features = ["time"] }
tokio-tungstenite = { workspace = true, features = ["connect"], optional = true }
reqwest-chain = "0.1.0"

[target.'cfg(target_arch = "wasm32")'.dependencies]
ws_stream_wasm = "0.7"
Expand All @@ -74,6 +78,7 @@ web-sys = { version = "0.3", features = ["console"] }
tokio = { workspace = true, features = ["rt-multi-thread", "macros", "time"] }
tempfile = "3.5.0"


[features]
default = ["ws", "rustls"]
celo = ["ethers-core/celo"]
Expand All @@ -91,3 +96,4 @@ dev-rpc = []

[dev-dependencies]
tracing-test = { version = "0.2.4", features = ["no-env-filter"] }
hex = "0.4.3"
66 changes: 60 additions & 6 deletions ethers-providers/src/rpc/transports/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,13 @@ use super::common::{Authorization, JsonRpcError, Request, Response};
use crate::{errors::ProviderError, JsonRpcClient};
use async_trait::async_trait;
use reqwest::{header::HeaderValue, Client, Error as ReqwestError};

use reqwest_middleware::{ClientBuilder, ClientWithMiddleware, Error as MiddlewareError};
use serde::{de::DeserializeOwned, Serialize};
use std::{
str::FromStr,
sync::atomic::{AtomicU64, Ordering},
time::Duration,
};
use thiserror::Error;
use url::Url;
Expand All @@ -30,8 +33,9 @@ use url::Url;
#[derive(Debug)]
pub struct Provider {
id: AtomicU64,
client: Client,
client: ClientWithMiddleware,
url: Url,
retry_urls: Vec<Url>,
}

#[derive(Error, Debug)]
Expand All @@ -52,6 +56,9 @@ pub enum ClientError {
/// The contents of the HTTP response that could not be deserialized
text: String,
},
/// Reqwest Middleware Error
#[error(transparent)]
MiddlewareError(#[from] MiddlewareError),
}

impl From<ClientError> for ProviderError {
Expand Down Expand Up @@ -133,7 +140,7 @@ impl Provider {
/// let url = Url::parse("http://localhost:8545").unwrap();
/// let provider = Http::new(url);
/// ```
pub fn new(url: impl Into<Url>) -> Self {
pub fn new(url: impl Into<Url> + Clone) -> Self {
Self::new_with_client(url, Client::new())
}

Expand All @@ -159,7 +166,7 @@ impl Provider {
/// let provider = Http::new_with_auth(url, Authorization::basic("admin", "good_password"));
/// ```
pub fn new_with_auth(
url: impl Into<Url>,
url: impl Into<Url> + Clone,
auth: Authorization,
) -> Result<Self, HttpClientError> {
let mut auth_value = HeaderValue::from_str(&auth.to_string())?;
Expand All @@ -185,8 +192,50 @@ impl Provider {
/// let client = reqwest::Client::builder().build().unwrap();
/// let provider = Http::new_with_client(url, client);
/// ```
pub fn new_with_client(url: impl Into<Url>, client: reqwest::Client) -> Self {
Self { id: AtomicU64::new(1), client, url: url.into() }
pub fn new_with_client(url: impl Into<Url> + Clone, client: reqwest::Client) -> Self {
let retry_urls = vec![url.clone().into()];
let client_with_middleware = ClientBuilder::new(client).build();
Self { id: AtomicU64::new(1), client: client_with_middleware, url: url.into(), retry_urls }
}

#[cfg(not(target_arch = "wasm32"))]
/// Allows to customize the provider by providing multiple rpc urls
/// Useable only in std mode
/// # Example
///
/// ```
/// use ethers_providers::Http;
/// use url::Url;
///
/// let url = Url::parse("http://localhost:8545").unwrap();
/// let provider = Http::new_client_with_chain_middleware(vec![url]);
/// ```
pub fn new_client_with_chain_middleware(
urls: Vec<Url>,
request_timeout: Option<Duration>,
) -> Self {
use crate::rpc::transports::middleware::SwitchProviderMiddleware;
use reqwest_chain::ChainMiddleware;
let client = reqwest::ClientBuilder::new()
.timeout(request_timeout.unwrap_or(Duration::from_secs(180)))
.build()
.expect("Client build panicked");

let client_with_middleware = ClientBuilder::new(client)
.with(ChainMiddleware::new(SwitchProviderMiddleware::_new(urls.clone())))
.build();
let url = urls.get(0).expect("Needs at least a url");
Self {
id: AtomicU64::new(1),
client: client_with_middleware,
url: url.clone(),
retry_urls: urls,
}
}

/// Returns the client middleware
pub fn client(self) -> ClientWithMiddleware {
self.client
}
}

Expand All @@ -201,7 +250,12 @@ impl FromStr for Provider {

impl Clone for Provider {
fn clone(&self) -> Self {
Self { id: AtomicU64::new(1), client: self.client.clone(), url: self.url.clone() }
Self {
id: AtomicU64::new(1),
client: self.client.clone(),
url: self.url.clone(),
retry_urls: self.retry_urls.clone(),
}
}
}

Expand Down
148 changes: 148 additions & 0 deletions ethers-providers/src/rpc/transports/middleware.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
#![cfg(not(target_arch = "wasm32"))]
use crate::rpc::transports::http::ClientError;
use anyhow::anyhow;
use http::response::Builder;
use reqwest_chain::Chainer;
use reqwest_middleware::Error;
use tracing::trace;
use url::Url;

/// Middleware for switching between providers on failures
pub struct SwitchProviderMiddleware {
/// Rpc providers to be used for retries of failed requests
pub providers: Vec<Url>,
}

#[derive(Default, Debug)]
pub struct LocalState {
pub active_provider_index: usize,
}

impl SwitchProviderMiddleware {
pub fn _new(providers: Vec<Url>) -> Self {
Self { providers }
}
}

#[async_trait::async_trait]
impl Chainer for SwitchProviderMiddleware {
type State = LocalState;

async fn chain(
&self,
result: Result<reqwest::Response, Error>,
_state: &mut Self::State,
request: &mut reqwest::Request,
) -> Result<Option<reqwest::Response>, Error> {
let mut next_state = |client_error: ClientError| {
let next_index = _state.active_provider_index + 1;
if next_index >= self.providers.len() {
trace!(target:"ethers-providers", "Providers have been exhausted");

Err(anyhow!(client_error))?;
}
_state.active_provider_index = next_index;
let next_provider = self.providers[next_index].clone();
let url_ref = request.url_mut();

*url_ref = next_provider;
trace!(target:"ethers-providers", "Retrying request with new provider {url_ref:?}");
Ok::<_, anyhow::Error>(())
};

match result {
Ok(response) => {
let body = response.bytes().await?;

match serde_json::from_slice(&body) {
Ok(crate::rpc::common::Response::Success { result: _, .. }) => {
let http_response = Builder::new()
.status(200)
.body(body.clone())
.map_err(|err| Error::Middleware(anyhow!("Error {err:?}")))?;
return Ok(Some(reqwest::Response::from(http_response)));
}
Ok(crate::rpc::common::Response::Error { error, .. }) => {
let _ = next_state(ClientError::JsonRpcError(error))?;
}
Ok(_) => {
let err = ClientError::SerdeJson {
err: serde::de::Error::custom(
"unexpected notification over HTTP transport",
),
text: String::from_utf8_lossy(&body).to_string(),
};
let _ = next_state(err)?;
}
Err(err) => {
let error = ClientError::SerdeJson {
err,
text: String::from_utf8_lossy(&body).to_string(),
};

let _ = next_state(error)?;
}
};
}
Err(e) => {
trace!(target:"ethers-providers", "Possibly encountered an os error submitting request, switching provider {e:?}");
let _ = next_state(ClientError::MiddlewareError(e))?;
}
}

Ok(None)
}

fn max_chain_length(&self) -> u32 {
self.providers.len() as u32
}
}

#[cfg(test)]
mod test {
use crate::{Http, Middleware, Provider};
use ethers_core::types::{Block, EIP1186ProofResponse, H160, H256};
use reqwest::Url;

#[tokio::test]
async fn test_switch_provider_middleware_for_json_get_block_by_number() {
let providers = vec![
Url::parse("http://localhost:3500").unwrap(),
Url::parse("https://www.noderpc.xyz/rpc-mainnet/public").unwrap(),
];

let http_provider = Http::new_client_with_chain_middleware(providers, None);

let block_num = "latest";
let txn_details = false;
let params = (block_num, txn_details);

let provider = Provider::<Http>::new(http_provider.clone());

let block: Block<H256> = provider.request("eth_getBlockByNumber", params).await.unwrap();
assert!(block.hash.is_some())
}

#[tokio::test]
async fn test_switch_provider_middleware_for_json_rpc_get_proof() {
let providers = vec![
Url::parse("http://localhost:3500").unwrap(),
Url::parse("https://docs-demo.quiknode.pro").unwrap(),
];

let http_provider = Http::new_client_with_chain_middleware(providers, None);

let from =
H160::from_slice(&hex::decode("7F0d15C7FAae65896648C8273B6d7E43f58Fa842").unwrap());
let locations = vec![H256::from_slice(
&hex::decode("56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421")
.unwrap(),
)];

let provider = Provider::<Http>::new(http_provider.clone());

let proof: EIP1186ProofResponse = provider.get_proof(from, locations, None).await.unwrap();

assert_eq!(proof.address, from);
}
}
3 changes: 3 additions & 0 deletions ethers-providers/src/rpc/transports/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,8 @@ pub mod legacy_ws;
#[cfg(feature = "legacy-ws")]
pub use legacy_ws::{ClientError as WsClientError, Ws};

#[cfg(not(target_arch = "wasm32"))]
pub mod middleware;
mod mock;

pub use mock::{MockError, MockProvider, MockResponse};
1 change: 1 addition & 0 deletions ethers-providers/src/rpc/transports/retry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,7 @@ impl RetryPolicy<ClientError> for HttpRateLimitRetryPolicy {
}
false
}
ClientError::MiddlewareError(_) => false,
}
}

Expand Down
1 change: 1 addition & 0 deletions examples/wasm/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ default = ["console_error_panic_hook"]

[dependencies]
ethers = { workspace = true, features = ["abigen", "ws"] }
getrandom = { version = "0.2", default-features = false, features = ["js"] }

serde.workspace = true
serde_json.workspace = true
Expand Down
Loading