diff --git a/homestar-runtime/src/logger.rs b/homestar-runtime/src/logger.rs index 744a21c5..f397e6be 100644 --- a/homestar-runtime/src/logger.rs +++ b/homestar-runtime/src/logger.rs @@ -7,6 +7,8 @@ use tracing_subscriber::{layer::SubscriberExt as _, prelude::*, EnvFilter}; const LOG_FILE: &str = "homestar.log"; const DIRECTIVE_EXPECT: &str = "Invalid tracing directive"; +// Sets simplified logging filter and format for Every CLI +const EVERY_CLI: &str = "EVERY_CLI"; /// Logger interface. #[derive(Debug)] @@ -43,31 +45,70 @@ fn init( guard: WorkerGuard, #[allow(unused_variables)] settings: &settings::Monitoring, ) -> WorkerGuard { + // RUST_LOG ignored when EVERY_CLI is true + let every_cli: bool = std::env::var(EVERY_CLI).is_ok_and(|val| val == "true"); + // TODO: Add support for customizing logger(s) / specialzed formatters. - let format_layer = tracing_logfmt::builder() - .with_level(true) - .with_target(true) - .with_span_name(true) - .with_span_path(true) - .with_location(true) - .with_module_path(true) - .layer() - .with_writer(writer); + let format_layer = if every_cli { + tracing_logfmt::builder() + .with_level(true) + .with_target(false) + .with_span_name(false) + .with_span_path(false) + .with_location(false) + .with_module_path(false) + .layer() + .with_writer(writer) + } else { + tracing_logfmt::builder() + .with_level(true) + .with_target(true) + .with_span_name(true) + .with_span_path(true) + .with_location(true) + .with_module_path(true) + .layer() + .with_writer(writer) + }; - let filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| { - EnvFilter::new("info") - .add_directive("homestar_wasm=info".parse().expect(DIRECTIVE_EXPECT)) - .add_directive("libp2p=info".parse().expect(DIRECTIVE_EXPECT)) + let filter = if every_cli { + EnvFilter::new("off") + .add_directive( + "homestar_runtime::runner[run_worker]=info" + .parse() + .expect(DIRECTIVE_EXPECT), + ) + .add_directive( + "homestar_runtime::worker[run]=info" + .parse() + .expect(DIRECTIVE_EXPECT), + ) + .add_directive( + "homestar_runtime::worker[spawn_workflow_tasks]=info" + .parse() + .expect(DIRECTIVE_EXPECT), + ) .add_directive( - "libp2p_gossipsub::behaviour=info" + "homestar_wasm[wasi_log]=trace" .parse() .expect(DIRECTIVE_EXPECT), ) - .add_directive("tarpc=info".parse().expect(DIRECTIVE_EXPECT)) - .add_directive("tower_http=info".parse().expect(DIRECTIVE_EXPECT)) - .add_directive("moka=info".parse().expect(DIRECTIVE_EXPECT)) - .add_directive("jsonrpsee=info".parse().expect(DIRECTIVE_EXPECT)) - }); + } else { + EnvFilter::try_from_default_env().unwrap_or_else(|_| { + EnvFilter::new("info") + .add_directive("homestar_wasm=info".parse().expect(DIRECTIVE_EXPECT)) + .add_directive("libp2p=info".parse().expect(DIRECTIVE_EXPECT)) + .add_directive( + "libp2p_gossipsub::behaviour=info" + .parse() + .expect(DIRECTIVE_EXPECT), + ) + .add_directive("tarpc=info".parse().expect(DIRECTIVE_EXPECT)) + .add_directive("tower_http=info".parse().expect(DIRECTIVE_EXPECT)) + .add_directive("moka=info".parse().expect(DIRECTIVE_EXPECT)) + .add_directive("jsonrpsee=info".parse().expect(DIRECTIVE_EXPECT)) + }) + }; #[cfg(all( feature = "console", diff --git a/homestar-runtime/src/runner.rs b/homestar-runtime/src/runner.rs index c8e6eec6..0de197b3 100644 --- a/homestar-runtime/src/runner.rs +++ b/homestar-runtime/src/runner.rs @@ -40,7 +40,7 @@ use tokio::{ time, }; use tokio_util::time::{delay_queue, DelayQueue}; -use tracing::{debug, error, info, warn}; +use tracing::{debug, error, info, info_span, instrument, warn, Instrument}; mod error; pub(crate) mod file; @@ -702,6 +702,7 @@ impl Runner { } } + #[instrument(skip_all)] async fn run_worker>( &self, workflow: Workflow<'static, Arg>, @@ -767,9 +768,11 @@ impl Runner { async move { Fetch::get_resources(rscs, workflow_settings).await }.boxed() }; - let handle = self - .runtime - .spawn(worker.run(self.running_tasks(), fetch_fn)); + let handle = self.runtime.spawn( + worker + .run(self.running_tasks(), fetch_fn) + .instrument(info_span!("run").or_current()), + ); // Add Cid to expirations timing wheel let delay_key = self @@ -790,6 +793,20 @@ impl Runner { .collect(); let replayed_receipt_info = find_receipt_info_by_pointers(&receipt_pointers, db)?; + // Log replayed receipts if any + if !replayed_receipt_info.is_empty() { + info!( + subject = "workflow.receipts", + category = "workflow", + receipt_cids = replayed_receipt_info + .iter() + .map(|info| info.0.to_string()) + .collect::>() + .join(","), + "replaying receipts", + ); + }; + Ok(WorkflowData { info: initial_info, name: workflow_name, diff --git a/homestar-runtime/src/tasks/wasm.rs b/homestar-runtime/src/tasks/wasm.rs index 5b51e5df..d14341bc 100644 --- a/homestar-runtime/src/tasks/wasm.rs +++ b/homestar-runtime/src/tasks/wasm.rs @@ -8,6 +8,7 @@ use homestar_wasm::{ io::{Arg, Output}, wasmtime::{world::Env, Error as WasmRuntimeError, State, World}, }; +use tracing::Instrument; #[allow(dead_code)] #[allow(missing_debug_implementations)] @@ -32,7 +33,7 @@ impl WasmContext { args: Args, ) -> Result { let env = World::instantiate_with_current_env(bytes, fun_name, &mut self.env).await?; - env.execute(args).await + env.execute(args).in_current_span().await } } diff --git a/homestar-runtime/src/worker.rs b/homestar-runtime/src/worker.rs index a5f2e31e..d5d89733 100644 --- a/homestar-runtime/src/worker.rs +++ b/homestar-runtime/src/worker.rs @@ -36,7 +36,7 @@ use indexmap::IndexMap; use libipld::{Cid, Ipld}; use std::{collections::BTreeMap, sync::Arc}; use tokio::task::JoinSet; -use tracing::{debug, error, info}; +use tracing::{debug, debug_span, error, info, info_span, instrument, Instrument}; mod poller; mod resolver; @@ -157,6 +157,7 @@ where /// [Instruction]: homestar_invocation::task::Instruction /// [Swarm]: crate::network::swarm /// [LinkMap]: homestar_workflow::LinkMap + #[instrument(skip_all)] pub(crate) async fn run(self, running_tasks: Arc, fetch_fn: F) -> Result<()> where F: FnOnce(FnvHashSet) -> BoxFuture<'a, Result>>>, @@ -169,6 +170,15 @@ where .await { Ok(ctx) => { + let workflow_cid = self.workflow_info.cid.to_string(); + + info!( + subject = "worker.init_workflow", + category = "worker.run", + workflow_cid, + "initializing workflow" + ); + let promises_to_resolve = ctx.scheduler.promises_to_resolve.clone(); let resolver = DHTResolver::new( promises_to_resolve, @@ -181,7 +191,7 @@ where info!( subject = "worker.resolve_receipts", category = "worker.run", - workflow_cid = self.workflow_info.cid.to_string(), + workflow_cid, "resolving receipts in the background" ); poller::poll( @@ -196,12 +206,26 @@ where // Set the workflow status to running. let conn = &mut self.db.conn()?; if ctx.scheduler.run_length() > 0 { + info!( + subject = "worker.start_workflow", + category = "worker.run", + workflow_cid, + "starting workflow" + ); + Db::set_workflow_status( self.workflow_info.cid, workflow::Status::Running, conn, )?; } else { + info!( + subject = "worker.start_workflow", + category = "worker.run", + workflow_cid, + "replaying workflow" + ); + Db::set_workflow_status( self.workflow_info.cid, workflow::Status::Completed, @@ -223,6 +247,7 @@ where } #[allow(unused_mut)] + #[instrument(skip_all)] async fn run_queue( mut self, mut scheduler: TaskScheduler<'a>, @@ -321,7 +346,7 @@ where category = "worker.run", workflow_cid = workflow_cid.to_string(), cid = cid.to_string(), - "attempting to resolve cid in workflow" + "attempting to resolve workflow args by cid" ); cid.resolve(linkmap.clone(), resources.clone(), db.clone()) @@ -329,9 +354,11 @@ where }); let handle = task_set.spawn(async move { - match resolved.await { + match resolved.await { Ok(inst_result) => { - match wasm_ctx.run(wasm, &fun, inst_result).await { + match wasm_ctx.run(wasm, &fun, inst_result).instrument({ + debug_span!("wasm_run").or_current() + }).await { Ok(output) => Ok(( output, instruction_ptr, @@ -352,7 +379,11 @@ where }) } } - }); + } + .instrument({ + info_span!("spawn_workflow_tasks").or_current() + })); + handles.push(handle); } None => error!( @@ -428,6 +459,13 @@ where "committed to database" ); + info!( + subject = "worker.receipt", + category = "worker.run", + receipt_cid = stored_receipt.cid().to_string(), + "computed receipt" + ); + let _ = self .event_sender .send_async(Event::CapturedReceipt(Captured::with( @@ -442,6 +480,14 @@ where // Set the workflow status to `completed` let conn = &mut self.db.conn()?; Db::set_workflow_status(self.workflow_info.cid, workflow::Status::Completed, conn)?; + + info!( + subject = "worker.end_workflow", + category = "worker.run", + workflow_cid = self.workflow_info.cid.to_string(), + "workflow completed" + ); + Ok(()) } } diff --git a/homestar-runtime/src/worker/resolver.rs b/homestar-runtime/src/worker/resolver.rs index 8629cfa5..9edadde4 100644 --- a/homestar-runtime/src/worker/resolver.rs +++ b/homestar-runtime/src/worker/resolver.rs @@ -23,7 +23,7 @@ use tokio::{ sync::RwLock, time::{timeout_at, Instant}, }; -use tracing::debug; +use tracing::{debug, instrument}; pub(crate) trait Resolver { async fn resolve( @@ -35,6 +35,7 @@ pub(crate) trait Resolver { } impl Resolver for Cid { + #[instrument(level = "debug", name = "cid_resolve", skip_all)] async fn resolve( self, linkmap: Arc>>>, diff --git a/homestar-wasm/src/wasmtime/host/helpers.rs b/homestar-wasm/src/wasmtime/host/helpers.rs index 2dd9fb74..5309230d 100644 --- a/homestar-wasm/src/wasmtime/host/helpers.rs +++ b/homestar-wasm/src/wasmtime/host/helpers.rs @@ -6,6 +6,7 @@ use crate::wasmtime::{ }; use async_trait::async_trait; use std::time::Instant; +use tracing::instrument; #[async_trait] impl helpers::Host for State { @@ -30,6 +31,7 @@ impl helpers::Host for State { #[async_trait] impl wasi::logging::logging::Host for State { /// Log a message, formatted by the runtime subscriber. + #[instrument(name = "wasi_log", skip_all)] async fn log( &mut self, level: wasi::logging::logging::Level, diff --git a/homestar-wasm/src/wasmtime/world.rs b/homestar-wasm/src/wasmtime/world.rs index fc980239..7c21b469 100644 --- a/homestar-wasm/src/wasmtime/world.rs +++ b/homestar-wasm/src/wasmtime/world.rs @@ -20,6 +20,7 @@ use homestar_invocation::{ task::instruction::{Args, Input}, }; use std::{iter, time::Instant}; +use tracing::{instrument, Instrument}; use wasmtime::{ component::{self, Component, Func, Instance, Linker}, Config, Engine, Store, @@ -145,6 +146,7 @@ impl Env { /// Types must conform to [Wit] IDL types when Wasm was compiled/generated. /// /// [Wit]: + #[instrument(skip_all)] pub async fn execute(&mut self, args: Args) -> Result where T: Send, @@ -196,6 +198,7 @@ impl Env { .ok_or(Error::WasmInstantiation)? .func() .call_async(&mut self.store, ¶ms, &mut results_alloc) + .in_current_span() .await?; self.bindings @@ -203,6 +206,7 @@ impl Env { .ok_or(Error::WasmInstantiation)? .func() .post_return_async(&mut self.store) + .in_current_span() .await?; let results = match &results_alloc[..] { @@ -415,7 +419,7 @@ fn component_from_bytes(bytes: &[u8], engine: Engine) -> Result