Skip to content

Commit

Permalink
feat: add pretty print for dfget (#420)
Browse files Browse the repository at this point in the history
* feat: add pretty print for dfget

Signed-off-by: Gaius <[email protected]>

* Trigger CI

---------

Signed-off-by: Gaius <[email protected]>
  • Loading branch information
gaius-qi authored Apr 24, 2024
1 parent 14c9e3b commit 8d0598b
Show file tree
Hide file tree
Showing 5 changed files with 663 additions and 14 deletions.
38 changes: 37 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 11 additions & 2 deletions dragonfly-client-backend/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use futures::TryStreamExt;
use rustls_pki_types::CertificateDer;
use std::io::{Error as IOError, ErrorKind};
use tokio_util::io::StreamReader;
use tracing::error;

// HTTP is the HTTP backend.
pub struct HTTP;
Expand Down Expand Up @@ -85,7 +86,11 @@ impl crate::Backend for HTTP {
.timeout(request.timeout)
.send()
.await
.or_err(ErrorType::HTTPError)?;
.or_err(ErrorType::HTTPError)
.map_err(|err| {
error!("head request failed: {}", err);
err
})?;

let header = response.headers().clone();
let status_code = response.status();
Expand All @@ -107,7 +112,11 @@ impl crate::Backend for HTTP {
.timeout(request.timeout)
.send()
.await
.or_err(ErrorType::HTTPError)?;
.or_err(ErrorType::HTTPError)
.map_err(|err| {
error!("get request failed: {}", err);
err
})?;

let header = response.headers().clone();
let status_code = response.status();
Expand Down
1 change: 1 addition & 0 deletions dragonfly-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -76,3 +76,4 @@ leaky-bucket = "1.0.1"
tokio-rustls = "0.25.0-alpha.4"
http-body-util = "0.1.0"
futures-util = "0.3.30"
termion = "3.0.0"
165 changes: 154 additions & 11 deletions dragonfly-client/src/bin/dfget/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,23 @@
use clap::Parser;
use dragonfly_api::common::v2::{Download, TaskType};
use dragonfly_api::dfdaemon::v2::{download_task_response, DownloadTaskRequest};
use dragonfly_api::errordetails::v2::Http;
use dragonfly_client::grpc::dfdaemon_download::DfdaemonDownloadClient;
use dragonfly_client::grpc::health::HealthClient;
use dragonfly_client::tracing::init_tracing;
use dragonfly_client_config::{self, default_piece_length, dfdaemon, dfget};
use dragonfly_client_core::Error;
use dragonfly_client_core::{
error::{ErrorType, OrErr},
Error, Result,
};
use dragonfly_client_util::http::header_vec_to_hashmap;
use fslock::LockFile;
use indicatif::{ProgressBar, ProgressState, ProgressStyle};
use std::path::PathBuf;
use std::process::Stdio;
use std::time::Duration;
use std::{cmp::min, fmt::Write};
use termion::{color, style};
use tokio::process::{Child, Command};
use tracing::{debug, error, info, Level};
use url::Url;
Expand Down Expand Up @@ -200,7 +205,7 @@ struct Args {
}

#[tokio::main]
async fn main() -> Result<(), anyhow::Error> {
async fn main() -> anyhow::Result<()> {
// Parse command line arguments.
let args = Args::parse();

Expand All @@ -214,6 +219,139 @@ async fn main() -> Result<(), anyhow::Error> {
args.verbose,
);

// Run dfget command.
if let Err(err) = run(args).await {
match err {
Error::TonicStatus(status) => {
let details = status.details();
if let Ok(http_err) = serde_json::from_slice::<Http>(details) {
eprintln!(
"{}{}{}Downloading failed, bad status code:{} {}",
color::Fg(color::Red),
style::Italic,
style::Bold,
style::Reset,
http_err.status_code
);
eprintln!(
"{}{}{}****************************************{}",
color::Fg(color::Black),
style::Italic,
style::Bold,
style::Reset
);
eprintln!(
"{}{}{}Header:{}",
color::Fg(color::Cyan),
style::Italic,
style::Bold,
style::Reset
);
for (key, value) in http_err.header.iter() {
eprintln!(" [{}]: {}", key.as_str(), value.as_str());
}
eprintln!(
"{}{}{}****************************************{}",
color::Fg(color::Black),
style::Italic,
style::Bold,
style::Reset
);
} else {
eprintln!(
"{}{}{}Downloading failed, bad code:{} {}",
color::Fg(color::Red),
style::Italic,
style::Bold,
style::Reset,
status.code()
);
eprintln!(
"{}{}{}*********************************{}",
color::Fg(color::Black),
style::Italic,
style::Bold,
style::Reset
);
eprintln!(
"{}{}{}Message:{} {}",
color::Fg(color::Cyan),
style::Italic,
style::Bold,
style::Reset,
status.message()
);
eprintln!(
"{}{}{}Details:{} {}",
color::Fg(color::Cyan),
style::Italic,
style::Bold,
style::Reset,
std::str::from_utf8(status.details()).unwrap()
);
eprintln!(
"{}{}{}*********************************{}",
color::Fg(color::Black),
style::Italic,
style::Bold,
style::Reset
);
}
}
Error::HTTP(err) => {
eprintln!(
"{}{}{}Downloading failed, bad status code:{} {}",
color::Fg(color::Red),
style::Italic,
style::Bold,
style::Reset,
err.status_code
);
eprintln!(
"{}{}{}****************************************{}",
color::Fg(color::Black),
style::Italic,
style::Bold,
style::Reset
);
eprintln!(
"{}{}{}Header:{}",
color::Fg(color::Cyan),
style::Italic,
style::Bold,
style::Reset
);
for (key, value) in err.header.iter() {
eprintln!(" [{}]: {}", key.as_str(), value.to_str().unwrap());
}
eprintln!(
"{}{}{}****************************************{}",
color::Fg(color::Black),
style::Italic,
style::Bold,
style::Reset
);
}
err => {
eprintln!(
"{}{}{}Downloading failed, error message:{} {}",
color::Fg(color::Red),
style::Italic,
style::Bold,
style::Reset,
err
);
}
}

std::process::exit(1);
}

Ok(())
}

// run runs the dfget command.
async fn run(args: Args) -> Result<()> {
// Get or create dfdaemon download client.
let dfdaemon_download_client = get_or_create_dfdaemon_download_client(
args.dfdaemon_config,
Expand Down Expand Up @@ -245,7 +383,10 @@ async fn main() -> Result<(), anyhow::Error> {
request_header: header_vec_to_hashmap(args.header.unwrap_or_default())?,
piece_length: args.piece_length,
output_path: Some(args.output.into_os_string().into_string().unwrap()),
timeout: Some(prost_wkt_types::Duration::try_from(args.timeout)?),
timeout: Some(
prost_wkt_types::Duration::try_from(args.timeout)
.or_err(ErrorType::ParseError)?,
),
need_back_to_source: false,
disable_back_to_source: args.disable_back_to_source,
certificate_chain: Vec::new(),
Expand All @@ -263,7 +404,8 @@ async fn main() -> Result<(), anyhow::Error> {
pb.set_style(
ProgressStyle::with_template(
"[{elapsed_precise}] [{wide_bar}] {bytes}/{total_bytes} ({bytes_per_sec}, {eta})",
)?
)
.or_err(ErrorType::ParseError)?
.with_key("eta", |state: &ProgressState, w: &mut dyn Write| {
write!(w, "{:.1}s", state.eta().as_secs_f64()).unwrap()
})
Expand All @@ -273,7 +415,10 @@ async fn main() -> Result<(), anyhow::Error> {
// Dwonload file.
let mut downloaded = 0;
let mut out_stream = response.into_inner();
while let Some(message) = out_stream.message().await? {
while let Some(message) = out_stream.message().await.map_err(|err| {
error!("get message failed: {}", err);
err
})? {
match message.response {
Some(download_task_response::Response::DownloadTaskStartedResponse(response)) => {
pb.set_length(response.content_length);
Expand Down Expand Up @@ -301,7 +446,7 @@ async fn get_or_create_dfdaemon_download_client(
log_level: Level,
log_max_files: usize,
lock_path: PathBuf,
) -> Result<DfdaemonDownloadClient, anyhow::Error> {
) -> Result<DfdaemonDownloadClient> {
// Get dfdaemon download client and check its health.
match get_dfdaemon_download_client(endpoint.clone()).await {
Ok(dfdaemon_download_client) => return Ok(dfdaemon_download_client),
Expand Down Expand Up @@ -340,16 +485,14 @@ async fn get_or_create_dfdaemon_download_client(
}
}
_ = &mut check_health_timeout => {
return Err(anyhow::anyhow!("get dfdaemon download client timeout"));
return Err(Error::Unknown("get dfdaemon download client timeout".to_string()));
}
}
}
}

// get_and_check_dfdaemon_download_client gets a dfdaemon download client and checks its health.
async fn get_dfdaemon_download_client(
endpoint: PathBuf,
) -> Result<DfdaemonDownloadClient, anyhow::Error> {
async fn get_dfdaemon_download_client(endpoint: PathBuf) -> Result<DfdaemonDownloadClient> {
// Check dfdaemon's health.
let health_client = HealthClient::new_unix(endpoint.clone()).await?;
health_client.check_dfdaemon_download().await?;
Expand All @@ -365,7 +508,7 @@ fn spawn_dfdaemon(
log_dir: PathBuf,
log_level: Level,
log_max_files: usize,
) -> Result<Child, anyhow::Error> {
) -> Result<Child> {
// Create dfdaemon command.
let mut cmd = Command::new("dfdaemon");

Expand Down
Loading

0 comments on commit 8d0598b

Please sign in to comment.