diff --git a/.envrc b/.envrc index 29dc2412..c36e651a 100644 --- a/.envrc +++ b/.envrc @@ -1,4 +1,5 @@ use_flake -export RUST_LOG=homestar_runtime=debug,atuin_client=warn,libp2p=info +export RUST_LOG=homestar_runtime=debug,atuin_client=warn,libp2p=info,libp2p_gossipsub::behaviour=debug export RUST_BACKTRACE=full +export RUSTFLAGS="--cfg tokio_unstable" diff --git a/Cargo.lock b/Cargo.lock index b0eba5f7..cf669b37 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2384,6 +2384,7 @@ name = "homestar-core" version = "0.1.0" dependencies = [ "anyhow", + "byte-unit", "criterion", "diesel", "enum-as-inner 0.6.0", @@ -2451,10 +2452,12 @@ dependencies = [ "reqwest", "semver", "serde", + "serde_ipld_dagcbor", "thiserror", "tokio", "tokio-tungstenite 0.19.0", "tracing", + "tracing-appender", "tracing-logfmt", "tracing-subscriber", "tryhard", @@ -2466,6 +2469,7 @@ name = "homestar-wasm" version = "0.1.0" dependencies = [ "anyhow", + "async-trait", "atomic_refcell", "criterion", "enum-as-inner 0.6.0", @@ -2474,7 +2478,6 @@ dependencies = [ "image", "itertools", "libipld", - "libipld-core", "rust_decimal", "serde_ipld_dagcbor", "stacker", @@ -6170,6 +6173,17 @@ dependencies = [ "tracing-core", ] +[[package]] +name = "tracing-appender" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09d48f71a791638519505cefafe162606f706c25592e4bde4d97600c0195312e" +dependencies = [ + "crossbeam-channel", + "time", + "tracing-subscriber", +] + [[package]] name = "tracing-attributes" version = "0.1.24" diff --git a/Cargo.toml b/Cargo.toml index 655d54f3..bf37e424 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,10 +18,14 @@ rust-version = "1.66.0" [workspace.dependencies] anyhow = { version = "1.0", features = ["backtrace"] } +async-trait = "0.1" +byte-unit = { version = "4.0", default-features = false } enum-assoc = " 1.1" enum-as-inner = "0.6" +libipld = { version = "0.16", features = ["serde-codec"] } +serde_ipld_dagcbor = "0.3" thiserror = "1.0" -tokio = { version = "1.26", features = ["fs", "io-util", "io-std", "macros", "rt", "rt-multi-thread"] } +tokio = { version = "1.26", features = ["fs", "io-util", "io-std", "macros", "rt", "rt-multi-thread", "tracing"] } tracing = "0.1" # Speedup build on macOS diff --git a/flake.nix b/flake.nix index c5e464c2..b1f69717 100644 --- a/flake.nix +++ b/flake.nix @@ -50,6 +50,7 @@ cargo clippy cargo build --release nx-test + nx-test-0 ''; db = pkgs.writeScriptBin "db" '' diff --git a/homestar-core/Cargo.toml b/homestar-core/Cargo.toml index 8f61e0a4..4230cb67 100644 --- a/homestar-core/Cargo.toml +++ b/homestar-core/Cargo.toml @@ -22,12 +22,13 @@ doctest = true # return to version.workspace = true after the following issue is fixed: # https://github.com/DevinR528/cargo-sort/issues/47 anyhow = { workspace = true } +byte-unit = { workspace = true } diesel = { version = "2.0", features = ["sqlite"] } enum-as-inner = { workspace = true } enum-assoc = { workspace = true } generic-array = { version = "0.14", features = ["serde"] } indexmap = "1.9" -libipld = { version = "0.16", features = ["serde-codec"] } +libipld = { workspace = true } libsqlite3-sys = { version = "0.26", features = ["bundled"] } proptest = { version = "1.2", optional = true } serde = { version = "1.0", features = ["derive"] } diff --git a/homestar-core/src/consts.rs b/homestar-core/src/consts.rs index ad5c9194..c21d407d 100644 --- a/homestar-core/src/consts.rs +++ b/homestar-core/src/consts.rs @@ -4,3 +4,5 @@ pub const INVOCATION_VERSION: &str = "0.2.0"; /// DagCbor codec. pub const DAG_CBOR: u64 = 0x71; +/// 4GiB maximum memory for Wasm. +pub const WASM_MAX_MEMORY: u64 = byte_unit::n_gib_bytes!(4); diff --git a/homestar-core/src/workflow/config.rs b/homestar-core/src/workflow/config.rs index f9e12a06..32519b0d 100644 --- a/homestar-core/src/workflow/config.rs +++ b/homestar-core/src/workflow/config.rs @@ -4,18 +4,20 @@ //! [workflow]: super //! [Invocations]: super::Invocation -use crate::{workflow, Unit}; +use crate::{consts, workflow, Unit}; use libipld::{serde::from_ipld, Ipld}; use serde::{Deserialize, Serialize}; use std::{collections::BTreeMap, default::Default, time::Duration}; const FUEL_KEY: &str = "fuel"; +const MEMORY_KEY: &str = "memory"; const TIMEOUT_KEY: &str = "time"; /// Resource configuration for defining fuel quota, timeout, etc. #[derive(Clone, Debug, PartialEq, Eq, Deserialize, Serialize)] pub struct Resources { fuel: Option, + memory: Option, time: Option, } @@ -23,6 +25,7 @@ impl Default for Resources { fn default() -> Self { Self { fuel: Some(u64::MAX), + memory: Some(consts::WASM_MAX_MEMORY), time: Some(Duration::from_millis(100_000)), } } @@ -30,9 +33,10 @@ impl Default for Resources { impl Resources { /// Create new [Resources] configuration. - pub fn new(fuel: u64, time: Duration) -> Self { + pub fn new(fuel: u64, memory: u64, time: Duration) -> Self { Self { fuel: Some(fuel), + memory: Some(memory), time: Some(time), } } @@ -56,6 +60,16 @@ impl Resources { pub fn set_time(&mut self, time: Duration) { self.time = Some(time) } + + /// Get max memory. + pub fn memory(&self) -> Option { + self.memory + } + + /// Set max memory. + pub fn set_memory(&mut self, memory: u64) { + self.memory = Some(memory) + } } impl From for Ipld { @@ -65,6 +79,10 @@ impl From for Ipld { FUEL_KEY.into(), resources.fuel().map(Ipld::from).unwrap_or(Ipld::Null), ), + ( + MEMORY_KEY.into(), + resources.memory().map(Ipld::from).unwrap_or(Ipld::Null), + ), ( TIMEOUT_KEY.into(), resources @@ -95,6 +113,11 @@ impl TryFrom for Resources { ipld => from_ipld(ipld.to_owned()).ok(), }); + let memory = map.get(MEMORY_KEY).and_then(|ipld| match ipld { + Ipld::Null => None, + ipld => from_ipld(ipld.to_owned()).ok(), + }); + let time = map.get(TIMEOUT_KEY).and_then(|ipld| match ipld { Ipld::Null => None, ipld => { @@ -103,7 +126,7 @@ impl TryFrom for Resources { } }); - Ok(Resources { fuel, time }) + Ok(Resources { fuel, memory, time }) } } @@ -121,6 +144,10 @@ mod test { ipld, Ipld::Map(BTreeMap::from([ (FUEL_KEY.into(), Ipld::Integer(u64::MAX.into())), + ( + MEMORY_KEY.into(), + Ipld::Integer(consts::WASM_MAX_MEMORY.into()) + ), (TIMEOUT_KEY.into(), Ipld::Integer(100_000)) ])) ); diff --git a/homestar-core/src/workflow/receipt.rs b/homestar-core/src/workflow/receipt.rs index e059e42e..48a02cba 100644 --- a/homestar-core/src/workflow/receipt.rs +++ b/homestar-core/src/workflow/receipt.rs @@ -8,6 +8,8 @@ use crate::{ use libipld::{self, cbor::DagCborCodec, prelude::Codec, serde::from_ipld, Ipld}; use std::collections::BTreeMap; +pub mod metadata; + const RAN_KEY: &str = "ran"; const OUT_KEY: &str = "out"; const ISSUER_KEY: &str = "iss"; diff --git a/homestar-core/src/workflow/receipt/metadata.rs b/homestar-core/src/workflow/receipt/metadata.rs new file mode 100644 index 00000000..ff72d9ae --- /dev/null +++ b/homestar-core/src/workflow/receipt/metadata.rs @@ -0,0 +1,9 @@ +//! Metadata related to [Receipt]s. + +/// Metadata key for an operation or function name. +pub const OP_KEY: &str = "op"; + +/// Metadata key for a workflow [Cid]. +/// +/// [Cid]: libipld::Cid +pub const WORKFLOW_KEY: &str = "workflow"; diff --git a/homestar-core/src/workflow/task.rs b/homestar-core/src/workflow/task.rs index d7fa1a56..dfa30370 100644 --- a/homestar-core/src/workflow/task.rs +++ b/homestar-core/src/workflow/task.rs @@ -172,7 +172,7 @@ where #[cfg(test)] mod test { use super::*; - use crate::{test_utils, workflow::config::Resources, Unit}; + use crate::{consts::WASM_MAX_MEMORY, test_utils, workflow::config::Resources, Unit}; #[test] fn ipld_roundtrip() { @@ -207,6 +207,7 @@ mod test { METADATA_KEY.into(), Ipld::Map(BTreeMap::from([ ("fuel".into(), Ipld::Integer(u64::MAX.into())), + ("memory".into(), Ipld::Integer(WASM_MAX_MEMORY.into())), ("time".into(), Ipld::Integer(100_000)) ])) ), @@ -236,6 +237,7 @@ mod test { METADATA_KEY.into(), Ipld::Map(BTreeMap::from([ ("fuel".into(), Ipld::Integer(u64::MAX.into())), + ("memory".into(), Ipld::Integer(WASM_MAX_MEMORY.into())), ("time".into(), Ipld::Integer(100_000)) ])) ), diff --git a/homestar-runtime/Cargo.toml b/homestar-runtime/Cargo.toml index a3b4cf59..01982b0d 100644 --- a/homestar-runtime/Cargo.toml +++ b/homestar-runtime/Cargo.toml @@ -32,7 +32,7 @@ ansi_term = { version = "0.12", optional = true, default-features = false } anyhow = { workspace = true } async-trait = "0.1" axum = { version = "0.6", features = ["ws", "headers"] } -byte-unit = { version = "4.0", default-features = false } +byte-unit = { workspace = true } clap = { version = "4.3", features = ["derive", "color", "help"] } concat-in-place = "1.1" config = "0.13" @@ -52,18 +52,20 @@ indexmap = "1.9" ipfs-api = { version = "0.17", optional = true } ipfs-api-backend-hyper = { version = "0.6", features = ["with-builder", "with-send-sync"], optional = true } itertools = "0.10" -libipld = "0.16" -libp2p = { version = "0.51", features = ["kad", "request-response", "macros", "identify", "mdns", "gossipsub", "tokio", "dns", "mplex", "tcp", "noise", "yamux", "websocket"] } +libipld = { workspace = true } +libp2p = { version = "0.51", default-features = false, features = ["kad", "request-response", "macros", "identify", "mdns", "gossipsub", "tokio", "dns", "mplex", "tcp", "noise", "yamux", "websocket"] } libsqlite3-sys = { version = "0.26", features = ["bundled"] } openssl = { version = "0.10", features = ["vendored"] } proptest = { version = "1.2", optional = true } reqwest = { version = "0.11", features = ["json"] } semver = "1.0" serde = { version = "1.0", features = ["derive"] } +serde_ipld_dagcbor = { workspace = true } thiserror = "1.0" tokio = { workspace = true } tracing = { workspace = true } -tracing-logfmt = { version = "0.3", optional = true } +tracing-appender = "0.2" +tracing-logfmt = "0.3" tracing-subscriber = { version = "0.3", features = ["env-filter", "parking_lot", "registry"] } tryhard = "0.5" url = "2.4" @@ -76,11 +78,10 @@ json = "0.12" tokio-tungstenite = "0.19" [features] -default = ["console", "ipfs", "logfmt"] +default = ["ipfs"] ansi-logs = ["ansi_term"] console = ["console-subscriber"] ipfs = ["ipfs-api", "ipfs-api-backend-hyper"] -logfmt = ["tracing-logfmt"] test_utils = ["proptest"] [package.metadata.docs.rs] diff --git a/homestar-runtime/src/db.rs b/homestar-runtime/src/db.rs index 25d1250c..cb3a79ed 100644 --- a/homestar-runtime/src/db.rs +++ b/homestar-runtime/src/db.rs @@ -104,7 +104,7 @@ pub trait Database { /// work across vecs/arrays. /// /// [Instruction]: homestar_core::workflow::Instruction - fn find_instructions(pointers: Vec, conn: &mut Connection) -> Result> { + fn find_instructions(pointers: &Vec, conn: &mut Connection) -> Result> { let found_receipts = schema::receipts::dsl::receipts .filter(schema::receipts::instruction.eq_any(pointers)) .load(conn)?; diff --git a/homestar-runtime/src/lib.rs b/homestar-runtime/src/lib.rs index 9a9bb6fd..d054bb31 100644 --- a/homestar-runtime/src/lib.rs +++ b/homestar-runtime/src/lib.rs @@ -32,7 +32,7 @@ pub mod workflow; pub use db::Db; #[cfg(feature = "ipfs")] pub use network::ipfs::IpfsCli; -pub use receipt::Receipt; +pub use receipt::{Receipt, RECEIPT_TAG, VERSION_KEY}; pub use runtime::*; pub use settings::Settings; pub use worker::Worker; diff --git a/homestar-runtime/src/logger.rs b/homestar-runtime/src/logger.rs index 9c4501e2..9c63aad0 100644 --- a/homestar-runtime/src/logger.rs +++ b/homestar-runtime/src/logger.rs @@ -1,50 +1,54 @@ //! Logger initialization. use anyhow::Result; -#[cfg(not(feature = "logfmt"))] -use tracing_subscriber::prelude::*; -#[cfg(feature = "logfmt")] use tracing_subscriber::{layer::SubscriberExt as _, prelude::*, EnvFilter}; +const DIRECTIVE_EXPECT: &str = "Invalid tracing directive"; + /// Initialize a [tracing_subscriber::Registry] with a [logfmt] layer. /// /// [logfmt]: -#[cfg(feature = "logfmt")] -pub fn init() -> Result<()> { - let registry = tracing_subscriber::Registry::default() - .with(EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info"))) - .with(tracing_logfmt::layer()); +pub fn init(writer: tracing_appender::non_blocking::NonBlocking) -> Result<()> { + let format_layer = tracing_subscriber::fmt::layer() + .event_format(tracing_logfmt::EventsFormatter::default()) + .fmt_fields(tracing_logfmt::FieldsFormatter::default()) + .with_writer(writer); #[cfg(all(feature = "console", tokio_unstable))] - #[cfg_attr(docsrs, doc(cfg(feature = "console")))] - { - let console_layer = console_subscriber::ConsoleLayer::builder() - .retention(Duration::from_secs(60)) - .spawn(); - - registry.with(console_layer).init(); - } + let filter = EnvFilter::try_from_default_env() + .unwrap_or_else(|_| { + EnvFilter::new("info") + .add_directive("atuin_client=warn".parse().expect(DIRECTIVE_EXPECT)) + .add_directive("libp2p=info".parse().expect(DIRECTIVE_EXPECT)) + .add_directive( + "libp2p_gossipsub::behaviour=debug" + .parse() + .expect(DIRECTIVE_EXPECT), + ) + }) + .add_directive("tokio=trace".parse().expect(DIRECTIVE_EXPECT)) + .add_directive("runtime=trace".parse().expect(DIRECTIVE_EXPECT)); #[cfg(any(not(feature = "console"), not(tokio_unstable)))] - { - registry.init(); - } + let filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| { + EnvFilter::new("info") + .add_directive("atuin_client=warn".parse().expect(DIRECTIVE_EXPECT)) + .add_directive("libp2p=info".parse().expect(DIRECTIVE_EXPECT)) + .add_directive( + "libp2p_gossipsub::behaviour=debug" + .parse() + .expect(DIRECTIVE_EXPECT), + ) + }); - Ok(()) -} - -/// Initialize a default [tracing_subscriber::FmtSubscriber]. -#[cfg(not(feature = "logfmt"))] -pub fn init() -> Result<()> { - let registry = tracing_subscriber::FmtSubscriber::builder() - .with_target(false) - .finish(); + let registry = tracing_subscriber::Registry::default() + .with(filter) + .with(format_layer); #[cfg(all(feature = "console", tokio_unstable))] - #[cfg_attr(docsrs, doc(cfg(feature = "console")))] { let console_layer = console_subscriber::ConsoleLayer::builder() - .retention(Duration::from_secs(60)) + .retention(std::time::Duration::from_secs(60)) .spawn(); registry.with(console_layer).init(); diff --git a/homestar-runtime/src/main.rs b/homestar-runtime/src/main.rs index bb5637e9..90d22412 100644 --- a/homestar-runtime/src/main.rs +++ b/homestar-runtime/src/main.rs @@ -6,19 +6,15 @@ use homestar_runtime::{ cli::{Args, Argument}, db::{Database, Db}, logger, - network::{ - eventloop::{EventLoop, RECEIPTS_TOPIC}, - swarm, - ws::WebSocket, - }, + network::{eventloop::EventLoop, swarm, ws::WebSocket}, Settings, }; use std::sync::Arc; -/// TODO: All #[tokio::main(flavor = "multi_thread")] async fn main() -> Result<()> { - logger::init()?; + let (stdout_writer, _stdout_guard) = tracing_appender::non_blocking(std::io::stdout()); + logger::init(stdout_writer)?; let opts = Args::parse(); @@ -34,10 +30,7 @@ async fn main() -> Result<()> { }?; let db = Db::setup_connection_pool(settings.node())?; - let mut swarm = swarm::new(settings.node()).await?; - - // subscribe to `receipts` topic - swarm.behaviour_mut().gossip_subscribe(RECEIPTS_TOPIC)?; + let swarm = swarm::new(settings.node()).await?; let (_tx, rx) = EventLoop::setup_channel(settings.node()); // instantiate and start event-loop for events @@ -53,7 +46,8 @@ async fn main() -> Result<()> { let ws_sender = Arc::new(ws_tx); let ws_receiver = Arc::new(ws_rx); WebSocket::start_server(ws_sender, ws_receiver, settings.node()).await?; - Ok(()) } } + + Ok(()) } diff --git a/homestar-runtime/src/network/eventloop.rs b/homestar-runtime/src/network/eventloop.rs index 17327d90..1fe235f4 100644 --- a/homestar-runtime/src/network/eventloop.rs +++ b/homestar-runtime/src/network/eventloop.rs @@ -6,16 +6,18 @@ use crate::IpfsCli; use crate::{ db::{Connection, Database, Db}, network::swarm::{ComposedBehaviour, ComposedEvent, TopicMessage}, - settings, workflow, Receipt, + receipt::{RECEIPT_TAG, VERSION_KEY}, + settings, workflow, + workflow::WORKFLOW_TAG, + Receipt, }; use anyhow::{anyhow, Result}; -use concat_in_place::veccat; use crossbeam::channel; use homestar_core::{ consts, workflow::{Pointer, Receipt as InvocationReceipt}, }; -use libipld::Cid; +use libipld::{Cid, Ipld}; use libp2p::{ futures::StreamExt, gossipsub, @@ -35,9 +37,6 @@ use tokio::sync::mpsc; /// [Receipt]: homestar_core::workflow::receipt pub const RECEIPTS_TOPIC: &str = "receipts"; -const RECEIPT_CODE: &[u8; 16] = b"homestar_receipt"; -const WORKFLOW_INFO_CODE: &[u8; 17] = b"homestar_workflow"; - type WorkerSender = channel::Sender<(Cid, FoundEvent)>; /// Event loop handler for [libp2p] network events and commands. @@ -45,8 +44,9 @@ type WorkerSender = channel::Sender<(Cid, FoundEvent)>; pub struct EventLoop { receiver: mpsc::Receiver, receipt_quorum: usize, - worker_senders: HashMap, + workflow_quorum: usize, swarm: Swarm, + worker_senders: HashMap, } impl EventLoop { @@ -67,6 +67,7 @@ impl EventLoop { Self { receiver, receipt_quorum: settings.network.receipt_quorum, + workflow_quorum: settings.network.workflow_quorum, worker_senders: HashMap::new(), swarm, } @@ -154,7 +155,7 @@ impl EventLoop { }); } Err(err) => { - tracing::error!(error=?err, "error putting record on DHT with quorum {}", self.receipt_quorum) + tracing::error!(error=?err, "error putting record(s) on DHT with quorum {}", self.receipt_quorum) } } } else { @@ -182,23 +183,26 @@ impl EventLoop { Err(err) => tracing::error!(error=?err, "message not published on {RECEIPTS_TOPIC} for receipt with cid: {receipt_cid}") } - let quorum = if self.receipt_quorum > 0 { + let receipt_quorum = if self.receipt_quorum > 0 { + unsafe { Quorum::N(NonZeroUsize::new_unchecked(self.receipt_quorum)) } + } else { + Quorum::One + }; + + let workflow_quorum = if self.workflow_quorum > 0 { unsafe { Quorum::N(NonZeroUsize::new_unchecked(self.receipt_quorum)) } } else { Quorum::One }; - if let Ok(receipt_bytes) = Vec::try_from(invocation_receipt) { - let ref_bytes = &receipt_bytes; - let receipt_value = - veccat!(consts::INVOCATION_VERSION.as_bytes() RECEIPT_CODE ref_bytes); + if let Ok(receipt_bytes) = Receipt::invocation_capsule(invocation_receipt) { let _id = self .swarm .behaviour_mut() .kademlia .put_record( - Record::new(instruction_bytes, receipt_value.to_vec()), - quorum, + Record::new(instruction_bytes, receipt_bytes.to_vec()), + receipt_quorum, ) .map_err(anyhow::Error::msg)?; @@ -207,17 +211,16 @@ impl EventLoop { workflow_info.increment_progress(receipt_cid); let wf_cid_bytes = workflow_info.cid_as_bytes(); - let wf_bytes = &Vec::try_from(workflow_info)?; - let wf_value = veccat!(WORKFLOW_INFO_CODE wf_bytes); + let wf_bytes = workflow_info.capsule()?; let _id = self .swarm .behaviour_mut() .kademlia - .put_record(Record::new(wf_cid_bytes, wf_value), quorum) + .put_record(Record::new(wf_cid_bytes, wf_bytes), workflow_quorum) .map_err(anyhow::Error::msg)?; - Ok((receipt_cid, receipt_bytes)) + Ok((receipt_cid, receipt_bytes.to_vec())) } else { Err(anyhow!("cannot convert receipt {receipt_cid} to bytes")) } @@ -242,26 +245,37 @@ impl EventLoop { } fn on_found_record(key_cid: Cid, value: Vec) -> Result { - match value { - value - if value - .starts_with(&veccat!(consts::INVOCATION_VERSION.as_bytes() RECEIPT_CODE)) => - { - let receipt_bytes = - &value[consts::INVOCATION_VERSION.as_bytes().len() + RECEIPT_CODE.len()..]; - let invocation_receipt = InvocationReceipt::try_from(receipt_bytes.to_vec())?; - let receipt = Receipt::try_with(Pointer::new(key_cid), &invocation_receipt)?; - Ok(FoundEvent::Receipt(receipt)) - } - value if value.starts_with(WORKFLOW_INFO_CODE) => { - let workflow_info_bytes = &value[WORKFLOW_INFO_CODE.len()..]; - let workflow_info = workflow::Info::try_from(workflow_info_bytes.to_vec())?; - Ok(FoundEvent::Workflow(workflow_info)) - } - _ => Err(anyhow!( - "record version mismatch, current version: {}", - consts::INVOCATION_VERSION + match serde_ipld_dagcbor::de::from_reader(&*value) { + Ok(Ipld::Map(mut map)) => match map.pop_first() { + Some((code, Ipld::Map(mut rest))) if code == RECEIPT_TAG => { + if rest.remove(VERSION_KEY) + == Some(Ipld::String(consts::INVOCATION_VERSION.to_string())) + { + let invocation_receipt = InvocationReceipt::try_from(Ipld::Map(rest))?; + let receipt = + Receipt::try_with(Pointer::new(key_cid), &invocation_receipt)?; + Ok(FoundEvent::Receipt(receipt)) + } else { + Err(anyhow!( + "record version mismatch, current version: {}", + consts::INVOCATION_VERSION + )) + } + } + Some((code, Ipld::Map(rest))) if code == WORKFLOW_TAG => { + let workflow_info = workflow::Info::try_from(Ipld::Map(rest))?; + Ok(FoundEvent::Workflow(workflow_info)) + } + Some((code, _)) => Err(anyhow!("decode mismatch: {code} is not known")), + None => Err(anyhow!("invalid record value")), + }, + Ok(ipld) => Err(anyhow!( + "decode mismatch: expected an Ipld map, got {ipld:#?}", )), + Err(err) => { + tracing::error!(error=?err, "error deserializing record value"); + Err(anyhow!("error deserializing record value")) + } } } @@ -343,7 +357,7 @@ impl EventLoop { tracing::error!("error converting key {key:#?} to cid") } } - Err(err) => tracing::error!(err=?err, "error retrieving receipt"), + Err(err) => tracing::error!(err=?err, "error retrieving record"), } } } @@ -365,7 +379,6 @@ impl EventLoop { } _ => {} }, - SwarmEvent::Behaviour(ComposedEvent::Kademlia(_)) => {} SwarmEvent::Behaviour(ComposedEvent::Mdns(mdns::Event::Discovered(list))) => { for (peer_id, _multiaddr) in list { tracing::info!("mDNS discovered a new peer: {peer_id}"); @@ -439,10 +452,8 @@ mod test { fn found_receipt_record() { let (invocation_receipt, receipt) = test_utils::receipt::receipts(); let instruction_bytes = receipt.instruction_cid_as_bytes(); - let bytes = Vec::try_from(invocation_receipt).unwrap(); - let ref_bytes = &bytes; - let value = veccat!(consts::INVOCATION_VERSION.as_bytes() RECEIPT_CODE ref_bytes); - let record = Record::new(instruction_bytes, value.to_vec()); + let bytes = Receipt::invocation_capsule(invocation_receipt).unwrap(); + let record = Record::new(instruction_bytes, bytes); let record_value = record.value; if let FoundEvent::Receipt(found_receipt) = EventLoop::on_found_record(Cid::try_from(receipt.instruction()).unwrap(), record_value) @@ -474,10 +485,8 @@ mod test { let workflow_info = workflow::Info::default(workflow.clone().to_cid().unwrap(), workflow.len()); let workflow_cid_bytes = workflow_info.cid_as_bytes(); - let bytes = Vec::try_from(workflow_info.clone()).unwrap(); - let ref_bytes = &bytes; - let value = veccat!(WORKFLOW_INFO_CODE ref_bytes); - let record = Record::new(workflow_cid_bytes, value.to_vec()); + let bytes = workflow_info.capsule().unwrap(); + let record = Record::new(workflow_cid_bytes, bytes); let record_value = record.value; if let FoundEvent::Workflow(found_workflow) = EventLoop::on_found_record(workflow.to_cid().unwrap(), record_value).unwrap() diff --git a/homestar-runtime/src/network/mod.rs b/homestar-runtime/src/network/mod.rs index 27d0bbb4..f01a0e2c 100644 --- a/homestar-runtime/src/network/mod.rs +++ b/homestar-runtime/src/network/mod.rs @@ -10,3 +10,5 @@ pub mod ipfs; pub mod pubsub; pub mod swarm; pub mod ws; + +pub use eventloop::EventLoop; diff --git a/homestar-runtime/src/network/pubsub.rs b/homestar-runtime/src/network/pubsub.rs index 876334aa..1b898731 100644 --- a/homestar-runtime/src/network/pubsub.rs +++ b/homestar-runtime/src/network/pubsub.rs @@ -24,6 +24,9 @@ pub fn new(keypair: Keypair, settings: &settings::Node) -> Result Result Result> { @@ -21,12 +24,11 @@ pub async fn new(settings: &settings::Node) -> Result> let transport = tcp::tokio::Transport::new(tcp::Config::default().nodelay(true)) .upgrade(upgrade::Version::V1Lazy) - .authenticate( - noise::Config::new(&keypair).expect("Signing libp2p-noise static DH keypair failed"), - ) + .authenticate(noise::Config::new(&keypair)?) .multiplex(yamux::Config::default()) - // TODO: configure - //.timeout(Duration::from_secs(5)) + .timeout(Duration::from_secs( + settings.network.transport_connection_timeout_secs, + )) .boxed(); let mut swarm = SwarmBuilder::with_tokio_executor( @@ -43,6 +45,9 @@ pub async fn new(settings: &settings::Node) -> Result> // Listen-on given address swarm.listen_on(settings.network.listen_address.to_string().parse()?)?; + // subscribe to `receipts` topic + swarm.behaviour_mut().gossip_subscribe(RECEIPTS_TOPIC)?; + Ok(swarm) } diff --git a/homestar-runtime/src/receipt.rs b/homestar-runtime/src/receipt.rs index c72062db..16729db3 100644 --- a/homestar-runtime/src/receipt.rs +++ b/homestar-runtime/src/receipt.rs @@ -21,6 +21,11 @@ use semver::Version; use serde::{Deserialize, Serialize}; use std::{collections::BTreeMap, fmt}; +/// General version key for receipts. +pub const VERSION_KEY: &str = "version"; +/// [Receipt] header tag, for sharing over libp2p. +pub const RECEIPT_TAG: &str = "ipvm/receipt"; + const CID_KEY: &str = "cid"; const INSTRUCTION_KEY: &str = "instruction"; const RAN_KEY: &str = "ran"; @@ -28,7 +33,6 @@ const OUT_KEY: &str = "out"; const ISSUER_KEY: &str = "iss"; const METADATA_KEY: &str = "meta"; const PROOF_KEY: &str = "prf"; -const VERSION_KEY: &str = "version"; /// Receipt for [Invocation], including it's own [Cid] and a [Cid] for an [Instruction]. /// @@ -92,6 +96,27 @@ impl Receipt { Ok(Receipt::new(cid, instruction, invocation_receipt)) } + /// Capsule-wrapper for [InvocationReceipt] to to be shared over libp2p as + /// [DagCbor] encoded bytes. + /// + /// [DagCbor]: DagCborCodec + pub fn invocation_capsule( + invocation_receipt: InvocationReceipt, + ) -> anyhow::Result> { + let receipt_ipld = Ipld::from(&invocation_receipt); + let capsule = if let Ipld::Map(mut map) = receipt_ipld { + map.insert(VERSION_KEY.into(), consts::INVOCATION_VERSION.into()); + Ok(Ipld::Map(BTreeMap::from([( + RECEIPT_TAG.into(), + Ipld::Map(map), + )]))) + } else { + Err(anyhow!("receipt to Ipld conversion is not a map")) + }?; + + DagCborCodec.encode(&capsule) + } + /// Get [Ipld] metadata on a [Receipt]. pub fn meta(&self) -> &Ipld { self.meta.inner() diff --git a/homestar-runtime/src/scheduler.rs b/homestar-runtime/src/scheduler.rs index c0a2077c..fdd66d04 100644 --- a/homestar-runtime/src/scheduler.rs +++ b/homestar-runtime/src/scheduler.rs @@ -5,10 +5,12 @@ use crate::{ db::{Connection, Database}, - workflow::{Builder, Resource, Vertex}, + network::eventloop::{Event, FoundEvent}, + workflow::{self, Builder, Resource, Vertex}, Db, }; use anyhow::Result; +use crossbeam::channel; use dagga::Node; use futures::future::BoxFuture; use homestar_core::{ @@ -18,7 +20,13 @@ use homestar_core::{ use homestar_wasm::io::Arg; use indexmap::IndexMap; use libipld::Cid; -use std::{ops::ControlFlow, str::FromStr}; +use std::{ + ops::ControlFlow, + str::FromStr, + sync::Arc, + time::{Duration, Instant}, +}; +use tokio::sync::mpsc; type Schedule<'a> = Vec, usize>>>; @@ -26,7 +34,7 @@ type Schedule<'a> = Vec, usize>>>; /// resources. /// /// [instruction]: homestar_core::workflow::Instruction -#[allow(missing_debug_implementations)] +#[derive(Debug, Clone, Default)] pub struct ExecutionGraph<'a> { /// A built-up [Dag] [Schedule] of batches. /// @@ -42,7 +50,7 @@ pub struct ExecutionGraph<'a> { /// what's left to run, and data structures to track resources /// and what's been executed in memory. #[allow(dead_code)] -#[derive(Debug, Default)] +#[derive(Debug, Clone, Default)] pub struct TaskScheduler<'a> { /// In-memory map of task/instruction results. pub(crate) linkmap: LinkMap>, @@ -62,7 +70,8 @@ pub struct TaskScheduler<'a> { pub(crate) resume_step: Option, /// Resources that tasks within a [Workflow] rely on, retrieved - /// through IPFS Client or the DHT directly ahead-of-time. + /// through the IPFS Client, or over HTTP, or thorugh the DHT directly + /// ahead-of-time. /// /// This is transferred from the [ExecutionGraph] for actually executing the /// schedule. @@ -75,6 +84,8 @@ impl<'a> TaskScheduler<'a> { /// [Receipt]: crate::Receipt pub async fn init( workflow: Workflow<'a, Arg>, + settings: &'a workflow::Settings, + event_sender: Arc>, conn: &mut Connection, fetch_fn: F, ) -> Result> @@ -98,7 +109,8 @@ impl<'a> TaskScheduler<'a> { }); if let Ok(pointers) = folded_pointers { - match Db::find_instructions(pointers, conn) { + let pointers_len = pointers.len(); + match Db::find_instructions(&pointers, conn) { Ok(found) => { let linkmap = found.iter().fold( LinkMap::>::new(), @@ -118,7 +130,38 @@ impl<'a> TaskScheduler<'a> { ControlFlow::Continue(()) } } - _ => ControlFlow::Continue(()), + Err(_) => { + tracing::info!("receipt not available in the database"); + let (sender, receiver) = channel::bounded(pointers_len); + for ptr in &pointers { + let _ = event_sender + .blocking_send(Event::FindReceipt(ptr.cid(), sender.clone())); + } + + let mut linkmap = LinkMap::>::new(); + let mut counter = 0; + while let Ok((found_cid, FoundEvent::Receipt(found))) = receiver + .recv_deadline( + Instant::now() + + Duration::from_secs(settings.p2p_check_timeout_secs), + ) + { + if pointers.contains(&Pointer::new(found_cid)) { + if let Ok(cid) = found.instruction().try_into() { + let _ = linkmap.insert(cid, found.output_as_arg()); + counter += 1; + } + } + } + + if counter == pointers_len { + ControlFlow::Break((idx + 1, linkmap)) + } else if counter > 0 && counter < pointers_len { + ControlFlow::Break((idx, linkmap)) + } else { + ControlFlow::Continue(()) + } + } } } else { ControlFlow::Continue(()) @@ -156,7 +199,9 @@ impl<'a> TaskScheduler<'a> { #[cfg(test)] mod test { use super::*; - use crate::{db::Database, settings::Settings, test_utils, Receipt}; + use crate::{ + db::Database, network::EventLoop, settings::Settings, test_utils, workflow as wf, Receipt, + }; use futures::FutureExt; use homestar_core::{ ipld::DagCbor, @@ -184,14 +229,20 @@ mod test { UcanPrf::default(), ); + let settings = Settings::load().unwrap(); let db = test_utils::db::MemoryDb::setup_connection_pool(Settings::load().unwrap().node()) .unwrap(); let mut conn = db.conn().unwrap(); let workflow = Workflow::new(vec![task1.clone(), task2.clone()]); + let workflow_settings = wf::Settings::default(); let fetch_fn = |_rscs: Vec| { async { Ok(IndexMap::default()) } }.boxed(); - let scheduler = TaskScheduler::init(workflow, &mut conn, fetch_fn) - .await - .unwrap(); + + let (tx, mut _rx) = EventLoop::setup_channel(settings.node()); + + let scheduler = + TaskScheduler::init(workflow, &workflow_settings, tx.into(), &mut conn, fetch_fn) + .await + .unwrap(); assert!(scheduler.linkmap.is_empty()); assert!(scheduler.ran.is_none()); @@ -228,6 +279,7 @@ mod test { ) .unwrap(); + let settings = Settings::load().unwrap(); let db = test_utils::db::MemoryDb::setup_connection_pool(Settings::load().unwrap().node()) .unwrap(); let mut conn = db.conn().unwrap(); @@ -238,10 +290,16 @@ mod test { assert_eq!(receipt, stored_receipt); let workflow = Workflow::new(vec![task1.clone(), task2.clone()]); + let workflow_settings = wf::Settings::default(); let fetch_fn = |_rscs: Vec| { async { Ok(IndexMap::default()) } }.boxed(); - let scheduler = TaskScheduler::init(workflow, &mut conn, fetch_fn) - .await - .unwrap(); + + let (tx, mut _rx) = EventLoop::setup_channel(settings.node()); + + let scheduler = + TaskScheduler::init(workflow, &workflow_settings, tx.into(), &mut conn, fetch_fn) + .await + .unwrap(); + let ran = scheduler.ran.as_ref().unwrap(); assert_eq!(scheduler.linkmap.len(), 1); @@ -297,6 +355,7 @@ mod test { ) .unwrap(); + let settings = Settings::load().unwrap(); let db = test_utils::db::MemoryDb::setup_connection_pool(Settings::load().unwrap().node()) .unwrap(); let mut conn = db.conn().unwrap(); @@ -307,10 +366,16 @@ mod test { assert_eq!(2, rows_inserted); let workflow = Workflow::new(vec![task1.clone(), task2.clone()]); + let workflow_settings = wf::Settings::default(); let fetch_fn = |_rscs: Vec| { async { Ok(IndexMap::default()) } }.boxed(); - let scheduler = TaskScheduler::init(workflow, &mut conn, fetch_fn) - .await - .unwrap(); + + let (tx, mut _rx) = EventLoop::setup_channel(settings.node()); + + let scheduler = + TaskScheduler::init(workflow, &workflow_settings, tx.into(), &mut conn, fetch_fn) + .await + .unwrap(); + let ran = scheduler.ran.as_ref().unwrap(); assert_eq!(scheduler.linkmap.len(), 1); diff --git a/homestar-runtime/src/settings.rs b/homestar-runtime/src/settings.rs index d59003af..5f3854e7 100644 --- a/homestar-runtime/src/settings.rs +++ b/homestar-runtime/src/settings.rs @@ -51,10 +51,16 @@ pub(crate) struct Network { /// [Swarm]: libp2p::swarm::Swarm #[serde(with = "http_serde::uri")] pub(crate) listen_address: Uri, + /// Pub/sub duplicate cache time. + pub(crate) pubsub_duplication_cache_secs: u64, /// Pub/sub hearbeat interval for mesh configuration. pub(crate) pubsub_heartbeat_secs: u64, + /// Pub/sub idle timeout + pub(crate) pubsub_idle_timeout_secs: u64, /// Quorum for receipt records on the DHT. pub(crate) receipt_quorum: usize, + /// Transport connection timeout. + pub(crate) transport_connection_timeout_secs: u64, /// Websocket-server host address. #[serde(with = "http_serde::uri")] pub(crate) websocket_host: Uri, @@ -63,6 +69,10 @@ pub(crate) struct Network { /// Number of *bounded* clients to send messages to, used for a /// [tokio::sync::broadcast::channel] pub(crate) websocket_capacity: usize, + /// Quorum for [workflow::Info] records on the DHT. + /// + /// [workflow::Info]: crate::workflow::Info + pub(crate) workflow_quorum: usize, } /// Database-related settings for a homestar node. @@ -79,11 +89,15 @@ impl Default for Network { Self { events_buffer_len: 100, listen_address: Uri::from_static("/ip4/0.0.0.0/tcp/0"), - pubsub_heartbeat_secs: 10, + pubsub_duplication_cache_secs: 1, + pubsub_heartbeat_secs: 60, + pubsub_idle_timeout_secs: 60 * 60 * 24, receipt_quorum: 2, + transport_connection_timeout_secs: 20, websocket_host: Uri::from_static("127.0.0.1"), websocket_port: 1337, websocket_capacity: 100, + workflow_quorum: 3, } } } diff --git a/homestar-runtime/src/worker.rs b/homestar-runtime/src/worker.rs index 6444e925..62884465 100644 --- a/homestar-runtime/src/worker.rs +++ b/homestar-runtime/src/worker.rs @@ -10,15 +10,17 @@ use crate::{ workflow::{self, Resource}, Db, Receipt, }; -use anyhow::{anyhow, bail, Result}; +use anyhow::{anyhow, Result}; use crossbeam::channel; use futures::FutureExt; #[cfg(feature = "ipfs")] use futures::StreamExt; use homestar_core::{ - ipld::DagCbor, workflow::{ - error::ResolveError, prf::UcanPrf, InstructionResult, Pointer, Receipt as InvocationReceipt, + error::ResolveError, + prf::UcanPrf, + receipt::metadata::{OP_KEY, WORKFLOW_KEY}, + InstructionResult, Pointer, Receipt as InvocationReceipt, }, Workflow, }; @@ -39,7 +41,8 @@ use tryhard::RetryFutureConfig; pub struct Worker<'a> { pub(crate) scheduler: TaskScheduler<'a>, pub(crate) event_sender: Arc>, - pub(crate) workflow_info: workflow::Info, + pub(crate) workflow_info: &'a mut workflow::Info, + pub(crate) workflow_settings: &'a workflow::Settings, } impl<'a> Worker<'a> { @@ -47,6 +50,7 @@ impl<'a> Worker<'a> { #[cfg(not(feature = "ipfs"))] pub async fn new( workflow: Workflow<'a, Arg>, + workflow_info: &'a mut workflow::Info, workflow_settings: &'a workflow::Settings, event_sender: Arc>, mut conn: Connection, @@ -55,23 +59,29 @@ impl<'a> Worker<'a> { async { Self::get_resources(rscs, workflow_settings).await }.boxed() }; - let scheduler = TaskScheduler::init(workflow.clone(), &mut conn, fetch_fn).await?; - - let workflow_info = - Worker::get_workflow_info(workflow, workflow_settings, &event_sender, &mut conn) - .await?; + let scheduler = TaskScheduler::init( + workflow.clone(), + workflow_settings, + event_sender.clone(), + &mut conn, + fetch_fn, + ) + .await?; Ok(Self { scheduler, workflow_info, event_sender, + workflow_settings, }) } /// Instantiate a new [Worker] for a [Workflow]. #[cfg(feature = "ipfs")] + #[cfg_attr(docsrs, doc(cfg(feature = "ipfs")))] pub async fn new( workflow: Workflow<'a, Arg>, + workflow_info: &'a mut workflow::Info, workflow_settings: &'a workflow::Settings, event_sender: Arc>, mut conn: Connection, @@ -81,87 +91,31 @@ impl<'a> Worker<'a> { async { Self::get_resources(rscs, workflow_settings, ipfs).await }.boxed() }; - let scheduler = TaskScheduler::init(workflow.clone(), &mut conn, fetch_fn).await?; - - let workflow_info = - Worker::get_workflow_info(workflow, workflow_settings, &event_sender, &mut conn) - .await?; + let scheduler = TaskScheduler::init( + workflow, + workflow_settings, + event_sender.clone(), + &mut conn, + fetch_fn, + ) + .await?; Ok(Self { scheduler, - workflow_info, event_sender, + workflow_info, + workflow_settings, }) } /// Run [Worker]'s tasks in task-queue with access to the [Db] object /// to use a connection from the Database pool per run. - pub async fn run(self, db: impl Database, settings: workflow::Settings) -> Result<()> { - self.run_queue(db, settings).await - } - - async fn get_workflow_info( - workflow: Workflow<'_, Arg>, - workflow_settings: &'a workflow::Settings, - event_sender: &'a Arc>, - conn: &mut Connection, - ) -> Result { - let workflow_len = workflow.len(); - let workflow_cid = workflow.to_cid()?; - - let workflow_info = match Db::join_workflow_with_receipts(workflow_cid, conn) { - Ok((wf_info, receipts)) => { - workflow::Info::new(workflow_cid, receipts, wf_info.num_tasks as u32) - } - Err(_err) => { - tracing::debug!("workflow information not available in the database"); - let (sender, receiver) = channel::bounded(1); - event_sender - .send(Event::FindWorkflow(workflow_cid, sender)) - .await?; - - match receiver.recv_deadline( - Instant::now() + Duration::from_secs(workflow_settings.p2p_timeout_secs), - ) { - Ok((found_cid, FoundEvent::Workflow(workflow_info))) - if found_cid == workflow_cid => - { - // store workflow from info - Db::store_workflow( - workflow::Stored::new( - Pointer::new(workflow_info.cid), - workflow_info.num_tasks as i32, - ), - conn, - )?; - - workflow_info - } - Ok((found_cid, event)) => { - bail!("received unexpected event {event:?} for workflow {found_cid}") - } - Err(err) => { - tracing::info!(error=?err, "error returning invocation receipt for {workflow_cid}"); - let workflow_info = workflow::Info::default(workflow_cid, workflow_len); - // store workflow from info - Db::store_workflow( - workflow::Stored::new( - Pointer::new(workflow_info.cid), - workflow_info.num_tasks as i32, - ), - conn, - )?; - - workflow_info - } - } - } - }; - - Ok(workflow_info) + pub async fn run(self, db: impl Database) -> Result<()> { + self.run_queue(db).await } #[cfg(feature = "ipfs")] + #[cfg_attr(docsrs, doc(cfg(feature = "ipfs")))] async fn get_resources( resources: Vec, settings: &'a workflow::Settings, @@ -281,7 +235,7 @@ impl<'a> Worker<'a> { Ok(IndexMap::default()) } - async fn run_queue(mut self, db: impl Database, settings: workflow::Settings) -> Result<()> { + async fn run_queue(mut self, db: impl Database) -> Result<()> { for batch in self.scheduler.run.into_iter() { let (mut set, _handles) = batch.into_iter().try_fold( (JoinSet::new(), vec![]), @@ -295,8 +249,8 @@ impl<'a> Worker<'a> { let args = parsed.into_args(); let meta = Ipld::Map(BTreeMap::from([ - ("op".into(), fun.to_string().into()), - ("workflow".into(), self.workflow_info.cid().into()) + (OP_KEY.into(), fun.to_string().into()), + (WORKFLOW_KEY.into(), self.workflow_info.cid().into()) ])); match RegisteredTasks::ability(&instruction.op().to_string()) { @@ -312,9 +266,10 @@ impl<'a> Worker<'a> { let state = State::default(); let mut wasm_ctx = WasmContext::new(state)?; let resolved = - args.resolve(|cid| match self.scheduler.linkmap.get(&cid) { - Some(result) => Ok(result.to_owned()), - None => match Db::find_instruction( + args.resolve(|cid| if let Some(result) = self.scheduler.linkmap.get(&cid) { + Ok(result.to_owned()) + } else { + match Db::find_instruction( Pointer::new(cid), &mut db.conn()?, ) { @@ -328,8 +283,9 @@ impl<'a> Worker<'a> { cid, sender, )).map_err(|err| ResolveError::TransportError(err.to_string()))?; + let found = match receiver.recv_deadline( - Instant::now() + Duration::from_secs(settings.p2p_timeout_secs), + Instant::now() + Duration::from_secs(self.workflow_settings.p2p_timeout_secs), ) { Ok((found_cid, FoundEvent::Receipt(found))) if found_cid == cid => { found @@ -347,7 +303,7 @@ impl<'a> Worker<'a> { Ok(found.output_as_arg()) } - }, + } })?; let handle = set.spawn(async move { @@ -388,7 +344,17 @@ impl<'a> Worker<'a> { receipt.output_as_arg(), ); + // set receipt metadata receipt.set_meta(meta); + // modify workflow info before progress update, in case + // that we timed out getting info from the network, but later + // recovered where we last started from. + if let Some(step) = self.scheduler.resume_step { + self.workflow_info.set_progress_count(std::cmp::max( + self.workflow_info.progress_count, + step as u32, + )) + }; let stored_receipt = Db::store_receipt(receipt, &mut db.conn()?)?; @@ -411,9 +377,7 @@ mod test { #[cfg(feature = "ipfs")] use crate::IpfsCli; - use crate::{ - db::Database, network::eventloop::EventLoop, settings::Settings, test_utils, workflow as wf, - }; + use crate::{db::Database, network::EventLoop, settings::Settings, test_utils, workflow as wf}; use homestar_core::{ ipld::DagCbor, test_utils::workflow as workflow_test_utils, @@ -440,12 +404,11 @@ mod test { let db = test_utils::db::MemoryDb::setup_connection_pool(Settings::load().unwrap().node()) .unwrap(); - let conn = db.conn().unwrap(); + let mut conn = db.conn().unwrap(); let workflow = Workflow::new(vec![task1.clone(), task2.clone()]); let workflow_cid = workflow.clone().to_cid().unwrap(); let workflow_settings = wf::Settings::default(); - let wf_settings = workflow_settings.clone(); let settings = Settings::load().unwrap(); #[cfg(feature = "ipfs")] @@ -456,14 +419,36 @@ mod test { #[cfg(feature = "ipfs")] let ipfs = IpfsCli::default(); + let mut workflow_info = wf::Info::gather( + workflow.clone(), + &workflow_settings, + &tx.clone().into(), + &mut conn, + ) + .await + .unwrap(); + #[cfg(feature = "ipfs")] - let worker = Worker::new(workflow, &wf_settings, tx.into(), conn, &ipfs) - .await - .unwrap(); + let worker = Worker::new( + workflow, + &mut workflow_info, + &workflow_settings, + tx.into(), + conn, + &ipfs, + ) + .await + .unwrap(); #[cfg(not(feature = "ipfs"))] - let worker = Worker::new(workflow, &wf_settings, tx.into(), conn) - .await - .unwrap(); + let worker = Worker::new( + workflow, + &mut workflow_info, + &workflow_settings, + tx.into(), + conn, + ) + .await + .unwrap(); assert!(worker.scheduler.linkmap.is_empty()); assert!(worker.scheduler.ran.is_none()); @@ -475,7 +460,7 @@ mod test { #[cfg(feature = "ipfs")] { let worker_workflow_cid = worker.workflow_info.cid; - worker.run(db.clone(), workflow_settings).await.unwrap(); + worker.run(db.clone()).await.unwrap(); // first time check DHT for workflow info let workflow_info_event = rx.recv().await.unwrap(); @@ -570,7 +555,6 @@ mod test { let workflow = Workflow::new(vec![task1.clone(), task2.clone()]); let workflow_cid = workflow.clone().to_cid().unwrap(); let workflow_settings = wf::Settings::default(); - let wf_settings = workflow_settings.clone(); let settings = Settings::load().unwrap(); // already have stored workflow information (from a previous run) @@ -594,14 +578,36 @@ mod test { #[cfg(feature = "ipfs")] let ipfs = IpfsCli::default(); + let mut workflow_info = wf::Info::gather( + workflow.clone(), + &workflow_settings, + &tx.clone().into(), + &mut conn, + ) + .await + .unwrap(); + #[cfg(feature = "ipfs")] - let worker = Worker::new(workflow, &wf_settings, tx.into(), conn, &ipfs) - .await - .unwrap(); + let worker = Worker::new( + workflow, + &mut workflow_info, + &workflow_settings, + tx.into(), + conn, + &ipfs, + ) + .await + .unwrap(); #[cfg(not(feature = "ipfs"))] - let worker = Worker::new(workflow, &wf_settings, tx.into(), conn) - .await - .unwrap(); + let worker = Worker::new( + workflow, + &mut workflow_info, + &workflow_settings, + tx.into(), + conn, + ) + .await + .unwrap(); assert_eq!(worker.scheduler.linkmap.len(), 1); assert!(worker @@ -616,7 +622,7 @@ mod test { #[cfg(feature = "ipfs")] { - worker.run(db.clone(), workflow_settings).await.unwrap(); + worker.run(db.clone()).await.unwrap(); // we should have received 1 receipt let next_run_receipt = rx.recv().await.unwrap(); @@ -708,7 +714,6 @@ mod test { let workflow = Workflow::new(vec![task1.clone(), task2.clone()]); let workflow_cid = workflow.clone().to_cid().unwrap(); let workflow_settings = wf::Settings::default(); - let wf_settings = workflow_settings.clone(); let settings = Settings::load().unwrap(); // already have stored workflow information (from a previous run) @@ -738,14 +743,36 @@ mod test { #[cfg(feature = "ipfs")] let ipfs = IpfsCli::default(); + let mut workflow_info = wf::Info::gather( + workflow.clone(), + &workflow_settings, + &tx.clone().into(), + &mut conn, + ) + .await + .unwrap(); + #[cfg(feature = "ipfs")] - let worker = Worker::new(workflow, &wf_settings, tx.into(), conn, &ipfs) - .await - .unwrap(); + let worker = Worker::new( + workflow, + &mut workflow_info, + &workflow_settings, + tx.into(), + conn, + &ipfs, + ) + .await + .unwrap(); #[cfg(not(feature = "ipfs"))] - let worker = Worker::new(workflow, &wf_settings, tx.into(), conn) - .await - .unwrap(); + let worker = Worker::new( + workflow, + &mut workflow_info, + &workflow_settings, + tx.into(), + conn, + ) + .await + .unwrap(); assert_eq!(worker.scheduler.linkmap.len(), 1); assert!(!worker diff --git a/homestar-runtime/src/workflow.rs b/homestar-runtime/src/workflow.rs index 428b3ed3..c5527945 100644 --- a/homestar-runtime/src/workflow.rs +++ b/homestar-runtime/src/workflow.rs @@ -22,10 +22,10 @@ use url::Url; mod info; pub(crate) mod settings; -pub use info::Info; +pub use info::{Info, WORKFLOW_TAG}; pub(crate) use info::{Stored, StoredReceipt}; #[allow(unused_imports)] -pub(crate) use settings::Settings; +pub use settings::Settings; type Dag<'a> = dagga::Dag, usize>; diff --git a/homestar-runtime/src/workflow/info.rs b/homestar-runtime/src/workflow/info.rs index ae436c7c..a13d2843 100644 --- a/homestar-runtime/src/workflow/info.rs +++ b/homestar-runtime/src/workflow/info.rs @@ -1,9 +1,25 @@ -use crate::Receipt; -use anyhow::anyhow; +use crate::{ + db::{Connection, Database}, + network::eventloop::{Event, FoundEvent}, + Db, Receipt, +}; +use anyhow::{anyhow, bail, Result}; +use crossbeam::channel; use diesel::{Associations, Identifiable, Insertable, Queryable, Selectable}; -use homestar_core::workflow::Pointer; +use homestar_core::{ipld::DagCbor, workflow::Pointer, Workflow}; +use homestar_wasm::io::Arg; use libipld::{cbor::DagCborCodec, prelude::Codec, serde::from_ipld, Cid, Ipld}; -use std::collections::BTreeMap; +use std::{ + collections::BTreeMap, + sync::Arc, + time::{Duration, Instant}, +}; +use tokio::sync::mpsc; + +/// [Workflow Info] header tag, for sharing over libp2p. +/// +/// [Workflow Info]: Info +pub const WORKFLOW_TAG: &str = "ipvm/workflow"; const CID_KEY: &str = "cid"; const PROGRESS_KEY: &str = "progress"; @@ -105,11 +121,98 @@ impl Info { self.cid().to_bytes() } + /// Set the progress / step of the [Workflow] completed, which + /// may not be the same as the `progress` vector of [Cid]s. + pub fn set_progress_count(&mut self, progress_count: u32) { + self.progress_count = progress_count; + } + /// Set the progress / step of the [Info]. pub fn increment_progress(&mut self, new_cid: Cid) { self.progress.push(new_cid); self.progress_count = self.progress.len() as u32 + 1; } + + /// Capsule-wrapper for [Info] to to be shared over libp2p as + /// [DagCbor] encoded bytes. + /// + /// [DagCbor]: DagCborCodec + pub fn capsule(&self) -> anyhow::Result> { + let info_ipld = Ipld::from(self.to_owned()); + let capsule = if let Ipld::Map(map) = info_ipld { + Ok(Ipld::Map(BTreeMap::from([( + WORKFLOW_TAG.into(), + Ipld::Map(map), + )]))) + } else { + Err(anyhow!("workflow info to Ipld conversion is not a map")) + }?; + + DagCborCodec.encode(&capsule) + } + + /// Gather available [Info] from the database or [libp2p] given a + /// [Workflow] and [workflow settings]. + /// + /// [workflow settings]: super::Settings + pub async fn gather<'a>( + workflow: Workflow<'_, Arg>, + workflow_settings: &'a super::Settings, + event_sender: &'a Arc>, + conn: &mut Connection, + ) -> Result { + let workflow_len = workflow.len(); + let workflow_cid = workflow.to_cid()?; + + let workflow_info = match Db::join_workflow_with_receipts(workflow_cid, conn) { + Ok((wf_info, receipts)) => Info::new(workflow_cid, receipts, wf_info.num_tasks as u32), + Err(_err) => { + tracing::info!("workflow information not available in the database"); + let (sender, receiver) = channel::bounded(1); + event_sender + .send(Event::FindWorkflow(workflow_cid, sender)) + .await?; + + match receiver.recv_deadline( + Instant::now() + Duration::from_secs(workflow_settings.p2p_timeout_secs), + ) { + Ok((found_cid, FoundEvent::Workflow(workflow_info))) + if found_cid == workflow_cid => + { + // store workflow from info + Db::store_workflow( + Stored::new( + Pointer::new(workflow_info.cid), + workflow_info.num_tasks as i32, + ), + conn, + )?; + + workflow_info + } + Ok((found_cid, event)) => { + bail!("received unexpected event {event:?} for workflow {found_cid}") + } + Err(err) => { + tracing::info!(error=?err, "no information found for {workflow_cid}, setting default"); + let workflow_info = Info::default(workflow_cid, workflow_len); + // store workflow from info + Db::store_workflow( + Stored::new( + Pointer::new(workflow_info.cid), + workflow_info.num_tasks as i32, + ), + conn, + )?; + + workflow_info + } + } + } + }; + + Ok(workflow_info) + } } impl From for Ipld { @@ -170,9 +273,9 @@ impl TryFrom for Info { impl TryFrom for Vec { type Error = anyhow::Error; - fn try_from(receipt: Info) -> Result { - let receipt_ipld = Ipld::from(receipt); - DagCborCodec.encode(&receipt_ipld) + fn try_from(workflow_info: Info) -> Result { + let info_ipld = Ipld::from(workflow_info); + DagCborCodec.encode(&info_ipld) } } diff --git a/homestar-runtime/src/workflow/settings.rs b/homestar-runtime/src/workflow/settings.rs index 803855cd..c404ac6d 100644 --- a/homestar-runtime/src/workflow/settings.rs +++ b/homestar-runtime/src/workflow/settings.rs @@ -9,6 +9,7 @@ pub struct Settings { pub(crate) retry_backoff_strategy: BackoffStrategy, pub(crate) retry_max_delay_secs: u64, pub(crate) retry_initial_delay_ms: u64, + pub(crate) p2p_check_timeout_secs: u64, pub(crate) p2p_timeout_secs: u64, } @@ -20,7 +21,8 @@ impl Default for Settings { retry_backoff_strategy: BackoffStrategy::Exponential, retry_max_delay_secs: 60, retry_initial_delay_ms: 500, - p2p_timeout_secs: 60, + p2p_check_timeout_secs: 5, + p2p_timeout_secs: 120, } } } @@ -33,6 +35,7 @@ impl Default for Settings { retry_backoff_strategy: BackoffStrategy::Exponential, retry_max_delay_secs: 1, retry_initial_delay_ms: 50, + p2p_check_timeout_secs: 1, p2p_timeout_secs: 1, } } diff --git a/homestar-wasm/Cargo.toml b/homestar-wasm/Cargo.toml index 6c433a53..d6ba6cad 100644 --- a/homestar-wasm/Cargo.toml +++ b/homestar-wasm/Cargo.toml @@ -22,13 +22,13 @@ doctest = true # return to version.workspace = true after the following issue is fixed: # https://github.com/DevinR528/cargo-sort/issues/47 anyhow = { workspace = true } +async-trait = { workspace = true } atomic_refcell = "0.1" enum-as-inner = { workspace = true } heck = "0.4" homestar-core = { version = "0.1", path = "../homestar-core" } itertools = "0.10" -libipld = "0.16" -libipld-core = { version = "0.16", features = ["serde-codec", "serde"] } +libipld = { workspace = true } rust_decimal = "1.30" stacker = "0.1" thiserror = { workspace = true } @@ -43,7 +43,7 @@ wit-component = "0.8" [dev-dependencies] criterion = "0.5" image = "0.24" -serde_ipld_dagcbor = "0.3" +serde_ipld_dagcbor = { workspace = true } tokio = { workspace = true } [features] diff --git a/homestar-wasm/src/wasmtime/config.rs b/homestar-wasm/src/wasmtime/config.rs index a8ddbf6c..0449e4fe 100644 --- a/homestar-wasm/src/wasmtime/config.rs +++ b/homestar-wasm/src/wasmtime/config.rs @@ -1,10 +1,13 @@ //! Configuration for Wasm/wasmtime execution. -use crate::wasmtime; -use homestar_core::workflow::config::Resources; +use crate::wasmtime::{self, limits::StoreLimitsAsync}; +use homestar_core::{consts, workflow::config::Resources}; impl From for wasmtime::State { fn from(resources: Resources) -> wasmtime::State { - wasmtime::State::new(resources.fuel().unwrap_or(u64::MAX)) + wasmtime::State::new( + resources.fuel().unwrap_or(u64::MAX), + StoreLimitsAsync::new(Some(consts::WASM_MAX_MEMORY as usize), None), + ) } } diff --git a/homestar-wasm/src/wasmtime/limits.rs b/homestar-wasm/src/wasmtime/limits.rs new file mode 100644 index 00000000..bb46d676 --- /dev/null +++ b/homestar-wasm/src/wasmtime/limits.rs @@ -0,0 +1,66 @@ +//! Provides `async` limits for a [wasmtime::Store]. +//! This type is created for use with [wasmtime::Store::limiter_async]. +//! +//! This is a convenience type included to avoid needing to implement the +//! [wasmtime::ResourceLimiterAsync] trait. + +use async_trait::async_trait; +use homestar_core::consts; +use wasmtime::ResourceLimiterAsync; + +/// Async implementation of [wasmtime::StoreLimits]. +/// +/// Used to limit the memory use and table size of each Instance +#[derive(Clone, Debug, PartialEq)] +pub struct StoreLimitsAsync { + max_memory_size: Option, + max_table_elements: Option, + memory_consumed: u64, +} + +impl Default for StoreLimitsAsync { + fn default() -> Self { + Self { + max_memory_size: Some(consts::WASM_MAX_MEMORY as usize), + max_table_elements: None, + memory_consumed: 0, + } + } +} + +#[async_trait] +impl ResourceLimiterAsync for StoreLimitsAsync { + async fn memory_growing( + &mut self, + current: usize, + desired: usize, + _maximum: Option, + ) -> bool { + let can_grow = !matches!(self.max_memory_size, Some(limit) if desired > limit); + if can_grow { + self.memory_consumed = + (self.memory_consumed as i64 + (desired as i64 - current as i64)) as u64; + } + can_grow + } + + async fn table_growing(&mut self, _current: u32, desired: u32, _maximum: Option) -> bool { + !matches!(self.max_table_elements, Some(limit) if desired > limit) + } +} + +impl StoreLimitsAsync { + /// Create a new instance of [StoreLimitsAsync]. + pub fn new(max_memory_size: Option, max_table_elements: Option) -> Self { + Self { + max_memory_size, + max_table_elements, + memory_consumed: 0, + } + } + + /// How much memory has been consumed in bytes + pub fn memory_consumed(&self) -> u64 { + self.memory_consumed + } +} diff --git a/homestar-wasm/src/wasmtime/mod.rs b/homestar-wasm/src/wasmtime/mod.rs index dc9df00c..fe0ec93f 100644 --- a/homestar-wasm/src/wasmtime/mod.rs +++ b/homestar-wasm/src/wasmtime/mod.rs @@ -7,6 +7,7 @@ pub mod config; mod error; pub mod ipld; +pub mod limits; pub mod world; pub use error::*; diff --git a/homestar-wasm/src/wasmtime/world.rs b/homestar-wasm/src/wasmtime/world.rs index 4780dd39..1602836a 100644 --- a/homestar-wasm/src/wasmtime/world.rs +++ b/homestar-wasm/src/wasmtime/world.rs @@ -7,12 +7,13 @@ use crate::{ io::{Arg, Output}, wasmtime::{ ipld::{InterfaceType, RuntimeVal}, + limits::StoreLimitsAsync, Error, }, }; use heck::{ToKebabCase, ToSnakeCase}; use homestar_core::{ - bail, + bail, consts, workflow::{error::ResolveError, input::Args, Input}, }; use std::iter; @@ -29,21 +30,25 @@ const UNIT_OF_COMPUTE_INSTRUCTIONS: u64 = 100_000; // our error set. /// Incoming `state` from host runtime. -#[derive(Clone, Debug, PartialEq)] +#[allow(missing_debug_implementations)] pub struct State { fuel: u64, + limits: StoreLimitsAsync, } impl Default for State { fn default() -> Self { - Self { fuel: u64::MAX } + Self { + fuel: u64::MAX, + limits: StoreLimitsAsync::new(Some(consts::WASM_MAX_MEMORY as usize), None), + } } } impl State { /// Create a new [State] object. - pub fn new(fuel: u64) -> Self { - Self { fuel } + pub fn new(fuel: u64, limits: StoreLimitsAsync) -> Self { + Self { fuel, limits } } /// Set fuel add. @@ -225,6 +230,7 @@ impl World { let linker = Self::define_linker(&engine); let mut store = Store::new(&engine, data); + store.limiter_async(|s| &mut s.limits); store.add_fuel(store.data().fuel)?; // Configures a `Store` to yield execution of async WebAssembly code @@ -235,10 +241,12 @@ impl World { let component = component_from_bytes(&bytes, engine.clone())?; let instance = linker.instantiate_async(&mut store, &component).await?; + let bindings = Self::new(&mut store, &instance, fun_name)?; let mut env = Env::new(engine, linker, store); env.set_bindings(bindings); env.set_instance(instance); + Ok(env) } diff --git a/homestar-wasm/tests/execute_wasm.rs b/homestar-wasm/tests/execute_wasm.rs index 61fb749e..f6e2d547 100644 --- a/homestar-wasm/tests/execute_wasm.rs +++ b/homestar-wasm/tests/execute_wasm.rs @@ -5,7 +5,7 @@ use homestar_core::workflow::{ }; use homestar_wasm::{ io::{Arg, Output}, - wasmtime::{State, World}, + wasmtime::{limits::StoreLimitsAsync, Error, State, World}, }; use libipld::{ cid::{ @@ -20,6 +20,25 @@ fn fixtures(file: &str) -> PathBuf { PathBuf::from(env!("CARGO_MANIFEST_DIR")).join(format!("fixtures/{file}")) } +#[tokio::test] +async fn test_wasm_exceeds_max_memory() { + let wasm = fs::read(fixtures("homestar_guest_wasm.wasm")).unwrap(); + let env = World::instantiate( + wasm, + "add_one", + State::new(u64::MAX, StoreLimitsAsync::new(Some(10), None)), + ) + .await; + + if let Err(Error::WasmRuntimeError(err)) = env { + assert!(err.to_string().contains("exceeds memory limits")); + } else { + panic!("Expected WasmRuntimeError") + } + + //assert() +} + #[tokio::test] async fn test_execute_wat() { let ipld = Input::Ipld(Ipld::Map(BTreeMap::from([