diff --git a/Cargo.lock b/Cargo.lock index ca3544ea..8451155e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -59,6 +59,12 @@ dependencies = [ "equator", ] +[[package]] +name = "allocator-api2" +version = "0.2.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "45862d1c77f2228b9e10bc609d5bc203d86ebc9b87ad8d5d5167a6c9abf739d9" + [[package]] name = "android-tzdata" version = "0.1.1" @@ -865,7 +871,7 @@ dependencies = [ [[package]] name = "dragonfly-client" -version = "0.1.118" +version = "0.1.119" dependencies = [ "anyhow", "blake3", @@ -895,6 +901,7 @@ dependencies = [ "lazy_static", "leaky-bucket", "libc", + "lru", "openssl", "opentelemetry", "opentelemetry-jaeger", @@ -937,7 +944,7 @@ dependencies = [ [[package]] name = "dragonfly-client-backend" -version = "0.1.118" +version = "0.1.119" dependencies = [ "dragonfly-api", "dragonfly-client-core", @@ -965,7 +972,7 @@ dependencies = [ [[package]] name = "dragonfly-client-config" -version = "0.1.118" +version = "0.1.119" dependencies = [ "bytesize", "bytesize-serde", @@ -991,7 +998,7 @@ dependencies = [ [[package]] name = "dragonfly-client-core" -version = "0.1.118" +version = "0.1.119" dependencies = [ "headers 0.4.0", "hyper 1.5.1", @@ -1009,7 +1016,7 @@ dependencies = [ [[package]] name = "dragonfly-client-init" -version = "0.1.118" +version = "0.1.119" dependencies = [ "anyhow", "clap", @@ -1027,7 +1034,7 @@ dependencies = [ [[package]] name = "dragonfly-client-storage" -version = "0.1.118" +version = "0.1.119" dependencies = [ "base16ct", "bincode", @@ -1053,7 +1060,7 @@ dependencies = [ [[package]] name = "dragonfly-client-util" -version = "0.1.118" +version = "0.1.119" dependencies = [ "base16ct", "base64 0.22.1", @@ -1191,6 +1198,12 @@ version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" +[[package]] +name = "foldhash" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f81ec6369c545a7d40e4589b5597581fa1c441fe1cce96dd1de43159910a36a2" + [[package]] name = "foreign-types" version = "0.3.2" @@ -1415,6 +1428,17 @@ version = "0.14.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "290f1a1d9242c78d09ce40a5e87e7554ee637af1351968159f4952f028f75604" +[[package]] +name = "hashbrown" +version = "0.15.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bf151400ff0baff5465007dd2f3e717f3fe502074ca563069ce3a6629d07b289" +dependencies = [ + "allocator-api2", + "equivalent", + "foldhash", +] + [[package]] name = "hashring" version = "0.3.6" @@ -1426,7 +1450,7 @@ dependencies = [ [[package]] name = "hdfs" -version = "0.1.118" +version = "0.1.119" dependencies = [ "dragonfly-client-backend", "dragonfly-client-core", @@ -2204,6 +2228,15 @@ version = "0.4.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f" +[[package]] +name = "lru" +version = "0.12.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "234cf4f4a04dc1f57e24b96cc0cd600cf2af460d4161ac5ecdd0af8e1f3b2a38" +dependencies = [ + "hashbrown 0.15.2", +] + [[package]] name = "lz4-sys" version = "1.9.4" diff --git a/Cargo.toml b/Cargo.toml index 95e46bf5..a74dfa4a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,7 +12,7 @@ members = [ ] [workspace.package] -version = "0.1.118" +version = "0.1.119" authors = ["The Dragonfly Developers"] homepage = "https://d7y.io/" repository = "https://github.com/dragonflyoss/client.git" @@ -22,13 +22,13 @@ readme = "README.md" edition = "2021" [workspace.dependencies] -dragonfly-client = { path = "dragonfly-client", version = "0.1.118" } -dragonfly-client-core = { path = "dragonfly-client-core", version = "0.1.118" } -dragonfly-client-config = { path = "dragonfly-client-config", version = "0.1.118" } -dragonfly-client-storage = { path = "dragonfly-client-storage", version = "0.1.118" } -dragonfly-client-backend = { path = "dragonfly-client-backend", version = "0.1.118" } -dragonfly-client-util = { path = "dragonfly-client-util", version = "0.1.118" } -dragonfly-client-init = { path = "dragonfly-client-init", version = "0.1.118" } +dragonfly-client = { path = "dragonfly-client", version = "0.1.119" } +dragonfly-client-core = { path = "dragonfly-client-core", version = "0.1.119" } +dragonfly-client-config = { path = "dragonfly-client-config", version = "0.1.119" } +dragonfly-client-storage = { path = "dragonfly-client-storage", version = "0.1.119" } +dragonfly-client-backend = { path = "dragonfly-client-backend", version = "0.1.119" } +dragonfly-client-util = { path = "dragonfly-client-util", version = "0.1.119" } +dragonfly-client-init = { path = "dragonfly-client-init", version = "0.1.119" } thiserror = "1.0" dragonfly-api = "=2.0.173" reqwest = { version = "0.12.4", features = ["stream", "native-tls", "default-tls", "rustls-tls"] } @@ -92,6 +92,7 @@ percent-encoding = "2.3.1" tempfile = "3.14.0" tokio-rustls = "0.25.0-alpha.4" serde_json = "1.0.132" +lru = "0.12.5" [profile.release] opt-level = "z" diff --git a/dragonfly-client-config/src/dfdaemon.rs b/dragonfly-client-config/src/dfdaemon.rs index 1de7b4f7..c876abd2 100644 --- a/dragonfly-client-config/src/dfdaemon.rs +++ b/dragonfly-client-config/src/dfdaemon.rs @@ -212,6 +212,13 @@ pub fn default_proxy_server_port() -> u16 { 4001 } +/// default_proxy_cache_capacity is the default cache capacity for the proxy server, default is +/// 150. +#[inline] +pub fn default_proxy_cache_capacity() -> usize { + 150 +} + /// default_proxy_read_buffer_size is the default buffer size for reading piece, default is 32KB. #[inline] pub fn default_proxy_read_buffer_size() -> usize { @@ -1082,6 +1089,13 @@ pub struct Proxy { /// prefetch pre-downloads full of the task when download with range request. pub prefetch: bool, + /// cache_capacity is the capacity of the cache by LRU algorithm for HTTP proxy, default is 150. + /// The cache is used to store the hot piece content of the task, piece length is 4MB~16MB. + /// If the capacity is 150, the cache size is 600MB~2.4GB, need to adjust according to the + /// memory size of the host. + #[serde(default = "default_proxy_cache_capacity")] + pub cache_capacity: usize, + /// read_buffer_size is the buffer size for reading piece from disk, default is 1KB. #[serde(default = "default_proxy_read_buffer_size")] pub read_buffer_size: usize, @@ -1096,6 +1110,7 @@ impl Default for Proxy { registry_mirror: RegistryMirror::default(), disable_back_to_source: false, prefetch: false, + cache_capacity: default_proxy_cache_capacity(), read_buffer_size: default_proxy_read_buffer_size(), } } diff --git a/dragonfly-client-core/src/error/mod.rs b/dragonfly-client-core/src/error/mod.rs index 9efeb40d..f1d14052 100644 --- a/dragonfly-client-core/src/error/mod.rs +++ b/dragonfly-client-core/src/error/mod.rs @@ -126,6 +126,11 @@ pub enum DFError { #[error("invalid parameter")] InvalidParameter, + /// Infallible is the error for infallible. + #[error(transparent)] + Infallible(#[from] std::convert::Infallible), + + /// Utf8 is the error for utf8. #[error(transparent)] Utf8(#[from] std::str::Utf8Error), diff --git a/dragonfly-client-storage/src/lib.rs b/dragonfly-client-storage/src/lib.rs index a3278343..e73c5ac3 100644 --- a/dragonfly-client-storage/src/lib.rs +++ b/dragonfly-client-storage/src/lib.rs @@ -441,6 +441,7 @@ impl Storage { } /// piece_id returns the piece id. + #[inline] #[instrument(skip_all)] pub fn piece_id(&self, task_id: &str, number: u32) -> String { self.metadata.piece_id(task_id, number) diff --git a/dragonfly-client-storage/src/metadata.rs b/dragonfly-client-storage/src/metadata.rs index 90eb907e..cfdb1c9d 100644 --- a/dragonfly-client-storage/src/metadata.rs +++ b/dragonfly-client-storage/src/metadata.rs @@ -829,6 +829,7 @@ impl Metadata { } /// piece_id returns the piece id. + #[inline] #[instrument(skip_all)] pub fn piece_id(&self, task_id: &str, number: u32) -> String { format!("{}-{}", task_id, number) diff --git a/dragonfly-client-util/src/id_generator/mod.rs b/dragonfly-client-util/src/id_generator/mod.rs index 8b3ab1fe..cf4e5def 100644 --- a/dragonfly-client-util/src/id_generator/mod.rs +++ b/dragonfly-client-util/src/id_generator/mod.rs @@ -57,6 +57,7 @@ impl IDGenerator { } /// host_id generates the host id. + #[inline] #[instrument(skip_all)] pub fn host_id(&self) -> String { if self.is_seed_peer { @@ -67,6 +68,7 @@ impl IDGenerator { } /// task_id generates the task id. + #[inline] #[instrument(skip_all)] pub fn task_id( &self, @@ -116,6 +118,7 @@ impl IDGenerator { } /// persistent_cache_task_id generates the persistent cache task id. + #[inline] #[instrument(skip_all)] pub fn persistent_cache_task_id( &self, @@ -145,6 +148,7 @@ impl IDGenerator { } /// peer_id generates the peer id. + #[inline] #[instrument(skip_all)] pub fn peer_id(&self) -> String { if self.is_seed_peer { diff --git a/dragonfly-client/Cargo.toml b/dragonfly-client/Cargo.toml index 5dc7e659..3248acbc 100644 --- a/dragonfly-client/Cargo.toml +++ b/dragonfly-client/Cargo.toml @@ -63,6 +63,7 @@ uuid.workspace = true percent-encoding.workspace = true tokio-rustls.workspace = true serde_json.workspace = true +lru.workspace = true lazy_static = "1.5" tracing-log = "0.2" tracing-subscriber = { version = "0.3", features = ["env-filter", "time", "chrono"] } diff --git a/dragonfly-client/src/metrics/mod.rs b/dragonfly-client/src/metrics/mod.rs index 00a37cec..d3087a2e 100644 --- a/dragonfly-client/src/metrics/mod.rs +++ b/dragonfly-client/src/metrics/mod.rs @@ -178,6 +178,21 @@ lazy_static! { &[] ).expect("metric can be created"); + /// PROXY_REQUEST_VIA_DFDAEMON_COUNT is used to count the number of proxy requset via dfdaemon. + pub static ref PROXY_REQUEST_VIA_DFDAEMON_COUNT: IntCounterVec = + IntCounterVec::new( + Opts::new("proxy_request_by_dfdaemon_total", "Counter of the number of the proxy request by dfdaemon.").namespace(dragonfly_client_config::SERVICE_NAME).subsystem(dragonfly_client_config::NAME), + &[] + ).expect("metric can be created"); + + /// PROXY_REQUEST_VIA_DFDAEMON_AND_CACHE_HITS_COUNT is used to count the number of proxy request via + /// dfdaemon and cache hits. + pub static ref PROXY_REQUEST_VIA_DFDAEMON_AND_CACHE_HITS_COUNT: IntCounterVec = + IntCounterVec::new( + Opts::new("proxy_request_via_dfdaemon_and_cache_hits_total", "Counter of the number of cache hits of the proxy request via dfdaemon.").namespace(dragonfly_client_config::SERVICE_NAME).subsystem(dragonfly_client_config::NAME), + &[] + ).expect("metric can be created"); + /// STAT_TASK_COUNT is used to count the number of stat tasks. pub static ref STAT_TASK_COUNT: IntCounterVec = IntCounterVec::new( @@ -312,6 +327,16 @@ fn register_custom_metrics() { .register(Box::new(PROXY_REQUEST_FAILURE_COUNT.clone())) .expect("metric can be registered"); + REGISTRY + .register(Box::new(PROXY_REQUEST_VIA_DFDAEMON_COUNT.clone())) + .expect("metric can be registered"); + + REGISTRY + .register(Box::new( + PROXY_REQUEST_VIA_DFDAEMON_AND_CACHE_HITS_COUNT.clone(), + )) + .expect("metric can be registered"); + REGISTRY .register(Box::new(STAT_TASK_COUNT.clone())) .expect("metric can be registered"); @@ -371,6 +396,8 @@ fn reset_custom_metrics() { BACKEND_REQUEST_DURATION.reset(); PROXY_REQUEST_COUNT.reset(); PROXY_REQUEST_FAILURE_COUNT.reset(); + PROXY_REQUEST_VIA_DFDAEMON_COUNT.reset(); + PROXY_REQUEST_VIA_DFDAEMON_AND_CACHE_HITS_COUNT.reset(); STAT_TASK_COUNT.reset(); STAT_TASK_FAILURE_COUNT.reset(); DELETE_TASK_COUNT.reset(); @@ -704,6 +731,21 @@ pub fn collect_proxy_request_failure_metrics() { PROXY_REQUEST_FAILURE_COUNT.with_label_values(&[]).inc(); } +/// collect_proxy_request_via_dfdaemon_metrics collects the proxy request via dfdaemon metrics. +pub fn collect_proxy_request_via_dfdaemon_metrics() { + PROXY_REQUEST_VIA_DFDAEMON_COUNT + .with_label_values(&[]) + .inc(); +} + +/// collect_proxy_request_via_dfdaemon_and_cache_hits_metrics collects the proxy request via +/// dfdaemon and cache hits metrics. +pub fn collect_proxy_request_via_dfdaemon_and_cache_hits_metrics() { + PROXY_REQUEST_VIA_DFDAEMON_AND_CACHE_HITS_COUNT + .with_label_values(&[]) + .inc(); +} + /// collect_stat_task_started_metrics collects the stat task started metrics. pub fn collect_stat_task_started_metrics(typ: i32) { STAT_TASK_COUNT diff --git a/dragonfly-client/src/proxy/cache.rs b/dragonfly-client/src/proxy/cache.rs new file mode 100644 index 00000000..6b13c977 --- /dev/null +++ b/dragonfly-client/src/proxy/cache.rs @@ -0,0 +1,121 @@ +/* + * Copyright 2024 The Dragonfly Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use crate::resource::task::Task; +use dragonfly_api::dfdaemon::v2::DownloadTaskRequest; +use dragonfly_client_core::{Error, Result}; +use lru::LruCache; +use std::cmp::{max, min}; +use std::num::NonZeroUsize; +use std::sync::{Arc, Mutex}; + +/// Cache is the cache for storing http response by LRU algorithm. +#[derive(Clone)] +pub struct Cache { + /// pieces stores the piece cache data with piece id and value. + pieces: Arc>>, + + /// task is the task manager. + task: Arc, +} + +/// Cache implements the cache for storing http response by LRU algorithm. +impl Cache { + /// new creates a new cache with the specified capacity. + pub fn new(capacity: usize, task: Arc) -> Result { + let capacity = NonZeroUsize::new(capacity).ok_or(Error::InvalidParameter)?; + let pieces = Arc::new(Mutex::new(LruCache::new(capacity))); + Ok(Cache { pieces, task }) + } + + /// get_by_request gets the content from the cache by the request. + pub async fn get_by_request( + &self, + request: &DownloadTaskRequest, + ) -> Result> { + let Some(download) = &request.download else { + return Err(Error::InvalidParameter); + }; + + let task_id = self.task.id_generator.task_id( + &download.url, + download.tag.as_deref(), + download.application.as_deref(), + download.filtered_query_params.clone(), + )?; + + let Some(task) = self.task.get(&task_id).await? else { + return Ok(None); + }; + + let (Some(content_length), Some(piece_length)) = + (task.content_length(), task.piece_length()) + else { + return Ok(None); + }; + + let range = download.range; + let interested_pieces = + self.task + .piece + .calculate_interested(piece_length, content_length, range)?; + + // Calculate the content capacity based on the interested pieces and push the content into + // the bytes. + let content_capacity = interested_pieces.len() * piece_length as usize; + let mut content = bytes::BytesMut::with_capacity(content_capacity); + for interested_piece in interested_pieces { + let piece_id = self.task.piece.id(&task_id, interested_piece.number); + let Some(piece_content) = self.get_piece(&piece_id) else { + return Ok(None); + }; + + // Calculate the target offset and length based on the range. + let (target_offset, target_length) = if let Some(range) = range { + let target_offset = + max(interested_piece.offset, range.start) - interested_piece.offset; + let target_length = min( + interested_piece.offset + interested_piece.length - 1, + range.start + range.length - 1, + ) - target_offset + + 1; + (target_offset as usize, target_length as usize) + } else { + (0, interested_piece.length as usize) + }; + + let piece_content = piece_content.slice(target_offset..target_offset + target_length); + content.extend_from_slice(&piece_content); + } + + Ok(Some(content.freeze())) + } + + /// get gets the piece content from the cache. + pub fn get_piece(&self, id: &str) -> Option { + let mut pieces = self.pieces.lock().unwrap(); + pieces.get(id).cloned() + } + + /// add create the piece content into the cache, if the key already exists, no operation will + /// be performed. + pub fn add_piece(&self, id: &str, content: bytes::Bytes) { + let mut pieces = self.pieces.lock().unwrap(); + if !pieces.contains(id) { + pieces.put(id.to_string(), content); + } + } +} diff --git a/dragonfly-client/src/proxy/mod.rs b/dragonfly-client/src/proxy/mod.rs index b77d1379..595dff1b 100644 --- a/dragonfly-client/src/proxy/mod.rs +++ b/dragonfly-client/src/proxy/mod.rs @@ -16,12 +16,15 @@ use crate::grpc::dfdaemon_download::DfdaemonDownloadClient; use crate::metrics::{ - collect_proxy_request_failure_metrics, collect_proxy_request_started_metrics, + collect_download_piece_traffic_metrics, collect_proxy_request_failure_metrics, + collect_proxy_request_started_metrics, + collect_proxy_request_via_dfdaemon_and_cache_hits_metrics, + collect_proxy_request_via_dfdaemon_metrics, }; use crate::resource::task::Task; use crate::shutdown; use bytes::Bytes; -use dragonfly_api::common::v2::{Download, TaskType}; +use dragonfly_api::common::v2::{Download, TaskType, TrafficType}; use dragonfly_api::dfdaemon::v2::{ download_task_response, DownloadTaskRequest, DownloadTaskStartedResponse, }; @@ -34,7 +37,7 @@ use dragonfly_client_util::{ tls::{generate_self_signed_certs_by_ca_cert, generate_simple_self_signed_certs, NoVerifier}, }; use futures_util::TryStreamExt; -use http_body_util::{combinators::BoxBody, BodyExt, Empty, StreamBody}; +use http_body_util::{combinators::BoxBody, BodyExt, Empty, Full, StreamBody}; use hyper::body::Frame; use hyper::client::conn::http1::Builder as ClientBuilder; use hyper::server::conn::http1::Builder as ServerBuilder; @@ -58,9 +61,10 @@ use tokio::net::TcpStream; use tokio::sync::mpsc; use tokio::time::sleep; use tokio_rustls::TlsAcceptor; -use tokio_util::io::ReaderStream; +use tokio_util::io::{InspectReader, ReaderStream}; use tracing::{debug, error, info, instrument, Instrument, Span}; +pub mod cache; pub mod header; /// Response is the response of the proxy server. @@ -71,6 +75,9 @@ pub struct Proxy { /// config is the configuration of the dfdaemon. config: Arc, + /// cache is the cache manager for storing the piece content. + cache: Arc, + /// task is the task manager. task: Arc, @@ -103,6 +110,7 @@ impl Proxy { ) -> Self { let mut proxy = Self { config: config.clone(), + cache: Arc::new(cache::Cache::new(config.proxy.cache_capacity, task.clone()).unwrap()), task: task.clone(), addr: SocketAddr::new(config.proxy.server.ip.unwrap(), config.proxy.server.port), registry_cert: Arc::new(None), @@ -163,6 +171,7 @@ impl Proxy { debug!("accepted connection from {}", remote_address); let config = self.config.clone(); + let cache = self.cache.clone(); let task = self.task.clone(); let dfdaemon_download_client = dfdaemon_download_client.clone(); @@ -175,7 +184,7 @@ impl Proxy { .title_case_headers(true) .serve_connection( io, - service_fn(move |request| handler(config.clone(), task.clone(), request, dfdaemon_download_client.clone(), registry_cert.clone(), server_ca_cert.clone())), + service_fn(move |request| handler(config.clone(), cache.clone(), task.clone(), request, dfdaemon_download_client.clone(), registry_cert.clone(), server_ca_cert.clone())), ) .with_upgrades() .await @@ -199,6 +208,7 @@ impl Proxy { #[instrument(skip_all, fields(uri, method))] pub async fn handler( config: Arc, + cache: Arc, task: Arc, request: Request, dfdaemon_download_client: DfdaemonDownloadClient, @@ -215,6 +225,7 @@ pub async fn handler( if Method::CONNECT == request.method() { return registry_mirror_https_handler( config, + cache, task, request, dfdaemon_download_client, @@ -226,6 +237,7 @@ pub async fn handler( return registry_mirror_http_handler( config, + cache, task, request, dfdaemon_download_client, @@ -242,6 +254,7 @@ pub async fn handler( if Method::CONNECT == request.method() { return https_handler( config, + cache, task, request, dfdaemon_download_client, @@ -253,6 +266,7 @@ pub async fn handler( http_handler( config, + cache, task, request, dfdaemon_download_client, @@ -265,6 +279,7 @@ pub async fn handler( #[instrument(skip_all)] pub async fn registry_mirror_http_handler( config: Arc, + cache: Arc, task: Arc, request: Request, dfdaemon_download_client: DfdaemonDownloadClient, @@ -273,6 +288,7 @@ pub async fn registry_mirror_http_handler( let request = make_registry_mirror_request(config.clone(), request)?; return http_handler( config, + cache, task, request, dfdaemon_download_client, @@ -285,6 +301,7 @@ pub async fn registry_mirror_http_handler( #[instrument(skip_all)] pub async fn registry_mirror_https_handler( config: Arc, + cache: Arc, task: Arc, request: Request, dfdaemon_download_client: DfdaemonDownloadClient, @@ -294,6 +311,7 @@ pub async fn registry_mirror_https_handler( let request = make_registry_mirror_request(config.clone(), request)?; return https_handler( config, + cache, task, request, dfdaemon_download_client, @@ -307,6 +325,7 @@ pub async fn registry_mirror_https_handler( #[instrument(skip_all)] pub async fn http_handler( config: Arc, + cache: Arc, task: Arc, request: Request, dfdaemon_download_client: DfdaemonDownloadClient, @@ -340,7 +359,15 @@ pub async fn http_handler( request.method(), request_uri ); - return proxy_by_dfdaemon(config, task, &rule, request, dfdaemon_download_client).await; + return proxy_via_dfdaemon( + config, + cache, + task, + &rule, + request, + dfdaemon_download_client, + ) + .await; } // If the request header contains the X-Dragonfly-Use-P2P header, proxy the request via the @@ -351,8 +378,9 @@ pub async fn http_handler( request.method(), request_uri ); - return proxy_by_dfdaemon( + return proxy_via_dfdaemon( config, + cache, task, &Rule::default(), request, @@ -367,7 +395,7 @@ pub async fn http_handler( request.method(), request.uri() ); - return proxy_https(request, registry_cert).await; + return proxy_via_https(request, registry_cert).await; } info!( @@ -375,13 +403,14 @@ pub async fn http_handler( request.method(), request.uri() ); - return proxy_http(request).await; + return proxy_via_http(request).await; } /// https_handler handles the https request by client. #[instrument(skip_all)] pub async fn https_handler( config: Arc, + cache: Arc, task: Arc, request: Request, dfdaemon_download_client: DfdaemonDownloadClient, @@ -398,6 +427,7 @@ pub async fn https_handler( Ok(upgraded) => { if let Err(e) = upgraded_tunnel( config, + cache, task, upgraded, host, @@ -423,9 +453,11 @@ pub async fn https_handler( /// upgraded_tunnel handles the upgraded connection. If the ca_cert is not set, use the /// self-signed certificate. Otherwise, use the CA certificate to sign the /// self-signed certificate. +#[allow(clippy::too_many_arguments)] #[instrument(skip_all)] async fn upgraded_tunnel( config: Arc, + cache: Arc, task: Arc, upgraded: Upgraded, host: String, @@ -473,6 +505,7 @@ async fn upgraded_tunnel( service_fn(move |request| { upgraded_handler( config.clone(), + cache.clone(), task.clone(), host.clone(), request, @@ -494,6 +527,7 @@ async fn upgraded_tunnel( #[instrument(skip_all, fields(uri, method))] pub async fn upgraded_handler( config: Arc, + cache: Arc, task: Arc, host: String, mut request: Request, @@ -536,7 +570,15 @@ pub async fn upgraded_handler( request.method(), request_uri ); - return proxy_by_dfdaemon(config, task, &rule, request, dfdaemon_download_client).await; + return proxy_via_dfdaemon( + config, + cache, + task, + &rule, + request, + dfdaemon_download_client, + ) + .await; } // If the request header contains the X-Dragonfly-Use-P2P header, proxy the request via the @@ -547,8 +589,9 @@ pub async fn upgraded_handler( request.method(), request_uri ); - return proxy_by_dfdaemon( + return proxy_via_dfdaemon( config, + cache, task, &Rule::default(), request, @@ -563,7 +606,7 @@ pub async fn upgraded_handler( request.method(), request.uri() ); - return proxy_https(request, registry_cert).await; + return proxy_via_https(request, registry_cert).await; } info!( @@ -571,18 +614,22 @@ pub async fn upgraded_handler( request.method(), request.uri() ); - return proxy_http(request).await; + return proxy_via_http(request).await; } -/// proxy_by_dfdaemon proxies the request via the dfdaemon. +/// proxy_via_dfdaemon proxies the request via the dfdaemon. #[instrument(skip_all, fields(host_id, task_id, peer_id))] -async fn proxy_by_dfdaemon( +async fn proxy_via_dfdaemon( config: Arc, + cache: Arc, task: Arc, rule: &Rule, request: Request, dfdaemon_download_client: DfdaemonDownloadClient, ) -> ClientResult { + // Collect the metrics for the proxy request via dfdaemon. + collect_proxy_request_via_dfdaemon_metrics(); + // Make the download task request. let download_task_request = match make_download_task_request(config.clone(), rule, request) { Ok(download_task_request) => download_task_request, @@ -595,6 +642,35 @@ async fn proxy_by_dfdaemon( } }; + // Get the content from the cache by the request. + match cache.get_by_request(&download_task_request).await { + Ok(None) => { + debug!("cache miss"); + } + Ok(Some(content)) => { + debug!("cache hit"); + + // Collect the download piece traffic metrics and the proxy request via dfdaemon and + // cache hits metrics. + collect_proxy_request_via_dfdaemon_and_cache_hits_metrics(); + collect_download_piece_traffic_metrics( + &TrafficType::LocalPeer, + TaskType::Standard as i32, + content.len() as u64, + ); + + let body_boxed = Full::new(content).map_err(ClientError::from).boxed(); + return Ok(Response::new(body_boxed)); + } + Err(err) => { + error!("get content from cache failed: {}", err); + return Ok(make_error_response( + http::StatusCode::INTERNAL_SERVER_ERROR, + None, + )); + } + } + // Download the task by the dfdaemon download client. let response = match dfdaemon_download_client .download_task(download_task_request) @@ -760,13 +836,21 @@ async fn proxy_by_dfdaemon( let piece_reader = BufReader::with_capacity(read_buffer_size, piece_reader); - // Write the piece data to the pipe in order. + // Write the piece data to the pipe in order and store the piece reader + // in the cache. finished_piece_readers.insert(piece.number, piece_reader); while let Some(piece_reader) = finished_piece_readers.get_mut(&need_piece_number) { debug!("copy piece {} to stream", need_piece_number); - if let Err(err) = tokio::io::copy(piece_reader, &mut writer).await { + let mut content = + bytes::BytesMut::with_capacity(piece.length as usize); + + let mut tee = InspectReader::new(piece_reader, |bytes| { + content.extend_from_slice(bytes); + }); + + if let Err(err) = tokio::io::copy(&mut tee, &mut writer).await { error!("download piece reader error: {}", err); if let Err(err) = writer.shutdown().await { error!("writer shutdown error: {}", err); @@ -775,6 +859,11 @@ async fn proxy_by_dfdaemon( return; } + cache.add_piece( + &task.piece.id(&message.task_id, need_piece_number), + content.freeze(), + ); + need_piece_number += 1; } } else { @@ -851,9 +940,9 @@ async fn proxy_by_dfdaemon( } } -/// proxy_http proxies the HTTP request directly to the remote server. +/// proxy_via_http proxies the HTTP request directly to the remote server. #[instrument(skip_all)] -async fn proxy_http(request: Request) -> ClientResult { +async fn proxy_via_http(request: Request) -> ClientResult { let Some(host) = request.uri().host() else { error!("CONNECT host is not socket addr: {:?}", request.uri()); return Ok(make_error_response(http::StatusCode::BAD_REQUEST, None)); @@ -878,9 +967,9 @@ async fn proxy_http(request: Request) -> ClientResult, registry_cert: Arc>>>, ) -> ClientResult { @@ -935,7 +1024,6 @@ fn make_registry_mirror_request( .parse::() .or_err(ErrorType::ParseError)?, }; - header::get_registry(&header); *request.uri_mut() = registry_mirror_uri.clone(); diff --git a/dragonfly-client/src/resource/piece.rs b/dragonfly-client/src/resource/piece.rs index f1789620..4699ae7b 100644 --- a/dragonfly-client/src/resource/piece.rs +++ b/dragonfly-client/src/resource/piece.rs @@ -110,6 +110,7 @@ impl Piece { } /// id generates a new piece id. + #[inline] #[instrument(skip_all)] pub fn id(&self, task_id: &str, number: u32) -> String { self.storage.piece_id(task_id, number) diff --git a/dragonfly-client/src/resource/task.rs b/dragonfly-client/src/resource/task.rs index 02c18620..988617af 100644 --- a/dragonfly-client/src/resource/task.rs +++ b/dragonfly-client/src/resource/task.rs @@ -116,6 +116,11 @@ impl Task { } } + /// get gets the metadata of the task. + pub async fn get(&self, id: &str) -> ClientResult> { + self.storage.get_task(id) + } + /// download_started updates the metadata of the task when the task downloads started. #[instrument(skip_all)] pub async fn download_started(