Skip to content

Commit eb66100

Browse files
authored
feat: add downloader for downloading piece (#894)
Signed-off-by: Gaius <[email protected]>
1 parent df39410 commit eb66100

File tree

8 files changed

+165
-65
lines changed

8 files changed

+165
-65
lines changed

dragonfly-client-config/src/dfdaemon.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,12 @@ fn default_dfdaemon_cache_dir() -> PathBuf {
7878
crate::default_cache_dir().join(NAME)
7979
}
8080

81+
/// default_upload_protocol is the default protocol of the upload server.
82+
#[inline]
83+
fn default_upload_protocol() -> String {
84+
"grpc".to_string()
85+
}
86+
8187
/// default_upload_grpc_server_port is the default port of the upload grpc server.
8288
#[inline]
8389
fn default_upload_grpc_server_port() -> u16 {
@@ -450,6 +456,11 @@ impl Default for Download {
450456
#[derive(Debug, Clone, Validate, Deserialize)]
451457
#[serde(default, rename_all = "camelCase")]
452458
pub struct UploadServer {
459+
/// protocol is the protocol of the upload server. The protocol used for downloading pieces
460+
/// between different peers, now only support grpc.
461+
#[serde(default = "default_upload_protocol")]
462+
pub protocol: String,
463+
453464
/// ip is the listen ip of the grpc server.
454465
pub ip: Option<IpAddr>,
455466

@@ -474,6 +485,7 @@ pub struct UploadServer {
474485
impl Default for UploadServer {
475486
fn default() -> Self {
476487
UploadServer {
488+
protocol: default_upload_protocol(),
477489
ip: None,
478490
port: default_upload_grpc_server_port(),
479491
ca_cert: None,

dragonfly-client/src/bin/dfdaemon/main.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -223,7 +223,7 @@ async fn main() -> Result<(), anyhow::Error> {
223223
storage.clone(),
224224
scheduler_client.clone(),
225225
backend_factory.clone(),
226-
);
226+
)?;
227227
let task = Arc::new(task);
228228

229229
// Initialize persistent cache task manager.
@@ -233,7 +233,7 @@ async fn main() -> Result<(), anyhow::Error> {
233233
storage.clone(),
234234
scheduler_client.clone(),
235235
backend_factory.clone(),
236-
);
236+
)?;
237237
let persistent_cache_task = Arc::new(persistent_cache_task);
238238

239239
// Initialize health server.

dragonfly-client/src/proxy/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -661,7 +661,7 @@ async fn proxy_via_dfdaemon(
661661
debug!("cache miss");
662662
}
663663
Ok(Some(content)) => {
664-
debug!("cache hit");
664+
info!("cache hit");
665665

666666
// Collect the download piece traffic metrics and the proxy request via dfdaemon and
667667
// cache hits metrics.

dragonfly-client/src/resource/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,4 +17,5 @@
1717
pub mod persistent_cache_task;
1818
pub mod piece;
1919
pub mod piece_collector;
20+
pub mod piece_downloader;
2021
pub mod task;

dragonfly-client/src/resource/persistent_cache_task.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -86,22 +86,22 @@ impl PersistentCacheTask {
8686
storage: Arc<Storage>,
8787
scheduler_client: Arc<SchedulerClient>,
8888
backend_factory: Arc<BackendFactory>,
89-
) -> Self {
89+
) -> ClientResult<Self> {
9090
let piece = piece::Piece::new(
9191
config.clone(),
9292
id_generator.clone(),
9393
storage.clone(),
9494
backend_factory.clone(),
95-
);
95+
)?;
9696
let piece = Arc::new(piece);
9797

98-
PersistentCacheTask {
98+
Ok(Self {
9999
config,
100100
id_generator,
101101
storage,
102102
scheduler_client,
103103
piece: piece.clone(),
104-
}
104+
})
105105
}
106106

107107
/// create_persistent creates a persistent cache task from local.

dragonfly-client/src/resource/piece.rs

Lines changed: 22 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,13 @@
1515
*/
1616

1717
use super::*;
18-
use crate::grpc::dfdaemon_upload::DfdaemonUploadClient;
1918
use crate::metrics::{
2019
collect_backend_request_failure_metrics, collect_backend_request_finished_metrics,
2120
collect_backend_request_started_metrics, collect_download_piece_traffic_metrics,
2221
collect_upload_piece_traffic_metrics,
2322
};
2423
use chrono::Utc;
2524
use dragonfly_api::common::v2::{Hdfs, ObjectStorage, Range, TrafficType};
26-
use dragonfly_api::dfdaemon::v2::DownloadPieceRequest;
2725
use dragonfly_client_backend::{BackendFactory, GetRequest};
2826
use dragonfly_client_config::dfdaemon::Config;
2927
use dragonfly_client_core::{error::BackendError, Error, Result};
@@ -66,6 +64,9 @@ pub struct Piece {
6664
/// storage is the local storage.
6765
storage: Arc<Storage>,
6866

67+
/// downloader_factory is the piece downloader factory.
68+
downloader_factory: Arc<piece_downloader::DownloaderFactory>,
69+
6970
/// backend_factory is the backend factory.
7071
backend_factory: Arc<BackendFactory>,
7172

@@ -88,11 +89,15 @@ impl Piece {
8889
id_generator: Arc<IDGenerator>,
8990
storage: Arc<Storage>,
9091
backend_factory: Arc<BackendFactory>,
91-
) -> Self {
92-
Self {
92+
) -> Result<Self> {
93+
Ok(Self {
9394
config: config.clone(),
9495
id_generator,
9596
storage,
97+
downloader_factory: Arc::new(piece_downloader::DownloaderFactory::new(
98+
config.upload.server.protocol.as_str(),
99+
config.clone(),
100+
)?),
96101
backend_factory,
97102
download_rate_limiter: Arc::new(
98103
RateLimiter::builder()
@@ -116,7 +121,7 @@ impl Piece {
116121
.interval(Duration::from_secs(1))
117122
.build(),
118123
),
119-
}
124+
})
120125
}
121126

122127
/// id generates a new piece id.
@@ -429,70 +434,32 @@ impl Piece {
429434

430435
Error::InvalidPeer(parent.id.clone())
431436
})?;
432-
let dfdaemon_upload_client = DfdaemonUploadClient::new(
433-
self.config.clone(),
434-
format!("http://{}:{}", host.ip, host.port),
435-
)
436-
.await
437-
.map_err(|err| {
438-
error!(
439-
"create dfdaemon upload client from {}:{} failed: {}",
440-
host.ip, host.port, err
441-
);
442-
if let Some(err) = self.storage.download_piece_failed(piece_id).err() {
443-
error!("set piece metadata failed: {}", err)
444-
};
445-
446-
err
447-
})?;
448437

449-
// Send the interested pieces request.
450-
let response = dfdaemon_upload_client
438+
let (content, offset, digest) = self
439+
.downloader_factory
440+
.build()
451441
.download_piece(
452-
DownloadPieceRequest {
453-
host_id: host_id.to_string(),
454-
task_id: task_id.to_string(),
455-
piece_number: number,
456-
},
457-
self.config.download.piece_timeout,
442+
format!("{}:{}", host.ip, host.port).as_str(),
443+
number,
444+
host_id,
445+
task_id,
458446
)
459447
.await
460-
.map_err(|err| {
448+
.inspect_err(|err| {
461449
error!("download piece failed: {}", err);
462450
if let Some(err) = self.storage.download_piece_failed(piece_id).err() {
463451
error!("set piece metadata failed: {}", err)
464452
};
465-
466-
err
467453
})?;
468454

469-
let piece = response.piece.ok_or_else(|| {
470-
error!("piece is empty");
471-
if let Some(err) = self.storage.download_piece_failed(piece_id).err() {
472-
error!("set piece metadata failed: {}", err)
473-
};
474-
475-
Error::InvalidParameter
476-
})?;
477-
478-
// Get the piece content.
479-
let content = piece.content.ok_or_else(|| {
480-
error!("piece content is empty");
481-
if let Some(err) = self.storage.download_piece_failed(piece_id).err() {
482-
error!("set piece metadata failed: {}", err)
483-
};
484-
485-
Error::InvalidParameter
486-
})?;
487-
488455
// Record the finish of downloading piece.
489456
match self
490457
.storage
491458
.download_piece_from_remote_peer_finished(
492459
piece_id,
493460
task_id,
494-
piece.offset,
495-
piece.digest.as_str(),
461+
offset,
462+
digest.as_str(),
496463
parent.id.as_str(),
497464
&mut content.as_slice(),
498465
)
@@ -715,7 +682,8 @@ mod tests {
715682
id_generator.clone(),
716683
storage.clone(),
717684
backend_factory.clone(),
718-
);
685+
)
686+
.unwrap();
719687

720688
let test_cases = vec![
721689
(1000, 1, None, 1, vec![0], 0, 1),
Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
/*
2+
* Copyright 2024 The Dragonfly Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
use crate::grpc::dfdaemon_upload::DfdaemonUploadClient;
18+
use dragonfly_api::dfdaemon::v2::DownloadPieceRequest;
19+
use dragonfly_client_config::dfdaemon::Config;
20+
use dragonfly_client_core::{Error, Result};
21+
use std::sync::Arc;
22+
use tracing::{error, instrument};
23+
24+
/// Downloader is the interface for downloading pieces, which is implemented by different
25+
/// protocols. The downloader is used to download pieces from the other peers.
26+
#[tonic::async_trait]
27+
pub trait Downloader {
28+
/// download_piece downloads a piece from the other peer by different protocols.
29+
async fn download_piece(
30+
&self,
31+
addr: &str,
32+
number: u32,
33+
host_id: &str,
34+
task_id: &str,
35+
) -> Result<(Vec<u8>, u64, String)>;
36+
}
37+
38+
/// DownloaderFactory is the factory for creating different downloaders by different protocols.
39+
pub struct DownloaderFactory {
40+
/// downloader is the downloader for downloading pieces, which is implemented by different
41+
/// protocols.
42+
downloader: Arc<dyn Downloader + Send + Sync>,
43+
}
44+
45+
/// DownloadFactory implements the DownloadFactory trait.
46+
impl DownloaderFactory {
47+
/// new returns a new DownloadFactory.
48+
#[instrument(skip_all)]
49+
pub fn new(protocol: &str, config: Arc<Config>) -> Result<Self> {
50+
let downloader = match protocol {
51+
"grpc" => Arc::new(GRPCDownloader::new(config.clone())),
52+
_ => {
53+
error!("downloader unsupported protocol: {}", protocol);
54+
return Err(Error::InvalidParameter);
55+
}
56+
};
57+
58+
Ok(Self { downloader })
59+
}
60+
61+
/// build returns the downloader.
62+
#[instrument(skip_all)]
63+
pub fn build(&self) -> Arc<dyn Downloader + Send + Sync> {
64+
self.downloader.clone()
65+
}
66+
}
67+
68+
/// GRPCDownloader is the downloader for downloading pieces by the gRPC protocol.
69+
pub struct GRPCDownloader {
70+
/// config is the configuration of the dfdaemon.
71+
config: Arc<Config>,
72+
}
73+
74+
/// GRPCDownloader implements the downloader with the gRPC protocol.
75+
impl GRPCDownloader {
76+
/// new returns a new GRPCDownloader.
77+
#[instrument(skip_all)]
78+
pub fn new(config: Arc<Config>) -> Self {
79+
Self { config }
80+
}
81+
}
82+
83+
/// GRPCDownloader implements the Downloader trait.
84+
#[tonic::async_trait]
85+
impl Downloader for GRPCDownloader {
86+
/// download_piece downloads a piece from the other peer by the gRPC protocol.
87+
#[instrument(skip_all)]
88+
async fn download_piece(
89+
&self,
90+
addr: &str,
91+
number: u32,
92+
host_id: &str,
93+
task_id: &str,
94+
) -> Result<(Vec<u8>, u64, String)> {
95+
let dfdaemon_upload_client =
96+
DfdaemonUploadClient::new(self.config.clone(), format!("http://{}", addr)).await?;
97+
98+
let response = dfdaemon_upload_client
99+
.download_piece(
100+
DownloadPieceRequest {
101+
host_id: host_id.to_string(),
102+
task_id: task_id.to_string(),
103+
piece_number: number,
104+
},
105+
self.config.download.piece_timeout,
106+
)
107+
.await?;
108+
109+
let Some(piece) = response.piece else {
110+
return Err(Error::InvalidParameter);
111+
};
112+
113+
let Some(content) = piece.content else {
114+
return Err(Error::InvalidParameter);
115+
};
116+
117+
Ok((content, piece.offset, piece.digest))
118+
}
119+
}

dragonfly-client/src/resource/task.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -97,23 +97,23 @@ impl Task {
9797
storage: Arc<Storage>,
9898
scheduler_client: Arc<SchedulerClient>,
9999
backend_factory: Arc<BackendFactory>,
100-
) -> Self {
100+
) -> ClientResult<Self> {
101101
let piece = piece::Piece::new(
102102
config.clone(),
103103
id_generator.clone(),
104104
storage.clone(),
105105
backend_factory.clone(),
106-
);
106+
)?;
107107
let piece = Arc::new(piece);
108108

109-
Self {
109+
Ok(Self {
110110
config,
111111
id_generator,
112112
storage: storage.clone(),
113113
scheduler_client: scheduler_client.clone(),
114114
backend_factory: backend_factory.clone(),
115115
piece: piece.clone(),
116-
}
116+
})
117117
}
118118

119119
/// get gets the metadata of the task.

0 commit comments

Comments
 (0)