Skip to content

Commit

Permalink
feat: calculate piece range and store the actual piece reader
Browse files Browse the repository at this point in the history
Signed-off-by: Gaius <[email protected]>
  • Loading branch information
gaius-qi committed Dec 19, 2024
1 parent 64341d6 commit 164b6c1
Show file tree
Hide file tree
Showing 8 changed files with 298 additions and 77 deletions.
71 changes: 65 additions & 6 deletions dragonfly-client-storage/src/content.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,20 +212,21 @@ impl Content {
#[instrument(skip_all)]
pub async fn read_task_by_range(&self, task_id: &str, range: Range) -> Result<impl AsyncRead> {
let task_path = self.get_task_path(task_id);
let mut from_f = File::open(task_path.as_path()).await.map_err(|err| {
let from_f = File::open(task_path.as_path()).await.map_err(|err| {
error!("open {:?} failed: {}", task_path, err);
err
})?;
let mut f_reader = BufReader::with_capacity(self.config.storage.read_buffer_size, from_f);

from_f
f_reader
.seek(SeekFrom::Start(range.start))
.await
.map_err(|err| {
error!("seek {:?} failed: {}", task_path, err);
err
})?;

let range_reader = from_f.take(range.length);
let range_reader = f_reader.take(range.length);
Ok(range_reader)
}

Expand All @@ -251,10 +252,11 @@ impl Content {
range: Option<Range>,
) -> Result<impl AsyncRead> {
let task_path = self.get_task_path(task_id);
let mut f = File::open(task_path.as_path()).await.map_err(|err| {
let f = File::open(task_path.as_path()).await.map_err(|err| {
error!("open {:?} failed: {}", task_path, err);
err
})?;
let mut f_reader = BufReader::with_capacity(self.config.storage.read_buffer_size, f);

// Calculate the target offset and length based on the range.
let (target_offset, target_length) = if let Some(range) = range {
Expand All @@ -266,14 +268,71 @@ impl Content {
(offset, length)
};

f.seek(SeekFrom::Start(target_offset))
f_reader
.seek(SeekFrom::Start(target_offset))
.await
.map_err(|err| {
error!("seek {:?} failed: {}", task_path, err);
err
})?;

Ok(f_reader.take(target_length))
}

/// read_piece_with_dual_read return two readers, one is the range reader, and the other is the
/// full reader of the piece. It is used for cache the piece content to the proxy cache.
#[instrument(skip_all)]
pub async fn read_piece_with_dual_read(
&self,
task_id: &str,
offset: u64,
length: u64,
range: Option<Range>,
) -> Result<(impl AsyncRead, impl AsyncRead)> {
let task_path = self.get_task_path(task_id);

// Calculate the target offset and length based on the range.
let (target_offset, target_length) = if let Some(range) = range {
let target_offset = max(offset, range.start);
let target_length =
min(offset + length - 1, range.start + range.length - 1) - target_offset + 1;
(target_offset, target_length)
} else {
(offset, length)
};

let f = File::open(task_path.as_path()).await.map_err(|err| {
error!("open {:?} failed: {}", task_path, err);
err
})?;
let mut f_range_reader = BufReader::with_capacity(self.config.storage.read_buffer_size, f);

f_range_reader
.seek(SeekFrom::Start(target_offset))
.await
.map_err(|err| {
error!("seek {:?} failed: {}", task_path, err);
err
})?;
let range_reader = f_range_reader.take(target_length);

// Create full reader of the piece.
let f = File::open(task_path.as_path()).await.map_err(|err| {
error!("open {:?} failed: {}", task_path, err);
err
})?;
let mut f_reader = BufReader::with_capacity(self.config.storage.read_buffer_size, f);

f_reader
.seek(SeekFrom::Start(offset))
.await
.map_err(|err| {
error!("seek {:?} failed: {}", task_path, err);
err
})?;
let reader = f_reader.take(length);

Ok(f.take(target_length))
Ok((range_reader, reader))
}

/// write_piece_with_crc32_castagnoli writes the piece to the content with crc32 castagnoli.
Expand Down
48 changes: 48 additions & 0 deletions dragonfly-client-storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,54 @@ impl Storage {
}
}

/// upload_piece_with_dual_read returns the dual reader of the piece, one is the range reader, and the other is the
/// full reader of the piece. It is used for cache the piece content to the proxy cache.
#[instrument(skip_all)]
pub async fn upload_piece_with_dual_read(
&self,
piece_id: &str,
task_id: &str,
range: Option<Range>,
) -> Result<(impl AsyncRead, impl AsyncRead)> {
// Wait for the piece to be finished.
self.wait_for_piece_finished(piece_id).await?;

// Start uploading the task.
self.metadata.upload_task_started(task_id)?;

// Get the piece metadata and return the content of the piece.
match self.metadata.get_piece(piece_id) {
Ok(Some(piece)) => {
match self
.content
.read_piece_with_dual_read(task_id, piece.offset, piece.length, range)
.await
{
Ok(dual_reader) => {
// Finish uploading the task.
self.metadata.upload_task_finished(task_id)?;
Ok(dual_reader)
}
Err(err) => {
// Failed uploading the task.
self.metadata.upload_task_failed(task_id)?;
Err(err)
}
}
}
Ok(None) => {
// Failed uploading the task.
self.metadata.upload_task_failed(task_id)?;
Err(Error::PieceNotFound(piece_id.to_string()))
}
Err(err) => {
// Failed uploading the task.
self.metadata.upload_task_failed(task_id)?;
Err(err)
}
}
}

/// get_piece returns the piece metadata.
#[instrument(skip_all)]
pub fn get_piece(&self, piece_id: &str) -> Result<Option<metadata::Piece>> {
Expand Down
2 changes: 1 addition & 1 deletion dragonfly-client/src/grpc/dfdaemon_upload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -812,7 +812,7 @@ impl DfdaemonUpload for DfdaemonUploadServerHandler {
let mut reader = self
.task
.piece
.upload_from_local_peer_into_async_read(
.upload_from_local_into_async_read(
piece_id.as_str(),
task_id.as_str(),
piece.length,
Expand Down
71 changes: 53 additions & 18 deletions dragonfly-client/src/proxy/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@
*/

use crate::resource::task::Task;
use dragonfly_api::common::v2::Range;
use dragonfly_api::dfdaemon::v2::DownloadTaskRequest;
use dragonfly_client_core::{Error, Result};
use dragonfly_client_util::http::{get_range, hashmap_to_headermap};
use lru::LruCache;
use std::cmp::{max, min};
use std::num::NonZeroUsize;
Expand Down Expand Up @@ -67,7 +69,14 @@ impl Cache {
return Ok(None);
};

let range = download.range;
let Ok(request_header) = hashmap_to_headermap(&download.request_header) else {
return Ok(None);
};

let Ok(range) = get_range(&request_header, content_length) else {
return Ok(None);
};

let interested_pieces =
self.task
.piece
Expand All @@ -84,38 +93,64 @@ impl Cache {
};

// Calculate the target offset and length based on the range.
let (target_offset, target_length) = if let Some(range) = range {
let target_offset =
max(interested_piece.offset, range.start) - interested_piece.offset;
let target_length = min(
interested_piece.offset + interested_piece.length - 1,
range.start + range.length - 1,
) - target_offset
+ 1;
(target_offset as usize, target_length as usize)
} else {
(0, interested_piece.length as usize)
};
let (target_offset, target_length) =
self.calculate_piece_range(interested_piece.offset, interested_piece.length, range);

let piece_content = piece_content.slice(target_offset..target_offset + target_length);
let begin = target_offset;
let end = target_offset + target_length;
if begin >= piece_content.len() || end > piece_content.len() {
return Err(Error::InvalidParameter);
}

let piece_content = piece_content.slice(begin..end);
content.extend_from_slice(&piece_content);
}

Ok(Some(content.freeze()))
}

/// get gets the piece content from the cache.
/// calculate_piece_range calculates the target offset and length based on the piece range and
/// request range.
fn calculate_piece_range(
&self,
piece_offset: u64,
piece_length: u64,
range: Option<Range>,
) -> (usize, usize) {
if let Some(range) = range {
let target_offset = max(piece_offset, range.start) - piece_offset;

let interested_piece_end = piece_offset + piece_length - 1;
let range_end = range.start + range.length - 1;
let target_length =
min(interested_piece_end, range_end) - target_offset - piece_offset + 1;

(target_offset as usize, target_length as usize)
} else {
(0, piece_length as usize)
}
}

/// get_piece gets the piece content from the cache.
pub fn get_piece(&self, id: &str) -> Option<bytes::Bytes> {
let mut pieces = self.pieces.lock().unwrap();
pieces.get(id).cloned()
}

/// add create the piece content into the cache, if the key already exists, no operation will
/// add_piece create the piece content into the cache, if the key already exists, no operation will
/// be performed.
pub fn add_piece(&self, id: &str, content: bytes::Bytes) {
let mut pieces = self.pieces.lock().unwrap();
if !pieces.contains(id) {
pieces.put(id.to_string(), content);
if pieces.contains(id) {
return;
}

pieces.put(id.to_string(), content);
}

/// contains_piece checks whether the piece exists in the cache.
pub fn contains_piece(&self, id: &str) -> bool {
let pieces = self.pieces.lock().unwrap();
pieces.contains(id)
}
}
60 changes: 38 additions & 22 deletions dragonfly-client/src/proxy/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,13 @@ use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use tokio::io::{AsyncWriteExt, BufReader};
use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader};
use tokio::net::TcpListener;
use tokio::net::TcpStream;
use tokio::sync::{mpsc, Barrier};
use tokio::time::sleep;
use tokio_rustls::TlsAcceptor;
use tokio_util::io::{InspectReader, ReaderStream};
use tokio_util::io::ReaderStream;
use tracing::{debug, error, info, instrument, Instrument, Span};

pub mod cache;
Expand Down Expand Up @@ -821,9 +821,9 @@ async fn proxy_via_dfdaemon(
return;
};

let piece_reader = match task
let (piece_range_reader, piece_reader) = match task
.piece
.download_from_local_peer_into_async_read(
.download_from_local_into_dual_async_read(
task.piece
.id(message.task_id.as_str(), piece.number)
.as_str(),
Expand All @@ -835,7 +835,7 @@ async fn proxy_via_dfdaemon(
)
.await
{
Ok(piece_reader) => piece_reader,
Ok(dual_reader) => dual_reader,
Err(err) => {
error!("download piece reader error: {}", err);
if let Err(err) = writer.shutdown().await {
Expand All @@ -847,24 +847,22 @@ async fn proxy_via_dfdaemon(
};

// Use a buffer to read the piece.
let piece_reader =
BufReader::with_capacity(read_buffer_size, piece_reader);
let piece_range_reader =
BufReader::with_capacity(read_buffer_size, piece_range_reader);

// Write the piece data to the pipe in order and store the piece reader
// in the cache.
finished_piece_readers.insert(piece.number, piece_reader);
while let Some(piece_reader) =
finished_piece_readers.get_mut(&need_piece_number)
finished_piece_readers
.insert(piece.number, (piece_range_reader, piece_reader));
while let Some((mut piece_range_reader, piece_reader)) =
finished_piece_readers
.get_mut(&need_piece_number)
.map(|(range_reader, reader)| (range_reader, reader))
{
debug!("copy piece {} to stream", need_piece_number);
let mut content =
bytes::BytesMut::with_capacity(piece.length as usize);

let mut tee = InspectReader::new(piece_reader, |bytes| {
content.extend_from_slice(bytes);
});

if let Err(err) = tokio::io::copy(&mut tee, &mut writer).await {
if let Err(err) =
tokio::io::copy(&mut piece_range_reader, &mut writer).await
{
error!("download piece reader error: {}", err);
if let Err(err) = writer.shutdown().await {
error!("writer shutdown error: {}", err);
Expand All @@ -873,10 +871,28 @@ async fn proxy_via_dfdaemon(
return;
}

cache.add_piece(
&task.piece.id(&message.task_id, need_piece_number),
content.freeze(),
);
// If the piece is not in the cache, add it to the cache.
let piece_id =
task.piece.id(message.task_id.as_str(), need_piece_number);

if !cache.contains_piece(&piece_id) {
let mut content =
bytes::BytesMut::with_capacity(piece.length as usize);
loop {
let n = match piece_reader.read_buf(&mut content).await {
Ok(n) => n,
Err(err) => {
error!("read piece reader error: {}", err);
break;
}
};

if n == 0 {
cache.add_piece(&piece_id, content.freeze());
break;
}
}
}

need_piece_number += 1;
}
Expand Down
Loading

0 comments on commit 164b6c1

Please sign in to comment.