From e54b93dc7d73159e70b60245eefde7590f596949 Mon Sep 17 00:00:00 2001 From: Gaius Date: Fri, 11 Oct 2024 17:39:00 +0800 Subject: [PATCH] feat: avoid the out stream is aborted in proxy and add calculate_interested test (#769) Signed-off-by: Gaius --- Cargo.lock | 16 ++-- Cargo.toml | 16 ++-- dragonfly-client/src/proxy/mod.rs | 6 ++ dragonfly-client/src/resource/piece.rs | 106 ++++++++++++++++++++++++- dragonfly-client/src/resource/task.rs | 44 +++++----- 5 files changed, 149 insertions(+), 39 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e1663096..3c72fc49 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -856,7 +856,7 @@ dependencies = [ [[package]] name = "dragonfly-client" -version = "0.1.110" +version = "0.1.111" dependencies = [ "anyhow", "blake3", @@ -928,7 +928,7 @@ dependencies = [ [[package]] name = "dragonfly-client-backend" -version = "0.1.110" +version = "0.1.111" dependencies = [ "dragonfly-api", "dragonfly-client-core", @@ -951,7 +951,7 @@ dependencies = [ [[package]] name = "dragonfly-client-config" -version = "0.1.110" +version = "0.1.111" dependencies = [ "bytesize", "bytesize-serde", @@ -974,7 +974,7 @@ dependencies = [ [[package]] name = "dragonfly-client-core" -version = "0.1.110" +version = "0.1.111" dependencies = [ "hyper 1.4.1", "hyper-util", @@ -989,7 +989,7 @@ dependencies = [ [[package]] name = "dragonfly-client-init" -version = "0.1.110" +version = "0.1.111" dependencies = [ "anyhow", "clap", @@ -1005,7 +1005,7 @@ dependencies = [ [[package]] name = "dragonfly-client-storage" -version = "0.1.110" +version = "0.1.111" dependencies = [ "base16ct", "bincode", @@ -1030,7 +1030,7 @@ dependencies = [ [[package]] name = "dragonfly-client-util" -version = "0.1.110" +version = "0.1.111" dependencies = [ "base16ct", "blake3", @@ -1380,7 +1380,7 @@ dependencies = [ [[package]] name = "hdfs" -version = "0.1.110" +version = "0.1.111" dependencies = [ "dragonfly-client-backend", "dragonfly-client-core", diff --git a/Cargo.toml b/Cargo.toml index 3e15e855..5d2ad76a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,7 +12,7 @@ members = [ ] [workspace.package] -version = "0.1.110" +version = "0.1.111" authors = ["The Dragonfly Developers"] homepage = "https://d7y.io/" repository = "https://github.com/dragonflyoss/client.git" @@ -22,13 +22,13 @@ readme = "README.md" edition = "2021" [workspace.dependencies] -dragonfly-client = { path = "dragonfly-client", version = "0.1.110" } -dragonfly-client-core = { path = "dragonfly-client-core", version = "0.1.110" } -dragonfly-client-config = { path = "dragonfly-client-config", version = "0.1.110" } -dragonfly-client-storage = { path = "dragonfly-client-storage", version = "0.1.110" } -dragonfly-client-backend = { path = "dragonfly-client-backend", version = "0.1.110" } -dragonfly-client-util = { path = "dragonfly-client-util", version = "0.1.110" } -dragonfly-client-init = { path = "dragonfly-client-init", version = "0.1.110" } +dragonfly-client = { path = "dragonfly-client", version = "0.1.111" } +dragonfly-client-core = { path = "dragonfly-client-core", version = "0.1.111" } +dragonfly-client-config = { path = "dragonfly-client-config", version = "0.1.111" } +dragonfly-client-storage = { path = "dragonfly-client-storage", version = "0.1.111" } +dragonfly-client-backend = { path = "dragonfly-client-backend", version = "0.1.111" } +dragonfly-client-util = { path = "dragonfly-client-util", version = "0.1.111" } +dragonfly-client-init = { path = "dragonfly-client-init", version = "0.1.111" } thiserror = "1.0" dragonfly-api = "=2.0.164" reqwest = { version = "0.12.4", features = ["stream", "native-tls", "default-tls", "rustls-tls"] } diff --git a/dragonfly-client/src/proxy/mod.rs b/dragonfly-client/src/proxy/mod.rs index 4a0ae98f..f4d67e83 100644 --- a/dragonfly-client/src/proxy/mod.rs +++ b/dragonfly-client/src/proxy/mod.rs @@ -57,10 +57,12 @@ use rustls_pki_types::CertificateDer; use std::collections::HashMap; use std::net::SocketAddr; use std::sync::Arc; +use std::time::Duration; use tokio::io::{AsyncWriteExt, BufReader}; use tokio::net::TcpListener; use tokio::net::TcpStream; use tokio::sync::mpsc; +use tokio::time::sleep; use tokio_rustls::TlsAcceptor; use tokio_util::io::ReaderStream; use tracing::{error, info, instrument, Span}; @@ -709,6 +711,10 @@ async fn proxy_by_dfdaemon( download_task_response, )) = message.response { + // Sleep for a while to avoid the out stream is aborted. If the task is small, proxy read the piece + // before the task download is finished. It will cause `user body write aborted` error. + sleep(Duration::from_millis(1)).await; + // Send the none response to the client, if the first piece is received. if !initialized { info!("first piece received, send response"); diff --git a/dragonfly-client/src/resource/piece.rs b/dragonfly-client/src/resource/piece.rs index 2c23b830..83832569 100644 --- a/dragonfly-client/src/resource/piece.rs +++ b/dragonfly-client/src/resource/piece.rs @@ -117,7 +117,6 @@ impl Piece { } /// calculate_interested calculates the interested pieces by content_length and range. - #[instrument(skip_all)] pub fn calculate_interested( &self, piece_length: u64, @@ -145,6 +144,7 @@ impl Piece { error!("piece not found"); Error::InvalidParameter })?; + piece.length = piece_length + content_length - offset; pieces.push(piece); break; @@ -197,6 +197,7 @@ impl Piece { error!("piece not found"); Error::InvalidParameter })?; + piece.length = piece_length + content_length - offset; pieces.push(piece); break; @@ -638,3 +639,106 @@ impl Piece { }) } } + +#[cfg(test)] +mod tests { + use super::*; + use tempfile::tempdir; + + #[tokio::test] + async fn should_calculate_interested() { + let temp_dir = tempdir().unwrap(); + + let config = Config::default(); + let config = Arc::new(config); + + let id_generator = + IDGenerator::new("127.0.0.1".to_string(), "localhost".to_string(), false); + let id_generator = Arc::new(id_generator); + + let storage = Storage::new( + config.clone(), + temp_dir.path(), + temp_dir.path().to_path_buf(), + ) + .await + .unwrap(); + let storage = Arc::new(storage); + + let backend_factory = BackendFactory::new(None).unwrap(); + let backend_factory = Arc::new(backend_factory); + + let piece = Piece::new( + config.clone(), + id_generator.clone(), + storage.clone(), + backend_factory.clone(), + ); + + let test_cases = vec![ + (1000, 1, None, 1, vec![0], 0, 1), + (1000, 5000, None, 5, vec![0, 1, 2, 3, 4], 4000, 1000), + (5000, 1000, None, 1, vec![0], 0, 1000), + ( + 10, + 101, + None, + 11, + vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10], + 100, + 1, + ), + ( + 1000, + 5000, + Some(Range { + start: 1500, + length: 2000, + }), + 3, + vec![1, 2, 3], + 3000, + 1000, + ), + ( + 1000, + 5000, + Some(Range { + start: 0, + length: 1, + }), + 1, + vec![0], + 0, + 1000, + ), + ]; + + for ( + piece_length, + content_length, + range, + expected_len, + expected_numbers, + expected_last_piece_offset, + expected_last_piece_length, + ) in test_cases + { + let pieces = piece + .calculate_interested(piece_length, content_length, range) + .unwrap(); + assert_eq!(pieces.len(), expected_len); + assert_eq!( + pieces + .iter() + .map(|piece| piece.number) + .collect::>(), + expected_numbers + ); + + let last_piece = pieces.last().unwrap(); + assert_eq!(last_piece.offset, expected_last_piece_offset); + assert_eq!(last_piece.length, expected_last_piece_length); + } + } +} diff --git a/dragonfly-client/src/resource/task.rs b/dragonfly-client/src/resource/task.rs index 0b377d54..cf71f151 100644 --- a/dragonfly-client/src/resource/task.rs +++ b/dragonfly-client/src/resource/task.rs @@ -1236,6 +1236,28 @@ impl Task { created_at: Some(prost_wkt_types::Timestamp::from(metadata.created_at)), }; + // Send the download piece finished request. + in_stream_tx + .send_timeout( + AnnouncePeerRequest { + host_id: host_id.to_string(), + task_id: task_id.clone(), + peer_id: peer_id.to_string(), + request: Some( + announce_peer_request::Request::DownloadPieceBackToSourceFinishedRequest( + DownloadPieceBackToSourceFinishedRequest { + piece: Some(piece.clone()), + }, + ), + ), + }, + REQUEST_TIMEOUT, + ) + .await.map_err(|err| { + error!("send DownloadPieceBackToSourceFinishedRequest for piece {} failed: {:?}", storage.piece_id(task_id.as_str(), number), err); + err + })?; + // Send the download progress. download_progress_tx .send_timeout( @@ -1263,28 +1285,6 @@ impl Task { err })?; - // Send the download piece finished request. - in_stream_tx - .send_timeout( - AnnouncePeerRequest { - host_id: host_id.to_string(), - task_id: task_id.clone(), - peer_id: peer_id.to_string(), - request: Some( - announce_peer_request::Request::DownloadPieceBackToSourceFinishedRequest( - DownloadPieceBackToSourceFinishedRequest { - piece: Some(piece.clone()), - }, - ), - ), - }, - REQUEST_TIMEOUT, - ) - .await.map_err(|err| { - error!("send DownloadPieceBackToSourceFinishedRequest for piece {} failed: {:?}", storage.piece_id(task_id.as_str(), number), err); - err - })?; - info!( "finished piece {} from source", storage.piece_id(task_id.as_str(), piece.number)