Skip to content

Commit

Permalink
feat: avoid the out stream is aborted in proxy and add calculate_inte…
Browse files Browse the repository at this point in the history
…rested test (#769)

Signed-off-by: Gaius <[email protected]>
  • Loading branch information
gaius-qi authored Oct 11, 2024
1 parent c382f27 commit e54b93d
Show file tree
Hide file tree
Showing 5 changed files with 149 additions and 39 deletions.
16 changes: 8 additions & 8 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 8 additions & 8 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ members = [
]

[workspace.package]
version = "0.1.110"
version = "0.1.111"
authors = ["The Dragonfly Developers"]
homepage = "https://d7y.io/"
repository = "https://github.com/dragonflyoss/client.git"
Expand All @@ -22,13 +22,13 @@ readme = "README.md"
edition = "2021"

[workspace.dependencies]
dragonfly-client = { path = "dragonfly-client", version = "0.1.110" }
dragonfly-client-core = { path = "dragonfly-client-core", version = "0.1.110" }
dragonfly-client-config = { path = "dragonfly-client-config", version = "0.1.110" }
dragonfly-client-storage = { path = "dragonfly-client-storage", version = "0.1.110" }
dragonfly-client-backend = { path = "dragonfly-client-backend", version = "0.1.110" }
dragonfly-client-util = { path = "dragonfly-client-util", version = "0.1.110" }
dragonfly-client-init = { path = "dragonfly-client-init", version = "0.1.110" }
dragonfly-client = { path = "dragonfly-client", version = "0.1.111" }
dragonfly-client-core = { path = "dragonfly-client-core", version = "0.1.111" }
dragonfly-client-config = { path = "dragonfly-client-config", version = "0.1.111" }
dragonfly-client-storage = { path = "dragonfly-client-storage", version = "0.1.111" }
dragonfly-client-backend = { path = "dragonfly-client-backend", version = "0.1.111" }
dragonfly-client-util = { path = "dragonfly-client-util", version = "0.1.111" }
dragonfly-client-init = { path = "dragonfly-client-init", version = "0.1.111" }
thiserror = "1.0"
dragonfly-api = "=2.0.164"
reqwest = { version = "0.12.4", features = ["stream", "native-tls", "default-tls", "rustls-tls"] }
Expand Down
6 changes: 6 additions & 0 deletions dragonfly-client/src/proxy/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,12 @@ use rustls_pki_types::CertificateDer;
use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use tokio::io::{AsyncWriteExt, BufReader};
use tokio::net::TcpListener;
use tokio::net::TcpStream;
use tokio::sync::mpsc;
use tokio::time::sleep;
use tokio_rustls::TlsAcceptor;
use tokio_util::io::ReaderStream;
use tracing::{error, info, instrument, Span};
Expand Down Expand Up @@ -709,6 +711,10 @@ async fn proxy_by_dfdaemon(
download_task_response,
)) = message.response
{
// Sleep for a while to avoid the out stream is aborted. If the task is small, proxy read the piece
// before the task download is finished. It will cause `user body write aborted` error.
sleep(Duration::from_millis(1)).await;

// Send the none response to the client, if the first piece is received.
if !initialized {
info!("first piece received, send response");
Expand Down
106 changes: 105 additions & 1 deletion dragonfly-client/src/resource/piece.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,6 @@ impl Piece {
}

/// calculate_interested calculates the interested pieces by content_length and range.
#[instrument(skip_all)]
pub fn calculate_interested(
&self,
piece_length: u64,
Expand Down Expand Up @@ -145,6 +144,7 @@ impl Piece {
error!("piece not found");
Error::InvalidParameter
})?;

piece.length = piece_length + content_length - offset;
pieces.push(piece);
break;
Expand Down Expand Up @@ -197,6 +197,7 @@ impl Piece {
error!("piece not found");
Error::InvalidParameter
})?;

piece.length = piece_length + content_length - offset;
pieces.push(piece);
break;
Expand Down Expand Up @@ -638,3 +639,106 @@ impl Piece {
})
}
}

#[cfg(test)]
mod tests {
use super::*;
use tempfile::tempdir;

#[tokio::test]
async fn should_calculate_interested() {
let temp_dir = tempdir().unwrap();

let config = Config::default();
let config = Arc::new(config);

let id_generator =
IDGenerator::new("127.0.0.1".to_string(), "localhost".to_string(), false);
let id_generator = Arc::new(id_generator);

let storage = Storage::new(
config.clone(),
temp_dir.path(),
temp_dir.path().to_path_buf(),
)
.await
.unwrap();
let storage = Arc::new(storage);

let backend_factory = BackendFactory::new(None).unwrap();
let backend_factory = Arc::new(backend_factory);

let piece = Piece::new(
config.clone(),
id_generator.clone(),
storage.clone(),
backend_factory.clone(),
);

let test_cases = vec![
(1000, 1, None, 1, vec![0], 0, 1),
(1000, 5000, None, 5, vec![0, 1, 2, 3, 4], 4000, 1000),
(5000, 1000, None, 1, vec![0], 0, 1000),
(
10,
101,
None,
11,
vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
100,
1,
),
(
1000,
5000,
Some(Range {
start: 1500,
length: 2000,
}),
3,
vec![1, 2, 3],
3000,
1000,
),
(
1000,
5000,
Some(Range {
start: 0,
length: 1,
}),
1,
vec![0],
0,
1000,
),
];

for (
piece_length,
content_length,
range,
expected_len,
expected_numbers,
expected_last_piece_offset,
expected_last_piece_length,
) in test_cases
{
let pieces = piece
.calculate_interested(piece_length, content_length, range)
.unwrap();
assert_eq!(pieces.len(), expected_len);
assert_eq!(
pieces
.iter()
.map(|piece| piece.number)
.collect::<Vec<u32>>(),
expected_numbers
);

let last_piece = pieces.last().unwrap();
assert_eq!(last_piece.offset, expected_last_piece_offset);
assert_eq!(last_piece.length, expected_last_piece_length);
}
}
}
44 changes: 22 additions & 22 deletions dragonfly-client/src/resource/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1236,6 +1236,28 @@ impl Task {
created_at: Some(prost_wkt_types::Timestamp::from(metadata.created_at)),
};

// Send the download piece finished request.
in_stream_tx
.send_timeout(
AnnouncePeerRequest {
host_id: host_id.to_string(),
task_id: task_id.clone(),
peer_id: peer_id.to_string(),
request: Some(
announce_peer_request::Request::DownloadPieceBackToSourceFinishedRequest(
DownloadPieceBackToSourceFinishedRequest {
piece: Some(piece.clone()),
},
),
),
},
REQUEST_TIMEOUT,
)
.await.map_err(|err| {
error!("send DownloadPieceBackToSourceFinishedRequest for piece {} failed: {:?}", storage.piece_id(task_id.as_str(), number), err);
err
})?;

// Send the download progress.
download_progress_tx
.send_timeout(
Expand Down Expand Up @@ -1263,28 +1285,6 @@ impl Task {
err
})?;

// Send the download piece finished request.
in_stream_tx
.send_timeout(
AnnouncePeerRequest {
host_id: host_id.to_string(),
task_id: task_id.clone(),
peer_id: peer_id.to_string(),
request: Some(
announce_peer_request::Request::DownloadPieceBackToSourceFinishedRequest(
DownloadPieceBackToSourceFinishedRequest {
piece: Some(piece.clone()),
},
),
),
},
REQUEST_TIMEOUT,
)
.await.map_err(|err| {
error!("send DownloadPieceBackToSourceFinishedRequest for piece {} failed: {:?}", storage.piece_id(task_id.as_str(), number), err);
err
})?;

info!(
"finished piece {} from source",
storage.piece_id(task_id.as_str(), piece.number)
Expand Down

0 comments on commit e54b93d

Please sign in to comment.