diff --git a/dragonfly-client-storage/src/content.rs b/dragonfly-client-storage/src/content.rs index 95fc11d3..63c34f35 100644 --- a/dragonfly-client-storage/src/content.rs +++ b/dragonfly-client-storage/src/content.rs @@ -212,12 +212,13 @@ impl Content { #[instrument(skip_all)] pub async fn read_task_by_range(&self, task_id: &str, range: Range) -> Result { let task_path = self.get_task_path(task_id); - let mut from_f = File::open(task_path.as_path()).await.map_err(|err| { + let from_f = File::open(task_path.as_path()).await.map_err(|err| { error!("open {:?} failed: {}", task_path, err); err })?; + let mut f_reader = BufReader::with_capacity(self.config.storage.read_buffer_size, from_f); - from_f + f_reader .seek(SeekFrom::Start(range.start)) .await .map_err(|err| { @@ -225,7 +226,7 @@ impl Content { err })?; - let range_reader = from_f.take(range.length); + let range_reader = f_reader.take(range.length); Ok(range_reader) } @@ -251,10 +252,11 @@ impl Content { range: Option, ) -> Result { let task_path = self.get_task_path(task_id); - let mut f = File::open(task_path.as_path()).await.map_err(|err| { + let f = File::open(task_path.as_path()).await.map_err(|err| { error!("open {:?} failed: {}", task_path, err); err })?; + let mut f_reader = BufReader::with_capacity(self.config.storage.read_buffer_size, f); // Calculate the target offset and length based on the range. let (target_offset, target_length) = if let Some(range) = range { @@ -266,14 +268,71 @@ impl Content { (offset, length) }; - f.seek(SeekFrom::Start(target_offset)) + f_reader + .seek(SeekFrom::Start(target_offset)) + .await + .map_err(|err| { + error!("seek {:?} failed: {}", task_path, err); + err + })?; + + Ok(f_reader.take(target_length)) + } + + /// read_piece_with_dual_read return two readers, one is the range reader, and the other is the + /// full reader of the piece. It is used for cache the piece content to the proxy cache. + #[instrument(skip_all)] + pub async fn read_piece_with_dual_read( + &self, + task_id: &str, + offset: u64, + length: u64, + range: Option, + ) -> Result<(impl AsyncRead, impl AsyncRead)> { + let task_path = self.get_task_path(task_id); + + // Calculate the target offset and length based on the range. + let (target_offset, target_length) = if let Some(range) = range { + let target_offset = max(offset, range.start); + let target_length = + min(offset + length - 1, range.start + range.length - 1) - target_offset + 1; + (target_offset, target_length) + } else { + (offset, length) + }; + + let f = File::open(task_path.as_path()).await.map_err(|err| { + error!("open {:?} failed: {}", task_path, err); + err + })?; + let mut f_range_reader = BufReader::with_capacity(self.config.storage.read_buffer_size, f); + + f_range_reader + .seek(SeekFrom::Start(target_offset)) + .await + .map_err(|err| { + error!("seek {:?} failed: {}", task_path, err); + err + })?; + let range_reader = f_range_reader.take(target_length); + + // Create full reader of the piece. + let f = File::open(task_path.as_path()).await.map_err(|err| { + error!("open {:?} failed: {}", task_path, err); + err + })?; + let mut f_reader = BufReader::with_capacity(self.config.storage.read_buffer_size, f); + + f_reader + .seek(SeekFrom::Start(offset)) .await .map_err(|err| { error!("seek {:?} failed: {}", task_path, err); err })?; + let reader = f_reader.take(length); - Ok(f.take(target_length)) + Ok((range_reader, reader)) } /// write_piece_with_crc32_castagnoli writes the piece to the content with crc32 castagnoli. diff --git a/dragonfly-client-storage/src/lib.rs b/dragonfly-client-storage/src/lib.rs index b546294f..b1aecc49 100644 --- a/dragonfly-client-storage/src/lib.rs +++ b/dragonfly-client-storage/src/lib.rs @@ -423,6 +423,54 @@ impl Storage { } } + /// upload_piece_with_dual_read returns the dual reader of the piece, one is the range reader, and the other is the + /// full reader of the piece. It is used for cache the piece content to the proxy cache. + #[instrument(skip_all)] + pub async fn upload_piece_with_dual_read( + &self, + piece_id: &str, + task_id: &str, + range: Option, + ) -> Result<(impl AsyncRead, impl AsyncRead)> { + // Wait for the piece to be finished. + self.wait_for_piece_finished(piece_id).await?; + + // Start uploading the task. + self.metadata.upload_task_started(task_id)?; + + // Get the piece metadata and return the content of the piece. + match self.metadata.get_piece(piece_id) { + Ok(Some(piece)) => { + match self + .content + .read_piece_with_dual_read(task_id, piece.offset, piece.length, range) + .await + { + Ok(dual_reader) => { + // Finish uploading the task. + self.metadata.upload_task_finished(task_id)?; + Ok(dual_reader) + } + Err(err) => { + // Failed uploading the task. + self.metadata.upload_task_failed(task_id)?; + Err(err) + } + } + } + Ok(None) => { + // Failed uploading the task. + self.metadata.upload_task_failed(task_id)?; + Err(Error::PieceNotFound(piece_id.to_string())) + } + Err(err) => { + // Failed uploading the task. + self.metadata.upload_task_failed(task_id)?; + Err(err) + } + } + } + /// get_piece returns the piece metadata. #[instrument(skip_all)] pub fn get_piece(&self, piece_id: &str) -> Result> { diff --git a/dragonfly-client/src/grpc/dfdaemon_upload.rs b/dragonfly-client/src/grpc/dfdaemon_upload.rs index 73b049aa..1e0abce9 100644 --- a/dragonfly-client/src/grpc/dfdaemon_upload.rs +++ b/dragonfly-client/src/grpc/dfdaemon_upload.rs @@ -812,7 +812,7 @@ impl DfdaemonUpload for DfdaemonUploadServerHandler { let mut reader = self .task .piece - .upload_from_local_peer_into_async_read( + .upload_from_local_into_async_read( piece_id.as_str(), task_id.as_str(), piece.length, diff --git a/dragonfly-client/src/proxy/cache.rs b/dragonfly-client/src/proxy/cache.rs index 6b13c977..6f0e4455 100644 --- a/dragonfly-client/src/proxy/cache.rs +++ b/dragonfly-client/src/proxy/cache.rs @@ -15,12 +15,15 @@ */ use crate::resource::task::Task; +use dragonfly_api::common::v2::Range; use dragonfly_api::dfdaemon::v2::DownloadTaskRequest; use dragonfly_client_core::{Error, Result}; +use dragonfly_client_util::http::{get_range, hashmap_to_headermap}; use lru::LruCache; use std::cmp::{max, min}; use std::num::NonZeroUsize; use std::sync::{Arc, Mutex}; +use tracing::error; /// Cache is the cache for storing http response by LRU algorithm. #[derive(Clone)] @@ -67,7 +70,14 @@ impl Cache { return Ok(None); }; - let range = download.range; + let Ok(request_header) = hashmap_to_headermap(&download.request_header) else { + return Ok(None); + }; + + let Ok(range) = get_range(&request_header, content_length) else { + return Ok(None); + }; + let interested_pieces = self.task .piece @@ -84,38 +94,69 @@ impl Cache { }; // Calculate the target offset and length based on the range. - let (target_offset, target_length) = if let Some(range) = range { - let target_offset = - max(interested_piece.offset, range.start) - interested_piece.offset; - let target_length = min( - interested_piece.offset + interested_piece.length - 1, - range.start + range.length - 1, - ) - target_offset - + 1; - (target_offset as usize, target_length as usize) - } else { - (0, interested_piece.length as usize) - }; + let (target_offset, target_length) = + self.calculate_piece_range(interested_piece.offset, interested_piece.length, range); + + let begin = target_offset; + let end = target_offset + target_length; + if begin >= piece_content.len() || end > piece_content.len() { + error!( + "invalid piece content, piece_id: {}, begin: {}, end: {}", + piece_id, begin, end + ); + + return Err(Error::InvalidParameter); + } - let piece_content = piece_content.slice(target_offset..target_offset + target_length); + let piece_content = piece_content.slice(begin..end); content.extend_from_slice(&piece_content); } Ok(Some(content.freeze())) } - /// get gets the piece content from the cache. + /// calculate_piece_range calculates the target offset and length based on the piece range and + /// request range. + fn calculate_piece_range( + &self, + piece_offset: u64, + piece_length: u64, + range: Option, + ) -> (usize, usize) { + if let Some(range) = range { + let target_offset = max(piece_offset, range.start) - piece_offset; + + let interested_piece_end = piece_offset + piece_length - 1; + let range_end = range.start + range.length - 1; + let target_length = + min(interested_piece_end, range_end) - target_offset - piece_offset + 1; + + (target_offset as usize, target_length as usize) + } else { + (0, piece_length as usize) + } + } + + /// get_piece gets the piece content from the cache. pub fn get_piece(&self, id: &str) -> Option { let mut pieces = self.pieces.lock().unwrap(); pieces.get(id).cloned() } - /// add create the piece content into the cache, if the key already exists, no operation will + /// add_piece create the piece content into the cache, if the key already exists, no operation will /// be performed. pub fn add_piece(&self, id: &str, content: bytes::Bytes) { let mut pieces = self.pieces.lock().unwrap(); - if !pieces.contains(id) { - pieces.put(id.to_string(), content); + if pieces.contains(id) { + return; } + + pieces.put(id.to_string(), content); + } + + /// contains_piece checks whether the piece exists in the cache. + pub fn contains_piece(&self, id: &str) -> bool { + let pieces = self.pieces.lock().unwrap(); + pieces.contains(id) } } diff --git a/dragonfly-client/src/proxy/mod.rs b/dragonfly-client/src/proxy/mod.rs index 9e932f2d..04e70562 100644 --- a/dragonfly-client/src/proxy/mod.rs +++ b/dragonfly-client/src/proxy/mod.rs @@ -55,13 +55,13 @@ use std::collections::HashMap; use std::net::SocketAddr; use std::sync::Arc; use std::time::Duration; -use tokio::io::{AsyncWriteExt, BufReader}; +use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader}; use tokio::net::TcpListener; use tokio::net::TcpStream; use tokio::sync::{mpsc, Barrier}; use tokio::time::sleep; use tokio_rustls::TlsAcceptor; -use tokio_util::io::{InspectReader, ReaderStream}; +use tokio_util::io::ReaderStream; use tracing::{debug, error, info, instrument, Instrument, Span}; pub mod cache; @@ -821,9 +821,9 @@ async fn proxy_via_dfdaemon( return; }; - let piece_reader = match task + let (piece_range_reader, piece_reader) = match task .piece - .download_from_local_peer_into_async_read( + .download_from_local_into_dual_async_read( task.piece .id(message.task_id.as_str(), piece.number) .as_str(), @@ -835,7 +835,7 @@ async fn proxy_via_dfdaemon( ) .await { - Ok(piece_reader) => piece_reader, + Ok(dual_reader) => dual_reader, Err(err) => { error!("download piece reader error: {}", err); if let Err(err) = writer.shutdown().await { @@ -847,24 +847,22 @@ async fn proxy_via_dfdaemon( }; // Use a buffer to read the piece. - let piece_reader = - BufReader::with_capacity(read_buffer_size, piece_reader); + let piece_range_reader = + BufReader::with_capacity(read_buffer_size, piece_range_reader); // Write the piece data to the pipe in order and store the piece reader // in the cache. - finished_piece_readers.insert(piece.number, piece_reader); - while let Some(piece_reader) = - finished_piece_readers.get_mut(&need_piece_number) + finished_piece_readers + .insert(piece.number, (piece_range_reader, piece_reader)); + while let Some((mut piece_range_reader, piece_reader)) = + finished_piece_readers + .get_mut(&need_piece_number) + .map(|(range_reader, reader)| (range_reader, reader)) { debug!("copy piece {} to stream", need_piece_number); - let mut content = - bytes::BytesMut::with_capacity(piece.length as usize); - - let mut tee = InspectReader::new(piece_reader, |bytes| { - content.extend_from_slice(bytes); - }); - - if let Err(err) = tokio::io::copy(&mut tee, &mut writer).await { + if let Err(err) = + tokio::io::copy(&mut piece_range_reader, &mut writer).await + { error!("download piece reader error: {}", err); if let Err(err) = writer.shutdown().await { error!("writer shutdown error: {}", err); @@ -873,10 +871,28 @@ async fn proxy_via_dfdaemon( return; } - cache.add_piece( - &task.piece.id(&message.task_id, need_piece_number), - content.freeze(), - ); + // If the piece is not in the cache, add it to the cache. + let piece_id = + task.piece.id(message.task_id.as_str(), need_piece_number); + + if !cache.contains_piece(&piece_id) { + let mut content = + bytes::BytesMut::with_capacity(piece.length as usize); + loop { + let n = match piece_reader.read_buf(&mut content).await { + Ok(n) => n, + Err(err) => { + error!("read piece reader error: {}", err); + break; + } + }; + + if n == 0 { + cache.add_piece(&piece_id, content.freeze()); + break; + } + } + } need_piece_number += 1; } diff --git a/dragonfly-client/src/resource/persistent_cache_task.rs b/dragonfly-client/src/resource/persistent_cache_task.rs index e6c43027..3a22634c 100644 --- a/dragonfly-client/src/resource/persistent_cache_task.rs +++ b/dragonfly-client/src/resource/persistent_cache_task.rs @@ -384,10 +384,10 @@ impl PersistentCacheTask { err })?; - // Download the pieces from the local peer. - debug!("download the pieces from local peer"); + // Download the pieces from the local. + debug!("download the pieces from local"); let finished_pieces = match self - .download_partial_from_local_peer( + .download_partial_from_local( task, host_id, peer_id, @@ -398,7 +398,7 @@ impl PersistentCacheTask { { Ok(finished_pieces) => finished_pieces, Err(err) => { - error!("download from local peer error: {:?}", err); + error!("download from local error: {:?}", err); download_progress_tx .send_timeout(Err(Status::internal(err.to_string())), REQUEST_TIMEOUT) .await @@ -422,7 +422,7 @@ impl PersistentCacheTask { // Check if all pieces are downloaded. if interested_pieces.is_empty() { - info!("all pieces are downloaded from local peer"); + info!("all pieces are downloaded from local"); return Ok(()); }; debug!("download the pieces with scheduler"); @@ -1015,10 +1015,10 @@ impl PersistentCacheTask { Ok(finished_pieces) } - /// download_partial_from_local_peer downloads a partial persistent cache task from a local peer. + /// download_partial_from_local downloads a partial persistent cache task from a local. #[allow(clippy::too_many_arguments)] #[instrument(skip_all)] - async fn download_partial_from_local_peer( + async fn download_partial_from_local( &self, task: &metadata::PersistentCacheTask, host_id: &str, @@ -1029,7 +1029,7 @@ impl PersistentCacheTask { // Initialize the finished pieces. let mut finished_pieces: Vec = Vec::new(); - // Download the piece from the local peer. + // Download the piece from the local. for interested_piece in interested_pieces { let piece_id = self .storage @@ -1048,10 +1048,10 @@ impl PersistentCacheTask { } }; - // Fake the download from the local peer. + // Fake the download from the local. self.piece - .download_from_local_peer(task.id.as_str(), piece.length); - info!("finished piece {} from local peer", piece_id); + .download_from_local(task.id.as_str(), piece.length); + info!("finished piece {} from local", piece_id); // Construct the piece. let piece = Piece { diff --git a/dragonfly-client/src/resource/piece.rs b/dragonfly-client/src/resource/piece.rs index 440a5f2c..9a908115 100644 --- a/dragonfly-client/src/resource/piece.rs +++ b/dragonfly-client/src/resource/piece.rs @@ -319,9 +319,9 @@ impl Piece { (content_length as f64 / piece_length as f64).ceil() as u32 } - /// upload_from_local_peer_into_async_read uploads a single piece from a local peer. + /// upload_from_local_into_async_read uploads a single piece from local cache. #[instrument(skip_all, fields(piece_id))] - pub async fn upload_from_local_peer_into_async_read( + pub async fn upload_from_local_into_async_read( &self, piece_id: &str, task_id: &str, @@ -349,9 +349,40 @@ impl Piece { }) } - /// download_from_local_peer_into_async_read downloads a single piece from a local peer. + /// upload_from_local_into_async_read. It will return two readers, one is the range reader, and the other is the + /// full reader of the piece. #[instrument(skip_all, fields(piece_id))] - pub async fn download_from_local_peer_into_async_read( + pub async fn upload_from_local_into_dual_async_read( + &self, + piece_id: &str, + task_id: &str, + length: u64, + range: Option, + disable_rate_limit: bool, + ) -> Result<(impl AsyncRead, impl AsyncRead)> { + // Span record the piece_id. + Span::current().record("piece_id", piece_id); + + // Acquire the upload rate limiter. + if !disable_rate_limit { + self.upload_rate_limiter.acquire(length as usize).await; + } + + // Upload the piece content. + self.storage + .upload_piece_with_dual_read(piece_id, task_id, range) + .await + .inspect(|_reader| { + collect_upload_piece_traffic_metrics( + self.id_generator.task_type(task_id) as i32, + length, + ); + }) + } + + /// download_from_local_into_async_read downloads a single piece from local cache. + #[instrument(skip_all, fields(piece_id))] + pub async fn download_from_local_into_async_read( &self, piece_id: &str, task_id: &str, @@ -378,10 +409,42 @@ impl Piece { self.storage.upload_piece(piece_id, task_id, range).await } - /// download_from_local_peer downloads a single piece from a local peer. Fake the download piece - /// from the local peer, just collect the metrics. + /// download_from_local_into_dual_async_read returns two readers, one is the range reader, and + /// the other is the full reader of the piece. + #[instrument(skip_all, fields(piece_id))] + pub async fn download_from_local_into_dual_async_read( + &self, + piece_id: &str, + task_id: &str, + length: u64, + range: Option, + disable_rate_limit: bool, + is_prefetch: bool, + ) -> Result<(impl AsyncRead, impl AsyncRead)> { + // Span record the piece_id. + Span::current().record("piece_id", piece_id); + + // Acquire the download rate limiter. + if !disable_rate_limit { + if is_prefetch { + // Acquire the prefetch rate limiter. + self.prefetch_rate_limiter.acquire(length as usize).await; + } else { + // Acquire the download rate limiter. + self.download_rate_limiter.acquire(length as usize).await; + } + } + + // Upload the piece content. + self.storage + .upload_piece_with_dual_read(piece_id, task_id, range) + .await + } + + /// download_from_local downloads a single piece from local cache. Fake the download piece + /// from the local cache, just collect the metrics. #[instrument(skip_all)] - pub fn download_from_local_peer(&self, task_id: &str, length: u64) { + pub fn download_from_local(&self, task_id: &str, length: u64) { collect_download_piece_traffic_metrics( &TrafficType::LocalPeer, self.id_generator.task_type(task_id) as i32, diff --git a/dragonfly-client/src/resource/task.rs b/dragonfly-client/src/resource/task.rs index 0e5dadec..da126ea3 100644 --- a/dragonfly-client/src/resource/task.rs +++ b/dragonfly-client/src/resource/task.rs @@ -342,10 +342,10 @@ impl Task { err })?; - // Download the pieces from the local peer. - debug!("download the pieces from local peer"); + // Download the pieces from the local. + debug!("download the pieces from local"); let finished_pieces = match self - .download_partial_from_local_peer( + .download_partial_from_local( task, host_id, peer_id, @@ -356,7 +356,7 @@ impl Task { { Ok(finished_pieces) => finished_pieces, Err(err) => { - error!("download from local peer error: {:?}", err); + error!("download from local error: {:?}", err); return Err(err); } }; @@ -375,7 +375,7 @@ impl Task { // Check if all pieces are downloaded. if interested_pieces.is_empty() { - info!("all pieces are downloaded from local peer"); + info!("all pieces are downloaded from local"); return Ok(()); }; debug!("download the pieces with scheduler"); @@ -1202,7 +1202,7 @@ impl Task { // Initialize the finished pieces. let mut finished_pieces: Vec = Vec::new(); - // Download the piece from the local peer. + // Download the piece from the local. let mut join_set = JoinSet::new(); let semaphore = Arc::new(Semaphore::new( self.config.download.concurrent_piece_count as usize, @@ -1436,10 +1436,10 @@ impl Task { Ok(finished_pieces) } - /// download_partial_from_local_peer downloads a partial task from a local peer. + /// download_partial_from_local downloads a partial task from a local. #[allow(clippy::too_many_arguments)] #[instrument(skip_all)] - async fn download_partial_from_local_peer( + async fn download_partial_from_local( &self, task: &metadata::Task, host_id: &str, @@ -1453,7 +1453,7 @@ impl Task { // Initialize the finished pieces. let mut finished_pieces: Vec = Vec::new(); - // Download the piece from the local peer. + // Download the piece from the local. for interested_piece in interested_pieces { let piece_id = self.storage.piece_id(task_id, interested_piece.number); @@ -1470,9 +1470,9 @@ impl Task { } }; - // Fake the download from the local peer. - self.piece.download_from_local_peer(task_id, piece.length); - info!("finished piece {} from local peer", piece_id,); + // Fake the download from the local. + self.piece.download_from_local(task_id, piece.length); + info!("finished piece {} from local", piece_id,); // Construct the piece. let piece = Piece {