Skip to content

Commit

Permalink
feat: filter query params (#667)
Browse files Browse the repository at this point in the history
Signed-off-by: Gaius <[email protected]>
  • Loading branch information
gaius-qi authored Aug 13, 2024
1 parent 19831e3 commit 4ac259a
Show file tree
Hide file tree
Showing 8 changed files with 127 additions and 39 deletions.
20 changes: 10 additions & 10 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 9 additions & 9 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ members = [
]

[workspace.package]
version = "0.1.98"
version = "0.1.99"
authors = ["The Dragonfly Developers"]
homepage = "https://d7y.io/"
repository = "https://github.com/dragonflyoss/client.git"
Expand All @@ -22,15 +22,15 @@ readme = "README.md"
edition = "2021"

[workspace.dependencies]
dragonfly-client = { path = "dragonfly-client", version = "0.1.98" }
dragonfly-client-core = { path = "dragonfly-client-core", version = "0.1.98" }
dragonfly-client-config = { path = "dragonfly-client-config", version = "0.1.98" }
dragonfly-client-storage = { path = "dragonfly-client-storage", version = "0.1.98" }
dragonfly-client-backend = { path = "dragonfly-client-backend", version = "0.1.98" }
dragonfly-client-util = { path = "dragonfly-client-util", version = "0.1.98" }
dragonfly-client-init = { path = "dragonfly-client-init", version = "0.1.98" }
dragonfly-client = { path = "dragonfly-client", version = "0.1.99" }
dragonfly-client-core = { path = "dragonfly-client-core", version = "0.1.99" }
dragonfly-client-config = { path = "dragonfly-client-config", version = "0.1.99" }
dragonfly-client-storage = { path = "dragonfly-client-storage", version = "0.1.99" }
dragonfly-client-backend = { path = "dragonfly-client-backend", version = "0.1.99" }
dragonfly-client-util = { path = "dragonfly-client-util", version = "0.1.99" }
dragonfly-client-init = { path = "dragonfly-client-init", version = "0.1.99" }
thiserror = "1.0"
dragonfly-api = "2.0.147"
dragonfly-api = "2.0.148"
reqwest = { version = "0.12.4", features = ["stream", "native-tls", "default-tls", "rustls-tls"] }
rcgen = { version = "0.12.1", features = ["x509-parser"] }
hyper = { version = "1.4", features = ["full"] }
Expand Down
2 changes: 1 addition & 1 deletion dragonfly-client-config/src/dfdaemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ fn cos_filtered_query_params() -> Vec<String> {

// default_proxy_rule_filtered_query_params is the default filtered query params to generate the task id.
#[inline]
fn default_proxy_rule_filtered_query_params() -> Vec<String> {
pub fn default_proxy_rule_filtered_query_params() -> Vec<String> {
let mut visited = HashSet::new();
for query_param in s3_filtered_query_params() {
visited.insert(query_param);
Expand Down
3 changes: 2 additions & 1 deletion dragonfly-client-util/src/id_generator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ impl IDGenerator {
let url = Url::parse(url).or_err(ErrorType::ParseError)?;
let query = url
.query_pairs()
.filter(|(k, _)| filtered_query_params.contains(&k.to_string()));
.filter(|(k, _)| !filtered_query_params.contains(&k.to_string()));

let mut artifact_url = url.clone();
artifact_url.query_pairs_mut().clear().extend_pairs(query);

Expand Down
8 changes: 7 additions & 1 deletion dragonfly-client/src/bin/dfget/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -683,6 +683,12 @@ async fn download(
});
}

// If the `filtered_query_params` is not provided, then use the default value.
let filtered_query_params = match args.filtered_query_params {
Some(params) => params,
None => dfdaemon::default_proxy_rule_filtered_query_params(),
};

// Create dfdaemon client.
let response = download_client
.download_task(DownloadTaskRequest {
Expand All @@ -695,7 +701,7 @@ async fn download(
tag: Some(args.tag),
application: Some(args.application),
priority: args.priority,
filtered_query_params: args.filtered_query_params.unwrap_or_default(),
filtered_query_params,
request_header: header_vec_to_hashmap(args.header.unwrap_or_default())?,
piece_length: None,
output_path: Some(args.output.to_string_lossy().to_string()),
Expand Down
15 changes: 9 additions & 6 deletions dragonfly-client/src/grpc/dfdaemon_download.rs
Original file line number Diff line number Diff line change
Expand Up @@ -603,13 +603,16 @@ impl DfdaemonDownload for DfdaemonDownloadServerHandler {
collect_delete_task_started_metrics(TaskType::Dfdaemon as i32);

// Delete the task from the scheduler.
self.task.delete(task_id.as_str()).await.map_err(|err| {
// Collect the delete task failure metrics.
collect_delete_task_failure_metrics(TaskType::Dfdaemon as i32);
self.task
.delete(task_id.as_str(), host_id.as_str())
.await
.map_err(|err| {
// Collect the delete task failure metrics.
collect_delete_task_failure_metrics(TaskType::Dfdaemon as i32);

error!("delete task: {}", err);
Status::internal(err.to_string())
})?;
error!("delete task: {}", err);
Status::internal(err.to_string())
})?;

Ok(Response::new(()))
}
Expand Down
78 changes: 75 additions & 3 deletions dragonfly-client/src/grpc/dfdaemon_upload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@ use crate::metrics::{
};
use crate::resource::{cache_task, task};
use crate::shutdown;
use dragonfly_api::common::v2::{CacheTask, Piece, Priority, TaskType};
use dragonfly_api::common::v2::{CacheTask, Piece, Priority, Task, TaskType};
use dragonfly_api::dfdaemon::v2::{
dfdaemon_upload_client::DfdaemonUploadClient as DfdaemonUploadGRPCClient,
dfdaemon_upload_server::{DfdaemonUpload, DfdaemonUploadServer as DfdaemonUploadGRPCServer},
DeleteCacheTaskRequest, DownloadCacheTaskRequest, DownloadCacheTaskResponse,
DeleteCacheTaskRequest, DeleteTaskRequest, DownloadCacheTaskRequest, DownloadCacheTaskResponse,
DownloadPieceRequest, DownloadPieceResponse, DownloadTaskRequest, DownloadTaskResponse,
StatCacheTaskRequest, SyncPiecesRequest, SyncPiecesResponse,
StatCacheTaskRequest, StatTaskRequest, SyncPiecesRequest, SyncPiecesResponse,
};
use dragonfly_api::errordetails::v2::Backend;
use dragonfly_client_config::dfdaemon::Config;
Expand Down Expand Up @@ -528,6 +528,78 @@ impl DfdaemonUpload for DfdaemonUploadServerHandler {
Ok(Response::new(ReceiverStream::new(out_stream_rx)))
}

// stat_task stats the task.
#[instrument(skip_all, fields(host_id, task_id))]
async fn stat_task(&self, request: Request<StatTaskRequest>) -> Result<Response<Task>, Status> {
// Clone the request.
let request = request.into_inner();

// Generate the host id.
let host_id = self.task.id_generator.host_id();

// Get the task id from the request.
let task_id = request.task_id;

// Span record the host id and task id.
Span::current().record("host_id", host_id.as_str());
Span::current().record("task_id", task_id.as_str());

// Collect the stat task metrics.
collect_stat_task_started_metrics(TaskType::Dfdaemon as i32);

// Get the task from the scheduler.
let task = self
.task
.stat(task_id.as_str(), host_id.as_str())
.await
.map_err(|err| {
// Collect the stat task failure metrics.
collect_stat_task_failure_metrics(TaskType::Dfdaemon as i32);

error!("stat task: {}", err);
Status::internal(err.to_string())
})?;

Ok(Response::new(task))
}

// delete_task deletes the task.
#[instrument(skip_all, fields(host_id, task_id))]
async fn delete_task(
&self,
request: Request<DeleteTaskRequest>,
) -> Result<Response<()>, Status> {
// Clone the request.
let request = request.into_inner();

// Generate the host id.
let host_id = self.task.id_generator.host_id();

// Get the task id from the request.
let task_id = request.task_id;

// Span record the host id and task id.
Span::current().record("host_id", host_id.as_str());
Span::current().record("task_id", task_id.as_str());

// Collect the delete task started metrics.
collect_delete_task_started_metrics(TaskType::Dfdaemon as i32);

// Delete the task from the scheduler.
self.task
.delete(task_id.as_str(), host_id.as_str())
.await
.map_err(|err| {
// Collect the delete task failure metrics.
collect_delete_task_failure_metrics(TaskType::Dfdaemon as i32);

error!("delete task: {}", err);
Status::internal(err.to_string())
})?;

Ok(Response::new(()))
}

// SyncPiecesStream is the stream of the sync pieces response.
type SyncPiecesStream = ReceiverStream<Result<SyncPiecesResponse, Status>>;

Expand Down
22 changes: 14 additions & 8 deletions dragonfly-client/src/resource/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use dragonfly_api::dfdaemon::{
use dragonfly_api::errordetails::v2::Backend;
use dragonfly_api::scheduler::v2::{
announce_peer_request, announce_peer_response, download_piece_back_to_source_failed_request,
AnnouncePeerRequest, DownloadPeerBackToSourceFailedRequest,
AnnouncePeerRequest, DeleteTaskRequest, DownloadPeerBackToSourceFailedRequest,
DownloadPeerBackToSourceFinishedRequest, DownloadPeerBackToSourceStartedRequest,
DownloadPeerFailedRequest, DownloadPeerFinishedRequest, DownloadPeerStartedRequest,
DownloadPieceBackToSourceFailedRequest, DownloadPieceBackToSourceFinishedRequest,
Expand Down Expand Up @@ -1611,22 +1611,28 @@ impl Task {
}

// Delete a task and reclaim local storage.
pub async fn delete(&self, task_id: &str) -> ClientResult<()> {
pub async fn delete(&self, task_id: &str, host_id: &str) -> ClientResult<()> {
let task = self.storage.get_task(task_id).map_err(|err| {
error!("get task {} from local storage error: {:?}", task_id, err);
err
})?;

match task {
Some(task) => {
// Check current task is valid to be deleted.
if task.is_uploading() {
return Err(Error::InvalidState("current task is uploading".to_string()));
}

self.storage.delete_task(task.id.as_str()).await;
info!("delete task {} from local storage", task.id);

self.scheduler_client
.delete_task(DeleteTaskRequest {
host_id: host_id.to_string(),
task_id: task_id.to_string(),
})
.await
.map_err(|err| {
error!("delete task {} failed from scheudler: {:?}", task_id, err);
err
})?;

info!("delete task {} from local storage", task.id);
Ok(())
}
None => {
Expand Down

0 comments on commit 4ac259a

Please sign in to comment.