From 1635ffa455e171f8794e86d4d8af478abccff445 Mon Sep 17 00:00:00 2001 From: Zeeshan Lakhani Date: Wed, 14 Jun 2023 18:45:41 +0900 Subject: [PATCH] refactor: error handling with thiserror, anyhow for runtime only (#139) Closes https://github.com/ipvm-wg/homestar/issues/62 This PR includes: * flake commands for helpful shortcuts * move to cargo nextest * worker tests * addition of workspace macros bail and ensure for internal error handling * moves general workflow structure to homestar-core, with generic handling, and allows for further building blocks --- .github/dependabot.yml | 10 +- .ignore | 16 ++ Cargo.lock | 1 + Cargo.toml | 1 + README.md | 52 ++++- flake.lock | 38 +++- flake.nix | 111 ++++++++++- homestar-core/Cargo.toml | 3 +- homestar-core/src/consts.rs | 4 +- homestar-core/src/lib.rs | 3 + homestar-core/src/macros.rs | 68 +++++++ homestar-core/src/unit.rs | 12 +- homestar-core/src/workflow.rs | 182 +++++++++++++++++ homestar-core/src/workflow/ability.rs | 3 +- homestar-core/src/workflow/config.rs | 5 +- homestar-core/src/workflow/error.rs | 163 ++++++++++++++++ homestar-core/src/workflow/input.rs | 117 ++--------- homestar-core/src/workflow/input/parse.rs | 89 +++++++++ homestar-core/src/workflow/instruction.rs | 52 ++--- .../src/workflow/instruction_result.rs | 16 +- homestar-core/src/workflow/invocation.rs | 18 +- homestar-core/src/workflow/issuer.rs | 3 +- homestar-core/src/workflow/nonce.rs | 10 +- homestar-core/src/workflow/pointer.rs | 17 +- homestar-core/src/workflow/prf.rs | 5 +- homestar-core/src/workflow/receipt.rs | 29 +-- homestar-core/src/workflow/task.rs | 24 +-- homestar-runtime/Cargo.toml | 8 +- homestar-runtime/src/cli.rs | 21 +- homestar-runtime/src/lib.rs | 1 - homestar-runtime/src/main.rs | 1 + homestar-runtime/src/network/eventloop.rs | 83 ++++++-- homestar-runtime/src/scheduler.rs | 12 +- homestar-runtime/src/tasks/wasm.rs | 7 +- homestar-runtime/src/worker.rs | 22 ++- homestar-runtime/src/workflow.rs | 184 ++++-------------- homestar-runtime/src/workflow/info.rs | 26 ++- homestar-runtime/src/workflow/settings.rs | 4 +- homestar-wasm/Cargo.toml | 4 +- homestar-wasm/src/error.rs | 107 ++++++++++ homestar-wasm/src/io.rs | 40 ++-- homestar-wasm/src/lib.rs | 1 + homestar-wasm/src/wasmtime/error.rs | 50 +++++ homestar-wasm/src/wasmtime/ipld.rs | 143 +++++++++----- homestar-wasm/src/wasmtime/mod.rs | 2 + homestar-wasm/src/wasmtime/world.rs | 74 ++++--- rust-toolchain.toml | 2 +- 47 files changed, 1340 insertions(+), 504 deletions(-) create mode 100644 .ignore create mode 100644 homestar-core/src/macros.rs create mode 100644 homestar-core/src/workflow/error.rs create mode 100644 homestar-core/src/workflow/input/parse.rs create mode 100644 homestar-wasm/src/error.rs create mode 100644 homestar-wasm/src/wasmtime/error.rs diff --git a/.github/dependabot.yml b/.github/dependabot.yml index 9639f9b5..d66c5c07 100644 --- a/.github/dependabot.yml +++ b/.github/dependabot.yml @@ -9,7 +9,7 @@ updates: - package-ecosystem: "cargo" directory: "/homestar-core" commit-message: - prefix: "chore" + prefix: "[chore(core)]" include: "scope" target-branch: "main" schedule: @@ -18,7 +18,7 @@ updates: - package-ecosystem: "cargo" directory: "/homestar-runtime" commit-message: - prefix: "chore" + prefix: "[chore(runtime)]" include: "scope" target-branch: "main" schedule: @@ -27,7 +27,7 @@ updates: - package-ecosystem: "cargo" directory: "/homestar-guest-wasm" commit-message: - prefix: "chore" + prefix: "[chore(guest-wasm)]" include: "scope" target-branch: "main" schedule: @@ -36,7 +36,7 @@ updates: - package-ecosystem: "cargo" directory: "/homestar-wasm" commit-message: - prefix: "chore" + prefix: "[chore(wasm)]" include: "scope" target-branch: "main" schedule: @@ -45,7 +45,7 @@ updates: - package-ecosystem: "github-actions" directory: "/" commit-message: - prefix: "chore(ci)" + prefix: "[chore(ci)]" include: "scope" target-branch: "main" schedule: diff --git a/.ignore b/.ignore new file mode 100644 index 00000000..ce6103f8 --- /dev/null +++ b/.ignore @@ -0,0 +1,16 @@ +# cargo-watch ignores + +docker +flake.lock +release-please-config.json +deny.toml +diesel.toml +LICENSE +*.nix +*.md + +.envrc +.dockerignore +.gitignore +.release-please-manifest.json +.pre-commit-config.yaml diff --git a/Cargo.lock b/Cargo.lock index 7cf330ad..012f5ac9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2580,6 +2580,7 @@ dependencies = [ "enum-assoc", "generic-array", "indexmap", + "json", "libipld", "libsqlite3-sys", "proptest", diff --git a/Cargo.toml b/Cargo.toml index e4e3c753..19423c00 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,6 +18,7 @@ rust-version = "1.66.0" [workspace.dependencies] anyhow = { version = "1.0", features = ["backtrace"] } +thiserror = "1.0" tokio = { version = "1.26", features = ["fs", "io-util", "io-std", "macros", "rt", "rt-multi-thread"] } tracing = "0.1" diff --git a/README.md b/README.md index 42728c81..2a4f1982 100644 --- a/README.md +++ b/README.md @@ -12,20 +12,32 @@ Crate + + Crate + Code Coverage - - Build Status + + Tests and Checks Status + + + Build Docker Status + + + Cargo Audit Status License - Docs + Docs - Docs + Docs + + + Docs Discord @@ -69,14 +81,14 @@ represents the `homestar` runtime. - Running the tests: -We recommend using [cargo nextest][cargo-nextest], which is installed via -[our Nix flake](#nix) or can be [installed separately][cargo-nextest-install]. +We recommend using [cargo nextest][cargo-nextest], which is installed by default +in our [Nix flake](#nix) or can be [installed separately][cargo-nextest-install]. ```console cargo nextest run --all-features --no-capture ``` -Otherwise, the above command looks like this using the default `cargo test`: +The above command translates to this using the default `cargo test`: ```console cargo test --all-features -- --nocapture @@ -105,7 +117,7 @@ with `experimental` and `buildkit` set to `true`, for example: - Build a multi-plaform Docker image via [buildx][buildx]: ```console - docker buildx build --platform=linux/amd64,linux/arm64 -t homestar-runtime --progress=plain . + docker buildx build --file docker/Dockerfile --platform=linux/amd64,linux/arm64 -t homestar-runtime --progress=plain . ``` - Run a Docker image (depending on your platform): @@ -149,8 +161,27 @@ hooks. Please run this before every commit and/or push. - We recommend leveraging [cargo-watch][cargo-watch], [cargo-expand][cargo-expand] and [irust][irust] for Rust development. -- We also recommend using [cargo-udeps][cargo-udeps] for removing unused dependencies - before commits and pull-requests. +- We also recommend using [cargo-udeps][cargo-udeps] for removing unused + dependencies before commits and pull-requests. +- If using our [Nix flake][nix-flake], there are a number of handy + command shortcuts available for working with `cargo-watch`, `diesel`, and + other binaries, including: + * `ci`, which runs a sequence of commands to check formatting, lints, release + builds, and tests + * `db` and `db-reset` for running `diesel` setup and migrations + * `compile-wasm` for compiling [homestar-guest-wasm](./homestar-guest-wasm), + a [wit-bindgen][]-driven example, to the `wasm32-unknown-unknown` target + * `docker-` for running docker builds + * `nx-test`, which translates to `cargo nextest run && cargo test --doc` + * `x-test` for testing continuously as files change, translating to + `cargo watch -c -s "cargo nextest run && cargo test --doc"` + * `x-` for running a variety of `cargo watch` + execution stages + * `nx-test-`, which is just like `nx-test`, but adds `all` or `0` + for running tests either with the `all-features` flag or + `no-default-features` flag, respectively. + * `x--` for package-specific + builds, tests, etc. ### Conventional Commits @@ -221,3 +252,4 @@ conditions. [pre-commit]: https://pre-commit.com/ [seamless-services]: https://youtu.be/Kr3B3sXh_VA [ucan-invocation]: https://github.com/ucan-wg/invocation +[wit-bindgen]: https://github.com/bytecodealliance/wit-bindgen diff --git a/flake.lock b/flake.lock index 9900418e..5a7c0391 100644 --- a/flake.lock +++ b/flake.lock @@ -1,12 +1,15 @@ { "nodes": { "flake-utils": { + "inputs": { + "systems": "systems" + }, "locked": { - "lastModified": 1667395993, - "narHash": "sha256-nuEHfE/LcWyuSWnS8t12N1wc105Qtau+/OdUAjtQ0rA=", + "lastModified": 1685518550, + "narHash": "sha256-o2d0KcvaXzTrPRIo0kOLV0/QXHhDQ5DTi+OxcjO8xqY=", "owner": "numtide", "repo": "flake-utils", - "rev": "5aed5285a952e0b949eb3ba02c12fa4fcfef535f", + "rev": "a1720a10a6cfe8234c0e93907ffe81be440f4cef", "type": "github" }, "original": { @@ -17,16 +20,16 @@ }, "nixpkgs": { "locked": { - "lastModified": 1678872516, - "narHash": "sha256-/E1YwtMtFAu2KUQKV/1+KFuReYPANM2Rzehk84VxVoc=", + "lastModified": 1686331006, + "narHash": "sha256-hElRDWUNG655aqF0awu+h5cmDN+I/dQcChRt2tGuGGU=", "owner": "NixOS", "repo": "nixpkgs", - "rev": "9b8e5abb18324c7fe9f07cb100c3cd4a29cda8b8", + "rev": "85bcb95aa83be667e562e781e9d186c57a07d757", "type": "github" }, "original": { "id": "nixpkgs", - "ref": "nixos-22.11", + "ref": "nixos-23.05", "type": "indirect" } }, @@ -47,11 +50,11 @@ ] }, "locked": { - "lastModified": 1674095406, - "narHash": "sha256-RexH/1rZTiX4OhdYkuJP3MuANJ+JRgoLKL60iHm//T0=", + "lastModified": 1686537156, + "narHash": "sha256-mJD80brS6h6P4jzwdKID0S9RvfyiruxgJbXvPPIDqF0=", "owner": "oxalica", "repo": "rust-overlay", - "rev": "5f7315b9800e2e500e6834767a57e39f7dbfd495", + "rev": "e75da5cfc7da874401decaa88f4ccb3b4d64d20d", "type": "github" }, "original": { @@ -59,6 +62,21 @@ "repo": "rust-overlay", "type": "github" } + }, + "systems": { + "locked": { + "lastModified": 1681028828, + "narHash": "sha256-Vy1rq5AaRuLzOxct8nz4T6wlgyUR7zLU309k9mBC768=", + "owner": "nix-systems", + "repo": "default", + "rev": "da67096a3b9bf56a91d16901293e51ba5b49a27e", + "type": "github" + }, + "original": { + "owner": "nix-systems", + "repo": "default", + "type": "github" + } } }, "root": "root", diff --git a/flake.nix b/flake.nix index 339f1a8c..ea8a6bf9 100644 --- a/flake.nix +++ b/flake.nix @@ -2,9 +2,8 @@ description = "homestar"; inputs = { - nixpkgs.url = "nixpkgs/nixos-22.11"; + nixpkgs.url = "nixpkgs/nixos-23.05"; flake-utils.url = "github:numtide/flake-utils"; - rust-overlay = { url = "github:oxalica/rust-overlay"; inputs.nixpkgs.follows = "nixpkgs"; @@ -45,6 +44,109 @@ cargo-watch twiggy ]; + + ci = pkgs.writeScriptBin "ci" '' + cargo fmt --check + cargo clippy + cargo build --release + nx-test + ''; + + db = pkgs.writeScriptBin "db" '' + diesel setup + diesel migration run + ''; + + dbReset = pkgs.writeScriptBin "db-reset" '' + diesel database reset + diesel setup + diesel migration run + ''; + + compileWasm = pkgs.writeScriptBin "compile-wasm" '' + cargo build -p homestar-guest-wasm --target wasm32-unknown-unknown --release + ''; + + dockerBuild = arch: + pkgs.writeScriptBin "docker-${arch}" '' + docker buildx build --file docker/Dockerfile --platform=linux/${arch} -t homestar-runtime --progress=plain . + ''; + + xFunc = cmd: + pkgs.writeScriptBin "x-${cmd}" '' + cargo watch -c -x ${cmd} + ''; + + xFuncAll = cmd: + pkgs.writeScriptBin "x-${cmd}-all" '' + cargo watch -c -s "cargo ${cmd} --all-features" + ''; + + xFuncNoDefault = cmd: + pkgs.writeScriptBin "x-${cmd}-0" '' + cargo watch -c -s "cargo ${cmd} --no-default-features" + ''; + + xFuncPackage = cmd: crate: + pkgs.writeScriptBin "x-${cmd}-${crate}" '' + cargo watch -c -s "cargo ${cmd} -p homestar-${crate} --all-features" + ''; + + xFuncTest = pkgs.writeScriptBin "x-test" '' + cargo watch -c -s "cargo nextest run && cargo test --doc" + ''; + + xFuncTestAll = pkgs.writeScriptBin "x-test-all" '' + cargo watch -c -s "cargo nextest run --all-features --nocapture \ + && cargo test --doc --all-features" + ''; + + xFuncTestNoDefault = pkgs.writeScriptBin "x-test-all" '' + cargo watch -c -s "cargo nextest run --no-default-features --nocapture \ + && cargo test --doc --no-default-features" + ''; + + xFuncTestPackage = crate: + pkgs.writeScriptBin "x-test-${crate}" '' + cargo watch -c -s "cargo nextest run -p homestar-${crate} --all-features \ + && cargo test --doc -p homestar-${crate} --all-features" + ''; + + nxTest = pkgs.writeScriptBin "nx-test" '' + cargo nextest run + cargo test --doc + ''; + + nxTestAll = pkgs.writeScriptBin "nx-test-all" '' + cargo nextest run --all-features --nocapture + cargo test --doc --all-features + ''; + + nxTestNoDefault = pkgs.writeScriptBin "nx-test-0" '' + cargo nextest run --no-default-features --nocapture + cargo test --doc --no-default-features + ''; + + scripts = [ + ci + db + dbReset + compileWasm + (builtins.map (arch: dockerBuild arch) ["amd64" "arm64"]) + (builtins.map (cmd: xFunc cmd) ["build" "check" "run" "clippy"]) + (builtins.map (cmd: xFuncAll cmd) ["build" "check" "run" "clippy"]) + (builtins.map (cmd: xFuncNoDefault cmd) ["build" "check" "run" "clippy"]) + (builtins.map (cmd: xFuncPackage cmd "core") ["build" "check" "run" "clippy"]) + (builtins.map (cmd: xFuncPackage cmd "wasm") ["build" "check" "run" "clippy"]) + (builtins.map (cmd: xFuncPackage cmd "runtime") ["build" "check" "run" "clippy"]) + xFuncTest + xFuncTestAll + xFuncTestNoDefault + (builtins.map (crate: xFuncTestPackage crate) ["core" "wasm" "guest-wasm" "runtime"]) + nxTest + nxTestAll + nxTestNoDefault + ]; in rec { devShells.default = pkgs.mkShell { @@ -57,16 +159,17 @@ nightly-rustfmt rust-toolchain rust-analyzer + rustup pkg-config pre-commit protobuf - rustup diesel-cli direnv self.packages.${system}.irust ] ++ format-pkgs ++ cargo-installs + ++ scripts ++ lib.optionals stdenv.isDarwin [ darwin.apple_sdk.frameworks.Security darwin.apple_sdk.frameworks.CoreFoundation @@ -92,6 +195,8 @@ doCheck = false; cargoSha256 = "sha256-FmsD3ajMqpPrTkXCX2anC+cmm0a2xuP+3FHqzj56Ma4="; }; + + formatter = pkgs.alejandra; } ); } diff --git a/homestar-core/Cargo.toml b/homestar-core/Cargo.toml index be5819be..6eee8442 100644 --- a/homestar-core/Cargo.toml +++ b/homestar-core/Cargo.toml @@ -32,7 +32,7 @@ libsqlite3-sys = { version = "0.26", features = ["bundled"] } proptest = { version = "1.1", optional = true } serde = { version = "1.0", features = ["derive"] } signature = "2.0" -thiserror = "1.0" +thiserror = { workspace = true } tracing = { workspace = true } ucan = "0.1" url = "2.3" @@ -40,6 +40,7 @@ xid = "1.0" [dev-dependencies] criterion = "0.4" +json = "0.12" [features] default = [] diff --git a/homestar-core/src/consts.rs b/homestar-core/src/consts.rs index df94e5b3..ad5c9194 100644 --- a/homestar-core/src/consts.rs +++ b/homestar-core/src/consts.rs @@ -1,4 +1,6 @@ //! Exported global constants. -/// SemVer-formatted version of the UCAN Invocation Specification. +/// SemVer-formatted version of the UCAN Invocation Specification. pub const INVOCATION_VERSION: &str = "0.2.0"; +/// DagCbor codec. +pub const DAG_CBOR: u64 = 0x71; diff --git a/homestar-core/src/lib.rs b/homestar-core/src/lib.rs index 4596d257..9fe6d56c 100644 --- a/homestar-core/src/lib.rs +++ b/homestar-core/src/lib.rs @@ -18,10 +18,13 @@ //! [Ucan invocation]: pub mod consts; +pub mod macros; #[cfg(any(test, feature = "test_utils"))] #[cfg_attr(docsrs, doc(cfg(feature = "test_utils")))] pub mod test_utils; mod unit; + pub mod workflow; pub use consts::*; pub use unit::*; +pub use workflow::Workflow; diff --git a/homestar-core/src/macros.rs b/homestar-core/src/macros.rs new file mode 100644 index 00000000..645c00b2 --- /dev/null +++ b/homestar-core/src/macros.rs @@ -0,0 +1,68 @@ +//! Macros for cross-crate export. + +/// Return early with an error. +/// +/// Modelled after [anyhow::bail]. +/// +/// # Example +/// +/// ``` +/// use homestar_core::{workflow, bail, Unit}; +/// +/// fn has_permission(user: usize, resource: usize) -> bool { +/// true +/// } +/// +/// # fn main() -> Result<(), workflow::Error> { +/// # let user = 0; +/// # let resource = 0; +/// # +/// +/// if !has_permission(user, resource) { +/// bail!(workflow::Error::UnknownError); +/// } +/// +/// # Ok(()) +/// # } +/// ``` +#[macro_export] +macro_rules! bail { + ($e:expr) => { + return Err($e); + }; +} + +/// /// Return early with an error if a condition is not satisfied. +/// +/// Analogously to `assert!`, `ensure!` takes a condition and exits the function +/// if the condition fails. Unlike `assert!`, `ensure!` returns an `Error` +/// rather than panicking. +/// +/// Modelled after [anyhow::ensure]. +/// +/// # Example +/// +/// ``` +/// use homestar_core::{workflow, ensure, Unit}; +/// +/// # +/// # fn main() -> Result<(), workflow::Error> { +/// # let user = 1; +/// # +/// ensure!( +/// user < 2, +/// workflow::Error::ConditionNotMet( +/// "only user 0 and 1 are allowed".to_string() +/// ) +/// ); +/// # Ok(()) +/// # } +/// ``` +#[macro_export(local_inner_macros)] +macro_rules! ensure { + ($cond:expr, $e:expr) => { + if !($cond) { + bail!($e); + } + }; +} diff --git a/homestar-core/src/unit.rs b/homestar-core/src/unit.rs index af3fe731..d6418588 100644 --- a/homestar-core/src/unit.rs +++ b/homestar-core/src/unit.rs @@ -4,10 +4,12 @@ //! [Tasks]: crate::workflow::Task //! [Inputs]: crate::workflow::Input //! [Invocations]: crate::workflow::Invocation +//! use crate::workflow::{ + error::InputParseError, input::{self, Args, Parsed}, - Input, + Error, Input, }; use libipld::Ipld; @@ -30,7 +32,7 @@ impl From for Unit { // Default implementation. impl input::Parse for Input { - fn parse(&self) -> anyhow::Result> { + fn parse(&self) -> Result, InputParseError> { let args = match Ipld::try_from(self.to_owned())? { Ipld::List(v) => Ipld::List(v).try_into()?, ipld => Args::new(vec![ipld.try_into()?]), @@ -39,3 +41,9 @@ impl input::Parse for Input { Ok(Parsed::with(args)) } } + +impl From> for InputParseError { + fn from(err: Error) -> Self { + InputParseError::WorkflowError(err.into()) + } +} diff --git a/homestar-core/src/workflow.rs b/homestar-core/src/workflow.rs index 68a34ca1..7127ca37 100644 --- a/homestar-core/src/workflow.rs +++ b/homestar-core/src/workflow.rs @@ -2,8 +2,21 @@ //! //! [Ucan invocation]: +use self::Error as WorkflowError; +use crate::{bail, Unit, DAG_CBOR}; +use libipld::{ + cbor::DagCborCodec, + json::DagJsonCodec, + multihash::{Code, MultihashDigest}, + prelude::Codec, + serde::from_ipld, + Cid, Ipld, +}; +use std::collections::BTreeMap; + mod ability; pub mod config; +pub mod error; pub mod input; pub mod instruction; mod instruction_result; @@ -16,6 +29,7 @@ pub mod receipt; pub mod task; pub use ability::*; +pub use error::Error; pub use input::Input; pub use instruction::Instruction; pub use instruction_result::*; @@ -26,9 +40,177 @@ pub use pointer::Pointer; pub use receipt::Receipt; pub use task::Task; +const TASKS_KEY: &str = "tasks"; + /// Generic link, cid => T [IndexMap] for storing /// invoked, raw values in-memory and using them to /// resolve other steps within a runtime's workflow. /// /// [IndexMap]: indexmap::IndexMap pub type LinkMap = indexmap::IndexMap; + +/// Workflow composed of [tasks]. +/// +/// [tasks]: Task +#[derive(Debug, Clone, PartialEq)] +pub struct Workflow<'a, T> { + tasks: Vec>, +} + +impl<'a, T> Workflow<'a, T> { + /// Create a new [Workflow] given a set of tasks. + pub fn new(tasks: Vec>) -> Self { + Self { tasks } + } + + /// Return a [Workflow]'s [tasks] vector. + /// + /// [tasks]: Task + pub fn tasks(self) -> Vec> { + self.tasks + } + + /// Return a reference to [Workflow]'s [tasks] vector. + /// + /// [tasks]: Task + pub fn tasks_ref(&self) -> &Vec> { + &self.tasks + } + + /// Length of workflow given a series of [tasks]. + /// + /// [tasks]: Task + pub fn len(&self) -> u32 { + self.tasks.len() as u32 + } + + /// Whether [Workflow] contains [tasks] or not. + /// + /// [tasks]: Task + pub fn is_empty(&self) -> bool { + self.tasks.is_empty() + } + + /// Return workflow as stringified Json. + pub fn to_json(self) -> Result> + where + Ipld: From>, + { + let encoded = DagJsonCodec.encode(&Ipld::from(self))?; + let s = std::str::from_utf8(&encoded)?; + Ok(s.to_string()) + } +} + +impl<'a, T> From> for Ipld +where + Ipld: From>, +{ + fn from(workflow: Workflow<'a, T>) -> Self { + Ipld::Map(BTreeMap::from([( + TASKS_KEY.into(), + Ipld::List( + workflow + .tasks + .into_iter() + .map(Ipld::from) + .collect::>(), + ), + )])) + } +} + +impl<'a, T> TryFrom for Workflow<'a, T> +where + T: From, +{ + type Error = WorkflowError; + + fn try_from(ipld: Ipld) -> Result { + let map = from_ipld::>(ipld)?; + let ipld = map + .get(TASKS_KEY) + .ok_or_else(|| WorkflowError::::MissingFieldError(TASKS_KEY.to_string()))?; + + let tasks = if let Ipld::List(tasks) = ipld { + tasks.iter().try_fold(vec![], |mut acc, ipld| { + acc.push(ipld.to_owned().try_into()?); + Ok::<_, Self::Error>(acc) + })? + } else { + bail!(WorkflowError::not_an_ipld_list()); + }; + + Ok(Self { tasks }) + } +} + +impl<'a, T> TryFrom> for Cid +where + Ipld: From>, +{ + type Error = WorkflowError; + + fn try_from(workflow: Workflow<'a, T>) -> Result { + let ipld: Ipld = workflow.into(); + let bytes = DagCborCodec.encode(&ipld)?; + let hash = Code::Sha3_256.digest(&bytes); + Ok(Cid::new_v1(DAG_CBOR, hash)) + } +} + +#[cfg(test)] +mod test { + use super::*; + use crate::{ + test_utils, + workflow::{config::Resources, instruction::RunInstruction, prf::UcanPrf}, + Unit, + }; + + #[test] + fn workflow_to_json() { + let config = Resources::default(); + let (instruction1, instruction2, _) = + test_utils::workflow::related_wasm_instructions::(); + let task1 = Task::new( + RunInstruction::Expanded(instruction1), + config.clone().into(), + UcanPrf::default(), + ); + let task2 = Task::new( + RunInstruction::Expanded(instruction2), + config.into(), + UcanPrf::default(), + ); + + let workflow = Workflow::new(vec![task1.clone(), task2.clone()]); + + let json_string = workflow.to_json().unwrap(); + + let json_val = json::from(json_string.clone()); + assert_eq!(json_string, json_val.to_string()); + } + + #[test] + fn ipld_roundtrip_workflow() { + let config = Resources::default(); + let (instruction1, instruction2, _) = + test_utils::workflow::related_wasm_instructions::(); + let task1 = Task::new( + RunInstruction::Expanded(instruction1), + config.clone().into(), + UcanPrf::default(), + ); + let task2 = Task::new( + RunInstruction::Expanded(instruction2), + config.into(), + UcanPrf::default(), + ); + + let workflow = Workflow::new(vec![task1.clone(), task2.clone()]); + let ipld = Ipld::from(workflow.clone()); + let ipld_to_workflow = ipld.try_into().unwrap(); + assert_eq!(workflow, ipld_to_workflow); + } +} diff --git a/homestar-core/src/workflow/ability.rs b/homestar-core/src/workflow/ability.rs index 421a6b41..d76f1388 100644 --- a/homestar-core/src/workflow/ability.rs +++ b/homestar-core/src/workflow/ability.rs @@ -3,6 +3,7 @@ //! [Resource]: url::Url //! [UCAN Ability]: +use crate::{workflow, Unit}; use libipld::{serde::from_ipld, Ipld}; use serde::{Deserialize, Serialize}; use std::{borrow::Cow, fmt}; @@ -63,7 +64,7 @@ impl From for Ipld { } impl TryFrom for Ability { - type Error = anyhow::Error; + type Error = workflow::Error; fn try_from(ipld: Ipld) -> Result { let ability = from_ipld::(ipld)?; diff --git a/homestar-core/src/workflow/config.rs b/homestar-core/src/workflow/config.rs index ea940576..f9e12a06 100644 --- a/homestar-core/src/workflow/config.rs +++ b/homestar-core/src/workflow/config.rs @@ -4,6 +4,7 @@ //! [workflow]: super //! [Invocations]: super::Invocation +use crate::{workflow, Unit}; use libipld::{serde::from_ipld, Ipld}; use serde::{Deserialize, Serialize}; use std::{collections::BTreeMap, default::Default, time::Duration}; @@ -76,7 +77,7 @@ impl From for Ipld { } impl<'a> TryFrom<&'a Ipld> for Resources { - type Error = anyhow::Error; + type Error = workflow::Error; fn try_from(ipld: &Ipld) -> Result { Resources::try_from(ipld.to_owned()) @@ -84,7 +85,7 @@ impl<'a> TryFrom<&'a Ipld> for Resources { } impl TryFrom for Resources { - type Error = anyhow::Error; + type Error = workflow::Error; fn try_from(ipld: Ipld) -> Result { let map = from_ipld::>(ipld)?; diff --git a/homestar-core/src/workflow/error.rs b/homestar-core/src/workflow/error.rs new file mode 100644 index 00000000..990770de --- /dev/null +++ b/homestar-core/src/workflow/error.rs @@ -0,0 +1,163 @@ +//! Error types and implementations for [Workflow] interaction(s). +//! +//! [Workflow]: crate::Workflow + +use crate::{ + workflow::{input::Args, Input}, + Unit, +}; +use libipld::Ipld; +use serde::de::Error as DeError; + +/// Generic error type for [Workflow] use cases. +/// +/// [Workflow]: crate::Workflow +#[derive(thiserror::Error, Debug)] +pub enum Error { + /// Error encoding structure to a [Cid]. + /// + /// [Cid]: libipld::cid::Cid + #[error("failed to encode CID: {0}")] + CidError(#[from] libipld::cid::Error), + /// Error thrown when condition or dynamic check is not met. + #[error("condition not met: {0}")] + ConditionNotMet(String), + /// Failure to decode/encode from/to DagCbor. + /// + /// The underlying error is a [anyhow::Error], per the + /// [DagCborCodec] implementation. + /// + /// [DagCborCodec]: libipld::cbor::DagCborCodec + #[error("failed to decode/encode DAG-CBOR: {0}")] + DagCborTranslationError(#[from] anyhow::Error), + /// Error converting from [Ipld] structure. + #[error("cannot convert from Ipld structure: {0}")] + FromIpldError(#[from] libipld::error::SerdeError), + /// Invalid match discriminant or enumeration. + #[error("invalid discriminant {0:#?}")] + InvalidDiscriminant(T), + /// Error related to a missing a field in a structure or key + /// in a map. + #[error("no {0} field set")] + MissingFieldError(String), + /// Error during parsing a [Url]. + /// + /// Transparently forwards from [url::ParseError]'s `source` and + /// `Display` methods through to an underlying error. + /// + /// [Url]: url::Url + #[error(transparent)] + ParseResourceError(#[from] url::ParseError), + /// Generic unknown error. + #[error("unknown error")] + UnknownError, + /// Error when attempting to interpret a sequence of [u8] + /// as a string. + /// + /// Transparently forwards from [std::str::Utf8Error]'s `source` and + /// `Display` methods through to an underlying error. + #[error(transparent)] + Utf8Error(#[from] std::str::Utf8Error), +} + +impl Error { + /// Return a [SerdeError] when returning an [Ipld] structure + /// that's not expected at the call-site. + /// + /// [SerdeError]: libipld::error::SerdeError + pub fn unexpected_ipld(ipld: Ipld) -> Self { + Error::FromIpldError(libipld::error::SerdeError::custom(format!( + "unexpected Ipld conversion: {ipld:#?}" + ))) + } + + /// Return an `invalid type` [SerdeError] when not matching an expected + /// [Ipld] list/sequence type. + /// + /// [SerdeError]: libipld::error::SerdeError + pub fn not_an_ipld_list() -> Self { + Error::FromIpldError(libipld::error::SerdeError::invalid_type( + serde::de::Unexpected::Seq, + &"an Ipld list / sequence", + )) + } +} + +impl From> for Error { + fn from(_err: Error) -> Self { + Error::UnknownError + } +} + +impl From> for Error { + fn from(_err: Error) -> Error { + Error::UnknownError + } +} + +impl From for Error { + fn from(err: std::convert::Infallible) -> Self { + match err {} + } +} + +/// Error type for parsing [Workflow] [Input]s. +/// +/// [Workflow]: crate::Workflow +#[derive(thiserror::Error, Debug)] +pub enum InputParseError { + /// Error converting from [Ipld] structure. + #[error("cannot convert from Ipld structure: {0}")] + FromIpldError(#[from] libipld::error::SerdeError), + /// Error converting from [Ipld] structure into [Args]. + #[error("cannot convert from Ipld structure into arguments: {0:#?}")] + IpldToArgsError(Args), + /// Unexpected [Input] in [Task] structure. + /// + /// [Task]: crate::workflow::Task + #[error("unexpected task input: {0:#?}")] + UnexpectedTaskInput(Input), + /// Bubble-up conversion and other general [Workflow errors]. + /// + /// [Workflow errors]: Error + #[error(transparent)] + WorkflowError(#[from] Error), +} + +impl From for InputParseError { + fn from(err: std::convert::Infallible) -> Self { + match err {} + } +} + +/// Error type for resolving promised [Cid]s within [Workflow] [Input]s. +/// +/// [Cid]: libipld::Cid +/// [Workflow]: crate::Workflow +#[derive(thiserror::Error, Debug)] +pub enum ResolveError { + /// Generic runtime error. + /// + /// Transparently forwards from [anyhow::Error]'s `source` and + /// `Display` methods through to an underlying error. + #[error(transparent)] + RuntimeError(#[from] anyhow::Error), + /// Transport error when attempting to resolve [Workflow] [Input]'s [Cid]. + /// + /// [Cid]: libipld::Cid + /// [Workflow]: crate::Workflow + #[error("transport error during resolve phase of input Cid: {0}")] + TransportError(String), + /// Unable to resolve a [Cid] within a [Workflow]'s [Input]. + /// + /// [Cid]: libipld::Cid + /// [Workflow]: crate::Workflow + #[error("error resolving input Cid: {0}")] + UnresolvedCidError(String), +} + +impl From for ResolveError { + fn from(err: std::convert::Infallible) -> Self { + match err {} + } +} diff --git a/homestar-core/src/workflow/input.rs b/homestar-core/src/workflow/input.rs index 174ce4b9..609e79f5 100644 --- a/homestar-core/src/workflow/input.rs +++ b/homestar-core/src/workflow/input.rs @@ -5,101 +5,17 @@ //! [parse]: Parse::parse //! [resolve]: Args::resolve -use super::{ +use crate::workflow::{ + self, + error::ResolveError, pointer::{Await, AwaitResult, ERR_BRANCH, OK_BRANCH, PTR_BRANCH}, InstructionResult, Pointer, }; -use anyhow::anyhow; use libipld::{serde::from_ipld, Cid, Ipld}; use std::{collections::btree_map::BTreeMap, result::Result}; -/// Parsed [Args] consisting of [Inputs] for execution flows, as well as an -/// optional function name/definition. -/// -/// TODO: Extend via enumeration for singular objects/values. -/// -/// [Inputs]: super::Input -#[derive(Clone, Debug, PartialEq)] -pub struct Parsed { - args: Args, - fun: Option, -} - -impl Parsed { - /// Initiate [Parsed] data structure with only [Args]. - pub fn with(args: Args) -> Self { - Parsed { args, fun: None } - } - - /// Initiate [Parsed] data structure with a function name and - /// [Args]. - pub fn with_fn(fun: String, args: Args) -> Self { - Parsed { - args, - fun: Some(fun), - } - } - - /// Parsed arguments. - pub fn args(&self) -> &Args { - &self.args - } - - /// Turn [Parsed] structure into owned [Args]. - pub fn into_args(self) -> Args { - self.args - } - - /// Parsed function named. - pub fn fun(&self) -> Option { - self.fun.as_ref().map(|f| f.to_string()) - } -} - -impl From> for Args { - fn from(apply: Parsed) -> Self { - apply.args - } -} - -/// Interface for [Instruction] implementations, relying on `homestore-core` -/// to implement custom parsing specifics. -/// -/// # Example -/// -/// ``` -/// use homestar_core::{ -/// workflow::{ -/// input::{Args, Parse}, Ability, Input, Instruction, -/// }, -/// Unit, -/// }; -/// use libipld::Ipld; -/// use url::Url; -/// -/// let wasm = "bafkreidztuwoszw2dfnzufjpsjmzj67x574qcdm2autnhnv43o3t4zmh7i".to_string(); -/// let resource = Url::parse(format!("ipfs://{wasm}").as_str()).unwrap(); -/// -/// let inst = Instruction::unique( -/// resource, -/// Ability::from("wasm/run"), -/// Input::::Ipld(Ipld::List(vec![Ipld::Bool(true)])) -/// ); -/// -/// let parsed = inst.input().parse().unwrap(); -/// -/// // turn into Args for invocation: -/// let args: Args = parsed.try_into().unwrap(); -/// ``` -/// -/// [Instruction]: super::Instruction -pub trait Parse { - /// Function returning [Parsed] structure for execution/invocation. - /// - /// Note: meant to come before the `resolve` step - /// during runtime execution. - fn parse(&self) -> anyhow::Result>; -} +mod parse; +pub use parse::*; /// A list of ordered [Input] arguments/parameters. #[derive(Clone, Debug, PartialEq)] @@ -148,9 +64,9 @@ where /// [awaited promises]: Await /// [inputs]: Input /// [resolving Ipld links]: resolve_links - pub fn resolve(self, lookup_fn: F) -> anyhow::Result + pub fn resolve(self, lookup_fn: F) -> Result where - F: Fn(Cid) -> anyhow::Result> + Clone, + F: Fn(Cid) -> Result, ResolveError> + Clone, Ipld: From, T: Clone, { @@ -173,9 +89,9 @@ impl TryFrom for Args where InstructionResult: TryFrom, { - type Error = anyhow::Error; + type Error = workflow::Error; - fn try_from(ipld: Ipld) -> Result { + fn try_from(ipld: Ipld) -> Result { if let Ipld::List(vec) = ipld { let args = vec .into_iter() @@ -192,7 +108,7 @@ where }); Ok(Args(args)) } else { - Err(anyhow!("unexpected conversion type")) + Err(workflow::Error::not_an_ipld_list()) } } } @@ -230,7 +146,7 @@ impl Input { /// [resolving Ipld links]: resolve_links pub fn resolve(self, lookup_fn: F) -> Input where - F: Fn(Cid) -> anyhow::Result> + Clone, + F: Fn(Cid) -> Result, ResolveError> + Clone, Ipld: From, { match self { @@ -280,7 +196,7 @@ impl TryFrom for Input where T: From, { - type Error = anyhow::Error; + type Error = workflow::Error; fn try_from(ipld: Ipld) -> Result { let Ok(map) = from_ipld::>(ipld.to_owned()) else { @@ -304,8 +220,9 @@ where let instruction = Pointer::try_from(ipld)?; Ok(Input::Deferred(Await::new( instruction, - AwaitResult::result(branch) - .ok_or_else(|| anyhow!("wrong branch name: {branch}"))?, + AwaitResult::result(branch).ok_or_else(|| { + workflow::Error::InvalidDiscriminant(branch.to_string()) + })?, ))) }, ) @@ -314,7 +231,7 @@ where fn resolve_args(args: Vec>, lookup_fn: F) -> Vec> where - F: Fn(Cid) -> anyhow::Result> + Clone, + F: Fn(Cid) -> Result, ResolveError> + Clone, Ipld: From, { let args = args.into_iter().map(|v| v.resolve(lookup_fn.clone())); @@ -326,7 +243,7 @@ where /// [awaited promises]: Await pub fn resolve_links(ipld: Ipld, lookup_fn: F) -> Ipld where - F: Fn(Cid) -> anyhow::Result> + Clone, + F: Fn(Cid) -> Result, ResolveError> + Clone, Ipld: From, { match ipld { diff --git a/homestar-core/src/workflow/input/parse.rs b/homestar-core/src/workflow/input/parse.rs new file mode 100644 index 00000000..bf27c384 --- /dev/null +++ b/homestar-core/src/workflow/input/parse.rs @@ -0,0 +1,89 @@ +use crate::workflow::{error::InputParseError, input::Args}; + +/// Parsed [Args] consisting of [Inputs] for execution flows, as well as an +/// optional function name/definition. +/// +/// TODO: Extend via enumeration for singular objects/values. +/// +/// [Inputs]: super::Input +#[derive(Clone, Debug, PartialEq)] +pub struct Parsed { + args: Args, + fun: Option, +} + +impl Parsed { + /// Initiate [Parsed] data structure with only [Args]. + pub fn with(args: Args) -> Self { + Parsed { args, fun: None } + } + + /// Initiate [Parsed] data structure with a function name and + /// [Args]. + pub fn with_fn(fun: String, args: Args) -> Self { + Parsed { + args, + fun: Some(fun), + } + } + + /// Parsed arguments. + pub fn args(&self) -> &Args { + &self.args + } + + /// Turn [Parsed] structure into owned [Args]. + pub fn into_args(self) -> Args { + self.args + } + + /// Parsed function named. + pub fn fun(&self) -> Option { + self.fun.as_ref().map(|f| f.to_string()) + } +} + +impl From> for Args { + fn from(apply: Parsed) -> Self { + apply.args + } +} + +/// Interface for [Instruction] implementations, relying on `homestore-core` +/// to implement custom parsing specifics. +/// +/// # Example +/// +/// ``` +/// use homestar_core::{ +/// workflow::{ +/// input::{Args, Parse}, Ability, Input, Instruction, +/// }, +/// Unit, +/// }; +/// use libipld::Ipld; +/// use url::Url; +/// +/// let wasm = "bafkreidztuwoszw2dfnzufjpsjmzj67x574qcdm2autnhnv43o3t4zmh7i".to_string(); +/// let resource = Url::parse(format!("ipfs://{wasm}").as_str()).unwrap(); +/// +/// let inst = Instruction::unique( +/// resource, +/// Ability::from("wasm/run"), +/// Input::::Ipld(Ipld::List(vec![Ipld::Bool(true)])) +/// ); +/// +/// let parsed = inst.input().parse().unwrap(); +/// +/// // turn into Args for invocation: +/// let args: Args = parsed.try_into().unwrap(); +/// ``` +/// +/// [Instruction]: crate::workflow::Instruction +pub trait Parse { + /// Function returning [Parsed] structure for execution/invocation. + /// + /// Note: meant to come before the `resolve` step + /// during runtime execution. + fn parse(&self) -> Result, InputParseError>; +} diff --git a/homestar-core/src/workflow/instruction.rs b/homestar-core/src/workflow/instruction.rs index efe66403..a413c1d0 100644 --- a/homestar-core/src/workflow/instruction.rs +++ b/homestar-core/src/workflow/instruction.rs @@ -1,8 +1,11 @@ //! An [Instruction] is the smallest unit of work that can be requested from a //! UCAN, described via `resource`, `ability`. -use super::{Ability, Input, Nonce, Pointer}; -use anyhow::anyhow; +use crate::{ + consts::DAG_CBOR, + workflow::{Ability, Error as WorkflowError, Input, Nonce, Pointer}, + Unit, +}; use libipld::{ cbor::DagCborCodec, cid::{ @@ -17,7 +20,6 @@ use libipld::{ use std::{borrow::Cow, collections::BTreeMap, fmt}; use url::Url; -const DAG_CBOR: u64 = 0x71; const RESOURCE_KEY: &str = "rsc"; const OP_KEY: &str = "op"; const INPUT_KEY: &str = "input"; @@ -43,12 +45,12 @@ impl<'a, T> TryFrom> for Instruction<'a, T> where T: fmt::Debug, { - type Error = anyhow::Error; + type Error = WorkflowError>; fn try_from(run: RunInstruction<'a, T>) -> Result { match run { RunInstruction::Expanded(instruction) => Ok(instruction), - e => Err(anyhow!("wrong discriminant: {e:?}")), + e => Err(WorkflowError::InvalidDiscriminant(e)), } } } @@ -63,12 +65,12 @@ impl<'a, T> TryFrom> for Pointer where T: fmt::Debug, { - type Error = anyhow::Error; + type Error = WorkflowError>; fn try_from(run: RunInstruction<'a, T>) -> Result { match run { RunInstruction::Ptr(ptr) => Ok(ptr), - e => Err(anyhow!("wrong discriminant: {e:?}")), + e => Err(WorkflowError::InvalidDiscriminant(e)), } } } @@ -77,12 +79,12 @@ impl<'a, 'b, T> TryFrom<&'b RunInstruction<'a, T>> for &'b Pointer where T: fmt::Debug, { - type Error = anyhow::Error; + type Error = WorkflowError<&'b RunInstruction<'a, T>>; fn try_from(run: &'b RunInstruction<'a, T>) -> Result { match run { RunInstruction::Ptr(ptr) => Ok(ptr), - e => Err(anyhow!("wrong discriminant: {e:?}")), + e => Err(WorkflowError::InvalidDiscriminant(e)), } } } @@ -91,12 +93,12 @@ impl<'a, 'b, T> TryFrom<&'b RunInstruction<'a, T>> for Pointer where T: fmt::Debug, { - type Error = anyhow::Error; + type Error = WorkflowError<&'b RunInstruction<'a, T>>; fn try_from(run: &'b RunInstruction<'a, T>) -> Result { match run { RunInstruction::Ptr(ptr) => Ok(ptr.to_owned()), - e => Err(anyhow!("wrong discriminant: {e:?}")), + e => Err(WorkflowError::InvalidDiscriminant(e)), } } } @@ -117,13 +119,13 @@ impl TryFrom for RunInstruction<'_, T> where T: From, { - type Error = anyhow::Error; + type Error = WorkflowError; fn try_from<'a>(ipld: Ipld) -> Result { match ipld { Ipld::Map(_) => Ok(RunInstruction::Expanded(Instruction::try_from(ipld)?)), Ipld::Link(_) => Ok(RunInstruction::Ptr(Pointer::try_from(ipld)?)), - _ => Err(anyhow!("unexpected conversion type")), + other_ipld => Err(WorkflowError::unexpected_ipld(other_ipld)), } } } @@ -241,7 +243,7 @@ impl TryFrom> for Pointer where Ipld: From, { - type Error = anyhow::Error; + type Error = WorkflowError; fn try_from(instruction: Instruction<'_, T>) -> Result { Ok(Pointer::new(Cid::try_from(instruction)?)) @@ -252,7 +254,7 @@ impl TryFrom> for Cid where Ipld: From, { - type Error = anyhow::Error; + type Error = WorkflowError; fn try_from(instruction: Instruction<'_, T>) -> Result { let ipld: Ipld = instruction.into(); @@ -280,7 +282,7 @@ impl TryFrom<&Ipld> for Instruction<'_, T> where T: From, { - type Error = anyhow::Error; + type Error = WorkflowError; fn try_from(ipld: &Ipld) -> Result { TryFrom::try_from(ipld.to_owned()) @@ -291,35 +293,37 @@ impl TryFrom for Instruction<'_, T> where T: From, { - type Error = anyhow::Error; + type Error = WorkflowError; fn try_from(ipld: Ipld) -> Result { let map = from_ipld::>(ipld)?; let rsc = match map.get(RESOURCE_KEY) { Some(Ipld::Link(cid)) => cid - .to_string_of_base(Base::Base32Lower) - .map_err(|e| anyhow!("failed to encode CID into multibase string: {e}")) + .to_string_of_base(Base::Base32Lower) // Cid v1 + .map_err(WorkflowError::::CidError) .and_then(|txt| { Url::parse(format!("{}{}", "ipfs://", txt).as_str()) - .map_err(|e| anyhow!("failed to parse URL: {e}")) + .map_err(WorkflowError::ParseResourceError) }), Some(Ipld::String(txt)) => { - Url::parse(txt.as_str()).map_err(|e| anyhow!("failed to parse URL: {e}")) + Url::parse(txt.as_str()).map_err(WorkflowError::ParseResourceError) } - _ => Err(anyhow!("no resource/with set.")), + _ => Err(WorkflowError::MissingFieldError(RESOURCE_KEY.to_string())), }?; Ok(Self { rsc, op: from_ipld( map.get(OP_KEY) - .ok_or_else(|| anyhow!("no `op` field set"))? + .ok_or_else(|| WorkflowError::::MissingFieldError(OP_KEY.to_string()))? .to_owned(), )?, input: Input::try_from( map.get(INPUT_KEY) - .ok_or_else(|| anyhow!("no `input` field set"))? + .ok_or_else(|| { + WorkflowError::::MissingFieldError(INPUT_KEY.to_string()) + })? .to_owned(), )?, nnc: Nonce::try_from( diff --git a/homestar-core/src/workflow/instruction_result.rs b/homestar-core/src/workflow/instruction_result.rs index 5674bd0a..4b297aa9 100644 --- a/homestar-core/src/workflow/instruction_result.rs +++ b/homestar-core/src/workflow/instruction_result.rs @@ -3,7 +3,7 @@ //! //! [Instruction]: super::Instruction -use anyhow::anyhow; +use crate::{workflow, Unit}; use diesel::{ backend::Backend, deserialize::{self, FromSql}, @@ -76,9 +76,9 @@ impl TryFrom for InstructionResult where T: From, { - type Error = anyhow::Error; + type Error = workflow::Error; - fn try_from(ipld: Ipld) -> Result { + fn try_from(ipld: Ipld) -> Result> { if let Ipld::List(v) = ipld { match &v[..] { [Ipld::String(result), res] if result == OK => { @@ -90,10 +90,12 @@ where [Ipld::String(result), res] if result == JUST => { Ok(InstructionResult::Just(res.to_owned().try_into()?)) } - _ => Err(anyhow!("unexpected conversion type")), + other_ipld => Err(workflow::Error::unexpected_ipld( + other_ipld.to_owned().into(), + )), } } else { - Err(anyhow!("not convertible to Ipld")) + Err(workflow::Error::not_an_ipld_list()) } } } @@ -102,9 +104,9 @@ impl TryFrom<&Ipld> for InstructionResult where T: From, { - type Error = anyhow::Error; + type Error = workflow::Error; - fn try_from(ipld: &Ipld) -> Result { + fn try_from(ipld: &Ipld) -> Result> { TryFrom::try_from(ipld.to_owned()) } } diff --git a/homestar-core/src/workflow/invocation.rs b/homestar-core/src/workflow/invocation.rs index e3e75810..fd02bb56 100644 --- a/homestar-core/src/workflow/invocation.rs +++ b/homestar-core/src/workflow/invocation.rs @@ -2,8 +2,11 @@ //! //! [Task]: super::Task -use super::{Pointer, Task}; -use anyhow::anyhow; +use crate::{ + consts::DAG_CBOR, + workflow::{Error as WorkflowError, Pointer, Task}, + Unit, +}; use libipld::{ cbor::DagCborCodec, cid::{ @@ -16,7 +19,6 @@ use libipld::{ }; use std::collections::BTreeMap; -const DAG_CBOR: u64 = 0x71; const TASK_KEY: &str = "task"; /// A signed [Task] wrapper/container. @@ -48,7 +50,7 @@ impl TryFrom> for Ipld where Ipld: From, { - type Error = anyhow::Error; + type Error = WorkflowError; fn try_from(invocation: Invocation<'_, T>) -> Result { let map = Ipld::Map(BTreeMap::from([( @@ -64,7 +66,7 @@ impl TryFrom for Invocation<'_, T> where T: From, { - type Error = anyhow::Error; + type Error = WorkflowError; fn try_from(ipld: Ipld) -> Result { let map = from_ipld::>(ipld)?; @@ -72,7 +74,7 @@ where Ok(Self { task: Task::try_from( map.get(TASK_KEY) - .ok_or_else(|| anyhow!("no `task` set"))? + .ok_or_else(|| WorkflowError::::MissingFieldError(TASK_KEY.to_string()))? .to_owned(), )?, }) @@ -83,7 +85,7 @@ impl TryFrom> for Pointer where Ipld: From, { - type Error = anyhow::Error; + type Error = WorkflowError; fn try_from(invocation: Invocation<'_, T>) -> Result { Ok(Pointer::new(Cid::try_from(invocation)?)) @@ -94,7 +96,7 @@ impl TryFrom> for Cid where Ipld: From, { - type Error = anyhow::Error; + type Error = WorkflowError; fn try_from(invocation: Invocation<'_, T>) -> Result { let ipld: Ipld = invocation.try_into()?; diff --git a/homestar-core/src/workflow/issuer.rs b/homestar-core/src/workflow/issuer.rs index 35af0426..ef583904 100644 --- a/homestar-core/src/workflow/issuer.rs +++ b/homestar-core/src/workflow/issuer.rs @@ -1,6 +1,7 @@ //! Issuer referring to a principal (principal of least authority) that issues //! a receipt. +use crate::{workflow, Unit}; use diesel::{ backend::Backend, deserialize::{self, FromSql}, @@ -46,7 +47,7 @@ impl From for Ipld { } impl TryFrom for Issuer { - type Error = anyhow::Error; + type Error = workflow::Error; fn try_from(ipld: Ipld) -> Result { let s = from_ipld::(ipld)?; diff --git a/homestar-core/src/workflow/nonce.rs b/homestar-core/src/workflow/nonce.rs index 4603236b..6e1e2847 100644 --- a/homestar-core/src/workflow/nonce.rs +++ b/homestar-core/src/workflow/nonce.rs @@ -2,7 +2,7 @@ //! //! [Instruction]: super::Instruction -use anyhow::anyhow; +use crate::{workflow, Unit}; use enum_as_inner::EnumAsInner; use generic_array::{ typenum::consts::{U12, U16}, @@ -61,7 +61,7 @@ impl From for Ipld { } impl TryFrom for Nonce { - type Error = anyhow::Error; + type Error = workflow::Error; fn try_from(ipld: Ipld) -> Result { if let Ipld::List(v) = ipld { @@ -72,7 +72,9 @@ impl TryFrom for Nonce { [Ipld::Integer(1), Ipld::Bytes(nonce)] => { Ok(Nonce::Nonce128(*GenericArray::from_slice(nonce))) } - _ => Err(anyhow!("unexpected conversion type")), + other_ipld => Err(workflow::Error::unexpected_ipld( + other_ipld.to_owned().into(), + )), } } else { Ok(Nonce::Empty) @@ -81,7 +83,7 @@ impl TryFrom for Nonce { } impl TryFrom<&Ipld> for Nonce { - type Error = anyhow::Error; + type Error = workflow::Error; fn try_from(ipld: &Ipld) -> Result { TryFrom::try_from(ipld.to_owned()) diff --git a/homestar-core/src/workflow/pointer.rs b/homestar-core/src/workflow/pointer.rs index 1f4be658..c24c1e5b 100644 --- a/homestar-core/src/workflow/pointer.rs +++ b/homestar-core/src/workflow/pointer.rs @@ -8,7 +8,7 @@ //! [Instructions]: super::Instruction //! [Receipts]: super::Receipt -use anyhow::ensure; +use crate::{ensure, workflow, Unit}; use diesel::{ backend::Backend, deserialize::{self, FromSql}, @@ -122,11 +122,16 @@ impl From<&Await> for Ipld { } impl TryFrom for Await { - type Error = anyhow::Error; + type Error = workflow::Error; fn try_from(ipld: Ipld) -> Result { let map = from_ipld::>(ipld)?; - ensure!(map.len() == 1, "unexpected keys inside awaited promise"); + ensure!( + map.len() == 1, + workflow::Error::ConditionNotMet( + "await promise must jave only a single key ain a map".to_string() + ) + ); let (key, value) = map.into_iter().next().unwrap(); let instruction = Pointer::try_from(value)?; @@ -145,7 +150,7 @@ impl TryFrom for Await { } impl TryFrom<&Ipld> for Await { - type Error = anyhow::Error; + type Error = workflow::Error; fn try_from(ipld: &Ipld) -> Result { TryFrom::try_from(ipld.to_owned()) @@ -198,7 +203,7 @@ impl From for Ipld { } impl TryFrom for Pointer { - type Error = anyhow::Error; + type Error = workflow::Error; fn try_from(ipld: Ipld) -> Result { let s: Cid = from_ipld(ipld)?; @@ -207,7 +212,7 @@ impl TryFrom for Pointer { } impl TryFrom<&Ipld> for Pointer { - type Error = anyhow::Error; + type Error = workflow::Error; fn try_from(ipld: &Ipld) -> Result { TryFrom::try_from(ipld.to_owned()) diff --git a/homestar-core/src/workflow/prf.rs b/homestar-core/src/workflow/prf.rs index 333d30bf..66052ba7 100644 --- a/homestar-core/src/workflow/prf.rs +++ b/homestar-core/src/workflow/prf.rs @@ -3,6 +3,7 @@ //! //! [Task]: super::Task +use crate::{workflow, Unit}; use diesel::{ backend::Backend, deserialize::{self, FromSql}, @@ -46,7 +47,7 @@ impl From for Ipld { } impl TryFrom for UcanPrf { - type Error = anyhow::Error; + type Error = workflow::Error; fn try_from(ipld: Ipld) -> Result { if let Ipld::List(inner) = ipld { @@ -63,7 +64,7 @@ impl TryFrom for UcanPrf { } impl TryFrom<&Ipld> for UcanPrf { - type Error = anyhow::Error; + type Error = workflow::Error; fn try_from(ipld: &Ipld) -> Result { TryFrom::try_from(ipld.to_owned()) diff --git a/homestar-core/src/workflow/receipt.rs b/homestar-core/src/workflow/receipt.rs index bd5d1550..a49cf194 100644 --- a/homestar-core/src/workflow/receipt.rs +++ b/homestar-core/src/workflow/receipt.rs @@ -1,7 +1,9 @@ //! Output of an invocation, referenced by its invocation pointer. -use super::{prf::UcanPrf, InstructionResult, Issuer, Pointer}; -use anyhow::anyhow; +use crate::{ + workflow::{prf::UcanPrf, Error as WorkflowError, InstructionResult, Issuer, Pointer}, + Unit, +}; use libipld::{ cbor::DagCborCodec, cid::{ @@ -85,16 +87,17 @@ impl Receipt { } impl TryFrom> for Vec { - type Error = anyhow::Error; + type Error = WorkflowError; fn try_from(receipt: Receipt) -> Result { let receipt_ipld = Ipld::from(&receipt); - DagCborCodec.encode(&receipt_ipld) + let encoded = DagCborCodec.encode(&receipt_ipld)?; + Ok(encoded) } } impl TryFrom> for Receipt { - type Error = anyhow::Error; + type Error = WorkflowError; fn try_from(bytes: Vec) -> Result { let ipld: Ipld = DagCborCodec.decode(&bytes)?; @@ -103,7 +106,7 @@ impl TryFrom> for Receipt { } impl TryFrom> for Cid { - type Error = anyhow::Error; + type Error = WorkflowError; fn try_from(receipt: Receipt) -> Result { TryFrom::try_from(&receipt) @@ -111,7 +114,7 @@ impl TryFrom> for Cid { } impl TryFrom<&Receipt> for Cid { - type Error = anyhow::Error; + type Error = WorkflowError; fn try_from(receipt: &Receipt) -> Result { let ipld = Ipld::from(receipt); @@ -147,23 +150,23 @@ impl From> for Ipld { } impl TryFrom for Receipt { - type Error = anyhow::Error; + type Error = WorkflowError; fn try_from(ipld: Ipld) -> Result { let map = from_ipld::>(ipld)?; let ran = map .get(RAN_KEY) - .ok_or_else(|| anyhow!("missing {RAN_KEY}"))? + .ok_or_else(|| WorkflowError::::MissingFieldError(RAN_KEY.to_string()))? .try_into()?; let out = map .get(OUT_KEY) - .ok_or_else(|| anyhow!("missing {OUT_KEY}"))?; + .ok_or_else(|| WorkflowError::::MissingFieldError(OUT_KEY.to_string()))?; let meta = map .get(METADATA_KEY) - .ok_or_else(|| anyhow!("missing {METADATA_KEY}"))?; + .ok_or_else(|| WorkflowError::::MissingFieldError(METADATA_KEY.to_string()))?; let issuer = map .get(ISSUER_KEY) @@ -176,7 +179,7 @@ impl TryFrom for Receipt { let prf = map .get(PROOF_KEY) - .ok_or_else(|| anyhow!("missing {PROOF_KEY}"))?; + .ok_or_else(|| WorkflowError::::MissingFieldError(PROOF_KEY.to_string()))?; Ok(Receipt { ran, @@ -189,7 +192,7 @@ impl TryFrom for Receipt { } impl TryFrom> for Pointer { - type Error = anyhow::Error; + type Error = WorkflowError; fn try_from(receipt: Receipt) -> Result { Ok(Pointer::new(Cid::try_from(receipt)?)) diff --git a/homestar-core/src/workflow/task.rs b/homestar-core/src/workflow/task.rs index 4115c744..6adff5e6 100644 --- a/homestar-core/src/workflow/task.rs +++ b/homestar-core/src/workflow/task.rs @@ -1,7 +1,10 @@ //! A [Task] is the smallest unit of work that can be requested from a UCAN. -use super::{instruction::RunInstruction, prf::UcanPrf, Pointer}; -use anyhow::anyhow; +use crate::{ + consts::DAG_CBOR, + workflow::{instruction::RunInstruction, prf::UcanPrf, Error as WorkflowError, Pointer}, + Unit, +}; use libipld::{ cbor::DagCborCodec, cid::{ @@ -14,7 +17,6 @@ use libipld::{ }; use std::collections::BTreeMap; -const DAG_CBOR: u64 = 0x71; const RUN_KEY: &str = "run"; const CAUSE_KEY: &str = "cause"; const METADATA_KEY: &str = "meta"; @@ -84,7 +86,7 @@ where /// Return the [Cid] of the contained [Instruction]. /// /// [Instruction]: super::Instruction - pub fn instruction_cid(&self) -> anyhow::Result { + pub fn instruction_cid(&self) -> Result> { match &self.run { RunInstruction::Expanded(instruction) => Ok(Cid::try_from(instruction.to_owned())?), RunInstruction::Ptr(instruction_ptr) => Ok(instruction_ptr.cid()), @@ -113,7 +115,7 @@ impl TryFrom for Task<'_, T> where T: From, { - type Error = anyhow::Error; + type Error = WorkflowError; fn try_from(ipld: Ipld) -> Result { let map = from_ipld::>(ipld)?; @@ -121,7 +123,7 @@ where Ok(Self { run: RunInstruction::try_from( map.get(RUN_KEY) - .ok_or_else(|| anyhow!("no `run` set"))? + .ok_or_else(|| WorkflowError::::MissingFieldError(RUN_KEY.to_string()))? .to_owned(), )?, cause: map @@ -133,11 +135,11 @@ where .and_then(|ipld| ipld.to_owned().try_into().ok()), meta: map .get(METADATA_KEY) - .ok_or_else(|| anyhow!("no `metadata` field set"))? + .ok_or_else(|| WorkflowError::::MissingFieldError(METADATA_KEY.to_string()))? .to_owned(), prf: UcanPrf::try_from( map.get(PROOF_KEY) - .ok_or_else(|| anyhow!("no proof field set"))? + .ok_or_else(|| WorkflowError::::MissingFieldError(PROOF_KEY.to_string()))? .to_owned(), )?, }) @@ -148,7 +150,7 @@ impl TryFrom<&Ipld> for Task<'_, T> where T: From, { - type Error = anyhow::Error; + type Error = WorkflowError; fn try_from<'a>(ipld: &Ipld) -> Result { TryFrom::try_from(ipld.to_owned()) @@ -159,7 +161,7 @@ impl TryFrom> for Pointer where Ipld: From, { - type Error = anyhow::Error; + type Error = WorkflowError; fn try_from(task: Task<'_, T>) -> Result { Ok(Pointer::new(Cid::try_from(task)?)) @@ -170,7 +172,7 @@ impl TryFrom> for Cid where Ipld: From, { - type Error = anyhow::Error; + type Error = WorkflowError; fn try_from(task: Task<'_, T>) -> Result { let ipld: Ipld = task.into(); diff --git a/homestar-runtime/Cargo.toml b/homestar-runtime/Cargo.toml index 289faece..bb14f805 100644 --- a/homestar-runtime/Cargo.toml +++ b/homestar-runtime/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "homestar-runtime" version = "0.1.0" -description = "Homestar runtime implementation" +description = "Homestar CLI" keywords = ["ipfs", "workflows", "ipld", "ipvm"] categories = ["workflow-engines", "distributed-systems", "runtimes", "networking"] include = ["/src", "README.md", "LICENSE"] @@ -33,14 +33,13 @@ anyhow = { workspace = true } async-trait = "0.1" axum = { version = "0.6", features = ["ws", "headers"] } byte-unit = { version = "4.0", default-features = false } -clap = { version = "4.1", features = ["derive"] } +clap = { version = "4.1", features = ["derive", "color", "help"] } concat-in-place = "1.1" config = "0.13" console-subscriber = { version = "0.1", default-features = false, features = [ "parking_lot" ], optional = true } crossbeam = "0.8" dagga = "0.2" diesel = { version = "2.1", features = ["sqlite", "r2d2", "returning_clauses_for_sqlite_3_35"] } -diesel_migrations = "2.1" dotenvy = "0.15" enum-assoc = "0.4" futures = "0.3" @@ -53,7 +52,6 @@ indexmap = "1.9" ipfs-api = { version = "0.17", optional = true } ipfs-api-backend-hyper = { version = "0.6", features = ["with-builder", "with-send-sync"], optional = true } itertools = "0.10" -json = "0.12" libipld = "0.16" libp2p = { version = "0.51", features = ["kad", "request-response", "macros", "identify", "mdns", "gossipsub", "tokio", "dns", "mplex", "tcp", "noise", "yamux", "websocket"] } libp2p-identity = "0.1" @@ -74,7 +72,9 @@ url = "2.3" [dev-dependencies] criterion = "0.4" +diesel_migrations = "2.1" homestar-core = { version = "0.1", path = "../homestar-core", features = [ "test_utils" ] } +json = "0.12" tokio-tungstenite = "0.18" [features] diff --git a/homestar-runtime/src/cli.rs b/homestar-runtime/src/cli.rs index ef9c308c..4bbbc416 100644 --- a/homestar-runtime/src/cli.rs +++ b/homestar-runtime/src/cli.rs @@ -2,9 +2,17 @@ use clap::Parser; +const HELP_TEMPLATE: &str = "{about} {version} + +USAGE: + {usage} + +{all-args} +"; + /// CLI arguments. #[derive(Parser, Debug)] -#[command(author, version, about, long_about = None)] +#[command(author, version, about, long_about = None, help_template = HELP_TEMPLATE)] pub struct Args { /// Ipvm-specific [Argument]. #[clap(subcommand)] @@ -14,12 +22,15 @@ pub struct Args { /// CLI Argument types. #[derive(Debug, Parser)] pub enum Argument { - /// TODO: Run [Workflow] given a file. - /// - /// [Workflow]: crate::Workflow + /// Run a workflow given a file. Run { /// Configuration file for *homestar* node settings. - #[arg(short, long)] + #[arg( + short = 'c', + long = "config", + value_name = "CONFIG", + help = "runtime configuration file" + )] runtime_config: Option, }, } diff --git a/homestar-runtime/src/lib.rs b/homestar-runtime/src/lib.rs index c1cd9a94..9a9bb6fd 100644 --- a/homestar-runtime/src/lib.rs +++ b/homestar-runtime/src/lib.rs @@ -36,7 +36,6 @@ pub use receipt::Receipt; pub use runtime::*; pub use settings::Settings; pub use worker::Worker; -pub use workflow::Workflow; /// Test utilities. #[cfg(any(test, feature = "test_utils"))] diff --git a/homestar-runtime/src/main.rs b/homestar-runtime/src/main.rs index 6624b28c..bb5637e9 100644 --- a/homestar-runtime/src/main.rs +++ b/homestar-runtime/src/main.rs @@ -15,6 +15,7 @@ use homestar_runtime::{ }; use std::sync::Arc; +/// TODO: All #[tokio::main(flavor = "multi_thread")] async fn main() -> Result<()> { logger::init()?; diff --git a/homestar-runtime/src/network/eventloop.rs b/homestar-runtime/src/network/eventloop.rs index 8e6d68e8..8e2ae516 100644 --- a/homestar-runtime/src/network/eventloop.rs +++ b/homestar-runtime/src/network/eventloop.rs @@ -1,12 +1,11 @@ //! [EventLoop] implementation for handling network events and messages, as well //! as commands for the running [libp2p] node. -use super::swarm::TopicMessage; #[cfg(feature = "ipfs")] use crate::IpfsCli; use crate::{ db::{Connection, Database, Db}, - network::swarm::{ComposedBehaviour, ComposedEvent}, + network::swarm::{ComposedBehaviour, ComposedEvent, TopicMessage}, settings, workflow, Receipt, }; use anyhow::{anyhow, Result}; @@ -36,6 +35,9 @@ use tokio::sync::mpsc; /// [Receipt]: homestar_core::workflow::receipt pub const RECEIPTS_TOPIC: &str = "receipts"; +const RECEIPT_CODE: &[u8; 16] = b"homestar_receipt"; +const WORKFLOW_INFO_CODE: &[u8; 17] = b"homestar_workflow"; + type WorkerSender = channel::Sender<(Cid, FoundEvent)>; /// Event loop handler for [libp2p] network events and commands. @@ -188,25 +190,31 @@ impl EventLoop { if let Ok(receipt_bytes) = Vec::try_from(invocation_receipt) { let ref_bytes = &receipt_bytes; - let value = veccat!(consts::INVOCATION_VERSION.as_bytes() ref_bytes); + let receipt_value = + veccat!(consts::INVOCATION_VERSION.as_bytes() RECEIPT_CODE ref_bytes); let _id = self .swarm .behaviour_mut() .kademlia - .put_record(Record::new(instruction_bytes, value.to_vec()), quorum) + .put_record( + Record::new(instruction_bytes, receipt_value.to_vec()), + quorum, + ) .map_err(anyhow::Error::msg)?; // Store workflow_receipt join information. let _ = Db::store_workflow_receipt(workflow_info.cid, receipt_cid, conn); workflow_info.increment_progress(receipt_cid); + + let wf_cid_bytes = workflow_info.cid_as_bytes(); + let wf_bytes = &Vec::try_from(workflow_info)?; + let wf_value = veccat!(WORKFLOW_INFO_CODE wf_bytes); + let _id = self .swarm .behaviour_mut() .kademlia - .put_record( - Record::new(workflow_info.cid.to_bytes(), Vec::try_from(workflow_info)?), - quorum, - ) + .put_record(Record::new(wf_cid_bytes, wf_value), quorum) .map_err(anyhow::Error::msg)?; Ok((receipt_cid, receipt_bytes)) @@ -235,12 +243,21 @@ impl EventLoop { fn on_found_record(key_cid: Cid, value: Vec) -> Result { match value { - value if value.starts_with(consts::INVOCATION_VERSION.as_bytes()) => { - let receipt_bytes = &value[consts::INVOCATION_VERSION.as_bytes().len()..]; + value + if value + .starts_with(&veccat!(consts::INVOCATION_VERSION.as_bytes() RECEIPT_CODE)) => + { + let receipt_bytes = + &value[consts::INVOCATION_VERSION.as_bytes().len() + RECEIPT_CODE.len()..]; let invocation_receipt = InvocationReceipt::try_from(receipt_bytes.to_vec())?; let receipt = Receipt::try_with(Pointer::new(key_cid), &invocation_receipt)?; Ok(FoundEvent::Receipt(receipt)) } + value if value.starts_with(WORKFLOW_INFO_CODE) => { + let workflow_info_bytes = &value[WORKFLOW_INFO_CODE.len()..]; + let workflow_info = workflow::Info::try_from(workflow_info_bytes.to_vec())?; + Ok(FoundEvent::Workflow(workflow_info)) + } _ => Err(anyhow!( "record version mismatch, current version: {}", consts::INVOCATION_VERSION @@ -393,7 +410,7 @@ pub enum Event { FindReceipt(Cid, WorkerSender), /// Find a [Workflow], stored as [workflow::Info], in the DHT. /// - /// [Workflow]: crate::Workflow + /// [Workflow]: homestar_core::Workflow FindWorkflow(Cid, WorkerSender), } @@ -409,7 +426,13 @@ pub enum FoundEvent { #[cfg(test)] mod test { use super::*; - use crate::test_utils; + use crate::{test_utils, workflow}; + use homestar_core::{ + test_utils::workflow as workflow_test_utils, + workflow::{config::Resources, instruction::RunInstruction, prf::UcanPrf, Task}, + Workflow, + }; + use homestar_wasm::io::Arg; #[test] fn found_receipt_record() { @@ -417,7 +440,7 @@ mod test { let instruction_bytes = receipt.instruction_cid_as_bytes(); let bytes = Vec::try_from(invocation_receipt).unwrap(); let ref_bytes = &bytes; - let value = veccat!(consts::INVOCATION_VERSION.as_bytes() ref_bytes); + let value = veccat!(consts::INVOCATION_VERSION.as_bytes() RECEIPT_CODE ref_bytes); let record = Record::new(instruction_bytes, value.to_vec()); let record_value = record.value; if let FoundEvent::Receipt(found_receipt) = @@ -429,4 +452,38 @@ mod test { panic!("Incorrect event type") } } + + #[test] + fn found_workflow_record() { + let config = Resources::default(); + let (instruction1, instruction2, _) = + workflow_test_utils::related_wasm_instructions::(); + let task1 = Task::new( + RunInstruction::Expanded(instruction1.clone()), + config.clone().into(), + UcanPrf::default(), + ); + let task2 = Task::new( + RunInstruction::Expanded(instruction2), + config.into(), + UcanPrf::default(), + ); + + let workflow = Workflow::new(vec![task1.clone(), task2.clone()]); + let workflow_info = + workflow::Info::default(Cid::try_from(workflow.clone()).unwrap(), workflow.len()); + let workflow_cid_bytes = workflow_info.cid_as_bytes(); + let bytes = Vec::try_from(workflow_info.clone()).unwrap(); + let ref_bytes = &bytes; + let value = veccat!(WORKFLOW_INFO_CODE ref_bytes); + let record = Record::new(workflow_cid_bytes, value.to_vec()); + let record_value = record.value; + if let FoundEvent::Workflow(found_workflow) = + EventLoop::on_found_record(Cid::try_from(workflow).unwrap(), record_value).unwrap() + { + assert_eq!(found_workflow, workflow_info); + } else { + panic!("Incorrect event type") + } + } } diff --git a/homestar-runtime/src/scheduler.rs b/homestar-runtime/src/scheduler.rs index f1bcf80a..b846942b 100644 --- a/homestar-runtime/src/scheduler.rs +++ b/homestar-runtime/src/scheduler.rs @@ -5,13 +5,16 @@ use crate::{ db::{Connection, Database}, - workflow::{Resource, Vertex}, - Db, Workflow, + workflow::{Builder, Resource, Vertex}, + Db, }; use anyhow::Result; use dagga::Node; use futures::future::BoxFuture; -use homestar_core::workflow::{InstructionResult, LinkMap, Pointer}; +use homestar_core::{ + workflow::{InstructionResult, LinkMap, Pointer}, + Workflow, +}; use homestar_wasm::io::Arg; use indexmap::IndexMap; use libipld::Cid; @@ -78,7 +81,8 @@ impl<'a> TaskScheduler<'a> { where F: FnOnce(Vec) -> BoxFuture<'a, Result>>>, { - let graph = workflow.graph()?; + let builder = Builder::new(workflow); + let graph = builder.graph()?; let mut schedule = graph.schedule; let schedule_length = schedule.len(); let fetched = fetch_fn(graph.resources).await?; diff --git a/homestar-runtime/src/tasks/wasm.rs b/homestar-runtime/src/tasks/wasm.rs index 939ac691..f0583a41 100644 --- a/homestar-runtime/src/tasks/wasm.rs +++ b/homestar-runtime/src/tasks/wasm.rs @@ -1,10 +1,9 @@ use super::FileLoad; -use anyhow::Result; use async_trait::async_trait; use homestar_core::workflow::input::Args; use homestar_wasm::{ io::{Arg, Output}, - wasmtime::{world::Env, State, World}, + wasmtime::{world::Env, Error as WasmRuntimeError, State, World}, }; #[allow(missing_debug_implementations)] @@ -13,7 +12,7 @@ pub(crate) struct WasmContext { } impl WasmContext { - pub(crate) fn new(data: State) -> Result { + pub(crate) fn new(data: State) -> Result { Ok(Self { env: World::default(data)?, }) @@ -25,7 +24,7 @@ impl WasmContext { bytes: Vec, fun_name: &'a str, args: Args, - ) -> Result { + ) -> Result { let env = World::instantiate_with_current_env(bytes, fun_name, &mut self.env).await?; env.execute(args).await } diff --git a/homestar-runtime/src/worker.rs b/homestar-runtime/src/worker.rs index cf105198..206d88d7 100644 --- a/homestar-runtime/src/worker.rs +++ b/homestar-runtime/src/worker.rs @@ -8,15 +8,18 @@ use crate::{ scheduler::TaskScheduler, tasks::{RegisteredTasks, WasmContext}, workflow::{self, Resource}, - Db, Receipt, Workflow, + Db, Receipt, }; use anyhow::{anyhow, bail, Result}; use crossbeam::channel; use futures::FutureExt; #[cfg(feature = "ipfs")] use futures::StreamExt; -use homestar_core::workflow::{ - prf::UcanPrf, InstructionResult, Pointer, Receipt as InvocationReceipt, +use homestar_core::{ + workflow::{ + error::ResolveError, prf::UcanPrf, InstructionResult, Pointer, Receipt as InvocationReceipt, + }, + Workflow, }; use homestar_wasm::{io::Arg, wasmtime::State}; use indexmap::IndexMap; @@ -323,15 +326,22 @@ impl<'a> Worker<'a> { self.event_sender.blocking_send(Event::FindReceipt( cid, sender, - ))?; + )).map_err(|err| ResolveError::TransportError(err.to_string()))?; let found = match receiver.recv_deadline( Instant::now() + Duration::from_secs(settings.p2p_timeout_secs), ) { Ok((found_cid, FoundEvent::Receipt(found))) if found_cid == cid => { found } - Ok(_) => bail!("only one worker channel per worker"), - Err(err) => bail!("error returning invocation receipt for {cid}: {err}"), + Ok(_) => + homestar_core::bail!( + ResolveError::UnresolvedCidError( + "wrong or unexpected event message received".to_string()) + ), + Err(err) => + homestar_core::bail!(ResolveError::UnresolvedCidError( + format!("timeout deadline reached for invocation receipt @ {cid}: {err}")) + ), }; Ok(found.output_as_arg()) diff --git a/homestar-runtime/src/workflow.rs b/homestar-runtime/src/workflow.rs index a94b711d..428b3ed3 100644 --- a/homestar-runtime/src/workflow.rs +++ b/homestar-runtime/src/workflow.rs @@ -6,22 +6,18 @@ use crate::scheduler::ExecutionGraph; use anyhow::{anyhow, bail}; use dagga::{self, dot::DagLegend, Node}; -use homestar_core::workflow::{ - input::{Parse, Parsed}, - instruction::RunInstruction, - Instruction, Invocation, Pointer, Task, +use homestar_core::{ + workflow::{ + input::{Parse, Parsed}, + instruction::RunInstruction, + Instruction, Invocation, Pointer, + }, + Workflow, }; use homestar_wasm::io::Arg; use indexmap::IndexMap; -use libipld::{ - cbor::DagCborCodec, - json::DagJsonCodec, - multihash::{Code, MultihashDigest}, - prelude::Codec, - serde::from_ipld, - Cid, Ipld, -}; -use std::{collections::BTreeMap, path::Path}; +use libipld::Cid; +use std::path::Path; use url::Url; mod info; @@ -33,8 +29,9 @@ pub(crate) use settings::Settings; type Dag<'a> = dagga::Dag, usize>; -const DAG_CBOR: u64 = 0x71; -const TASKS_KEY: &str = "tasks"; +/// A [Workflow] [Builder] wrapper for the runtime. +#[derive(Debug, Clone, PartialEq)] +pub struct Builder<'a>(Workflow<'a, Arg>); /// A resource can refer to a [URI] or [Cid] /// being accessed. @@ -54,6 +51,7 @@ pub enum Resource { /// ahead-of-time. /// /// [Dag]: dagga::Dag +/// [Task]: homestar_core::workflow::Task #[derive(Debug, Clone)] pub struct AOTContext<'a> { dag: Dag<'a>, @@ -100,32 +98,20 @@ impl<'a> Vertex<'a> { } } -/// Workflow composed of [tasks]. -/// -/// [tasks]: Task -#[derive(Debug, Clone, PartialEq)] -pub struct Workflow<'a, T> { - tasks: Vec>, -} - -impl<'a> Workflow<'a, Arg> { - /// Create a new [Workflow] given a set of tasks. - pub fn new(tasks: Vec>) -> Self { - Self { tasks } +impl<'a> Builder<'a> { + /// Create a new [Workflow] [Builder] given a [Workflow]. + pub fn new(workflow: Workflow<'a, Arg>) -> Builder<'a> { + Builder(workflow) } - /// Length of workflow given a series of [tasks]. - /// - /// [tasks]: Task - pub fn len(&self) -> u32 { - self.tasks.len() as u32 + /// Return an owned [Workflow] from the [Builder]. + pub fn into_inner(self) -> Workflow<'a, Arg> { + self.0 } - /// Whether [Workflow] contains [tasks] or not. - /// - /// [tasks]: Task - pub fn is_empty(&self) -> bool { - self.tasks.is_empty() + /// Return a referenced [Workflow] from the [Builder]. + pub fn inner(&self) -> &Workflow<'a, Arg> { + &self.0 } /// Convert the [Workflow] into an batch-separated [ExecutionGraph]. @@ -140,24 +126,16 @@ impl<'a> Workflow<'a, Arg> { } } - /// Return workflow as stringified Json. - pub fn to_json(&self) -> anyhow::Result { - let encoded = DagJsonCodec.encode(&Ipld::from(self.to_owned()))?; - let s = std::str::from_utf8(&encoded) - .map_err(|e| anyhow!("cannot stringify encoded value: {e}"))?; - Ok(s.to_string()) - } - fn aot(self) -> anyhow::Result> { let lookup_table = self.lookup_table()?; let (dag, resources) = - self.tasks.into_iter().enumerate().try_fold( + self.into_inner().tasks().into_iter().enumerate().try_fold( (Dag::default(), vec![]), |(mut dag, mut resources), (i, task)| { + let instr_cid = task.instruction_cid()?.to_string(); // Clone as we're owning the struct going backward. let ptr: Pointer = Invocation::::from(task.clone()).try_into()?; - let instr_cid = task.instruction_cid()?.to_string(); let RunInstruction::Expanded(instr) = task.into_instruction() else { bail!("workflow tasks/instructions must be expanded / inlined") @@ -196,7 +174,8 @@ impl<'a> Workflow<'a, Arg> { /// Generate an [IndexMap] lookup table of task instruction CIDs to a /// unique enumeration. fn lookup_table(&self) -> anyhow::Result> { - self.tasks + self.inner() + .tasks_ref() .iter() .enumerate() .try_fold(IndexMap::new(), |mut acc, (i, t)| { @@ -206,61 +185,12 @@ impl<'a> Workflow<'a, Arg> { } } -impl From> for Ipld { - fn from(workflow: Workflow<'_, Arg>) -> Self { - Ipld::Map(BTreeMap::from([( - TASKS_KEY.into(), - Ipld::List( - workflow - .tasks - .into_iter() - .map(Ipld::from) - .collect::>(), - ), - )])) - } -} - -impl TryFrom for Workflow<'_, Arg> { - type Error = anyhow::Error; - - fn try_from(ipld: Ipld) -> Result { - let map = from_ipld::>(ipld)?; - let ipld = map - .get(TASKS_KEY) - .ok_or_else(|| anyhow!("no `tasks` set"))?; - - let tasks = if let Ipld::List(tasks) = ipld { - let tasks = tasks.iter().fold(vec![], |mut acc, ipld| { - acc.push(ipld.try_into().unwrap()); - acc - }); - tasks - } else { - bail!("unexpected conversion type") - }; - - Ok(Self { tasks }) - } -} - -impl TryFrom> for Cid { - type Error = anyhow::Error; - - fn try_from(workflow: Workflow<'_, Arg>) -> Result { - let ipld: Ipld = workflow.into(); - let bytes = DagCborCodec.encode(&ipld)?; - let hash = Code::Sha3_256.digest(&bytes); - Ok(Cid::new_v1(DAG_CBOR, hash)) - } -} - #[cfg(test)] mod test { use super::*; use homestar_core::{ test_utils, - workflow::{config::Resources, instruction::RunInstruction, prf::UcanPrf}, + workflow::{config::Resources, instruction::RunInstruction, prf::UcanPrf, Task}, }; use std::path::Path; @@ -281,7 +211,8 @@ mod test { ); let workflow = Workflow::new(vec![task1, task2]); - let aot = workflow.aot().unwrap(); + let builder = Builder::new(workflow); + let aot = builder.aot().unwrap(); aot.dot("test", Path::new("test.dot")).unwrap(); assert!(Path::new("test.dot").exists()); @@ -306,7 +237,8 @@ mod test { let tasks = vec![task1.clone(), task2.clone()]; let workflow = Workflow::new(tasks); - let dag = workflow.aot().unwrap().dag; + let builder = Builder::new(workflow); + let dag = builder.aot().unwrap().dag; let instr1 = task1.instruction_cid().unwrap().to_string(); let instr2 = task2.instruction_cid().unwrap().to_string(); @@ -331,7 +263,8 @@ mod test { ); let workflow = Workflow::new(vec![task1.clone(), task2.clone()]); - let dag = workflow.aot().unwrap().dag; + let builder = Builder::new(workflow); + let dag = builder.aot().unwrap().dag; let instr1 = task1.instruction_cid().unwrap().to_string(); let instr2 = task2.instruction_cid().unwrap().to_string(); @@ -376,7 +309,8 @@ mod test { let instr3 = task3.instruction_cid().unwrap().to_string(); let instr4 = task4.instruction_cid().unwrap().to_string(); - let schedule = workflow.graph().unwrap().schedule; + let builder = Builder::new(workflow); + let schedule = builder.graph().unwrap().schedule; let nodes = schedule .into_iter() .fold(vec![], |mut acc: Vec, vec| { @@ -403,50 +337,4 @@ mod test { || nodes == vec![format!("{instr4}, {instr1}"), instr2, instr3] ); } - - #[test] - fn workflow_to_json() { - let config = Resources::default(); - let (instruction1, instruction2, _) = - test_utils::workflow::related_wasm_instructions::(); - let task1 = Task::new( - RunInstruction::Expanded(instruction1), - config.clone().into(), - UcanPrf::default(), - ); - let task2 = Task::new( - RunInstruction::Expanded(instruction2), - config.into(), - UcanPrf::default(), - ); - - let workflow = Workflow::new(vec![task1.clone(), task2.clone()]); - - let json_string = workflow.to_json().unwrap(); - - let json_val = json::from(json_string.clone()); - assert_eq!(json_string, json_val.to_string()); - } - - #[test] - fn ipld_roundtrip_workflow() { - let config = Resources::default(); - let (instruction1, instruction2, _) = - test_utils::workflow::related_wasm_instructions::(); - let task1 = Task::new( - RunInstruction::Expanded(instruction1), - config.clone().into(), - UcanPrf::default(), - ); - let task2 = Task::new( - RunInstruction::Expanded(instruction2), - config.into(), - UcanPrf::default(), - ); - - let workflow = Workflow::new(vec![task1.clone(), task2.clone()]); - let ipld = Ipld::from(workflow.clone()); - - assert_eq!(workflow, ipld.try_into().unwrap()) - } } diff --git a/homestar-runtime/src/workflow/info.rs b/homestar-runtime/src/workflow/info.rs index 2a703b9c..2a8dcc26 100644 --- a/homestar-runtime/src/workflow/info.rs +++ b/homestar-runtime/src/workflow/info.rs @@ -12,7 +12,7 @@ const NUM_TASKS_KEY: &str = "num_tasks"; /// [Workflow] information stored in the database. /// -/// [Workflow]: crate::Workflow +/// [Workflow]: homestar_core::Workflow #[derive(Debug, Clone, PartialEq, Queryable, Insertable, Identifiable, Selectable, Hash)] #[diesel(table_name = crate::db::schema::workflows, primary_key(cid))] pub struct Stored { @@ -28,7 +28,7 @@ impl Stored { /// [Workflow] information stored in the database, tied to [receipts]. /// -/// [Workflow]: crate::Workflow +/// [Workflow]: homestar_core::Workflow /// [receipts]: crate::Receipt #[derive(Debug, Clone, PartialEq, Queryable, Insertable, Identifiable, Associations, Hash)] #[diesel(belongs_to(Receipt, foreign_key = receipt_cid))] @@ -52,7 +52,7 @@ impl StoredReceipt { /// to relate to it as a key-value relationship of (workflow) /// cid => [Info]. /// -/// [Workflow]: crate::Workflow +/// [Workflow]: homestar_core::Workflow #[derive(Debug, Clone, PartialEq)] pub struct Info { pub(crate) cid: Cid, @@ -84,13 +84,27 @@ impl Info { } } + /// Get unique identifier, [Cid], of [Workflow]. + /// + /// [Workflow]: homestar_core::Workflow + pub fn cid(&self) -> Cid { + self.cid + } + /// Get the [Cid] of a [Workflow] as a [String]. /// - /// [Workflow]: crate::Workflow - pub fn cid(&self) -> String { + /// [Workflow]: homestar_core::Workflow + pub fn cid_as_string(&self) -> String { self.cid.to_string() } + /// Get the [Cid] of a [Workflow] as bytes. + /// + /// [Workflow]: homestar_core::Workflow + pub fn cid_as_bytes(&self) -> Vec { + self.cid().to_bytes() + } + /// Set the progress / step of the [Info]. pub fn increment_progress(&mut self, new_cid: Cid) { self.progress.push(new_cid); @@ -174,10 +188,10 @@ impl TryFrom> for Info { #[cfg(test)] mod test { use super::*; - use crate::Workflow; use homestar_core::{ test_utils, workflow::{config::Resources, instruction::RunInstruction, prf::UcanPrf, Task}, + Workflow, }; use homestar_wasm::io::Arg; diff --git a/homestar-runtime/src/workflow/settings.rs b/homestar-runtime/src/workflow/settings.rs index 8af94df3..803855cd 100644 --- a/homestar-runtime/src/workflow/settings.rs +++ b/homestar-runtime/src/workflow/settings.rs @@ -1,4 +1,6 @@ -//! Workflow settings for a worker's run/execution. +//! [Workflow] settings for a worker's run/execution. +//! +//! [Workflow]: crate::Workflow /// Workflow settings. #[derive(Debug, Clone, PartialEq)] diff --git a/homestar-wasm/Cargo.toml b/homestar-wasm/Cargo.toml index b39f4535..9633d4f8 100644 --- a/homestar-wasm/Cargo.toml +++ b/homestar-wasm/Cargo.toml @@ -30,9 +30,8 @@ itertools = "0.10" libipld = "0.16" libipld-core = { version = "0.16", features = ["serde-codec", "serde"] } rust_decimal = "1.28" -serde_ipld_dagcbor = "0.2" stacker = "0.1" -thiserror = "1.0" +thiserror = { workspace = true } tracing = { workspace = true } wasi-common = "8.0" wasmparser = "0.104" @@ -44,6 +43,7 @@ wit-component = "0.8" [dev-dependencies] criterion = "0.4" image = "0.24" +serde_ipld_dagcbor = "0.2" tokio = { workspace = true } [features] diff --git a/homestar-wasm/src/error.rs b/homestar-wasm/src/error.rs new file mode 100644 index 00000000..a1b8667c --- /dev/null +++ b/homestar-wasm/src/error.rs @@ -0,0 +1,107 @@ +//! Error types and implementations for `Ipld <=> Wasm` interaction(s). + +/// Error types related for [Ipld] to/from [Wasm value] interpretation. +/// +/// [Ipld]: libipld::Ipld +/// [Wasm value]: wasmtime::component:Val +#[derive(thiserror::Error, Debug)] +#[allow(clippy::enum_variant_names)] +pub enum InterpreterError { + /// Error encoding structure to a [Cid]. + /// + /// [Cid]: libipld::cid::Cid + #[error("failed to encode CID: {0}")] + CidError(#[from] libipld::cid::Error), + /// Error converting from [Decimal] precision to [f64]. + /// + /// [Decimal]: rust_decimal::Decimal + /// [f64]: f64 + #[error("Failed to convert from decimal to f64 float {0}")] + DecimalToFloatError(rust_decimal::Decimal), + /// Error converting from from [f32] to [Decimal]. + /// + /// [Decimal]: rust_decimal::Decimal + #[error("Failed to convert from f32 float {0} to decimal")] + FloatToDecimalError(f32), + /// Error converting from [Ipld] structure. + /// + /// [Ipld]: libipld::Ipld + #[error("cannot convert from Ipld structure: {0}")] + FromIpldError(#[from] libipld::error::SerdeError), + /// Error casting from [Ipld] [i128] structure to a lower precision integer. + /// + /// [Ipld]: libipld::Ipld + #[error("failed to cast Ipld i128 to integer type: {0}")] + IpldToIntError(#[from] std::num::TryFromIntError), + /// Error converting from [Ipld] structure to [Wit] structure. + /// + /// [Ipld]: libipld::Ipld + /// [Wit]: wasmtime::component::Val + #[error("no compatible Ipld type for Wit structure: {0:#?}")] + IpldToWitError(String), + /// Error involving mismatches with [Ipld] maps. + /// + /// [Ipld]: libipld::Ipld + #[error("{0}")] + MapTypeError(String), + /// Failure to match or find [Wit union] discriminant. + /// + /// [Wit union]: wasmtime::component::Union + #[error("no match within : {0}")] + NoDiscriminantMatched(String), + /// Bubble-up [TagsError] errors while executing the interpreter. + #[error(transparent)] + TagsError(#[from] TagsError), + /// Type mismatch error between expected and given types. + #[error("component type mismatch: expected: {expected}, found: {given:#?}")] + TypeMismatchError { + /// Expected type. + expected: String, + /// Given type or lack thereof. + given: Option, + }, + /// Failed to parse, handle, or otherwise convert to/from/between + /// Wit/Wasm types. + /// + /// The underlying error is a [anyhow::Error], per the + /// [wasmtime::component::types::Type] implementation. + #[error(transparent)] + WitTypeError(#[from] anyhow::Error), + /// Error converting from [Wit] structure to [Ipld] structure. + /// + /// [Ipld]: libipld::Ipld + /// [Wit]: wasmtime::component::Val + #[error("no compatible WIT type for Ipld structure: {0:#?}")] + WitToIpldError(libipld::Ipld), +} + +/// Error type for handling [Tags] stack structure. +/// +/// [Tags]: crate::wasmtime::ipld::Tags +#[allow(clippy::enum_variant_names)] +#[derive(thiserror::Error, Debug)] +pub enum TagsError { + /// An error returned by [atomic_refcell::AtomicRefCell::try_borrow]. + #[error("{0}")] + BorrowError(atomic_refcell::BorrowError), + /// An error returned by [atomic_refcell::AtomicRefCell::try_borrow_mut]. + #[error("{0}")] + BorrowMutError(atomic_refcell::BorrowMutError), + /// Working with [Tags] stack structure should never be empty. + /// + /// [Tags]: crate::wasmtime::ipld::Tags + #[error("structure must contain at least one element")] + TagsEmptyError, +} + +impl From for TagsError { + fn from(e: atomic_refcell::BorrowError) -> Self { + TagsError::BorrowError(e) + } +} + +impl From for TagsError { + fn from(e: atomic_refcell::BorrowMutError) -> Self { + TagsError::BorrowMutError(e) + } +} diff --git a/homestar-wasm/src/io.rs b/homestar-wasm/src/io.rs index e6fda776..735e2cab 100644 --- a/homestar-wasm/src/io.rs +++ b/homestar-wasm/src/io.rs @@ -1,23 +1,27 @@ //! IO (input/output) types for the Wasm execution. -use anyhow::anyhow; +use crate::{error::InterpreterError, wasmtime::ipld::RuntimeVal}; use enum_as_inner::EnumAsInner; use homestar_core::workflow::{ + error::InputParseError, input::{self, Args, Parsed}, - Input, + Error as WorkflowError, Input, }; use libipld::{serde::from_ipld, Ipld}; use std::{collections::btree_map::BTreeMap, fmt}; use wasmtime; -use crate::wasmtime::ipld::RuntimeVal; - -/// +/// Argument for Wasm execution, which can either be +/// an [Ipld] structure or a [wasmtime::component::Val]. #[derive(Clone, Debug, PartialEq, EnumAsInner)] pub enum Arg { + /// [Ipld] structure, which can be interpreted into a Wasm [Val]. /// + /// [Val]: wasmtime::component::Val Ipld(Ipld), + /// A direct [Wasm value] as argument input. /// + /// [Wasm value]: wasmtime::component::Val Value(wasmtime::component::Val), } @@ -52,39 +56,43 @@ impl From for Ipld { } impl input::Parse for Input { - fn parse(&self) -> anyhow::Result> { + fn parse(&self) -> Result, InputParseError> { if let Input::Ipld(ref ipld) = self { let map = from_ipld::>(ipld.to_owned())?; - let func = map - .get("func") - .ok_or_else(|| anyhow!("wrong task input format: {ipld:?}"))?; + let func = map.get("func").ok_or_else(|| { + InputParseError::WorkflowError(WorkflowError::MissingFieldError("func".to_string())) + })?; - let wasm_args = map - .get("args") - .ok_or_else(|| anyhow!("wrong task input format: {ipld:?}"))?; + let wasm_args = map.get("args").ok_or_else(|| { + InputParseError::WorkflowError(WorkflowError::MissingFieldError("args".to_string())) + })?; let args: Args = wasm_args.to_owned().try_into()?; Ok(Parsed::with_fn(from_ipld::(func.to_owned())?, args)) } else { - Err(anyhow!("unexpected task input")) + Err(InputParseError::UnexpectedTaskInput(self.clone())) } } } -/// +/// Enumeration of possible outputs from Wasm execution. #[derive(Clone, Debug, PartialEq)] pub enum Output { + /// A singular [Wasm value] as output. /// + /// [Wasm value]: wasmtime::component::Val Value(wasmtime::component::Val), + /// A list of [Wasm values] as output. /// + /// [Wasm value]: wasmtime::component::Val Values(Vec), - /// + /// No output, treated as `void`. Void, } impl TryFrom for Ipld { - type Error = anyhow::Error; + type Error = InterpreterError; fn try_from(output: Output) -> Result { match output { diff --git a/homestar-wasm/src/lib.rs b/homestar-wasm/src/lib.rs index d2b14177..6605766d 100644 --- a/homestar-wasm/src/lib.rs +++ b/homestar-wasm/src/lib.rs @@ -14,6 +14,7 @@ //! [homestar-runtime]: //! [Wasmtime]: +pub mod error; pub mod io; pub mod test_utils; pub mod wasmtime; diff --git a/homestar-wasm/src/wasmtime/error.rs b/homestar-wasm/src/wasmtime/error.rs new file mode 100644 index 00000000..217bedf2 --- /dev/null +++ b/homestar-wasm/src/wasmtime/error.rs @@ -0,0 +1,50 @@ +//! Error types and implementations for Wasm (via [Wasmtime]) execution, +//! instantiation, and runtime interaction(s). +//! +//! [Wasmtime]: + +/// Generic error type for Wasm execution, conversions, instantiations, etc. +#[derive(thiserror::Error, Debug)] +pub enum Error { + /// Failure to convert from Wasm binary into Wasm component. + #[error("cannot convert from binary structure to Wasm component")] + IntoWasmComponentError(#[source] anyhow::Error), + /// Bubble-up [ResolveError]s for [Cid]s still awaiting resolution. + /// + /// [ResolveError]: homestar_core::workflow::error::ResolveError + /// [Cid]: libipld::Cid + #[error(transparent)] + PromiseError(#[from] homestar_core::workflow::error::ResolveError), + /// Generic unknown error. + #[error("unknown error")] + UnknownError, + /// Failure to instantiate Wasm component and its host bindings. + #[error("bindings not yet instantiated for wasm environment")] + WasmInstantiationError, + /// Failure to parse Wasm binary. + /// + /// Transparently forwards from [wasmparser::BinaryReaderError]'s `source` + /// and `Display` methods through to an underlying error. + #[error(transparent)] + WasmParserError(#[from] wasmparser::BinaryReaderError), + /// Generic [wasmtime] runtime error. + /// + /// Transparently forwards from [anyhow::Error]'s `source` and + /// `Display` methods through to an underlying error. + #[error(transparent)] + WasmRuntimeError(#[from] anyhow::Error), + /// Failure to find Wasm function for execution. + #[error("Wasm function {0} not found")] + WasmFunctionNotFoundError(String), + /// [Wat] as Wasm component error. + /// + /// [Wat]: wat + #[error("{0}")] + WatComponentError(String), + /// [wat]-related error. + /// + /// Transparently forwards from [wat::Error]'s `source` + /// and `Display` methods through to an underlying error. + #[error(transparent)] + WatError(#[from] wat::Error), +} diff --git a/homestar-wasm/src/wasmtime/ipld.rs b/homestar-wasm/src/wasmtime/ipld.rs index 67c5be6c..6ad31767 100644 --- a/homestar-wasm/src/wasmtime/ipld.rs +++ b/homestar-wasm/src/wasmtime/ipld.rs @@ -5,7 +5,7 @@ //! //! [Ipld]: libipld::Ipld -use anyhow::{anyhow, Result}; +use crate::error::{InterpreterError, TagsError}; use atomic_refcell::{AtomicRef, AtomicRefCell, AtomicRefMut}; use itertools::{ FoldWhile::{Continue, Done}, @@ -98,8 +98,8 @@ impl Tags { self.0.borrow_mut() } - fn try_borrow_mut(&self) -> Result>> { - self.0.try_borrow_mut().map_err(|e| anyhow!(e)) + fn try_borrow_mut(&self) -> Result>, TagsError> { + Ok(self.0.try_borrow_mut()?) } fn borrow(&self) -> AtomicRef<'_, VecDeque> { @@ -107,19 +107,19 @@ impl Tags { } #[allow(dead_code)] - fn try_borrow(&self) -> Result>> { - self.0.try_borrow().map_err(|e| anyhow!(e)) + fn try_borrow(&self) -> Result>, TagsError> { + Ok(self.0.try_borrow()?) } - fn push(&mut self, tag: String) -> Result<()> { + fn push(&mut self, tag: String) -> Result<(), TagsError> { self.try_borrow_mut()?.push_front(tag); Ok(()) } - fn pop(&self) -> Result { + fn pop(&self) -> Result { self.try_borrow_mut()? .pop_front() - .ok_or_else(|| anyhow!("tags should be > 1")) + .ok_or(TagsError::TagsEmptyError) } fn empty(&self) -> bool { @@ -171,22 +171,30 @@ impl RuntimeVal { /// * Enums /// * Structs / Records /// * Results / Options - pub fn try_from(ipld: Ipld, interface_ty: &InterfaceType<'_>) -> Result { + pub fn try_from( + ipld: Ipld, + interface_ty: &InterfaceType<'_>, + ) -> Result { // TODO: Configure for recursion. stacker::maybe_grow(64 * 1024, 1024 * 1024, || { let dyn_type = match ipld { Ipld::Map(mut v) if matches!(interface_ty.inner(), Some(Type::Union(_))) && v.len() == 1 => { - let inner = interface_ty - .inner() - .ok_or_else(|| anyhow!("component type mismatch: expected "))?; + let inner = interface_ty.inner().ok_or_else(|| { + InterpreterError::TypeMismatchError { + expected: "".to_string(), + given: interface_ty.inner().map(|t| format!("{t:#?}")), + } + })?; // already pattern matched against let union_inst = inner.unwrap_union(); let (key, elem) = v.pop_first().ok_or_else(|| { - anyhow!("ipld map must contain at least one discriminant") + InterpreterError::MapTypeError( + "IPLD map must contain at least one discriminant".to_string(), + ) })?; let (discriminant, mut tags) = union_inst @@ -205,9 +213,11 @@ impl RuntimeVal { Continue(acc) } } - Err(e) => { + Err(err) => { if is_last { - Done(Err(anyhow!("No match within : {e:?}"))) + Done(Err(InterpreterError::NoDiscriminantMatched( + err.to_string(), + ))) } else { Continue(acc) } @@ -221,9 +231,12 @@ impl RuntimeVal { RuntimeVal::new_with_tags(discriminant?, tags) } v if matches!(interface_ty.inner(), Some(Type::Union(_))) => { - let inner = interface_ty - .inner() - .ok_or_else(|| anyhow!("component type mismatch: expected "))?; + let inner = interface_ty.inner().ok_or_else(|| { + InterpreterError::TypeMismatchError { + expected: "".to_string(), + given: interface_ty.inner().map(|t| format!("{t:#?}")), + } + })?; // already pattern matched against let union_inst = inner.unwrap_union(); @@ -244,9 +257,12 @@ impl RuntimeVal { Continue(acc) } } - Err(e) => { + Err(err) => { if is_last { - Done(Err(anyhow!("No match within : {e:?}"))) + Done(Err(InterpreterError::NoDiscriminantMatched( + err.to_string(), + ) + .into())) } else { Continue(acc) } @@ -261,16 +277,13 @@ impl RuntimeVal { InterfaceType::Type(Type::String) | InterfaceType::TypeRef(Type::String) | InterfaceType::Any => RuntimeVal::new(Val::String(Box::from("null"))), - _ => Err(anyhow!("no compatible Wit type for {:?}", Ipld::Null))?, + _ => Err(InterpreterError::WitToIpldError(Ipld::Null))?, }, Ipld::Bool(v) => match interface_ty { InterfaceType::Type(Type::Bool) | InterfaceType::TypeRef(Type::Bool) | InterfaceType::Any => RuntimeVal::new(Val::Bool(v)), - _ => Err(anyhow!( - "no compatible Wit type for Ipld type of {:?}", - v - ))?, + _ => Err(InterpreterError::WitToIpldError(Ipld::Bool(v)))?, }, Ipld::Integer(v) => match interface_ty { InterfaceType::Type(Type::U8) | InterfaceType::TypeRef(Type::U8) => { @@ -297,10 +310,7 @@ impl RuntimeVal { InterfaceType::Any | InterfaceType::Type(Type::S64) | InterfaceType::TypeRef(Type::S64) => RuntimeVal::new(Val::S64(v.try_into()?)), - _ => Err(anyhow!( - "no compatible Wit type for Ipld type of {:?}", - v - ))?, + _ => Err(InterpreterError::WitToIpldError(Ipld::Integer(v)))?, }, Ipld::Float(v) => match interface_ty { InterfaceType::Type(Type::Float32) | InterfaceType::TypeRef(Type::Float32) => { @@ -310,9 +320,12 @@ impl RuntimeVal { }, Ipld::String(v) => RuntimeVal::new(Val::String(Box::from(v))), Ipld::Bytes(v) if matches!(interface_ty.inner(), Some(Type::List(_))) => { - let inner = interface_ty - .inner() - .ok_or_else(|| anyhow!("component type mismatch: expected >"))?; + let inner = interface_ty.inner().ok_or_else(|| { + InterpreterError::TypeMismatchError { + expected: ">".to_string(), + given: interface_ty.inner().map(|t| format!("{t:#?}")), + } + })?; // already pattern matched against let list_inst = inner.unwrap_list(); @@ -335,9 +348,12 @@ impl RuntimeVal { ))), }, Ipld::List(v) if matches!(interface_ty.inner(), Some(Type::List(_))) => { - let inner = interface_ty - .inner() - .ok_or_else(|| anyhow!("component type mismatch: expected "))?; + let inner = interface_ty.inner().ok_or_else(|| { + InterpreterError::TypeMismatchError { + expected: "".to_string(), + given: interface_ty.inner().map(|t| format!("{t:#?}")), + } + })?; // already pattern matched against let list_inst = inner.unwrap_list(); @@ -346,7 +362,7 @@ impl RuntimeVal { let RuntimeVal(value, _tags) = RuntimeVal::try_from(elem, &InterfaceType::Type(list_inst.ty()))?; acc.push(value); - Ok::<_, anyhow::Error>(acc) + Ok::<_, InterpreterError>(acc) })?; RuntimeVal::new(list_inst.new_val(vec.into_boxed_slice())?) @@ -358,29 +374,39 @@ impl RuntimeVal { .fold_while(Ok(RuntimeVal::new(Val::Bool(false))), |_acc, elem| { match RuntimeVal::try_from(elem, interface_ty) { Ok(runtime_val) => Done(Ok(runtime_val)), - Err(e) => Done(Err(anyhow!(e))), + Err(e) => Done(Err(e)), } }) .into_inner()?, Ipld::Map(v) => { - let inner = interface_ty - .inner() - .ok_or_else(|| anyhow!("component type mismatch: expected "))?; + let inner = interface_ty.inner().ok_or_else(|| { + InterpreterError::TypeMismatchError { + expected: "".to_string(), + given: interface_ty.inner().map(|t| format!("{t:#?}")), + } + })?; let list_inst = matches!(inner, Type::List(_)) .then_some(inner.unwrap_list()) - .ok_or_else(|| anyhow!("{inner:?} must be a "))?; + .ok_or_else(|| InterpreterError::TypeMismatchError { + expected: "".to_string(), + given: Some(format!("{inner:#?}")), + })?; let tuple_inst = matches!(list_inst.ty(), Type::Tuple(_)) .then_some(list_inst.ty().unwrap_tuple()) - .ok_or_else(|| anyhow!("{inner:?} must be a "))? + .ok_or_else(|| InterpreterError::TypeMismatchError { + expected: "".to_string(), + given: Some(format!("{inner:#?}")), + })? .to_owned(); - let ty = tuple_inst - .types() - .nth(1) - .ok_or_else(|| anyhow!("a map has tuples of two elements"))?; + let ty = tuple_inst.types().nth(1).ok_or_else(|| { + InterpreterError::MapTypeError( + "IPLD map must have tuples of two elements".to_string(), + ) + })?; let (vec, tags) = v.into_iter().try_fold( (vec![], VecDeque::new()), @@ -393,7 +419,7 @@ impl RuntimeVal { acc_tuples.push(new_tuple); let mut tags = tags.try_borrow_mut()?; (acc_tags).append(&mut tags); - Ok::<_, anyhow::Error>((acc_tuples, acc_tags)) + Ok::<_, InterpreterError>((acc_tuples, acc_tags)) }, )?; RuntimeVal::new_with_tags( @@ -409,7 +435,7 @@ impl RuntimeVal { } impl TryFrom for Ipld { - type Error = anyhow::Error; + type Error = InterpreterError; fn try_from(val: RuntimeVal) -> Result { fn base_64_bytes(s: &str) -> Result, multibase::Error> { @@ -447,10 +473,10 @@ impl TryFrom for Ipld { // Convert to decimal for handling precision issues going from // f32 => f64. let dec = Decimal::from_f32(v) - .ok_or_else(|| anyhow!("failed conversion to decimal"))?; + .ok_or_else(|| InterpreterError::FloatToDecimalError(v))?; Ipld::Float( dec.to_f64() - .ok_or_else(|| anyhow!("failed conversion from decimal"))?, + .ok_or_else(|| InterpreterError::DecimalToFloatError(dec))?, ) } RuntimeVal(Val::Float64(v), _) => Ipld::Float(v), @@ -466,10 +492,16 @@ impl TryFrom for Ipld { acc.insert(s.to_string(), ipld); Ok::<_, Self::Error>(acc) } else { - Err(anyhow!("mismatched types: {:?}", tup_values))? + Err(InterpreterError::TypeMismatchError { + expected: " of (, <&wasmtime::Val>)".to_string(), + given: Some(format!("{tup_values:#?}")), + })? } } else { - Err(anyhow!("mismatched types: {elem:?}"))? + Err(InterpreterError::TypeMismatchError { + expected: "".to_string(), + given: Some(format!("{elem:#?}")), + })? } })?; Ipld::Map(inner) @@ -481,7 +513,10 @@ impl TryFrom for Ipld { acc.push(v.to_owned()); Ok::<_, Self::Error>(acc) } else { - Err(anyhow!("expected all u8 types"))? + Err(InterpreterError::TypeMismatchError { + expected: "all types".to_string(), + given: Some(format!("{elem:#?}")), + })? } })?; @@ -508,7 +543,7 @@ impl TryFrom for Ipld { Ipld::try_from(RuntimeVal::new(u.payload().to_owned()))? } // Rest of Wit types are unhandled going to Ipld. - v => Err(anyhow!("no compatible Ipld type for {:?}", v))?, + v => Err(InterpreterError::IpldToWitError(format!("{v:#?}")))?, }; Ok(ipld) diff --git a/homestar-wasm/src/wasmtime/mod.rs b/homestar-wasm/src/wasmtime/mod.rs index b7e69b7d..dc9df00c 100644 --- a/homestar-wasm/src/wasmtime/mod.rs +++ b/homestar-wasm/src/wasmtime/mod.rs @@ -5,7 +5,9 @@ //! [Ipld]: libipld::Ipld pub mod config; +mod error; pub mod ipld; pub mod world; +pub use error::*; pub use world::{State, World}; diff --git a/homestar-wasm/src/wasmtime/world.rs b/homestar-wasm/src/wasmtime/world.rs index 39903972..4780dd39 100644 --- a/homestar-wasm/src/wasmtime/world.rs +++ b/homestar-wasm/src/wasmtime/world.rs @@ -3,11 +3,18 @@ //! //! [Wasmtime]: -use super::ipld::{InterfaceType, RuntimeVal}; -use crate::io::{Arg, Output}; -use anyhow::{anyhow, bail, Result}; +use crate::{ + io::{Arg, Output}, + wasmtime::{ + ipld::{InterfaceType, RuntimeVal}, + Error, + }, +}; use heck::{ToKebabCase, ToSnakeCase}; -use homestar_core::workflow::{input::Args, Input}; +use homestar_core::{ + bail, + workflow::{error::ResolveError, input::Args, Input}, +}; use std::iter; use wasmtime::{ component::{self, Component, Func, Instance, Linker}, @@ -81,20 +88,20 @@ impl Env { /// /// [Wit]: /// [Ipld]: libipld::Ipld - pub async fn execute(&mut self, args: Args) -> Result + pub async fn execute(&mut self, args: Args) -> Result where T: Send, { let param_types = self .bindings .as_mut() - .ok_or_else(|| anyhow!("bindings not yet instantiated for wasm environment"))? + .ok_or(Error::WasmInstantiationError)? .func() .params(&self.store); let result_types = self .bindings .as_mut() - .ok_or_else(|| anyhow!("bindings not yet instantiated for wasm environment"))? + .ok_or(Error::WasmInstantiationError)? .func() .results(&self.store); @@ -103,22 +110,27 @@ impl Env { args.into_inner().into_iter(), ) .try_fold(vec![], |mut acc, (typ, arg)| { + // Remove unwraps let v = match arg { - Input::Ipld(ipld) => RuntimeVal::try_from(ipld, &InterfaceType::from(typ))?.value(), + Input::Ipld(ipld) => RuntimeVal::try_from(ipld, &InterfaceType::from(typ)) + .unwrap() + .value(), Input::Arg(val) => match val.into_inner() { - Arg::Ipld(ipld) => { - RuntimeVal::try_from(ipld, &InterfaceType::from(typ))?.value() - } + Arg::Ipld(ipld) => RuntimeVal::try_from(ipld, &InterfaceType::from(typ)) + .unwrap() + .value(), Arg::Value(v) => v, }, - Input::Deferred(await_promise) => bail!(anyhow!( - "deferred task not yet resolved for {}: {}", - await_promise.result(), - await_promise.instruction_cid() + Input::Deferred(await_promise) => bail!(Error::PromiseError( + ResolveError::UnresolvedCidError(format!( + "deferred task not yet resolved for {}: {}", + await_promise.result(), + await_promise.instruction_cid() + )) )), }; acc.push(v); - Ok::<_, anyhow::Error>(acc) + Ok::<_, Error>(acc) })?; let mut results_alloc: Vec = result_types @@ -128,14 +140,14 @@ impl Env { self.bindings .as_mut() - .ok_or_else(|| anyhow!("bindings not yet instantiated for wasm environment"))? + .ok_or(Error::WasmInstantiationError)? .func() .call_async(&mut self.store, ¶ms, &mut results_alloc) .await?; self.bindings .as_mut() - .ok_or_else(|| anyhow!("bindings not yet instantiated for wasm environment"))? + .ok_or(Error::WasmInstantiationError)? .func() .post_return_async(&mut self.store) .await?; @@ -181,7 +193,7 @@ impl World { /// for a [World], given [State]. /// /// [environment]: Env - pub fn default(data: State) -> Result> { + pub fn default(data: State) -> Result, Error> { let config = Self::configure(); let engine = Engine::new(&config)?; let linker = Self::define_linker(&engine); @@ -203,7 +215,11 @@ impl World { /// for future invocations to use the already-initialized linker, store. /// /// Used when first initiating a module of a workflow. - pub async fn instantiate(bytes: Vec, fun_name: &str, data: State) -> Result> { + pub async fn instantiate( + bytes: Vec, + fun_name: &str, + data: State, + ) -> Result, Error> { let config = Self::configure(); let engine = Engine::new(&config)?; let linker = Self::define_linker(&engine); @@ -236,7 +252,7 @@ impl World { bytes: Vec, fun_name: &'a str, env: &'a mut Env, - ) -> Result<&'a mut Env> + ) -> Result<&'a mut Env, Error> where T: Send, { @@ -291,7 +307,7 @@ impl World { mut store: impl wasmtime::AsContextMut, instance: &Instance, fun_name: &str, - ) -> Result { + ) -> Result { let mut store_ctx = store.as_context_mut(); let mut exports = instance.exports(&mut store_ctx); let mut __exports = exports.root(); @@ -299,14 +315,14 @@ impl World { .func(fun_name) .or_else(|| __exports.func(&fun_name.to_kebab_case())) .or_else(|| __exports.func(&fun_name.to_snake_case())) - .ok_or_else(|| anyhow!("function not found"))?; + .ok_or_else(|| Error::WasmFunctionNotFoundError(fun_name.to_string()))?; Ok(World(func)) } } /// Turn bytes into a Wasm [Component] module. -fn component_from_bytes(bytes: &[u8], engine: Engine) -> Result { +fn component_from_bytes(bytes: &[u8], engine: Engine) -> Result { fn is_component(chunk: wasmparser::Chunk<'_>) -> bool { matches!( chunk, @@ -322,21 +338,23 @@ fn component_from_bytes(bytes: &[u8], engine: Engine) -> Result { match wasmparser::Parser::new(0).parse(bytes, true) { Ok(chunk) => { if is_component(chunk) { - Component::from_binary(&engine, bytes) + Component::from_binary(&engine, bytes).map_err(Error::IntoWasmComponentError) } else { let component = ComponentEncoder::default() .module(bytes)? .validate(true) .encode()?; - Component::from_binary(&engine, &component) + Component::from_binary(&engine, &component).map_err(Error::IntoWasmComponentError) } } Err(_) => { let wasm_bytes = wat::parse_bytes(bytes)?; if is_component(wasmparser::Parser::new(0).parse(&wasm_bytes, true)?) { - Component::from_binary(&engine, &wasm_bytes) + Component::from_binary(&engine, &wasm_bytes).map_err(Error::IntoWasmComponentError) } else { - Err(anyhow!("WAT must reference a Wasm component.")) + Err(Error::WatComponentError( + "WAT must reference a Wasm component.".to_string(), + )) } } } diff --git a/rust-toolchain.toml b/rust-toolchain.toml index 5d56faf9..292fe499 100644 --- a/rust-toolchain.toml +++ b/rust-toolchain.toml @@ -1,2 +1,2 @@ [toolchain] -channel = "nightly" +channel = "stable"