Skip to content

Commit

Permalink
feat: pipe workflow output to emitter (#496)
Browse files Browse the repository at this point in the history
  • Loading branch information
morgante authored Sep 11, 2024
1 parent db4b277 commit 8eedbe7
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 6 deletions.
3 changes: 2 additions & 1 deletion crates/cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ tracing = { version = "0.1.40", default-features = false, features = [] }
notify = "6.1.1"
notify-debouncer-mini = "0.4.1"
tracing-subscriber = { version = "0.3", default-features = false, optional = true }
tracing-log = "0.2.0"
tracing-log = { version = "0.2.0", optional = true }

fs-err = { version = "2.11.0" }

Expand Down Expand Up @@ -129,6 +129,7 @@ grit_tracing = [
"dep:opentelemetry_sdk",
"dep:tracing-opentelemetry",
"dep:tracing-subscriber",
"dep:tracing-log",
"marzano-core/grit_tracing",
]
external_functions = ["marzano-core/external_functions"]
Expand Down
4 changes: 2 additions & 2 deletions crates/cli/src/commands/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ fn setup_env_logger(app: &App, multi: &MultiProgress) {
}
}

async fn run_command(use_tracing: bool) -> Result<()> {
async fn run_command(_use_tracing: bool) -> Result<()> {
let app = App::parse();
// Use this *only* for analytics, not for any other purpose.
let analytics_args = std::env::args().collect::<Vec<_>>();
Expand All @@ -353,7 +353,7 @@ async fn run_command(use_tracing: bool) -> Result<()> {
#[cfg(not(feature = "grit_tracing"))]
setup_env_logger(&app, &multi);
#[cfg(feature = "grit_tracing")]
if !use_tracing {
if !_use_tracing {
setup_env_logger(&app, &multi);
} else if let Err(e) = tracing_log::LogTracer::init() {
eprintln!("Failed to initialize LogTracer: {:?}", e);
Expand Down
67 changes: 64 additions & 3 deletions crates/cli/src/workflows.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@ use marzano_util::diff::FileDiff;
use serde::Serialize;
use serde_json::to_string;
use std::path::{Path, PathBuf};
use std::process::Stdio;
use tempfile::NamedTempFile;
use tokio::fs;
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio::process::Command;

pub static GRIT_REPO_URL_NAME: &str = "grit_repo_url";
Expand Down Expand Up @@ -67,13 +69,14 @@ where
let repo = LocalRepo::from_dir(&cwd).await;

#[cfg(feature = "workflow_server")]
let (server_addr, handle, shutdown_tx) = {
let (messages, server_addr, handle, shutdown_tx) = {
let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>();
let socket = tokio::net::TcpListener::bind("0.0.0.0:0").await?;
let server_addr = format!("http://{}", socket.local_addr()?).to_string();
let handle = grit_cloud_client::spawn_server_tasks(emitter, shutdown_rx, socket);
let (messages, handle) =
grit_cloud_client::spawn_server_tasks(emitter, shutdown_rx, socket);
log::info!("Started local server at {}", server_addr);
(server_addr, handle, shutdown_tx)
(messages, server_addr, handle, shutdown_tx)
};

let root = std::env::var(ENV_GRIT_WORKSPACE_ROOT).unwrap_or_else(|_| {
Expand Down Expand Up @@ -159,9 +162,63 @@ where
.arg("--file")
.arg(&tempfile_path)
.kill_on_drop(true)
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
.expect("Failed to start worker");

let stdout = final_child.stdout.take().expect("Failed to get stdout");
let stderr = final_child.stderr.take().expect("Failed to get stderr");

let mut stdout_reader = BufReader::new(stdout).lines();
let mut stderr_reader = BufReader::new(stderr).lines();

// TODO: make this more elegant by allowing Emitter to be shared between threads
#[cfg(feature = "workflow_server")]
let stdout_messages = messages.clone();
let stdout_handle = tokio::spawn(async move {
while let Some(line) = stdout_reader.next_line().await.unwrap() {
#[cfg(feature = "workflow_server")]
stdout_messages
.send(grit_cloud_client::ServerMessage::Log(
marzano_messenger::SimpleLogMessage {
level: marzano_core::api::AnalysisLogLevel::Info,
step_id: None,
message: line,
meta: Some(std::collections::HashMap::from([(
"source".to_string(),
serde_json::Value::String("stdout".to_string()),
)])),
},
))
.await
.unwrap();
#[cfg(not(feature = "workflow_server"))]
log::info!("{}", line);
}
});
let stderr_handle = tokio::spawn(async move {
while let Some(line) = stderr_reader.next_line().await.unwrap() {
#[cfg(feature = "workflow_server")]
messages
.send(grit_cloud_client::ServerMessage::Log(
marzano_messenger::SimpleLogMessage {
level: marzano_core::api::AnalysisLogLevel::Error,
step_id: None,
message: line,
meta: Some(std::collections::HashMap::from([(
"source".to_string(),
serde_json::Value::String("stderr".to_string()),
)])),
},
))
.await
.unwrap();
#[cfg(not(feature = "workflow_server"))]
log::info!("{}", line);
}
});

let status = final_child.wait().await?;

// Stop the embedded server
Expand All @@ -171,6 +228,10 @@ where
handle.await?
};

// Wait for the stdout and stderr readers to finish
stdout_handle.await?;
stderr_handle.await?;

// Note the workflow may have already emitted its own conclusion - this is a fallback
let fallback_outcome = if status.success() {
PackagedWorkflowOutcome {
Expand Down

0 comments on commit 8eedbe7

Please sign in to comment.