Skip to content

Commit

Permalink
refactor: Add workflow spans and every cli logging (#603)
Browse files Browse the repository at this point in the history
# Description

This PR implements the following changes:

- [x] Add initial workflow and task execution spans
- [x] Add workflow initialize, start, and end logs
- [x] Add computed and replayed receipt logs
- [x] Add custom logging format and filter for EveryCLI
- [x] Minor re-wording of existing logs

## Link to issue

Implements spans needed in #457

## Type of change

- [x] New feature (non-breaking change that adds functionality)
- [x] Refactor (non-breaking change that updates existing functionality)

## Test plan (required)

Run Homestar with `EVERY_CLI` set to `true` to see the simplified logs:

```
EVERY_CLI=true cargo run -- start
```

Check that `RUST_LOG` works when `EVERY_CLI` is `false` or not set:

 ```
EVERY_CLI=false RUST_LOG=info cargo run -- start
```
  • Loading branch information
bgins committed Mar 13, 2024
1 parent 0f093f0 commit c3f9b4a
Show file tree
Hide file tree
Showing 7 changed files with 144 additions and 32 deletions.
79 changes: 60 additions & 19 deletions homestar-runtime/src/logger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ use tracing_subscriber::{layer::SubscriberExt as _, prelude::*, EnvFilter};

const LOG_FILE: &str = "homestar.log";
const DIRECTIVE_EXPECT: &str = "Invalid tracing directive";
// Sets simplified logging filter and format for Every CLI
const EVERY_CLI: &str = "EVERY_CLI";

/// Logger interface.
#[derive(Debug)]
Expand Down Expand Up @@ -43,31 +45,70 @@ fn init(
guard: WorkerGuard,
#[allow(unused_variables)] settings: &settings::Monitoring,
) -> WorkerGuard {
// RUST_LOG ignored when EVERY_CLI is true
let every_cli: bool = std::env::var(EVERY_CLI).is_ok_and(|val| val == "true");

// TODO: Add support for customizing logger(s) / specialzed formatters.
let format_layer = tracing_logfmt::builder()
.with_level(true)
.with_target(true)
.with_span_name(true)
.with_span_path(true)
.with_location(true)
.with_module_path(true)
.layer()
.with_writer(writer);
let format_layer = if every_cli {
tracing_logfmt::builder()
.with_level(true)
.with_target(false)
.with_span_name(false)
.with_span_path(false)
.with_location(false)
.with_module_path(false)
.layer()
.with_writer(writer)
} else {
tracing_logfmt::builder()
.with_level(true)
.with_target(true)
.with_span_name(true)
.with_span_path(true)
.with_location(true)
.with_module_path(true)
.layer()
.with_writer(writer)
};

let filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| {
EnvFilter::new("info")
.add_directive("homestar_wasm=info".parse().expect(DIRECTIVE_EXPECT))
.add_directive("libp2p=info".parse().expect(DIRECTIVE_EXPECT))
let filter = if every_cli {
EnvFilter::new("off")
.add_directive(
"homestar_runtime::runner[run_worker]=info"
.parse()
.expect(DIRECTIVE_EXPECT),
)
.add_directive(
"homestar_runtime::worker[run]=info"
.parse()
.expect(DIRECTIVE_EXPECT),
)
.add_directive(
"homestar_runtime::worker[spawn_workflow_tasks]=info"
.parse()
.expect(DIRECTIVE_EXPECT),
)
.add_directive(
"libp2p_gossipsub::behaviour=info"
"homestar_wasm[wasi_log]=trace"
.parse()
.expect(DIRECTIVE_EXPECT),
)
.add_directive("tarpc=info".parse().expect(DIRECTIVE_EXPECT))
.add_directive("tower_http=info".parse().expect(DIRECTIVE_EXPECT))
.add_directive("moka=info".parse().expect(DIRECTIVE_EXPECT))
.add_directive("jsonrpsee=info".parse().expect(DIRECTIVE_EXPECT))
});
} else {
EnvFilter::try_from_default_env().unwrap_or_else(|_| {
EnvFilter::new("info")
.add_directive("homestar_wasm=info".parse().expect(DIRECTIVE_EXPECT))
.add_directive("libp2p=info".parse().expect(DIRECTIVE_EXPECT))
.add_directive(
"libp2p_gossipsub::behaviour=info"
.parse()
.expect(DIRECTIVE_EXPECT),
)
.add_directive("tarpc=info".parse().expect(DIRECTIVE_EXPECT))
.add_directive("tower_http=info".parse().expect(DIRECTIVE_EXPECT))
.add_directive("moka=info".parse().expect(DIRECTIVE_EXPECT))
.add_directive("jsonrpsee=info".parse().expect(DIRECTIVE_EXPECT))
})
};

#[cfg(all(
feature = "console",
Expand Down
25 changes: 21 additions & 4 deletions homestar-runtime/src/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use tokio::{
time,
};
use tokio_util::time::{delay_queue, DelayQueue};
use tracing::{debug, error, info, warn};
use tracing::{debug, error, info, info_span, instrument, warn, Instrument};

mod error;
pub(crate) mod file;
Expand Down Expand Up @@ -702,6 +702,7 @@ impl Runner {
}
}

#[instrument(skip_all)]
async fn run_worker<S: Into<FastStr>>(
&self,
workflow: Workflow<'static, Arg>,
Expand Down Expand Up @@ -767,9 +768,11 @@ impl Runner {
async move { Fetch::get_resources(rscs, workflow_settings).await }.boxed()
};

let handle = self
.runtime
.spawn(worker.run(self.running_tasks(), fetch_fn));
let handle = self.runtime.spawn(
worker
.run(self.running_tasks(), fetch_fn)
.instrument(info_span!("run").or_current()),
);

// Add Cid to expirations timing wheel
let delay_key = self
Expand All @@ -790,6 +793,20 @@ impl Runner {
.collect();
let replayed_receipt_info = find_receipt_info_by_pointers(&receipt_pointers, db)?;

// Log replayed receipts if any
if !replayed_receipt_info.is_empty() {
info!(
subject = "workflow.receipts",
category = "workflow",
receipt_cids = replayed_receipt_info
.iter()
.map(|info| info.0.to_string())
.collect::<Vec<String>>()
.join(","),
"replaying receipts",
);
};

Ok(WorkflowData {
info: initial_info,
name: workflow_name,
Expand Down
3 changes: 2 additions & 1 deletion homestar-runtime/src/tasks/wasm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use homestar_wasm::{
io::{Arg, Output},
wasmtime::{world::Env, Error as WasmRuntimeError, State, World},
};
use tracing::Instrument;

#[allow(dead_code)]
#[allow(missing_debug_implementations)]
Expand All @@ -32,7 +33,7 @@ impl WasmContext {
args: Args<Arg>,
) -> Result<Output, WasmRuntimeError> {
let env = World::instantiate_with_current_env(bytes, fun_name, &mut self.env).await?;
env.execute(args).await
env.execute(args).in_current_span().await
}
}

Expand Down
58 changes: 52 additions & 6 deletions homestar-runtime/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use indexmap::IndexMap;
use libipld::{Cid, Ipld};
use std::{collections::BTreeMap, sync::Arc};
use tokio::task::JoinSet;
use tracing::{debug, error, info};
use tracing::{debug, debug_span, error, info, info_span, instrument, Instrument};

mod poller;
mod resolver;
Expand Down Expand Up @@ -157,6 +157,7 @@ where
/// [Instruction]: homestar_invocation::task::Instruction
/// [Swarm]: crate::network::swarm
/// [LinkMap]: homestar_workflow::LinkMap
#[instrument(skip_all)]
pub(crate) async fn run<F>(self, running_tasks: Arc<RunningTaskSet>, fetch_fn: F) -> Result<()>
where
F: FnOnce(FnvHashSet<Resource>) -> BoxFuture<'a, Result<IndexMap<Resource, Vec<u8>>>>,
Expand All @@ -169,6 +170,15 @@ where
.await
{
Ok(ctx) => {
let workflow_cid = self.workflow_info.cid.to_string();

info!(
subject = "worker.init_workflow",
category = "worker.run",
workflow_cid,
"initializing workflow"
);

let promises_to_resolve = ctx.scheduler.promises_to_resolve.clone();
let resolver = DHTResolver::new(
promises_to_resolve,
Expand All @@ -181,7 +191,7 @@ where
info!(
subject = "worker.resolve_receipts",
category = "worker.run",
workflow_cid = self.workflow_info.cid.to_string(),
workflow_cid,
"resolving receipts in the background"
);
poller::poll(
Expand All @@ -196,12 +206,26 @@ where
// Set the workflow status to running.
let conn = &mut self.db.conn()?;
if ctx.scheduler.run_length() > 0 {
info!(
subject = "worker.start_workflow",
category = "worker.run",
workflow_cid,
"starting workflow"
);

Db::set_workflow_status(
self.workflow_info.cid,
workflow::Status::Running,
conn,
)?;
} else {
info!(
subject = "worker.start_workflow",
category = "worker.run",
workflow_cid,
"replaying workflow"
);

Db::set_workflow_status(
self.workflow_info.cid,
workflow::Status::Completed,
Expand All @@ -223,6 +247,7 @@ where
}

#[allow(unused_mut)]
#[instrument(skip_all)]
async fn run_queue(
mut self,
mut scheduler: TaskScheduler<'a>,
Expand Down Expand Up @@ -321,17 +346,19 @@ where
category = "worker.run",
workflow_cid = workflow_cid.to_string(),
cid = cid.to_string(),
"attempting to resolve cid in workflow"
"attempting to resolve workflow args by cid"
);

cid.resolve(linkmap.clone(), resources.clone(), db.clone())
.boxed()
});

let handle = task_set.spawn(async move {
match resolved.await {
match resolved.await {
Ok(inst_result) => {
match wasm_ctx.run(wasm, &fun, inst_result).await {
match wasm_ctx.run(wasm, &fun, inst_result).instrument({
debug_span!("wasm_run").or_current()
}).await {
Ok(output) => Ok((
output,
instruction_ptr,
Expand All @@ -352,7 +379,11 @@ where
})
}
}
});
}
.instrument({
info_span!("spawn_workflow_tasks").or_current()
}));

handles.push(handle);
}
None => error!(
Expand Down Expand Up @@ -428,6 +459,13 @@ where
"committed to database"
);

info!(
subject = "worker.receipt",
category = "worker.run",
receipt_cid = stored_receipt.cid().to_string(),
"computed receipt"
);

let _ = self
.event_sender
.send_async(Event::CapturedReceipt(Captured::with(
Expand All @@ -442,6 +480,14 @@ where
// Set the workflow status to `completed`
let conn = &mut self.db.conn()?;
Db::set_workflow_status(self.workflow_info.cid, workflow::Status::Completed, conn)?;

info!(
subject = "worker.end_workflow",
category = "worker.run",
workflow_cid = self.workflow_info.cid.to_string(),
"workflow completed"
);

Ok(())
}
}
Expand Down
3 changes: 2 additions & 1 deletion homestar-runtime/src/worker/resolver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use tokio::{
sync::RwLock,
time::{timeout_at, Instant},
};
use tracing::debug;
use tracing::{debug, instrument};

pub(crate) trait Resolver {
async fn resolve(
Expand All @@ -35,6 +35,7 @@ pub(crate) trait Resolver {
}

impl Resolver for Cid {
#[instrument(level = "debug", name = "cid_resolve", skip_all)]
async fn resolve(
self,
linkmap: Arc<RwLock<LinkMap<task::Result<Arg>>>>,
Expand Down
2 changes: 2 additions & 0 deletions homestar-wasm/src/wasmtime/host/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::wasmtime::{
};
use async_trait::async_trait;
use std::time::Instant;
use tracing::instrument;

#[async_trait]
impl helpers::Host for State {
Expand All @@ -30,6 +31,7 @@ impl helpers::Host for State {
#[async_trait]
impl wasi::logging::logging::Host for State {
/// Log a message, formatted by the runtime subscriber.
#[instrument(name = "wasi_log", skip_all)]
async fn log(
&mut self,
level: wasi::logging::logging::Level,
Expand Down
6 changes: 5 additions & 1 deletion homestar-wasm/src/wasmtime/world.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use homestar_invocation::{
task::instruction::{Args, Input},
};
use std::{iter, time::Instant};
use tracing::{instrument, Instrument};
use wasmtime::{
component::{self, Component, Func, Instance, Linker},
Config, Engine, Store,
Expand Down Expand Up @@ -145,6 +146,7 @@ impl<T> Env<T> {
/// Types must conform to [Wit] IDL types when Wasm was compiled/generated.
///
/// [Wit]: <https://github.com/WebAssembly/component-model/blob/main/design/mvp/WIT.md>
#[instrument(skip_all)]
pub async fn execute(&mut self, args: Args<Arg>) -> Result<Output, Error>
where
T: Send,
Expand Down Expand Up @@ -196,13 +198,15 @@ impl<T> Env<T> {
.ok_or(Error::WasmInstantiation)?
.func()
.call_async(&mut self.store, &params, &mut results_alloc)
.in_current_span()
.await?;

self.bindings
.as_mut()
.ok_or(Error::WasmInstantiation)?
.func()
.post_return_async(&mut self.store)
.in_current_span()
.await?;

let results = match &results_alloc[..] {
Expand Down Expand Up @@ -415,7 +419,7 @@ fn component_from_bytes(bytes: &[u8], engine: Engine) -> Result<Component, Error
if is_component(chunk) {
Component::from_binary(&engine, bytes).map_err(Error::IntoWasmComponent)
} else {
tracing::info!("Converting Wasm binary into a Wasm component");
tracing::info!("converting Wasm binary into a Wasm component");

let component = ComponentEncoder::default()
.module(bytes)?
Expand Down

0 comments on commit c3f9b4a

Please sign in to comment.