Skip to content

Commit

Permalink
feat: add request timeout for connection (#627)
Browse files Browse the repository at this point in the history
Signed-off-by: Gaius <[email protected]>
  • Loading branch information
gaius-qi authored Jul 25, 2024
1 parent bac3474 commit 97791be
Show file tree
Hide file tree
Showing 11 changed files with 46 additions and 55 deletions.
32 changes: 20 additions & 12 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 8 additions & 8 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ members = [
]

[workspace.package]
version = "0.1.89"
version = "0.1.90"
authors = ["The Dragonfly Developers"]
homepage = "https://d7y.io/"
repository = "https://github.com/dragonflyoss/client.git"
Expand All @@ -22,13 +22,13 @@ readme = "README.md"
edition = "2021"

[workspace.dependencies]
dragonfly-client = { path = "dragonfly-client", version = "0.1.89" }
dragonfly-client-core = { path = "dragonfly-client-core", version = "0.1.89" }
dragonfly-client-config = { path = "dragonfly-client-config", version = "0.1.89" }
dragonfly-client-storage = { path = "dragonfly-client-storage", version = "0.1.89" }
dragonfly-client-backend = { path = "dragonfly-client-backend", version = "0.1.89" }
dragonfly-client-util = { path = "dragonfly-client-util", version = "0.1.89" }
dragonfly-client-init = { path = "dragonfly-client-init", version = "0.1.89" }
dragonfly-client = { path = "dragonfly-client", version = "0.1.90" }
dragonfly-client-core = { path = "dragonfly-client-core", version = "0.1.90" }
dragonfly-client-config = { path = "dragonfly-client-config", version = "0.1.90" }
dragonfly-client-storage = { path = "dragonfly-client-storage", version = "0.1.90" }
dragonfly-client-backend = { path = "dragonfly-client-backend", version = "0.1.90" }
dragonfly-client-util = { path = "dragonfly-client-util", version = "0.1.90" }
dragonfly-client-init = { path = "dragonfly-client-init", version = "0.1.90" }
thiserror = "1.0"
dragonfly-api = "2.0.141"
reqwest = { version = "0.12.4", features = ["stream", "native-tls", "default-tls", "rustls-tls"] }
Expand Down
17 changes: 3 additions & 14 deletions dragonfly-client/src/bin/dfcache/remove.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,6 @@ const DEFAULT_PROGRESS_BAR_STEADY_TICK_INTERVAL: Duration = Duration::from_milli
pub struct RemoveCommand {
#[arg(help = "Specify the cache task ID to remove")]
id: String,

#[arg(
long = "timeout",
value_parser= humantime::parse_duration,
default_value = "30m",
help = "Specify the timeout for removing a file"
)]
timeout: Duration,
}

// Implement the execute for RemoveCommand.
Expand Down Expand Up @@ -198,12 +190,9 @@ impl RemoveCommand {
pb.set_message("Removing...");

dfdaemon_download_client
.delete_cache_task(
DeleteCacheTaskRequest {
task_id: self.id.clone(),
},
self.timeout,
)
.delete_cache_task(DeleteCacheTaskRequest {
task_id: self.id.clone(),
})
.await?;

pb.finish_with_message("Done");
Expand Down
10 changes: 2 additions & 8 deletions dragonfly-client/src/grpc/dfdaemon_download.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1102,14 +1102,8 @@ impl DfdaemonDownloadClient {

// delete_cache_task deletes the cache task.
#[instrument(skip_all)]
pub async fn delete_cache_task(
&self,
request: DeleteCacheTaskRequest,
timeout: Duration,
) -> ClientResult<()> {
let mut request = tonic::Request::new(request);
request.set_timeout(timeout);

pub async fn delete_cache_task(&self, request: DeleteCacheTaskRequest) -> ClientResult<()> {
let request = Self::make_request(request);
let _response = self.client.clone().delete_cache_task(request).await?;
Ok(())
}
Expand Down
15 changes: 4 additions & 11 deletions dragonfly-client/src/grpc/dfdaemon_upload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -997,6 +997,7 @@ impl DfdaemonUploadClient {
pub async fn new(addr: String) -> ClientResult<Self> {
let channel = Channel::from_static(Box::leak(addr.clone().into_boxed_str()))
.connect_timeout(super::CONNECT_TIMEOUT)
.timeout(super::REQUEST_TIMEOUT)
.connect()
.await
.map_err(|err| {
Expand Down Expand Up @@ -1088,23 +1089,15 @@ impl DfdaemonUploadClient {
// stat_cache_task stats the cache task.
#[instrument(skip_all)]
pub async fn stat_cache_task(&self, request: StatCacheTaskRequest) -> ClientResult<CacheTask> {
let mut request = tonic::Request::new(request);
request.set_timeout(super::CONNECT_TIMEOUT);

let request = Self::make_request(request);
let response = self.client.clone().stat_cache_task(request).await?;
Ok(response.into_inner())
}

// delete_cache_task deletes the cache task.
#[instrument(skip_all)]
pub async fn delete_cache_task(
&self,
request: DeleteCacheTaskRequest,
timeout: Duration,
) -> ClientResult<()> {
let mut request = tonic::Request::new(request);
request.set_timeout(timeout);

pub async fn delete_cache_task(&self, request: DeleteCacheTaskRequest) -> ClientResult<()> {
let request = Self::make_request(request);
let _response = self.client.clone().delete_cache_task(request).await?;
Ok(())
}
Expand Down
1 change: 1 addition & 0 deletions dragonfly-client/src/grpc/health.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ impl HealthClient {
let channel = Channel::from_shared(addr.to_string())
.map_err(|_| Error::InvalidURI(addr.into()))?
.connect_timeout(super::CONNECT_TIMEOUT)
.timeout(super::REQUEST_TIMEOUT)
.connect()
.await
.map_err(|err| {
Expand Down
1 change: 1 addition & 0 deletions dragonfly-client/src/grpc/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ impl ManagerClient {
let channel = Channel::from_shared(available_addr.clone())
.map_err(|_| Error::InvalidURI(available_addr.clone()))?
.connect_timeout(super::CONNECT_TIMEOUT)
.timeout(super::REQUEST_TIMEOUT)
.connect()
.await
.map_err(|err| {
Expand Down
2 changes: 1 addition & 1 deletion dragonfly-client/src/grpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ pub mod security;
pub const CONNECT_TIMEOUT: Duration = Duration::from_secs(1);

// REQUEST_TIMEOUT is the timeout for GRPC requests.
pub const REQUEST_TIMEOUT: Duration = Duration::from_secs(30);
pub const REQUEST_TIMEOUT: Duration = Duration::from_secs(10);

// CONCURRENCY_LIMIT_PER_CONNECTION is the limit of concurrency for each connection.
pub const CONCURRENCY_LIMIT_PER_CONNECTION: usize = 4096;
Expand Down
4 changes: 4 additions & 0 deletions dragonfly-client/src/grpc/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ impl SchedulerClient {
let channel = Channel::from_shared(format!("http://{}", addr))
.map_err(|_| Error::InvalidURI(addr.to_string()))?
.connect_timeout(super::CONNECT_TIMEOUT)
.timeout(super::REQUEST_TIMEOUT)
.connect()
.await
.map_err(|err| {
Expand Down Expand Up @@ -225,6 +226,7 @@ impl SchedulerClient {
let channel = Channel::from_shared(format!("http://{}", addr))
.map_err(|_| Error::InvalidURI(addr.to_string()))?
.connect_timeout(super::CONNECT_TIMEOUT)
.timeout(super::REQUEST_TIMEOUT)
.connect()
.await
.map_err(|err| {
Expand Down Expand Up @@ -282,6 +284,7 @@ impl SchedulerClient {
let channel = Channel::from_shared(format!("http://{}", addr))
.map_err(|_| Error::InvalidURI(addr.to_string()))?
.connect_timeout(super::CONNECT_TIMEOUT)
.timeout(super::REQUEST_TIMEOUT)
.connect()
.await
.map_err(|err| {
Expand Down Expand Up @@ -445,6 +448,7 @@ impl SchedulerClient {
let channel = match Channel::from_shared(format!("http://{}", addr))
.map_err(|_| Error::InvalidURI(addr.to_string()))?
.connect_timeout(super::CONNECT_TIMEOUT)
.timeout(super::REQUEST_TIMEOUT)
.connect()
.await
{
Expand Down
1 change: 1 addition & 0 deletions dragonfly-client/src/grpc/security.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ impl CertificateClient {
pub async fn new(addr: String) -> Result<Self> {
let channel = Channel::from_static(Box::leak(addr.into_boxed_str()))
.connect_timeout(super::CONNECT_TIMEOUT)
.timeout(super::REQUEST_TIMEOUT)
.connect()
.await
.or_err(ErrorType::ConnectError)?;
Expand Down
2 changes: 1 addition & 1 deletion rust-toolchain.toml
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
[toolchain]
channel = "1.79.0"
channel = "1.80.0"

0 comments on commit 97791be

Please sign in to comment.