Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Add workflow spans and every cli logging #603

Merged
merged 8 commits into from
Mar 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 60 additions & 19 deletions homestar-runtime/src/logger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ use tracing_subscriber::{layer::SubscriberExt as _, prelude::*, EnvFilter};

const LOG_FILE: &str = "homestar.log";
const DIRECTIVE_EXPECT: &str = "Invalid tracing directive";
// Sets simplified logging filter and format for Every CLI
const EVERY_CLI: &str = "EVERY_CLI";
bgins marked this conversation as resolved.
Show resolved Hide resolved

/// Logger interface.
#[derive(Debug)]
Expand Down Expand Up @@ -43,31 +45,70 @@ fn init(
guard: WorkerGuard,
#[allow(unused_variables)] settings: &settings::Monitoring,
) -> WorkerGuard {
// RUST_LOG ignored when EVERY_CLI is true
let every_cli: bool = std::env::var(EVERY_CLI).is_ok_and(|val| val == "true");

// TODO: Add support for customizing logger(s) / specialzed formatters.
let format_layer = tracing_logfmt::builder()
.with_level(true)
.with_target(true)
.with_span_name(true)
.with_span_path(true)
.with_location(true)
.with_module_path(true)
.layer()
.with_writer(writer);
let format_layer = if every_cli {
tracing_logfmt::builder()
.with_level(true)
.with_target(false)
.with_span_name(false)
.with_span_path(false)
.with_location(false)
.with_module_path(false)
.layer()
.with_writer(writer)
} else {
tracing_logfmt::builder()
.with_level(true)
.with_target(true)
.with_span_name(true)
.with_span_path(true)
.with_location(true)
.with_module_path(true)
.layer()
.with_writer(writer)
};

let filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| {
EnvFilter::new("info")
.add_directive("homestar_wasm=info".parse().expect(DIRECTIVE_EXPECT))
.add_directive("libp2p=info".parse().expect(DIRECTIVE_EXPECT))
let filter = if every_cli {
EnvFilter::new("off")
zeeshanlakhani marked this conversation as resolved.
Show resolved Hide resolved
.add_directive(
"homestar_runtime::runner[run_worker]=info"
.parse()
.expect(DIRECTIVE_EXPECT),
)
.add_directive(
"homestar_runtime::worker[run]=info"
.parse()
.expect(DIRECTIVE_EXPECT),
)
.add_directive(
"homestar_runtime::worker[spawn_workflow_tasks]=info"
.parse()
.expect(DIRECTIVE_EXPECT),
)
.add_directive(
"libp2p_gossipsub::behaviour=info"
"homestar_wasm[wasi_log]=trace"
.parse()
.expect(DIRECTIVE_EXPECT),
)
.add_directive("tarpc=info".parse().expect(DIRECTIVE_EXPECT))
.add_directive("tower_http=info".parse().expect(DIRECTIVE_EXPECT))
.add_directive("moka=info".parse().expect(DIRECTIVE_EXPECT))
.add_directive("jsonrpsee=info".parse().expect(DIRECTIVE_EXPECT))
});
} else {
EnvFilter::try_from_default_env().unwrap_or_else(|_| {
EnvFilter::new("info")
.add_directive("homestar_wasm=info".parse().expect(DIRECTIVE_EXPECT))
.add_directive("libp2p=info".parse().expect(DIRECTIVE_EXPECT))
.add_directive(
"libp2p_gossipsub::behaviour=info"
.parse()
.expect(DIRECTIVE_EXPECT),
)
.add_directive("tarpc=info".parse().expect(DIRECTIVE_EXPECT))
.add_directive("tower_http=info".parse().expect(DIRECTIVE_EXPECT))
.add_directive("moka=info".parse().expect(DIRECTIVE_EXPECT))
.add_directive("jsonrpsee=info".parse().expect(DIRECTIVE_EXPECT))
})
};

#[cfg(all(
feature = "console",
Expand Down
25 changes: 21 additions & 4 deletions homestar-runtime/src/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use tokio::{
time,
};
use tokio_util::time::{delay_queue, DelayQueue};
use tracing::{debug, error, info, warn};
use tracing::{debug, error, info, info_span, instrument, warn, Instrument};

mod error;
pub(crate) mod file;
Expand Down Expand Up @@ -702,6 +702,7 @@ impl Runner {
}
}

#[instrument(skip_all)]
async fn run_worker<S: Into<FastStr>>(
&self,
workflow: Workflow<'static, Arg>,
Expand Down Expand Up @@ -767,9 +768,11 @@ impl Runner {
async move { Fetch::get_resources(rscs, workflow_settings).await }.boxed()
};

let handle = self
.runtime
.spawn(worker.run(self.running_tasks(), fetch_fn));
let handle = self.runtime.spawn(
worker
.run(self.running_tasks(), fetch_fn)
.instrument(info_span!("run").or_current()),
);

// Add Cid to expirations timing wheel
let delay_key = self
Expand All @@ -790,6 +793,20 @@ impl Runner {
.collect();
let replayed_receipt_info = find_receipt_info_by_pointers(&receipt_pointers, db)?;

// Log replayed receipts if any
if !replayed_receipt_info.is_empty() {
info!(
subject = "workflow.receipts",
category = "workflow",
receipt_cids = replayed_receipt_info
.iter()
.map(|info| info.0.to_string())
.collect::<Vec<String>>()
.join(","),
"replaying receipts",
);
};

Ok(WorkflowData {
info: initial_info,
name: workflow_name,
Expand Down
3 changes: 2 additions & 1 deletion homestar-runtime/src/tasks/wasm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use homestar_wasm::{
io::{Arg, Output},
wasmtime::{world::Env, Error as WasmRuntimeError, State, World},
};
use tracing::Instrument;

#[allow(dead_code)]
#[allow(missing_debug_implementations)]
Expand All @@ -32,7 +33,7 @@ impl WasmContext {
args: Args<Arg>,
) -> Result<Output, WasmRuntimeError> {
let env = World::instantiate_with_current_env(bytes, fun_name, &mut self.env).await?;
env.execute(args).await
env.execute(args).in_current_span().await
}
}

Expand Down
58 changes: 52 additions & 6 deletions homestar-runtime/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use indexmap::IndexMap;
use libipld::{Cid, Ipld};
use std::{collections::BTreeMap, sync::Arc};
use tokio::task::JoinSet;
use tracing::{debug, error, info};
use tracing::{debug, debug_span, error, info, info_span, instrument, Instrument};

mod poller;
mod resolver;
Expand Down Expand Up @@ -157,6 +157,7 @@ where
/// [Instruction]: homestar_invocation::task::Instruction
/// [Swarm]: crate::network::swarm
/// [LinkMap]: homestar_workflow::LinkMap
#[instrument(skip_all)]
pub(crate) async fn run<F>(self, running_tasks: Arc<RunningTaskSet>, fetch_fn: F) -> Result<()>
where
F: FnOnce(FnvHashSet<Resource>) -> BoxFuture<'a, Result<IndexMap<Resource, Vec<u8>>>>,
Expand All @@ -169,6 +170,15 @@ where
.await
{
Ok(ctx) => {
let workflow_cid = self.workflow_info.cid.to_string();

info!(
subject = "worker.init_workflow",
category = "worker.run",
workflow_cid,
"initializing workflow"
);

let promises_to_resolve = ctx.scheduler.promises_to_resolve.clone();
let resolver = DHTResolver::new(
promises_to_resolve,
Expand All @@ -181,7 +191,7 @@ where
info!(
subject = "worker.resolve_receipts",
category = "worker.run",
workflow_cid = self.workflow_info.cid.to_string(),
workflow_cid,
"resolving receipts in the background"
);
poller::poll(
Expand All @@ -196,12 +206,26 @@ where
// Set the workflow status to running.
let conn = &mut self.db.conn()?;
if ctx.scheduler.run_length() > 0 {
info!(
subject = "worker.start_workflow",
category = "worker.run",
workflow_cid,
"starting workflow"
);

Db::set_workflow_status(
self.workflow_info.cid,
workflow::Status::Running,
conn,
)?;
} else {
info!(
subject = "worker.start_workflow",
category = "worker.run",
workflow_cid,
"replaying workflow"
);

Db::set_workflow_status(
self.workflow_info.cid,
workflow::Status::Completed,
Expand All @@ -223,6 +247,7 @@ where
}

#[allow(unused_mut)]
#[instrument(skip_all)]
async fn run_queue(
mut self,
mut scheduler: TaskScheduler<'a>,
Expand Down Expand Up @@ -321,17 +346,19 @@ where
category = "worker.run",
workflow_cid = workflow_cid.to_string(),
cid = cid.to_string(),
"attempting to resolve cid in workflow"
"attempting to resolve workflow args by cid"
);

cid.resolve(linkmap.clone(), resources.clone(), db.clone())
.boxed()
});

let handle = task_set.spawn(async move {
match resolved.await {
match resolved.await {
Ok(inst_result) => {
match wasm_ctx.run(wasm, &fun, inst_result).await {
match wasm_ctx.run(wasm, &fun, inst_result).instrument({
debug_span!("wasm_run").or_current()
}).await {
Ok(output) => Ok((
output,
instruction_ptr,
Expand All @@ -352,7 +379,11 @@ where
})
}
}
});
}
.instrument({
info_span!("spawn_workflow_tasks").or_current()
}));

handles.push(handle);
}
None => error!(
Expand Down Expand Up @@ -428,6 +459,13 @@ where
"committed to database"
);

info!(
subject = "worker.receipt",
category = "worker.run",
receipt_cid = stored_receipt.cid().to_string(),
"computed receipt"
);

let _ = self
.event_sender
.send_async(Event::CapturedReceipt(Captured::with(
Expand All @@ -442,6 +480,14 @@ where
// Set the workflow status to `completed`
let conn = &mut self.db.conn()?;
Db::set_workflow_status(self.workflow_info.cid, workflow::Status::Completed, conn)?;

info!(
subject = "worker.end_workflow",
category = "worker.run",
workflow_cid = self.workflow_info.cid.to_string(),
"workflow completed"
);

Ok(())
}
}
Expand Down
3 changes: 2 additions & 1 deletion homestar-runtime/src/worker/resolver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use tokio::{
sync::RwLock,
time::{timeout_at, Instant},
};
use tracing::debug;
use tracing::{debug, instrument};

pub(crate) trait Resolver {
async fn resolve(
Expand All @@ -35,6 +35,7 @@ pub(crate) trait Resolver {
}

impl Resolver for Cid {
#[instrument(level = "debug", name = "cid_resolve", skip_all)]
async fn resolve(
self,
linkmap: Arc<RwLock<LinkMap<task::Result<Arg>>>>,
Expand Down
2 changes: 2 additions & 0 deletions homestar-wasm/src/wasmtime/host/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::wasmtime::{
};
use async_trait::async_trait;
use std::time::Instant;
use tracing::instrument;

#[async_trait]
impl helpers::Host for State {
Expand All @@ -30,6 +31,7 @@ impl helpers::Host for State {
#[async_trait]
impl wasi::logging::logging::Host for State {
/// Log a message, formatted by the runtime subscriber.
#[instrument(name = "wasi_log", skip_all)]
async fn log(
&mut self,
level: wasi::logging::logging::Level,
Expand Down
6 changes: 5 additions & 1 deletion homestar-wasm/src/wasmtime/world.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use homestar_invocation::{
task::instruction::{Args, Input},
};
use std::{iter, time::Instant};
use tracing::{instrument, Instrument};
use wasmtime::{
component::{self, Component, Func, Instance, Linker},
Config, Engine, Store,
Expand Down Expand Up @@ -145,6 +146,7 @@ impl<T> Env<T> {
/// Types must conform to [Wit] IDL types when Wasm was compiled/generated.
///
/// [Wit]: <https://github.com/WebAssembly/component-model/blob/main/design/mvp/WIT.md>
#[instrument(skip_all)]
pub async fn execute(&mut self, args: Args<Arg>) -> Result<Output, Error>
where
T: Send,
Expand Down Expand Up @@ -196,13 +198,15 @@ impl<T> Env<T> {
.ok_or(Error::WasmInstantiation)?
.func()
.call_async(&mut self.store, &params, &mut results_alloc)
.in_current_span()
.await?;

self.bindings
.as_mut()
.ok_or(Error::WasmInstantiation)?
.func()
.post_return_async(&mut self.store)
.in_current_span()
.await?;

let results = match &results_alloc[..] {
Expand Down Expand Up @@ -415,7 +419,7 @@ fn component_from_bytes(bytes: &[u8], engine: Engine) -> Result<Component, Error
if is_component(chunk) {
Component::from_binary(&engine, bytes).map_err(Error::IntoWasmComponent)
} else {
tracing::info!("Converting Wasm binary into a Wasm component");
tracing::info!("converting Wasm binary into a Wasm component");

let component = ComponentEncoder::default()
.module(bytes)?
Expand Down
Loading