Skip to content

Commit

Permalink
feat(core): abstract HttpFetch trait for raw http client (#5184)
Browse files Browse the repository at this point in the history
  • Loading branch information
everpcpc authored Oct 16, 2024
1 parent 9f5f3ce commit b692223
Show file tree
Hide file tree
Showing 7 changed files with 78 additions and 22 deletions.
79 changes: 64 additions & 15 deletions core/src/raw/http_util/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,36 @@ use std::fmt::Debug;
use std::fmt::Formatter;
use std::future;
use std::mem;
use std::ops::Deref;
use std::str::FromStr;
use std::sync::Arc;

use futures::Future;
use futures::TryStreamExt;
use http::Request;
use http::Response;
use once_cell::sync::Lazy;
use raw::oio::Read;

use super::parse_content_encoding;
use super::parse_content_length;
use super::HttpBody;
use crate::raw::*;
use crate::*;

/// Http client used across opendal for loading credentials.
/// This is merely a temporary solution because reqsign requires a reqwest client to be passed.
/// We will remove it after the next major version of reqsign, which will enable users to provide their own client.
#[allow(dead_code)]
pub(crate) static GLOBAL_REQWEST_CLIENT: Lazy<reqwest::Client> = Lazy::new(reqwest::Client::new);

/// HttpFetcher is a type erased [`HttpFetch`].
pub type HttpFetcher = Arc<dyn HttpFetchDyn>;

/// HttpClient that used across opendal.
#[derive(Clone)]
pub struct HttpClient {
client: reqwest::Client,
fetcher: HttpFetcher,
}

/// We don't want users to know details about our clients.
Expand All @@ -47,26 +61,24 @@ impl Debug for HttpClient {
impl HttpClient {
/// Create a new http client in async context.
pub fn new() -> Result<Self> {
Self::build(reqwest::ClientBuilder::new())
let fetcher = Arc::new(reqwest::Client::new());
Ok(Self { fetcher })
}

/// Construct `Self` with given [`reqwest::Client`]
pub fn with(client: reqwest::Client) -> Self {
Self { client }
pub fn with(client: impl HttpFetch) -> Self {
let fetcher = Arc::new(client);
Self { fetcher }
}

/// Build a new http client in async context.
#[deprecated]
pub fn build(builder: reqwest::ClientBuilder) -> Result<Self> {
Ok(Self {
client: builder.build().map_err(|err| {
Error::new(ErrorKind::Unexpected, "http client build failed").set_source(err)
})?,
})
}

/// Get the async client from http client.
pub fn client(&self) -> reqwest::Client {
self.client.clone()
let client = builder.build().map_err(|err| {
Error::new(ErrorKind::Unexpected, "http client build failed").set_source(err)
})?;
let fetcher = Arc::new(client);
Ok(Self { fetcher })
}

/// Send a request in async way.
Expand All @@ -78,6 +90,44 @@ impl HttpClient {

/// Fetch a request in async way.
pub async fn fetch(&self, req: Request<Buffer>) -> Result<Response<HttpBody>> {
self.fetcher.fetch(req).await
}
}

/// HttpFetch is the trait to fetch a request in async way.
/// User should implement this trait to provide their own http client.
pub trait HttpFetch: Send + Sync + Unpin + 'static {
/// Fetch a request in async way.
fn fetch(
&self,
req: Request<Buffer>,
) -> impl Future<Output = Result<Response<HttpBody>>> + MaybeSend;
}

/// HttpFetchDyn is the dyn version of [`HttpFetch`]
/// which make it possible to use as `Arc<dyn HttpFetchDyn>`.
/// User should never implement this trait, but use `HttpFetch` instead.
pub trait HttpFetchDyn: Send + Sync + Unpin + 'static {
/// The dyn version of [`HttpFetch::fetch`].
///
/// This function returns a boxed future to make it object safe.
fn fetch_dyn(&self, req: Request<Buffer>) -> BoxedFuture<Result<Response<HttpBody>>>;
}

impl<T: HttpFetch + ?Sized> HttpFetchDyn for T {
fn fetch_dyn(&self, req: Request<Buffer>) -> BoxedFuture<Result<Response<HttpBody>>> {
Box::pin(self.fetch(req))
}
}

impl<T: HttpFetchDyn + ?Sized> HttpFetch for Arc<T> {
async fn fetch(&self, req: Request<Buffer>) -> Result<Response<HttpBody>> {
self.deref().fetch_dyn(req).await
}
}

impl HttpFetch for reqwest::Client {
async fn fetch(&self, req: Request<Buffer>) -> Result<Response<HttpBody>> {
// Uri stores all string alike data in `Bytes` which means
// the clone here is cheap.
let uri = req.uri().clone();
Expand All @@ -86,7 +136,6 @@ impl HttpClient {
let (parts, body) = req.into_parts();

let mut req_builder = self
.client
.request(
parts.method,
reqwest::Url::from_str(&uri.to_string()).expect("input request url must be valid"),
Expand Down
5 changes: 5 additions & 0 deletions core/src/raw/http_util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@

mod client;
pub use client::HttpClient;
pub use client::HttpFetch;

/// temporary client used by several features
#[allow(unused_imports)]
pub(crate) use client::GLOBAL_REQWEST_CLIENT;

mod body;
pub use body::HttpBody;
Expand Down
2 changes: 1 addition & 1 deletion core/src/services/cos/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ impl Builder for CosBuilder {
cfg.secret_key = Some(v);
}

let cred_loader = TencentCosCredentialLoader::new(client.client(), cfg);
let cred_loader = TencentCosCredentialLoader::new(GLOBAL_REQWEST_CLIENT.clone(), cfg);

let signer = TencentCosSigner::new();

Expand Down
2 changes: 1 addition & 1 deletion core/src/services/gcs/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ impl Builder for GcsBuilder {
DEFAULT_GCS_SCOPE
};

let mut token_loader = GoogleTokenLoader::new(scope, client.client());
let mut token_loader = GoogleTokenLoader::new(scope, GLOBAL_REQWEST_CLIENT.clone());
if let Some(account) = &self.config.service_account {
token_loader = token_loader.with_service_account(account);
}
Expand Down
2 changes: 1 addition & 1 deletion core/src/services/oss/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ impl Builder for OssBuilder {
})?
};

let loader = AliyunLoader::new(client.client(), cfg);
let loader = AliyunLoader::new(GLOBAL_REQWEST_CLIENT.clone(), cfg);

let signer = AliyunOssSigner::new(bucket);

Expand Down
8 changes: 5 additions & 3 deletions core/src/services/s3/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -800,7 +800,8 @@ impl Builder for S3Builder {
// If role_arn is set, we must use AssumeRoleLoad.
if let Some(role_arn) = self.config.role_arn {
// use current env as source credential loader.
let default_loader = AwsDefaultLoader::new(client.client(), cfg.clone());
let default_loader =
AwsDefaultLoader::new(GLOBAL_REQWEST_CLIENT.clone().clone(), cfg.clone());

// Build the config for assume role.
let mut assume_role_cfg = AwsConfig {
Expand All @@ -817,7 +818,7 @@ impl Builder for S3Builder {
}

let assume_role_loader = AwsAssumeRoleLoader::new(
client.client(),
GLOBAL_REQWEST_CLIENT.clone().clone(),
assume_role_cfg,
Box::new(default_loader),
)
Expand All @@ -835,7 +836,8 @@ impl Builder for S3Builder {
let loader = match loader {
Some(v) => v,
None => {
let mut default_loader = AwsDefaultLoader::new(client.client(), cfg);
let mut default_loader =
AwsDefaultLoader::new(GLOBAL_REQWEST_CLIENT.clone().clone(), cfg);
if self.config.disable_ec2_metadata {
default_loader = default_loader.with_disable_ec2_metadata();
}
Expand Down
2 changes: 1 addition & 1 deletion core/src/services/s3/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ impl S3Core {
async fn load_credential(&self) -> Result<Option<AwsCredential>> {
let cred = self
.loader
.load_credential(self.client.client())
.load_credential(GLOBAL_REQWEST_CLIENT.clone())
.await
.map_err(new_request_credential_error)?;

Expand Down

0 comments on commit b692223

Please sign in to comment.