Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: calculate piece range and store the actual piece reader #906

Merged
merged 1 commit into from
Dec 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 65 additions & 6 deletions dragonfly-client-storage/src/content.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,20 +212,21 @@ impl Content {
#[instrument(skip_all)]
pub async fn read_task_by_range(&self, task_id: &str, range: Range) -> Result<impl AsyncRead> {
let task_path = self.get_task_path(task_id);
let mut from_f = File::open(task_path.as_path()).await.map_err(|err| {
let from_f = File::open(task_path.as_path()).await.map_err(|err| {
error!("open {:?} failed: {}", task_path, err);
err
})?;
let mut f_reader = BufReader::with_capacity(self.config.storage.read_buffer_size, from_f);

from_f
f_reader
.seek(SeekFrom::Start(range.start))
.await
.map_err(|err| {
error!("seek {:?} failed: {}", task_path, err);
err
})?;

let range_reader = from_f.take(range.length);
let range_reader = f_reader.take(range.length);
Ok(range_reader)
}

Expand All @@ -251,10 +252,11 @@ impl Content {
range: Option<Range>,
) -> Result<impl AsyncRead> {
let task_path = self.get_task_path(task_id);
let mut f = File::open(task_path.as_path()).await.map_err(|err| {
let f = File::open(task_path.as_path()).await.map_err(|err| {
error!("open {:?} failed: {}", task_path, err);
err
})?;
let mut f_reader = BufReader::with_capacity(self.config.storage.read_buffer_size, f);

// Calculate the target offset and length based on the range.
let (target_offset, target_length) = if let Some(range) = range {
Expand All @@ -266,14 +268,71 @@ impl Content {
(offset, length)
};

f.seek(SeekFrom::Start(target_offset))
f_reader
.seek(SeekFrom::Start(target_offset))
.await
.map_err(|err| {
error!("seek {:?} failed: {}", task_path, err);
err
})?;

Ok(f_reader.take(target_length))
}

/// read_piece_with_dual_read return two readers, one is the range reader, and the other is the
/// full reader of the piece. It is used for cache the piece content to the proxy cache.
#[instrument(skip_all)]
pub async fn read_piece_with_dual_read(
&self,
task_id: &str,
offset: u64,
length: u64,
range: Option<Range>,
) -> Result<(impl AsyncRead, impl AsyncRead)> {
let task_path = self.get_task_path(task_id);

// Calculate the target offset and length based on the range.
let (target_offset, target_length) = if let Some(range) = range {
let target_offset = max(offset, range.start);
let target_length =
min(offset + length - 1, range.start + range.length - 1) - target_offset + 1;
(target_offset, target_length)
} else {
(offset, length)
};

let f = File::open(task_path.as_path()).await.map_err(|err| {
error!("open {:?} failed: {}", task_path, err);
err
})?;
let mut f_range_reader = BufReader::with_capacity(self.config.storage.read_buffer_size, f);

f_range_reader
.seek(SeekFrom::Start(target_offset))
.await
.map_err(|err| {
error!("seek {:?} failed: {}", task_path, err);
err
})?;
let range_reader = f_range_reader.take(target_length);

// Create full reader of the piece.
let f = File::open(task_path.as_path()).await.map_err(|err| {
error!("open {:?} failed: {}", task_path, err);
err
})?;
let mut f_reader = BufReader::with_capacity(self.config.storage.read_buffer_size, f);

f_reader
.seek(SeekFrom::Start(offset))
.await
.map_err(|err| {
error!("seek {:?} failed: {}", task_path, err);
err
})?;
let reader = f_reader.take(length);

Ok(f.take(target_length))
Ok((range_reader, reader))
}

/// write_piece_with_crc32_castagnoli writes the piece to the content with crc32 castagnoli.
Expand Down
48 changes: 48 additions & 0 deletions dragonfly-client-storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,54 @@ impl Storage {
}
}

/// upload_piece_with_dual_read returns the dual reader of the piece, one is the range reader, and the other is the
/// full reader of the piece. It is used for cache the piece content to the proxy cache.
#[instrument(skip_all)]
pub async fn upload_piece_with_dual_read(
&self,
piece_id: &str,
task_id: &str,
range: Option<Range>,
) -> Result<(impl AsyncRead, impl AsyncRead)> {
// Wait for the piece to be finished.
self.wait_for_piece_finished(piece_id).await?;

// Start uploading the task.
self.metadata.upload_task_started(task_id)?;

// Get the piece metadata and return the content of the piece.
match self.metadata.get_piece(piece_id) {
Ok(Some(piece)) => {
match self
.content
.read_piece_with_dual_read(task_id, piece.offset, piece.length, range)
.await
{
Ok(dual_reader) => {
// Finish uploading the task.
self.metadata.upload_task_finished(task_id)?;
Ok(dual_reader)
}
Err(err) => {
// Failed uploading the task.
self.metadata.upload_task_failed(task_id)?;
Err(err)
}
}
}
Ok(None) => {
// Failed uploading the task.
self.metadata.upload_task_failed(task_id)?;
Err(Error::PieceNotFound(piece_id.to_string()))
}
Err(err) => {
// Failed uploading the task.
self.metadata.upload_task_failed(task_id)?;
Err(err)
}
}
}

/// get_piece returns the piece metadata.
#[instrument(skip_all)]
pub fn get_piece(&self, piece_id: &str) -> Result<Option<metadata::Piece>> {
Expand Down
2 changes: 1 addition & 1 deletion dragonfly-client/src/grpc/dfdaemon_upload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -812,7 +812,7 @@ impl DfdaemonUpload for DfdaemonUploadServerHandler {
let mut reader = self
.task
.piece
.upload_from_local_peer_into_async_read(
.upload_from_local_into_async_read(
piece_id.as_str(),
task_id.as_str(),
piece.length,
Expand Down
71 changes: 53 additions & 18 deletions dragonfly-client/src/proxy/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@
*/

use crate::resource::task::Task;
use dragonfly_api::common::v2::Range;
use dragonfly_api::dfdaemon::v2::DownloadTaskRequest;
use dragonfly_client_core::{Error, Result};
use dragonfly_client_util::http::{get_range, hashmap_to_headermap};
use lru::LruCache;
use std::cmp::{max, min};
use std::num::NonZeroUsize;
Expand Down Expand Up @@ -67,7 +69,14 @@ impl Cache {
return Ok(None);
};

let range = download.range;
let Ok(request_header) = hashmap_to_headermap(&download.request_header) else {
return Ok(None);
};

let Ok(range) = get_range(&request_header, content_length) else {
return Ok(None);
};

let interested_pieces =
self.task
.piece
Expand All @@ -84,38 +93,64 @@ impl Cache {
};

// Calculate the target offset and length based on the range.
let (target_offset, target_length) = if let Some(range) = range {
let target_offset =
max(interested_piece.offset, range.start) - interested_piece.offset;
let target_length = min(
interested_piece.offset + interested_piece.length - 1,
range.start + range.length - 1,
) - target_offset
+ 1;
(target_offset as usize, target_length as usize)
} else {
(0, interested_piece.length as usize)
};
let (target_offset, target_length) =
self.calculate_piece_range(interested_piece.offset, interested_piece.length, range);

let piece_content = piece_content.slice(target_offset..target_offset + target_length);
let begin = target_offset;
let end = target_offset + target_length;
if begin >= piece_content.len() || end > piece_content.len() {
return Err(Error::InvalidParameter);
}

let piece_content = piece_content.slice(begin..end);
content.extend_from_slice(&piece_content);
}

Ok(Some(content.freeze()))
}

/// get gets the piece content from the cache.
/// calculate_piece_range calculates the target offset and length based on the piece range and
/// request range.
fn calculate_piece_range(
&self,
piece_offset: u64,
piece_length: u64,
range: Option<Range>,
) -> (usize, usize) {
if let Some(range) = range {
let target_offset = max(piece_offset, range.start) - piece_offset;

let interested_piece_end = piece_offset + piece_length - 1;
let range_end = range.start + range.length - 1;
let target_length =
min(interested_piece_end, range_end) - target_offset - piece_offset + 1;

(target_offset as usize, target_length as usize)
} else {
(0, piece_length as usize)
}
}

/// get_piece gets the piece content from the cache.
pub fn get_piece(&self, id: &str) -> Option<bytes::Bytes> {
let mut pieces = self.pieces.lock().unwrap();
pieces.get(id).cloned()
}

/// add create the piece content into the cache, if the key already exists, no operation will
/// add_piece create the piece content into the cache, if the key already exists, no operation will
/// be performed.
pub fn add_piece(&self, id: &str, content: bytes::Bytes) {
let mut pieces = self.pieces.lock().unwrap();
if !pieces.contains(id) {
pieces.put(id.to_string(), content);
if pieces.contains(id) {
return;
}

pieces.put(id.to_string(), content);
}

/// contains_piece checks whether the piece exists in the cache.
pub fn contains_piece(&self, id: &str) -> bool {
let pieces = self.pieces.lock().unwrap();
pieces.contains(id)
}
}
60 changes: 38 additions & 22 deletions dragonfly-client/src/proxy/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,13 @@ use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use tokio::io::{AsyncWriteExt, BufReader};
use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader};
use tokio::net::TcpListener;
use tokio::net::TcpStream;
use tokio::sync::{mpsc, Barrier};
use tokio::time::sleep;
use tokio_rustls::TlsAcceptor;
use tokio_util::io::{InspectReader, ReaderStream};
use tokio_util::io::ReaderStream;
use tracing::{debug, error, info, instrument, Instrument, Span};

pub mod cache;
Expand Down Expand Up @@ -821,9 +821,9 @@ async fn proxy_via_dfdaemon(
return;
};

let piece_reader = match task
let (piece_range_reader, piece_reader) = match task
.piece
.download_from_local_peer_into_async_read(
.download_from_local_into_dual_async_read(
task.piece
.id(message.task_id.as_str(), piece.number)
.as_str(),
Expand All @@ -835,7 +835,7 @@ async fn proxy_via_dfdaemon(
)
.await
{
Ok(piece_reader) => piece_reader,
Ok(dual_reader) => dual_reader,
Err(err) => {
error!("download piece reader error: {}", err);
if let Err(err) = writer.shutdown().await {
Expand All @@ -847,24 +847,22 @@ async fn proxy_via_dfdaemon(
};

// Use a buffer to read the piece.
let piece_reader =
BufReader::with_capacity(read_buffer_size, piece_reader);
let piece_range_reader =
BufReader::with_capacity(read_buffer_size, piece_range_reader);

// Write the piece data to the pipe in order and store the piece reader
// in the cache.
finished_piece_readers.insert(piece.number, piece_reader);
while let Some(piece_reader) =
finished_piece_readers.get_mut(&need_piece_number)
finished_piece_readers
.insert(piece.number, (piece_range_reader, piece_reader));
while let Some((mut piece_range_reader, piece_reader)) =
finished_piece_readers
.get_mut(&need_piece_number)
.map(|(range_reader, reader)| (range_reader, reader))
{
debug!("copy piece {} to stream", need_piece_number);
let mut content =
bytes::BytesMut::with_capacity(piece.length as usize);

let mut tee = InspectReader::new(piece_reader, |bytes| {
content.extend_from_slice(bytes);
});

if let Err(err) = tokio::io::copy(&mut tee, &mut writer).await {
if let Err(err) =
tokio::io::copy(&mut piece_range_reader, &mut writer).await
{
error!("download piece reader error: {}", err);
if let Err(err) = writer.shutdown().await {
error!("writer shutdown error: {}", err);
Expand All @@ -873,10 +871,28 @@ async fn proxy_via_dfdaemon(
return;
}

cache.add_piece(
&task.piece.id(&message.task_id, need_piece_number),
content.freeze(),
);
// If the piece is not in the cache, add it to the cache.
let piece_id =
task.piece.id(message.task_id.as_str(), need_piece_number);

if !cache.contains_piece(&piece_id) {
let mut content =
bytes::BytesMut::with_capacity(piece.length as usize);
loop {
let n = match piece_reader.read_buf(&mut content).await {
Ok(n) => n,
Err(err) => {
error!("read piece reader error: {}", err);
break;
}
};

if n == 0 {
cache.add_piece(&piece_id, content.freeze());
break;
}
}
}

need_piece_number += 1;
}
Expand Down
Loading
Loading