Skip to content

Commit

Permalink
feat: Implemented async printing in CLI (#130)
Browse files Browse the repository at this point in the history
This commit implements asynchronous printing in the CLI by having a
printing task that handles all printing, and having other tasks just
send messages to say they want something printed. This ensures nothing
interleaves while doing async printing, and that printing doesn't block
the workers.

This commit also introduces a structured Msg type for what gets printed,
handling the different cases of stdout vs. stderr and plain vs. JSON
output.

Signed-off-by: Andrew Lilley Brinker <[email protected]>
  • Loading branch information
alilleybrinker authored Feb 23, 2024
1 parent 0e0a01e commit b03f22c
Show file tree
Hide file tree
Showing 2 changed files with 163 additions and 93 deletions.
4 changes: 4 additions & 0 deletions omnibor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ build-binary = [
"dep:serde_json",
"dep:smart-default",
"tokio/fs",
"tokio/io-std",
"tokio/macros",
"tokio/rt",
"tokio/sync",
"tokio/rt-multi-thread"
]

Expand Down
252 changes: 159 additions & 93 deletions omnibor/src/bin/omnibor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use futures_lite::stream::StreamExt as _;
use omnibor::ArtifactId;
use omnibor::Sha256;
use serde_json::json;
use serde_json::Value as JsonValue;
use smart_default::SmartDefault;
use std::default::Default;
use std::fmt::Display;
Expand All @@ -21,24 +22,37 @@ use std::path::PathBuf;
use std::process::ExitCode;
use std::str::FromStr;
use tokio::fs::File as AsyncFile;
use tokio::runtime::Runtime;
use tokio::io::AsyncWrite;
use tokio::io::AsyncWriteExt as _;
use tokio::sync::mpsc;
use tokio::sync::mpsc::Sender;
use url::Url;

fn main() -> ExitCode {
#[tokio::main]
async fn main() -> ExitCode {
let args = Cli::parse();

// TODO(alilleybrinker): Make this channel Msg limit configurable.
let (tx, mut rx) = mpsc::channel::<Msg>(args.buffer.unwrap_or(100));

// Do all printing in a separate task we spawn to _just_ do printing.
// This stops printing from blocking the worker tasks.
tokio::spawn(async move {
while let Some(msg) = rx.recv().await {
// TODO(alilleybrinker): Handle this error.
let _ = msg.print().await;
}
});

let result = match args.command {
Command::Id(ref args) => run_id(args),
Command::Find(ref args) => run_find(args),
Command::Id(ref args) => run_id(&tx, args).await,
Command::Find(ref args) => run_find(&tx, args).await,
};

if let Err(e) = result {
if let Some(format) = &args.format() {
print_error(e, *format);
} else {
print_plain_error(e);
}

// TODO(alilleybrinker): Handle this erroring out, probably by
// sync-printing as a last resort.
let _ = tx.send(Msg::error(e, args.format())).await;
return ExitCode::FAILURE;
}

Expand All @@ -54,13 +68,17 @@ fn main() -> ExitCode {
struct Cli {
#[command(subcommand)]
command: Command,

/// How many print messages to buffer at one time, tunes printing perf
#[arg(short = 'b', long = "buffer")]
buffer: Option<usize>,
}

impl Cli {
fn format(&self) -> Option<Format> {
fn format(&self) -> Format {
match &self.command {
Command::Id(args) => Some(args.format),
Command::Find(args) => Some(args.format),
Command::Id(args) => args.format,
Command::Find(args) => args.format,
}
}
}
Expand Down Expand Up @@ -154,75 +172,156 @@ impl FromStr for SelectedHash {
}
}

#[derive(Debug)]
struct Msg {
content: Content,
status: Status,
}

impl Msg {
fn id(path: &Path, url: &Url, format: Format) -> Self {
let status = Status::Success;
let path = path.display().to_string();
let url = url.to_string();

match format {
Format::Plain => Msg::plain(status, &format!("{} => {}", path, url)),
Format::Json => Msg::json(status, json!({ "path": path, "id": url })),
}
}

fn error<E: Into<Error>>(error: E, format: Format) -> Msg {
fn _error(error: Error, format: Format) -> Msg {
let status = Status::Error;

match format {
Format::Plain => Msg::plain(status, &format!("error: {}", error.to_string())),
Format::Json => Msg::json(status, json!({"error": error.to_string()})),
}
}

_error(error.into(), format)
}

/// Construct a new plain Msg.
fn plain(status: Status, s: &str) -> Self {
Msg {
content: Content::Plain(s.to_string()),
status,
}
}

/// Construct a new JSON Msg.
fn json(status: Status, j: JsonValue) -> Self {
Msg {
content: Content::Json(j),
status,
}
}

/// Print the Msg to the appropriate sink.
async fn print(self) -> Result<()> {
let to_output = self.content.to_string();
self.resolve_sink().write_all(to_output.as_bytes()).await?;
Ok(())
}

/// Get the sink associated with the type of Msg.
fn resolve_sink(&self) -> Box<dyn AsyncWrite + Unpin + Send> {
match self.status {
Status::Success => Box::new(tokio::io::stdout()),
Status::Error => Box::new(tokio::io::stderr()),
}
}
}

#[derive(Debug)]
enum Content {
Json(JsonValue),
Plain(String),
}

impl Display for Content {
fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
match self {
Content::Plain(s) => write!(f, "{}\n", s),
Content::Json(j) => write!(f, "{}\n", j),
}
}
}

#[derive(Debug)]
enum Status {
Success,
Error,
}

/*===========================================================================
* Command Implementations
*-------------------------------------------------------------------------*/

/// Run the `id` subcommand.
fn run_id(args: &IdArgs) -> Result<()> {
Runtime::new()
.context("failed to initialize the async runtime")?
.block_on(async move {
let mut file = open_async_file(&args.path).await?;

if file_is_dir(&file).await? {
id_directory(&args.path, args.format, args.hash).await
} else {
id_file(&mut file, &args.path, args.format, args.hash).await
}
})
async fn run_id(tx: &Sender<Msg>, args: &IdArgs) -> Result<()> {
let mut file = open_async_file(&args.path).await?;

if file_is_dir(&file).await? {
id_directory(tx, &args.path, args.format, args.hash).await
} else {
id_file(tx, &mut file, &args.path, args.format, args.hash).await
}
}

/// Run the `find` subcommand.
fn run_find(args: &FindArgs) -> Result<()> {
async fn run_find(tx: &Sender<Msg>, args: &FindArgs) -> Result<()> {
let FindArgs { url, path, format } = args;

Runtime::new()
.context("failed to initialize the async runtime")?
.block_on(async move {
let id = ArtifactId::<Sha256>::id_url(url.clone())?;
let url = id.url();

let mut entries = WalkDir::new(&path);

loop {
match entries.next().await {
None => break,
Some(Err(e)) => print_error(e, *format),
Some(Ok(entry)) => {
let path = &entry.path();

if entry_is_dir(&entry).await? {
continue;
}

let mut file = open_async_file(&path).await?;
let file_url = hash_file(SelectedHash::Sha256, &mut file, &path).await?;

if url == file_url {
print_id(&path, &url, *format);
return Ok(());
}
}
let id = ArtifactId::<Sha256>::id_url(url.clone())?;
let url = id.url();

let mut entries = WalkDir::new(&path);

loop {
match entries.next().await {
None => break,
Some(Err(e)) => tx.send(Msg::error(e, *format)).await?,
Some(Ok(entry)) => {
let path = &entry.path();

if entry_is_dir(&entry).await? {
continue;
}

let mut file = open_async_file(&path).await?;
let file_url = hash_file(SelectedHash::Sha256, &mut file, &path).await?;

if url == file_url {
tx.send(Msg::id(&path, &url, *format)).await?;
return Ok(());
}
}
}
}

Ok(())
})
Ok(())
}

/*===========================================================================
* Helper Functions
*-------------------------------------------------------------------------*/

// Identify, recursively, all the files under a directory.
async fn id_directory(path: &Path, format: Format, hash: SelectedHash) -> Result<()> {
async fn id_directory(
tx: &Sender<Msg>,
path: &Path,
format: Format,
hash: SelectedHash,
) -> Result<()> {
let mut entries = WalkDir::new(path);

loop {
match entries.next().await {
None => break,
Some(Err(e)) => print_error(e, format),
Some(Err(e)) => tx.send(Msg::error(e, format)).await?,
Some(Ok(entry)) => {
let path = &entry.path();

Expand All @@ -231,7 +330,7 @@ async fn id_directory(path: &Path, format: Format, hash: SelectedHash) -> Result
}

let mut file = open_async_file(&path).await?;
id_file(&mut file, &path, format, hash).await?;
id_file(tx, &mut file, &path, format, hash).await?;
}
}
}
Expand All @@ -241,13 +340,14 @@ async fn id_directory(path: &Path, format: Format, hash: SelectedHash) -> Result

/// Identify a single file.
async fn id_file(
tx: &Sender<Msg>,
file: &mut AsyncFile,
path: &Path,
format: Format,
hash: SelectedHash,
) -> Result<()> {
let url = hash_file(hash, file, &path).await?;
print_id(path, &url, format);
tx.send(Msg::id(path, &url, format)).await?;
Ok(())
}

Expand All @@ -258,40 +358,6 @@ async fn hash_file(hash: SelectedHash, file: &mut AsyncFile, path: &Path) -> Res
}
}

/// Print IDs for path and file in the chosen format.
fn print_id(path: &Path, url: &Url, format: Format) {
let path = path.display().to_string();
let url = url.to_string();

match format {
Format::Plain => println!("path: {}, id: {}", path, url),
Format::Json => println!("{}", json!({ "path": path, "id": url })),
}
}

/// Print an error, respecting formatting.
fn print_error<E: Into<Error>>(error: E, format: Format) {
fn _print_error(error: Error, format: Format) {
match format {
Format::Plain => print_plain_error(error),
Format::Json => {
let output = json!({
"error": error.to_string(),
});

eprintln!("{}", output);
}
}
}

_print_error(error.into(), format)
}

/// Print an error in plain formatting.
fn print_plain_error(error: Error) {
eprintln!("error: {}", error);
}

/// Check if the file is for a directory.
async fn file_is_dir(file: &AsyncFile) -> Result<bool> {
Ok(file.metadata().await.map(|meta| meta.is_dir())?)
Expand Down

0 comments on commit b03f22c

Please sign in to comment.