From 2d77b0a553663a2398618f1ba350bc1acd13d46d Mon Sep 17 00:00:00 2001 From: Zeeshan Lakhani Date: Wed, 28 Feb 2024 20:39:22 -0500 Subject: [PATCH] Fix: poll DHT in background when worker runs up a workflow + dual-stack webserver (#590) Completes: - Closes https://github.com/ipvm-wg/homestar/issues/278. - Closes https://github.com/ipvm-wg/homestar/issues/589. Includes: * We no longer block DHT polling of receipts, and, instead, purposefully race execution and DHT finding/polling. * This moves us toward the goal of resolving outside-of-workflow awaited (CIDs), but we'll need another background process to re-run a workflow once known-awaits are resolved (separate ticket). The next set of work will also validate that those outside-of-workflow promises exist, i.e. currently running in another workflow or have already run. * v4/v6 host address settings for the Json-RPC web server so that the web server can handle dual v4/v6 requests on the same port. * Better / ordered error handling around CIDs that fail to resolve. * Workflow status field in the DB (+ migration) (SQLite enum, with special patching). * Workflow retries field in the DB (in same migration). * A new poller implementation that's general purpose. * Removes `async_trait` except for wasmtime impls (due to 1.73). --- .envrc | 2 +- .pre-commit-config.yaml | 2 +- Cargo.lock | 40 ++- diesel.toml | 1 + flake.lock | 24 +- flake.nix | 13 +- homestar-invocation/src/receipt.rs | 2 +- .../src/task/instruction/input.rs | 11 +- homestar-runtime/Cargo.toml | 4 +- homestar-runtime/config/defaults.toml | 4 +- homestar-runtime/migrations/.keep | 0 .../down.sql | 2 + .../up.sql | 4 + homestar-runtime/src/db.rs | 15 + homestar-runtime/src/db/schema.patch | 9 + homestar-runtime/src/db/schema.rs | 8 +- homestar-runtime/src/event_handler.rs | 2 - homestar-runtime/src/event_handler/event.rs | 2 - .../src/event_handler/notification.rs | 1 + .../src/event_handler/swarm_event.rs | 3 +- homestar-runtime/src/ip.rs | 16 + homestar-runtime/src/lib.rs | 1 + homestar-runtime/src/network/webserver.rs | 184 +++++++--- homestar-runtime/src/runner.rs | 10 +- homestar-runtime/src/runner/response.rs | 4 +- homestar-runtime/src/scheduler.rs | 184 +++++----- homestar-runtime/src/settings.rs | 10 +- .../src/settings/libp2p_config.rs | 3 + homestar-runtime/src/tasks.rs | 2 - homestar-runtime/src/tasks/wasm.rs | 2 - homestar-runtime/src/test_utils/db.rs | 2 +- .../src/test_utils/proc_macro/src/lib.rs | 2 + .../src/test_utils/worker_builder.rs | 8 +- homestar-runtime/src/worker.rs | 325 ++++++++---------- homestar-runtime/src/worker/poller.rs | 140 ++++++++ homestar-runtime/src/worker/resolver.rs | 169 +++++++++ homestar-runtime/src/workflow.rs | 88 ++++- homestar-runtime/src/workflow/info.rs | 30 ++ .../test-workflow-add-one-part-one.json | 40 +-- .../test-workflow-add-one-part-two.json | 50 +-- homestar-runtime/tests/network/dht.rs | 122 +++---- homestar-wasm/src/wasmtime/host/helpers.rs | 5 +- rust-toolchain.toml | 19 + 43 files changed, 1080 insertions(+), 485 deletions(-) delete mode 100644 homestar-runtime/migrations/.keep create mode 100644 homestar-runtime/migrations/2024-02-13-141620_add_workflows_status/down.sql create mode 100644 homestar-runtime/migrations/2024-02-13-141620_add_workflows_status/up.sql create mode 100644 homestar-runtime/src/db/schema.patch create mode 100644 homestar-runtime/src/ip.rs create mode 100644 homestar-runtime/src/worker/poller.rs create mode 100644 homestar-runtime/src/worker/resolver.rs diff --git a/.envrc b/.envrc index 9df87f24..80735bb4 100644 --- a/.envrc +++ b/.envrc @@ -1,4 +1,4 @@ use_flake export RUST_LOG=homestar=debug,homestar_runtime=debug,homestar_wasm=debug,libp2p=info,libp2p_gossipsub::behaviour=debug,tarpc=info,tower_http=debug,jsonrpsee_server=debug,moka=debug -export RUST_BACKTRACE=full +export RUST_BACKTRACE=1 diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 615cf8f7..5b2e9db1 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -80,7 +80,7 @@ repos: - id: trailing-whitespace exclude: bindings\.rs$ - id: end-of-file-fixer - exclude: \.(txt|json)$ + exclude: \.(txt|json|patch)$ - id: check-yaml - id: check-json - id: check-added-large-files diff --git a/Cargo.lock b/Cargo.lock index 318e3b24..e34288eb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1573,6 +1573,18 @@ dependencies = [ "time", ] +[[package]] +name = "diesel-derive-enum" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "81c5131a2895ef64741dad1d483f358c2a229a3a2d1b256778cdc5e146db64d4" +dependencies = [ + "heck", + "proc-macro2", + "quote", + "syn 2.0.50", +] + [[package]] name = "diesel_derives" version = "2.1.2" @@ -1935,9 +1947,9 @@ checksum = "25cbce373ec4653f1a01a31e8a5e5ec0c622dc27ff9c4e6606eefef5cbbed4a5" [[package]] name = "faststr" -version = "0.2.16" +version = "0.2.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c176ff74f084f24c4fdc98ac22d11e27da8daffbcbd13f4d71180758a319c2e3" +checksum = "17e546e971d6d404087d4885389651265b1bb03e5128a519a297f90d9e6f6ecb" dependencies = [ "bytes", "serde", @@ -2559,6 +2571,7 @@ dependencies = [ "dashmap", "derive-getters", "diesel", + "diesel-derive-enum", "diesel_migrations", "dotenvy", "dyn-clone", @@ -2577,6 +2590,7 @@ dependencies = [ "http 0.2.11", "http-serde", "humantime", + "hyper", "indexmap 2.2.3", "ipfs-api", "ipfs-api-backend-hyper", @@ -3045,9 +3059,9 @@ dependencies = [ [[package]] name = "image" -version = "0.24.8" +version = "0.24.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "034bbe799d1909622a74d1193aa50147769440040ff36cb2baa947609b0a4e23" +checksum = "5690139d2f55868e080017335e4b94cb7414274c74f1669c84fb5feba2c9f69d" dependencies = [ "bytemuck", "byteorder", @@ -4832,9 +4846,9 @@ checksum = "c1b04fb49957986fdce4d6ee7a65027d55d4b6d2265e5848bbb507b58ccfdb6f" [[package]] name = "papergrid" -version = "0.10.0" +version = "0.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a2ccbe15f2b6db62f9a9871642746427e297b0ceb85f9a7f1ee5ff47d184d0c8" +checksum = "9ad43c07024ef767f9160710b3a6773976194758c7919b17e63b863db0bdf7fb" dependencies = [ "bytecount", "fnv", @@ -6366,12 +6380,12 @@ dependencies = [ [[package]] name = "socket2" -version = "0.5.5" +version = "0.5.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7b5fac59a5cb5dd637972e5fca70daf0523c9067fcdc4842f053dae04a18f8e9" +checksum = "05ffd9c0a93b7543e062e759284fcf5f5e3b098501104bfbdde4d404db792871" dependencies = [ "libc", - "windows-sys 0.48.0", + "windows-sys 0.52.0", ] [[package]] @@ -6640,9 +6654,9 @@ dependencies = [ [[package]] name = "tabled" -version = "0.14.0" +version = "0.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dfe9c3632da101aba5131ed63f9eed38665f8b3c68703a6bb18124835c1a5d22" +checksum = "4c998b0c8b921495196a48aabaf1901ff28be0760136e31604f7967b0792050e" dependencies = [ "papergrid", "tabled_derive", @@ -6651,9 +6665,9 @@ dependencies = [ [[package]] name = "tabled_derive" -version = "0.6.0" +version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "99f688a08b54f4f02f0a3c382aefdb7884d3d69609f785bd253dc033243e3fe4" +checksum = "4c138f99377e5d653a371cdad263615634cfc8467685dfe8e73e2b8e98f44b17" dependencies = [ "heck", "proc-macro-error", diff --git a/diesel.toml b/diesel.toml index 72072950..00e8f819 100644 --- a/diesel.toml +++ b/diesel.toml @@ -3,6 +3,7 @@ [print_schema] file = "homestar-runtime/src/db/schema.rs" +patch_file = "homestar-runtime/src/db/schema.patch" [migrations_directory] dir = "homestar-runtime/migrations" diff --git a/flake.lock b/flake.lock index e761576c..7217eee0 100644 --- a/flake.lock +++ b/flake.lock @@ -79,11 +79,11 @@ "rust-analyzer-src": "rust-analyzer-src" }, "locked": { - "lastModified": 1708410168, - "narHash": "sha256-98kCv2PbKfqt+oyyXHqdXPRRGjW+QEy9eBzNtRyCRHs=", + "lastModified": 1708582955, + "narHash": "sha256-OOq0YCdOgCyMpr4TFPJVorU5I8k97G8AQ1mhnktp+ZA=", "owner": "nix-community", "repo": "fenix", - "rev": "846fc5ddb810c36411de6587384bef86c2db5127", + "rev": "244b9af2d0180619adcca5d020d085341c0d28c9", "type": "github" }, "original": { @@ -442,11 +442,11 @@ }, "nixos-unstable": { "locked": { - "lastModified": 1708405701, - "narHash": "sha256-E78TXiZiR9irWdYAVltRxZPJ+pMxXPU5PjHwqq6XLtI=", + "lastModified": 1708583918, + "narHash": "sha256-wcAivqrBghg/rPLBeuChEmSyb5j41zFZbVs6xFYgS7E=", "owner": "NixOS", "repo": "nixpkgs", - "rev": "fa15b53dbea5028db38d6e09b4cef6eba42aeebb", + "rev": "a7fa133a1e973c127e9c83e2c8e3407ae3797099", "type": "github" }, "original": { @@ -537,11 +537,11 @@ }, "nixpkgs_3": { "locked": { - "lastModified": 1708294118, - "narHash": "sha256-evZzmLW7qoHXf76VCepvun1esZDxHfVRFUJtumD7L2M=", + "lastModified": 1708440434, + "narHash": "sha256-XY+B9mbhL/i+Q6fP6gBQ6P76rv9rWtpjQiUJ+DGtaUg=", "owner": "NixOS", "repo": "nixpkgs", - "rev": "e0da498ad77ac8909a980f07eff060862417ccf7", + "rev": "526d051b128b82ae045a70e5ff1adf8e6dafa560", "type": "github" }, "original": { @@ -565,11 +565,11 @@ "rust-analyzer-src": { "flake": false, "locked": { - "lastModified": 1708361159, - "narHash": "sha256-HAZ/pEN0TVFoGMbITliYXbMLPaSy/X+WKY/y6K8iTr0=", + "lastModified": 1708459863, + "narHash": "sha256-PzlnvPOv/1l2lSU0oj+gY+hq5MRqJNU3KWSO+RCwhgs=", "owner": "rust-lang", "repo": "rust-analyzer", - "rev": "2223b4fa71e543ac6dd1abd4770a69fab8dbdec1", + "rev": "543d7e98dbcc0668528dbf3f5b32d752882baa33", "type": "github" }, "original": { diff --git a/flake.nix b/flake.nix index ea6379e9..3572dcd0 100644 --- a/flake.nix +++ b/flake.nix @@ -47,12 +47,23 @@ pkgs = import nixpkgs {inherit system overlays;}; unstable = import nixos-unstable {inherit system overlays;}; - rust-toolchain = fenix.packages.${system}.fromToolchainFile { + file-toolchain = fenix.packages.${system}.fromToolchainFile { file = ./rust-toolchain.toml; # sha256 = pkgs.lib.fakeSha256; sha256 = "sha256-e4mlaJehWBymYxJGgnbuCObVlqMlQSilZ8FljG9zPHY="; }; + default-toolchain = fenix.packages.${system}.complete.withComponents [ + "cargo" + "clippy" + "llvm-tools-preview" + "rustfmt" + "rust-src" + "rust-std" + ]; + + rust-toolchain = fenix.packages.${system}.combine [file-toolchain default-toolchain]; + rustPlatform = pkgs.makeRustPlatform { cargo = rust-toolchain; rustc = rust-toolchain; diff --git a/homestar-invocation/src/receipt.rs b/homestar-invocation/src/receipt.rs index 2bb54764..19c5b36b 100644 --- a/homestar-invocation/src/receipt.rs +++ b/homestar-invocation/src/receipt.rs @@ -41,7 +41,7 @@ pub struct Receipt { } impl Receipt { - /// + /// Create a new [Receipt]. pub fn new( ran: Pointer, result: task::Result, diff --git a/homestar-invocation/src/task/instruction/input.rs b/homestar-invocation/src/task/instruction/input.rs index cef7ecfe..18f67213 100644 --- a/homestar-invocation/src/task/instruction/input.rs +++ b/homestar-invocation/src/task/instruction/input.rs @@ -81,8 +81,15 @@ where F: Fn(Cid) -> BoxFuture<'a, Result, ResolveError>> + Clone + Send + Sync, Ipld: From, { - let inputs = resolve_args(self.0, lookup_fn); - Ok(Args(inputs.await)) + let inputs = resolve_args(self.0, lookup_fn).await; + for input in inputs.iter() { + if let Input::Deferred(awaiting) = input { + return Err(ResolveError::UnresolvedCid( + awaiting.instruction_cid().to_string(), + )); + } + } + Ok(Args(inputs)) } } diff --git a/homestar-runtime/Cargo.toml b/homestar-runtime/Cargo.toml index 62422d92..e3bb02db 100644 --- a/homestar-runtime/Cargo.toml +++ b/homestar-runtime/Cargo.toml @@ -61,6 +61,7 @@ diesel = { version = "2.1", default-features = false, features = [ "with-deprecated", "chrono", ] } +diesel-derive-enum = { version = "2.1", features = ["sqlite"] } diesel_migrations = "2.1" dotenvy = "0.15" dyn-clone = "1.0" @@ -79,6 +80,7 @@ homestar-workspace-hack = { workspace = true } http = "0.2" http-serde = "1.1" humantime = { workspace = true } +hyper = { version = "0.14", default-features = false } indexmap = { workspace = true } ipfs-api = { version = "0.17", optional = true } ipfs-api-backend-hyper = { version = "0.6", default-features = false, features = [ @@ -147,7 +149,7 @@ serde_with = { version = "3.5", default-features = false, features = [ ] } stream-cancel = "0.8" sysinfo = { version = "0.29", default-features = false, optional = true } -tabled = { version = "0.14", default-features = false, features = [ +tabled = { version = "0.15", default-features = false, features = [ "derive", "macros", ] } diff --git a/homestar-runtime/config/defaults.toml b/homestar-runtime/config/defaults.toml index 139267b7..40ea72a5 100644 --- a/homestar-runtime/config/defaults.toml +++ b/homestar-runtime/config/defaults.toml @@ -55,6 +55,7 @@ mesh_n = 2 mesh_outbound_min = 1 [node.network.libp2p.dht] +enable_resolve_receipts_in_background = true p2p_receipt_timeout = 500 p2p_workflow_info_timeout = 500 p2p_provider_timeout = 10_000 @@ -74,7 +75,8 @@ max_connections = 10 server_timeout = 120 [node.network.webserver] -host = "127.0.0.1" +v4_host = "127.0.0.1" +v6_host = "[::1]" port = 1337 timeout = 120 websocket_capacity = 2048 diff --git a/homestar-runtime/migrations/.keep b/homestar-runtime/migrations/.keep deleted file mode 100644 index e69de29b..00000000 diff --git a/homestar-runtime/migrations/2024-02-13-141620_add_workflows_status/down.sql b/homestar-runtime/migrations/2024-02-13-141620_add_workflows_status/down.sql new file mode 100644 index 00000000..5da51c92 --- /dev/null +++ b/homestar-runtime/migrations/2024-02-13-141620_add_workflows_status/down.sql @@ -0,0 +1,2 @@ +ALTER TABLE workflows DROP COLUMN status; +ALTER TABLE workflows DROP COLUMN retries; diff --git a/homestar-runtime/migrations/2024-02-13-141620_add_workflows_status/up.sql b/homestar-runtime/migrations/2024-02-13-141620_add_workflows_status/up.sql new file mode 100644 index 00000000..5a9cc679 --- /dev/null +++ b/homestar-runtime/migrations/2024-02-13-141620_add_workflows_status/up.sql @@ -0,0 +1,4 @@ +ALTER TABLE workflows ADD COLUMN status TEXT CHECK( + status IN ('pending', 'completed', 'running', 'stuck')) NOT NULL DEFAULT + 'pending'; +ALTER TABLE workflows ADD COLUMN retries INTEGER NOT NULL DEFAULT 0; diff --git a/homestar-runtime/src/db.rs b/homestar-runtime/src/db.rs index 985a77b1..03424b37 100644 --- a/homestar-runtime/src/db.rs +++ b/homestar-runtime/src/db.rs @@ -23,6 +23,7 @@ use tokio::fs; use tracing::info; #[allow(missing_docs, unused_imports)] +#[rustfmt::skip] pub mod schema; pub(crate) mod utils; @@ -248,6 +249,20 @@ pub trait Database: Send + Sync + Clone { } } + /// Update workflow status given a Cid to the workflow. + fn set_workflow_status( + workflow_cid: Cid, + status: workflow::Status, + conn: &mut Connection, + ) -> Result<(), diesel::result::Error> { + diesel::update(schema::workflows::dsl::workflows) + .filter(schema::workflows::cid.eq(Pointer::new(workflow_cid))) + .set(schema::workflows::status.eq(status)) + .execute(conn)?; + + Ok(()) + } + /// Store workflow Cid and [Receipt] Cid in the database for inner join. fn store_workflow_receipt( workflow_cid: Cid, diff --git a/homestar-runtime/src/db/schema.patch b/homestar-runtime/src/db/schema.patch new file mode 100644 index 00000000..33c4e158 --- /dev/null +++ b/homestar-runtime/src/db/schema.patch @@ -0,0 +1,9 @@ +@@ -25,7 +25,7 @@ diesel::table! { + cid -> Text, + name -> Nullable, + num_tasks -> Integer, + resources -> Binary, + created_at -> Timestamp, + completed_at -> Nullable, ++ status -> crate::workflow::StatusMapping, +- status -> Text, diff --git a/homestar-runtime/src/db/schema.rs b/homestar-runtime/src/db/schema.rs index 67b9e5a4..71736e52 100644 --- a/homestar-runtime/src/db/schema.rs +++ b/homestar-runtime/src/db/schema.rs @@ -21,6 +21,8 @@ diesel::table! { resources -> Binary, created_at -> Timestamp, completed_at -> Nullable, + status -> crate::workflow::StatusMapping, + retries -> Integer, } } @@ -34,4 +36,8 @@ diesel::table! { diesel::joinable!(workflows_receipts -> receipts (receipt_cid)); diesel::joinable!(workflows_receipts -> workflows (workflow_cid)); -diesel::allow_tables_to_appear_in_same_query!(receipts, workflows, workflows_receipts,); +diesel::allow_tables_to_appear_in_same_query!( + receipts, + workflows, + workflows_receipts, +); diff --git a/homestar-runtime/src/event_handler.rs b/homestar-runtime/src/event_handler.rs index 0e18d786..55189740 100644 --- a/homestar-runtime/src/event_handler.rs +++ b/homestar-runtime/src/event_handler.rs @@ -11,7 +11,6 @@ use crate::{ settings, }; use anyhow::Result; -use async_trait::async_trait; use fnv::FnvHashMap; use libp2p::{ core::ConnectedPoint, futures::StreamExt, kad::QueryId, rendezvous::Cookie, @@ -48,7 +47,6 @@ struct Bootstrap { } /// Handler trait for [EventHandler] events. -#[async_trait] pub(crate) trait Handler where DB: Database, diff --git a/homestar-runtime/src/event_handler/event.rs b/homestar-runtime/src/event_handler/event.rs index 1094c175..ec8cd168 100644 --- a/homestar-runtime/src/event_handler/event.rs +++ b/homestar-runtime/src/event_handler/event.rs @@ -21,7 +21,6 @@ use crate::{ workflow, Db, Receipt, }; use anyhow::Result; -use async_trait::async_trait; #[cfg(feature = "websocket-notify")] use homestar_invocation::Pointer; use homestar_invocation::Receipt as InvocationReceipt; @@ -701,7 +700,6 @@ impl PeerRequest { } } -#[async_trait] impl Handler for Event where DB: Database, diff --git a/homestar-runtime/src/event_handler/notification.rs b/homestar-runtime/src/event_handler/notification.rs index c03df12e..69611a0d 100644 --- a/homestar-runtime/src/event_handler/notification.rs +++ b/homestar-runtime/src/event_handler/notification.rs @@ -40,6 +40,7 @@ pub(crate) fn emit_receipt( subject = "notification.receipt", category = "notification", cid = receipt_cid.to_string(), + instruction_cid = receipt.instruction().cid().to_string(), "emitting receipt to WebSocket" ); if let Some(ipld) = metadata { diff --git a/homestar-runtime/src/event_handler/swarm_event.rs b/homestar-runtime/src/event_handler/swarm_event.rs index 9e04dfa6..cedcce13 100644 --- a/homestar-runtime/src/event_handler/swarm_event.rs +++ b/homestar-runtime/src/event_handler/swarm_event.rs @@ -23,7 +23,6 @@ use crate::{ Db, Receipt, }; use anyhow::{anyhow, Result}; -use async_trait::async_trait; use libipld::Cid; #[cfg(feature = "websocket-notify")] use libp2p::Multiaddr; @@ -86,7 +85,6 @@ pub(crate) struct WorkflowInfoEvent { pub(crate) workflow_source: notification::WorkflowInfoSource, } -#[async_trait] impl Handler for SwarmEvent where DB: Database + Sync, @@ -648,6 +646,7 @@ async fn handle_swarm_event( subject = "libp2p.kad.get_record", category = "handle_swarm_event", cid = receipt.cid().to_string(), + instruction_cid = receipt.instruction().cid().to_string(), "found receipt record published by {}", match peer_id { Some(peer) => peer.to_string(), diff --git a/homestar-runtime/src/ip.rs b/homestar-runtime/src/ip.rs new file mode 100644 index 00000000..f5229919 --- /dev/null +++ b/homestar-runtime/src/ip.rs @@ -0,0 +1,16 @@ +//! IP address parsing and formatting utilities. + +use std::net::IpAddr; + +/// Parse an IP address from a URI host. +pub(crate) fn parse_ip_from_uri_host(host: &str) -> Option { + // Attempt to parse directly as an IP address (IPv4 or IPv6 without brackets) + if let Ok(ip_addr) = host.parse::() { + return Some(ip_addr); + } + + // If direct parsing fails, check if it's an IPv6 in brackets + host.strip_prefix('[') + .and_then(|stripped| stripped.strip_suffix(']')) + .and_then(|stripped| stripped.parse::().ok()) +} diff --git a/homestar-runtime/src/lib.rs b/homestar-runtime/src/lib.rs index fa01e995..69866849 100644 --- a/homestar-runtime/src/lib.rs +++ b/homestar-runtime/src/lib.rs @@ -54,6 +54,7 @@ pub mod cli; pub mod daemon; pub mod db; mod event_handler; +mod ip; mod logger; pub mod network; mod receipt; diff --git a/homestar-runtime/src/network/webserver.rs b/homestar-runtime/src/network/webserver.rs index 983ba709..729c1bf0 100644 --- a/homestar-runtime/src/network/webserver.rs +++ b/homestar-runtime/src/network/webserver.rs @@ -2,12 +2,13 @@ use crate::{ db::Database, - runner, + ip, runner, runner::{DynamicNodeInfo, StaticNodeInfo, WsSender}, settings, }; use anyhow::{anyhow, Result}; use faststr::FastStr; +use futures::future::{self, Either}; use homestar_wasm::io::Arg; use homestar_workflow::Workflow; use http::{ @@ -15,24 +16,25 @@ use http::{ method::Method, }; use jsonrpsee::server::{ - middleware::http::ProxyGetRequestLayer, RandomStringIdProvider, ServerHandle, + middleware::http::ProxyGetRequestLayer, stop_channel, RandomStringIdProvider, ServerHandle, }; use libipld::Cid; use metrics_exporter_prometheus::PrometheusHandle; use std::{ iter::once, - net::{IpAddr, SocketAddr, TcpListener}, + net::{IpAddr, SocketAddr}, + pin::Pin, str::FromStr, time::Duration, }; -use tokio::runtime::Handle; #[cfg(feature = "websocket-notify")] use tokio::sync::broadcast; +use tokio::{net::TcpListener, runtime::Handle, select}; use tower_http::{ cors::{self, CorsLayer}, sensitive_headers::SetSensitiveRequestHeadersLayer, }; -use tracing::info; +use tracing::{debug, error, info}; pub(crate) mod listener; #[cfg(feature = "websocket-notify")] @@ -54,7 +56,6 @@ use rpc::{Context, JsonRpc}; #[allow(dead_code)] #[derive(Debug)] pub(crate) enum Message { - /// Error attempting to run a [Workflow]. RunErr(runner::Error), /// Run a workflow, given a tuple of name, and [Workflow]. RunWorkflow((FastStr, Workflow<'static, Arg>)), @@ -74,8 +75,10 @@ pub(crate) enum Message { #[cfg(feature = "websocket-notify")] #[derive(Clone, Debug)] pub(crate) struct Server { - /// Address of the server. - addr: SocketAddr, + /// V4 Address of the server. + v4_addr: SocketAddr, + /// V6 Address of the server. + v6_addr: SocketAddr, /// Message buffer capacity for the server. capacity: usize, /// Message sender for broadcasting internal events to clients connected to @@ -96,8 +99,10 @@ pub(crate) struct Server { #[cfg(not(feature = "websocket-notify"))] #[derive(Clone, Debug)] pub(crate) struct Server { - /// Address of the server. - addr: SocketAddr, + /// V4 Address of the server. + v4_addr: SocketAddr, + /// V6 Address of the server. + v6_addr: SocketAddr, /// Message buffer capacity for the server. capacity: usize, /// Sender timeout for the [Sink] messages. @@ -127,19 +132,29 @@ impl Server { pub(crate) fn new(settings: &settings::Webserver) -> Result { let (evt_sender, _receiver) = Self::setup_channel(settings.websocket_capacity); let (msg_sender, _receiver) = Self::setup_channel(settings.websocket_capacity); - let host = IpAddr::from_str(&settings.host.to_string())?; + let v4_host = IpAddr::from_str(&settings.v4_host.to_string())?; + let v6_host = ip::parse_ip_from_uri_host(&settings.v6_host.to_string()) + .ok_or_else(|| anyhow!("unable to parse URI"))?; + let port_setting = settings.port; - let addr = if port_available(host, port_setting) { - SocketAddr::from((host, port_setting)) + let (v4_addr, v6_addr) = if port_available(v4_host, port_setting) { + ( + SocketAddr::from((v4_host, port_setting)), + SocketAddr::from((v6_host, port_setting)), + ) } else { let port = (port_setting..port_setting + 1000) - .find(|port| port_available(host, *port)) + .find(|port| port_available(v4_host, *port)) .ok_or_else(|| anyhow!("no free TCP ports available"))?; - SocketAddr::from((host, port)) + ( + SocketAddr::from((v6_host, port)), + SocketAddr::from((v6_host, port)), + ) }; Ok(Self { - addr, + v4_addr, + v6_addr, capacity: settings.websocket_capacity, evt_notifier: Notifier::new(evt_sender), workflow_msg_notifier: Notifier::new(msg_sender), @@ -151,19 +166,29 @@ impl Server { /// Set up a new [Server] instance, which only acts as an HTTP server. #[cfg(not(feature = "websocket-notify"))] pub(crate) fn new(settings: &settings::Webserver) -> Result { - let host = IpAddr::from_str(&settings.host.to_string())?; + let v4_host = IpAddr::from_str(&settings.v4_host.to_string())?; + let v6_host = ip::parse_ip_from_uri_host(&settings.v6_host.to_string()) + .ok_or_else(|| anyhow!("unable to parse URI"))?; + let port_setting = settings.port; - let addr = if port_available(host, port_setting) { - SocketAddr::from((host, port_setting)) + let (v4_addr, v6_addr) = if port_available(v4_host, port_setting) { + ( + SocketAddr::from((v4_host, port_setting)), + SocketAddr::from((v6_host, port_setting)), + ) } else { let port = (port_setting..port_setting + 1000) - .find(|port| port_available(host, *port)) + .find(|port| port_available(v4_host, *port)) .ok_or_else(|| anyhow!("no free TCP ports available"))?; - SocketAddr::from((host, port)) + ( + SocketAddr::from((v6_host, port)), + SocketAddr::from((v6_host, port)), + ) }; Ok(Self { - addr, + v4_addr, + v6_addr, capacity: settings.websocket_capacity, sender_timeout: settings.websocket_sender_timeout, webserver_timeout: settings.timeout, @@ -228,12 +253,12 @@ impl Server { &self, module: JsonRpc, ) -> Result { - let addr = self.addr; info!( subject = "webserver.start", category = "webserver", - "webserver listening on {}", - addr + "webserver listening on {}/{}", + self.v4_addr, + self.v6_addr ); let cors = CorsLayer::new() @@ -259,25 +284,70 @@ impl Server { .timeout(self.webserver_timeout); let runtime_hdl = Handle::current(); + let listener_v4 = TcpListener::bind(&self.v4_addr).await?; + let listener_v6 = TcpListener::bind(&self.v6_addr).await?; + let (stop_hdl, server_hdl) = stop_channel(); - let server = jsonrpsee::server::Server::builder() + let svc = jsonrpsee::server::Server::builder() .custom_tokio_runtime(runtime_hdl.clone()) .set_http_middleware(middleware) .set_id_provider(Box::new(RandomStringIdProvider::new(16))) .set_message_buffer_capacity(self.capacity as u32) - .build(addr) - .await - .expect("Webserver to startup"); - - let hdl = server.start(module.into_inner()); - runtime_hdl.spawn(hdl.clone().stopped()); + .to_service_builder() + .build(module.into_inner(), stop_hdl.clone()); + + runtime_hdl.clone().spawn(async move { + loop { + let stream = select! { + result = listener_v4.accept() => { + if let Ok((stream, _remote_addr)) = result { + stream + } else { + continue + } + } + result = listener_v6.accept() => { + if let Ok((stream, _remote_addr)) = result { + stream + } else { + continue + } + } + _ = stop_hdl.clone().shutdown() => break, + }; + + let svc = svc.clone(); + let stop_hdl2 = stop_hdl.clone(); + runtime_hdl.spawn(async move { + let conn = hyper::server::conn::Http::new() + .serve_connection(stream, svc) + .with_upgrades(); + + let stopped = stop_hdl2.shutdown(); + tokio::pin!(stopped); + + let res = match future::select(conn, stopped).await { + Either::Left((conn, _)) => conn, + Either::Right((_, mut conn)) => { + debug!("graceful shutdown of HTTP connection"); + Pin::new(&mut conn).graceful_shutdown(); + conn.await + } + }; + + if let Err(err) = res { + error!(err=?err, "HTTP connection failed"); + } + }); + } + }); - Ok(hdl) + Ok(server_hdl) } } fn port_available(host: IpAddr, port: u16) -> bool { - TcpListener::bind((host.to_string(), port)).is_ok() + std::net::TcpListener::bind((host.to_string(), port)).is_ok() } #[cfg(test)] @@ -309,17 +379,47 @@ mod test { } #[homestar_runtime_proc_macro::runner_test] - fn ws_connect() { + fn ws_connect_v4() { + let TestRunner { runner, settings } = TestRunner::start(); + runner.runtime.block_on(async { + let server = Server::new(settings.node().network().webserver()).unwrap(); + let db = MemoryDb::setup_connection_pool(settings.node(), None).unwrap(); + let metrics_hdl = metrics_handle().await; + let (runner_tx, _runner_rx) = AsyncChannel::oneshot(); + let _ws_hdl = server.start(runner_tx, metrics_hdl, db).await.unwrap(); + + let ws_url = format!("ws://{}", server.v4_addr); + let http_url = format!("http://{}", server.v4_addr); + + tokio_tungstenite::connect_async(ws_url.clone()) + .await + .unwrap(); + + let client = WsClientBuilder::default().build(ws_url).await.unwrap(); + let ws_resp: serde_json::Value = client + .request(rpc::HEALTH_ENDPOINT, rpc_params![]) + .await + .unwrap(); + assert_eq!(ws_resp, serde_json::json!({"healthy": true })); + let http_resp = reqwest::get(format!("{}/health", http_url)).await.unwrap(); + assert_eq!(http_resp.status(), 200); + let http_resp = http_resp.json::().await.unwrap(); + assert_eq!(http_resp, serde_json::json!({"healthy": true })); + }); + } + + #[homestar_runtime_proc_macro::runner_test] + fn ws_connect_v6() { let TestRunner { runner, settings } = TestRunner::start(); runner.runtime.block_on(async { let server = Server::new(settings.node().network().webserver()).unwrap(); let db = MemoryDb::setup_connection_pool(settings.node(), None).unwrap(); let metrics_hdl = metrics_handle().await; let (runner_tx, _runner_rx) = AsyncChannel::oneshot(); - server.start(runner_tx, metrics_hdl, db).await.unwrap(); + let _ws_hdl = server.start(runner_tx, metrics_hdl, db).await.unwrap(); - let ws_url = format!("ws://{}", server.addr); - let http_url = format!("http://{}", server.addr); + let ws_url = format!("ws://{}", server.v6_addr); + let http_url = format!("http://{}", server.v6_addr); tokio_tungstenite::connect_async(ws_url.clone()) .await @@ -347,9 +447,9 @@ mod test { let db = MemoryDb::setup_connection_pool(settings.node(), None).unwrap(); let metrics_hdl = metrics_handle().await; let (runner_tx, _runner_rx) = AsyncChannel::oneshot(); - server.start(runner_tx, metrics_hdl, db).await.unwrap(); + let _ws_hdl = server.start(runner_tx, metrics_hdl, db).await.unwrap(); - let ws_url = format!("ws://{}", server.addr); + let ws_url = format!("ws://{}", server.v4_addr); let client1 = WsClientBuilder::default().build(ws_url).await.unwrap(); let mut sub: Subscription> = client1 @@ -423,9 +523,9 @@ mod test { let db = MemoryDb::setup_connection_pool(settings.node(), None).unwrap(); let metrics_hdl = metrics_handle().await; let (runner_tx, _runner_rx) = AsyncChannel::oneshot(); - server.start(runner_tx, metrics_hdl, db).await.unwrap(); + let _ws_hdl = server.start(runner_tx, metrics_hdl, db).await.unwrap(); - let ws_url = format!("ws://{}", server.addr); + let ws_url = format!("ws://{}", server.v4_addr); let client = WsClientBuilder::default().build(ws_url).await.unwrap(); let sub: Result>, ClientError> = client diff --git a/homestar-runtime/src/runner.rs b/homestar-runtime/src/runner.rs index a38e5e67..c8e6eec6 100644 --- a/homestar-runtime/src/runner.rs +++ b/homestar-runtime/src/runner.rs @@ -571,8 +571,9 @@ impl Runner { } /// Sequence for shutting down a [Runner], including: - /// a) RPC and runner-related channels. - /// b) Event-handler channels. + /// a) RPC (CLI) + /// b) Webserver + /// b) Event-handler channels /// c) Running workers async fn shutdown( &self, @@ -590,8 +591,9 @@ impl Runner { category = "homestar.shutdown", "shutting down webserver" ); + let _ = ws_hdl.stop(); - ws_hdl.clone().stopped().await; + ws_hdl.stopped().await; let (shutdown_sender, shutdown_receiver) = AsyncChannel::oneshot(); let _ = self @@ -741,6 +743,8 @@ impl Runner { ); // Provide workflow to network. + // + // This essentially says, I'm running this workflow Cid. self.event_sender .send_async(Event::ProvideRecord( worker.workflow_info.cid, diff --git a/homestar-runtime/src/runner/response.rs b/homestar-runtime/src/runner/response.rs index 833867f5..5fa552b3 100644 --- a/homestar-runtime/src/runner/response.rs +++ b/homestar-runtime/src/runner/response.rs @@ -101,7 +101,7 @@ impl show::ConsoleTable for AckWorkflow { } // If there are no replayed receipts, add a placeholder row. - if receipt_table_builder.count_rows() == 1 { + if receipt_table_builder.count_records() == 1 { receipt_table_builder.push_record([ "".to_string(), "".to_string(), @@ -192,7 +192,7 @@ impl show::ConsoleTable for AckNodeInfo { let mut conns_table_builder = tabled::builder::Builder::from_iter(conns); // If there are no connections, add a placeholder row. - if conns_table_builder.count_rows() == 0 { + if conns_table_builder.count_records() == 0 { conns_table_builder.push_record([ "Connections".to_string(), "".to_string(), diff --git a/homestar-runtime/src/scheduler.rs b/homestar-runtime/src/scheduler.rs index a6d0d30b..ba41aefd 100644 --- a/homestar-runtime/src/scheduler.rs +++ b/homestar-runtime/src/scheduler.rs @@ -6,10 +6,10 @@ use crate::{ db::{Connection, Database}, - workflow::{IndexedResources, Resource, Vertex}, + workflow::{self, IndexedResources, Resource, Vertex}, Db, }; -use anyhow::{anyhow, Context, Result}; +use anyhow::{anyhow, Result}; use dagga::Node; use fnv::FnvHashSet; use futures::future::BoxFuture; @@ -18,7 +18,7 @@ use homestar_wasm::io::Arg; use homestar_workflow::LinkMap; use indexmap::IndexMap; use libipld::Cid; -use std::{ops::ControlFlow, str::FromStr, sync::Arc}; +use std::{str::FromStr, sync::Arc}; use tokio::sync::RwLock; use tracing::debug; @@ -36,6 +36,8 @@ pub(crate) struct ExecutionGraph<'a> { /// A built-up [Dag] [Schedule] of batches. /// /// [Dag]: dagga::Dag + /// + pub(crate) awaiting: workflow::Promises, pub(crate) schedule: Schedule<'a>, /// Vector of [resources] to fetch for executing functions in [Workflow]. /// @@ -66,6 +68,9 @@ pub(crate) struct TaskScheduler<'a> { /// [Tasks]: homestar_invocation::Task pub(crate) run: Schedule<'a>, + /// Set of Cids to possibly fetch from the DHT. + pub(crate) promises_to_resolve: Arc>, + /// Step/batch to resume from. pub(crate) resume_step: Option, @@ -95,7 +100,6 @@ impl<'a> TaskScheduler<'a> { /// [Receipts]: crate::Receipt /// [Swarm]: crate::network::swarm /// [Workflow]: homestar_workflow::Workflow - #[allow(unknown_lints, clippy::needless_pass_by_ref_mut)] pub(crate) async fn init( mut graph: Arc>, conn: &mut Connection, @@ -105,103 +109,105 @@ impl<'a> TaskScheduler<'a> { F: FnOnce(FnvHashSet) -> BoxFuture<'a, Result>>>, { let mut_graph = Arc::make_mut(&mut graph); - let schedule: &mut Schedule<'a> = mut_graph.schedule.as_mut(); + let schedule = &mut mut_graph.schedule; let schedule_length = schedule.len(); - let mut resources_to_fetch = vec![]; - let linkmap = LinkMap::>::default(); - let resume = 'resume: { - for (idx, vec) in schedule.iter().enumerate().rev() { - let folded_pointers = vec.iter().try_fold(vec![], |mut ptrs, node| { + // Gather all CIDs to resolve + let mut cids_to_resolve = Vec::new(); + // Gather all resources to fetch + let mut resources_to_fetch = Vec::new(); + let mut linkmap = LinkMap::>::default(); + + let mut last_idx = 0; + for (idx, vec) in schedule.iter().enumerate().rev() { + let pointers: Result, _> = vec + .iter() + .map(|node| { let cid = Cid::from_str(node.name())?; - mut_graph - .indexed_resources - .get(&cid) - .map(|resource| { - resource.iter().for_each(|rsc| { - resources_to_fetch.push((cid, rsc)); - }); - ptrs.push(Pointer::new(cid)); - }) - .ok_or_else(|| anyhow!("resource not found for instruction {cid}"))?; - Ok::<_, anyhow::Error>(ptrs) - }); - - if let Ok(pointers) = folded_pointers { - match Db::find_instruction_pointers(&pointers, conn) { - Ok(found) => { - let linkmap = found.iter().fold(linkmap.clone(), |mut map, receipt| { - if let Some(idx) = resources_to_fetch - .iter() - .position(|(cid, _rsc)| cid == &receipt.instruction().cid()) - { - resources_to_fetch.swap_remove(idx); - } - - map.insert(receipt.instruction().cid(), receipt.output_as_arg()); - map - }); - - if found.len() == vec.len() { - break 'resume ControlFlow::Break((idx + 1, linkmap)); - } else { - continue; - } - } - Err(_) => { - debug!( - subject = "receipt.db.check", - category = "scheduler.run", - "receipt not available in the database" - ); - continue; + if let Some(resource) = mut_graph.indexed_resources.get(&cid) { + for rsc in resource.iter() { + resources_to_fetch.push((cid, rsc.clone())); } + } else { + return Err(anyhow!("Resource not found for instruction {cid}")); + } + Ok(Pointer::new(cid)) + }) + .collect(); + + if let Ok(pointers) = pointers { + if let Ok(found) = Db::find_instruction_pointers(&pointers, conn) { + for receipt in found.iter() { + resources_to_fetch.retain(|(cid, _)| *cid != receipt.instruction().cid()); + linkmap.insert(receipt.instruction().cid(), receipt.output_as_arg()); + } + + if found.len() == vec.len() { + last_idx = idx + 1; + break; } } else { - continue; + debug!("Receipt not available in the database"); } } - ControlFlow::Continue(()) - }; - - let resources_to_fetch: FnvHashSet = resources_to_fetch - .into_iter() - .map(|(_, rsc)| rsc.to_owned()) - .collect(); - - let fetched = fetch_fn(resources_to_fetch) - .await - .with_context(|| "unable to fetch resources")?; + } - match resume { - ControlFlow::Break((idx, linkmap)) => { - let pivot = schedule.split_off(idx); - let step = if idx >= schedule_length || idx == 0 { - None - } else { - Some(idx) - }; - - Ok(SchedulerContext { - scheduler: Self { - linkmap: Arc::new(linkmap.into()), - ran: Some(schedule.to_vec()), - run: pivot, - resume_step: step, - resources: Arc::new(fetched.into()), + // Add all CIDs not resolved to the list of CIDs to resolve. + cids_to_resolve.extend(resources_to_fetch.iter().map(|(cid, _)| *cid)); + + // Fetch resources from the DHT as a unique set. + let resources_to_fetch: FnvHashSet = + resources_to_fetch.into_iter().map(|(_, rsc)| rsc).collect(); + let fetched_resources = fetch_fn(resources_to_fetch).await?; + + // Filter out promises/awaits outside of the workflow that + // have been already resolved and store them in our in-memory + // cache (linkmap). + let promises_as_pointers = + mut_graph + .awaiting + .iter() + .fold( + vec![], + |mut acc, (in_or_out_flow, cid)| match in_or_out_flow { + workflow::Origin::InFlow => acc, + workflow::Origin::OutFlow => { + acc.push(Pointer::new(*cid)); + acc + } }, - }) + ); + if let Ok(found) = Db::find_instruction_pointers(&promises_as_pointers, conn) { + for receipt in found.iter() { + cids_to_resolve.retain(|cid| *cid != receipt.instruction().cid()); + linkmap.insert(receipt.instruction().cid(), receipt.output_as_arg()); } - _ => Ok(SchedulerContext { - scheduler: Self { - linkmap: Arc::new(linkmap.into()), - ran: None, - run: schedule.to_vec(), - resume_step: None, - resources: Arc::new(fetched.into()), - }, - }), } + + // Convert the list of CIDs to resolve into a unique set. + let promises_to_resolve: FnvHashSet = cids_to_resolve.into_iter().collect(); + + let (ran, run, resume_step) = if last_idx > 0 { + let pivot = schedule.split_off(last_idx); + if last_idx >= schedule_length || last_idx == 0 { + (Some(schedule.to_vec()), pivot, None) + } else { + (Some(schedule.to_vec()), pivot, Some(last_idx)) + } + } else { + (None, schedule.to_vec(), None) + }; + + Ok(SchedulerContext { + scheduler: Self { + linkmap: Arc::new(RwLock::new(linkmap)), + promises_to_resolve: Arc::new(promises_to_resolve), + ran, + run, + resume_step, + resources: Arc::new(fetched_resources.into()), + }, + }) } /// Get the number of tasks that have already ran in the [Workflow]. diff --git a/homestar-runtime/src/settings.rs b/homestar-runtime/src/settings.rs index e4107694..94a0c03d 100644 --- a/homestar-runtime/src/settings.rs +++ b/homestar-runtime/src/settings.rs @@ -160,9 +160,12 @@ pub(crate) struct Rpc { #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] #[serde(default)] pub(crate) struct Webserver { - /// Webserver host address. + /// V4 Webserver host address. #[serde(with = "http_serde::uri")] - pub(crate) host: Uri, + pub(crate) v4_host: Uri, + /// V6 (fallback) Webserver host address. + #[serde(with = "http_serde::uri")] + pub(crate) v6_host: Uri, /// Webserver-server port. pub(crate) port: u16, /// Webserver timeout. @@ -300,7 +303,8 @@ impl Default for Rpc { impl Default for Webserver { fn default() -> Self { Self { - host: Uri::from_static("127.0.0.1"), + v4_host: Uri::from_static("127.0.0.1"), + v6_host: Uri::from_static("[::1]"), port: 1337, timeout: Duration::new(120, 0), websocket_capacity: 2048, diff --git a/homestar-runtime/src/settings/libp2p_config.rs b/homestar-runtime/src/settings/libp2p_config.rs index 6487a8cc..dc245f73 100644 --- a/homestar-runtime/src/settings/libp2p_config.rs +++ b/homestar-runtime/src/settings/libp2p_config.rs @@ -61,6 +61,8 @@ pub(crate) struct Quic { #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] #[serde(default)] pub(crate) struct Dht { + /// Enable resolve receipts in background. + pub(crate) enable_resolve_receipts_in_background: bool, /// Timeout for p2p receipt record lookups in milliseconds. #[serde_as(as = "DurationMilliSeconds")] pub(crate) p2p_receipt_timeout: Duration, @@ -175,6 +177,7 @@ impl Libp2p { impl Default for Dht { fn default() -> Self { Self { + enable_resolve_receipts_in_background: true, p2p_receipt_timeout: Duration::from_millis(500), p2p_workflow_info_timeout: Duration::from_millis(500), p2p_provider_timeout: Duration::from_millis(10000), diff --git a/homestar-runtime/src/tasks.rs b/homestar-runtime/src/tasks.rs index f072a681..fcaa114b 100644 --- a/homestar-runtime/src/tasks.rs +++ b/homestar-runtime/src/tasks.rs @@ -1,7 +1,6 @@ //! Module for working with task-types and task-specific functionality. use anyhow::{anyhow, Result}; -use async_trait::async_trait; use enum_assoc::Assoc; use std::path::PathBuf; @@ -24,7 +23,6 @@ pub(crate) enum RegisteredTasks { /// Trait for loading files for different task-types directly. #[allow(dead_code)] -#[async_trait] pub(crate) trait FileLoad { /// Load file asynchronously. async fn load(file: PathBuf) -> Result> { diff --git a/homestar-runtime/src/tasks/wasm.rs b/homestar-runtime/src/tasks/wasm.rs index f8480671..5b51e5df 100644 --- a/homestar-runtime/src/tasks/wasm.rs +++ b/homestar-runtime/src/tasks/wasm.rs @@ -3,7 +3,6 @@ //! [tasks]: homestar_invocation::Task use super::FileLoad; -use async_trait::async_trait; use homestar_invocation::task::instruction::Args; use homestar_wasm::{ io::{Arg, Output}, @@ -37,7 +36,6 @@ impl WasmContext { } } -#[async_trait] impl FileLoad for WasmContext {} #[cfg(test)] diff --git a/homestar-runtime/src/test_utils/db.rs b/homestar-runtime/src/test_utils/db.rs index 66223198..2bd06d1e 100644 --- a/homestar-runtime/src/test_utils/db.rs +++ b/homestar-runtime/src/test_utils/db.rs @@ -74,7 +74,7 @@ impl Database for MemoryDb { .and_then(|mut conn| ConnectionCustomizer.on_acquire(&mut conn))?; let pool = r2d2::Pool::builder() - .max_size(3) + .max_size(10) .connection_customizer(Box::new(ConnectionCustomizer)) .build(manager) .expect("DATABASE_URL must be set to an SQLite DB file"); diff --git a/homestar-runtime/src/test_utils/proc_macro/src/lib.rs b/homestar-runtime/src/test_utils/proc_macro/src/lib.rs index 909d826f..0a6734c7 100644 --- a/homestar-runtime/src/test_utils/proc_macro/src/lib.rs +++ b/homestar-runtime/src/test_utils/proc_macro/src/lib.rs @@ -43,6 +43,7 @@ pub fn db_async_test(_attr: TokenStream, item: TokenStream) -> TokenStream { impl TestSettings { fn load() -> crate::Settings { let mut settings = crate::Settings::load().unwrap(); + settings.node.network.libp2p.dht.enable_resolve_receipts_in_background = false; settings.node.db.url = Some(format!("{}.db", #func_name_as_string)); settings } @@ -98,6 +99,7 @@ pub fn runner_test(_attr: TokenStream, item: TokenStream) -> TokenStream { impl TestRunner { fn start() -> TestRunner { let mut settings = crate::Settings::load().unwrap(); + settings.node.network.libp2p.dht.enable_resolve_receipts_in_background = false; settings.node.network.webserver.port = port_selector::random_free_port().unwrap(); settings.node.network.webserver.timeout = std::time::Duration::from_secs(5); settings.node.network.webserver.websocket_sender_timeout = std::time::Duration::from_millis(500); diff --git a/homestar-runtime/src/test_utils/worker_builder.rs b/homestar-runtime/src/test_utils/worker_builder.rs index d4278c33..5dde1e90 100644 --- a/homestar-runtime/src/test_utils/worker_builder.rs +++ b/homestar-runtime/src/test_utils/worker_builder.rs @@ -107,7 +107,7 @@ impl<'a> WorkerBuilder<'a> { name: Some(workflow_cid.to_string()), workflow, workflow_settings: workflow::Settings::default(), - network_settings: settings::Dht::default(), + network_settings: settings.network.libp2p.dht, } } @@ -177,6 +177,12 @@ impl<'a> WorkerBuilder<'a> { self.workflow.len() } + /// Get the [Workflow] from the builder state. + #[allow(dead_code)] + pub(crate) fn workflow(&self) -> Workflow<'a, Arg> { + self.workflow.clone() + } + /// Get the in-memory [db] from the builder state. /// /// [db]: MemoryDb diff --git a/homestar-runtime/src/worker.rs b/homestar-runtime/src/worker.rs index b7615f1a..a5f2e31e 100644 --- a/homestar-runtime/src/worker.rs +++ b/homestar-runtime/src/worker.rs @@ -7,14 +7,9 @@ #[cfg(feature = "websocket-notify")] use crate::event_handler::event::Replay; use crate::{ - channel::{AsyncChannel, AsyncChannelSender}, + channel::AsyncChannelSender, db::Database, - event_handler::{ - event::{Captured, QueryRecord}, - swarm_event::{FoundEvent, ResponseEvent}, - Event, - }, - network::swarm::CapsuleTag, + event_handler::{event::Captured, Event}, receipt::metadata::{REPLAYED_KEY, WORKFLOW_KEY, WORKFLOW_NAME_KEY}, runner::{ModifiedSet, RunningTaskSet}, scheduler::ExecutionGraph, @@ -29,24 +24,27 @@ use faststr::FastStr; use fnv::FnvHashSet; use futures::{future::BoxFuture, FutureExt}; use homestar_invocation::{ - authority::UcanPrf, bail, error::ResolveError, ipld::DagCbor, receipt::metadata::OP_KEY, task, - Pointer, Receipt as InvocationReceipt, + authority::UcanPrf, ipld::DagCbor, receipt::metadata::OP_KEY, task, Pointer, + Receipt as InvocationReceipt, }; use homestar_wasm::{ io::{Arg, Output}, wasmtime::State, }; -use homestar_workflow::{LinkMap, Workflow}; +use homestar_workflow::Workflow; use indexmap::IndexMap; use libipld::{Cid, Ipld}; use std::{collections::BTreeMap, sync::Arc}; -use tokio::{ - sync::RwLock, - task::JoinSet, - time::{timeout_at, Instant}, -}; +use tokio::task::JoinSet; use tracing::{debug, error, info}; +mod poller; +mod resolver; +use poller::Poll; +use resolver::Resolver; + +use self::resolver::DHTResolver; + /// [JoinSet] of tasks run by a [Worker]. #[allow(dead_code)] pub(crate) type TaskSet = JoinSet>; @@ -158,6 +156,7 @@ where /// /// [Instruction]: homestar_invocation::task::Instruction /// [Swarm]: crate::network::swarm + /// [LinkMap]: homestar_workflow::LinkMap pub(crate) async fn run(self, running_tasks: Arc, fetch_fn: F) -> Result<()> where F: FnOnce(FnvHashSet) -> BoxFuture<'a, Result>>>, @@ -169,7 +168,50 @@ where ) .await { - Ok(ctx) => self.run_queue(ctx.scheduler, running_tasks).await, + Ok(ctx) => { + let promises_to_resolve = ctx.scheduler.promises_to_resolve.clone(); + let resolver = DHTResolver::new( + promises_to_resolve, + self.network_settings.p2p_receipt_timeout, + self.workflow_info.cid, + ); + if self.network_settings.enable_resolve_receipts_in_background + && self.network_settings.p2p_receipt_timeout.as_millis() > 0 + { + info!( + subject = "worker.resolve_receipts", + category = "worker.run", + workflow_cid = self.workflow_info.cid.to_string(), + "resolving receipts in the background" + ); + poller::poll( + resolver, + self.db.clone(), + self.event_sender.clone(), + Some(ctx.scheduler.linkmap.clone()), + ) + .await; + } + + // Set the workflow status to running. + let conn = &mut self.db.conn()?; + if ctx.scheduler.run_length() > 0 { + Db::set_workflow_status( + self.workflow_info.cid, + workflow::Status::Running, + conn, + )?; + } else { + Db::set_workflow_status( + self.workflow_info.cid, + workflow::Status::Completed, + conn, + )?; + } + + // Run the queue of tasks. + self.run_queue(ctx.scheduler, running_tasks).await + } Err(err) => { error!(subject = "worker.init.err", category = "worker.run", @@ -186,114 +228,6 @@ where mut scheduler: TaskScheduler<'a>, running_tasks: Arc, ) -> Result<()> { - async fn insert_into_map(map: Arc>>, key: Cid, value: T) - where - T: Clone, - { - map.write() - .await - .entry(key) - .or_insert_with(|| value.clone()); - } - - async fn resolve_cid( - cid: Cid, - workflow_cid: Cid, - network_settings: Arc, - linkmap: Arc>>>, - resources: Arc>>>, - db: impl Database, - event_sender: Arc>, - ) -> Result, ResolveError> { - info!( - subject = "worker.resolve_cid", - category = "worker.run", - workflow_cid = workflow_cid.to_string(), - cid = cid.to_string(), - "attempting to resolve cid in workflow" - ); - - if let Some(result) = linkmap.read().await.get(&cid) { - debug!( - subject = "worker.resolve_cid", - category = "worker.run", - cid = cid.to_string(), - "found CID in in-memory linkmap" - ); - - Ok(result.to_owned()) - } else if let Some(bytes) = resources.read().await.get(&Resource::Cid(cid)) { - debug!( - subject = "worker.resolve_cid", - category = "worker.run", - cid = cid.to_string(), - "found CID in map of resources" - ); - - Ok(task::Result::Ok(Arg::Ipld(Ipld::Bytes(bytes.to_vec())))) - } else { - let conn = &mut db.conn()?; - match Db::find_instruction_by_cid(cid, conn) { - Ok(found) => Ok(found.output_as_arg()), - Err(_) => { - debug!( - subject = "worker.resolve_cid", - category = "worker.run", - "no related instruction receipt found in the DB" - ); - - let (tx, rx) = AsyncChannel::oneshot(); - let _ = event_sender - .send_async(Event::FindRecord(QueryRecord::with( - cid, - CapsuleTag::Receipt, - Some(tx), - ))) - .await; - - let found = match timeout_at( - Instant::now() + network_settings.p2p_receipt_timeout, - rx.recv_async(), - ) - .await - { - Ok(Ok(ResponseEvent::Found(Ok(FoundEvent::Receipt(found))))) => found, - Ok(Ok(ResponseEvent::Found(Err(err)))) => { - bail!(ResolveError::UnresolvedCid(format!( - "failure in attempting to find event: {err}" - ))) - } - Ok(Ok(_)) => bail!(ResolveError::UnresolvedCid( - "wrong or unexpected event message received".to_string(), - )), - Ok(Err(err)) => bail!(ResolveError::UnresolvedCid(format!( - "unexpected error while trying to resolve cid: {err}", - ))), - Err(_) => bail!(ResolveError::UnresolvedCid( - "timed out while trying to resolve cid".to_string(), - )), - }; - - let receipt = Db::commit_receipt(workflow_cid, found.clone().receipt, conn) - .unwrap_or(found.clone().receipt); - let found_result = receipt.output_as_arg(); - - // Store the result in the linkmap for use in next iterations. - insert_into_map(linkmap.clone(), cid, found_result.clone()).await; - - // TODO Check this event is sent when we've updated the receipt - // retrieval mechanism. - #[cfg(feature = "websocket-notify")] - let _ = event_sender - .send_async(Event::StoredRecord(FoundEvent::Receipt(found))) - .await; - - Ok(found_result) - } - } - } - } - // Replay previous receipts if subscriptions are on. #[cfg(feature = "websocket-notify")] { @@ -377,51 +311,46 @@ where let mut wasm_ctx = WasmContext::new(state)?; let db = self.db.clone(); - let network_settings = self.network_settings.clone(); let linkmap = scheduler.linkmap.clone(); let resources = scheduler.resources.clone(); - let event_sender = self.event_sender.clone(); let workflow_cid = self.workflow_info.cid(); let resolved = args.resolve(move |cid| { - resolve_cid( - cid, - workflow_cid, - network_settings.clone(), - linkmap.clone(), - resources.clone(), - db.clone(), - event_sender.clone(), - ) - .boxed() + info!( + subject = "worker.resolve_cid", + category = "worker.run", + workflow_cid = workflow_cid.to_string(), + cid = cid.to_string(), + "attempting to resolve cid in workflow" + ); + + cid.resolve(linkmap.clone(), resources.clone(), db.clone()) + .boxed() }); let handle = task_set.spawn(async move { - let resolved = match resolved.await { - Ok(inst_result) => inst_result, + match resolved.await { + Ok(inst_result) => { + match wasm_ctx.run(wasm, &fun, inst_result).await { + Ok(output) => Ok(( + output, + instruction_ptr, + invocation_ptr, + receipt_meta, + additional_meta)), + Err(err) => Err( + anyhow!("cannot execute wasm module: {:#?}", err)) + .with_context(|| { + format!("not able to run fn {fun} for cid: {instruction_ptr}, in workflow {workflow_cid}") + }), + } + }, Err(err) => { - error!(subject = "worker.resolve_cid.err", - category = "worker.run", - err=?err, - "error resolving cid"); - return Err(anyhow!("error resolving cid: {err}")) + Err(anyhow!("error resolving cid: {:#?}", err)) .with_context(|| { - format!("could not spawn task for cid: {workflow_cid}") - }); + format!("not able to resolve instruction: {instruction_ptr}, in workflow {workflow_cid}") + }) } - }; - match wasm_ctx.run(wasm, &fun, resolved).await { - Ok(output) => Ok(( - output, - instruction_ptr, - invocation_ptr, - receipt_meta, - additional_meta)), - Err(err) => Err( - anyhow!("cannot execute wasm module: {err}")) - .with_context(|| { - format!("not able to run fn {fun} for cid: {instruction_ptr}, in workflow {workflow_cid}") - }), } }); handles.push(handle); @@ -442,17 +371,21 @@ where { Ok(Ok(data)) => data, Ok(Err(err)) => { - error!(subject = "worker.run.task.err", - category = "worker.run", - err=?err, - "error in running task"); + error!( + subject = "worker.run.task.err", + category = "worker.run", + err = format!("{:#?}", err), + "error in running task" + ); break; } Err(err) => { - error!(subject = "worker.run.task.err", - category = "worker.run", - err=?err, - "error in running task"); + error!( + subject = "worker.run.task.err", + category = "worker.run", + err = format!("{:#?}", err), + "error in running task" + ); break; } }; @@ -483,14 +416,16 @@ where .set_progress_count(std::cmp::max(current_progress_count, step as u32)) }; + let instruction_cid = receipt.instruction().cid(); let stored_receipt = Db::commit_receipt(self.workflow_info.cid, receipt, &mut self.db.conn()?)?; debug!( subject = "db.commit_receipt", category = "worker.run", - cid = self.workflow_info.cid.to_string(), - "commited to database" + workflow_cid = self.workflow_info.cid.to_string(), + instruction_cid = instruction_cid.to_string(), + "committed to database" ); let _ = self @@ -503,6 +438,10 @@ where .await; } } + + // Set the workflow status to `completed` + let conn = &mut self.db.conn()?; + Db::set_workflow_status(self.workflow_info.cid, workflow::Status::Completed, conn)?; Ok(()) } } @@ -522,8 +461,9 @@ where mod test { use super::*; use crate::{ + event_handler::event::QueryRecord, test_utils::{self, db::MemoryDb, WorkerBuilder}, - workflow::IndexedResources, + workflow::{IndexedResources, Status}, }; use homestar_invocation::{ task::{instruction::RunInstruction, Resources}, @@ -532,12 +472,22 @@ mod test { #[homestar_runtime_proc_macro::db_async_test] fn initialize_worker() { - let settings = TestSettings::load(); + let mut settings = TestSettings::load(); + + // mod test settings to turn on background resolve + settings + .node + .network + .libp2p + .dht + .enable_resolve_receipts_in_background = true; + settings.node.network.libp2p.dht.p2p_receipt_timeout = std::time::Duration::from_millis(1); let (tx, rx) = test_utils::event::setup_event_channel(settings.clone().node); let builder = WorkerBuilder::new(settings.node).with_event_sender(tx); let fetch_fn = builder.fetch_fn(); + let workflow = builder.workflow(); let db = builder.db(); let worker = builder.build().await; let workflow_cid = worker.workflow_info.cid; @@ -567,12 +517,23 @@ mod test { let mut get_providers = false; let mut captured_receipt = false; let mut receipts_cnt = 0; + let mut find_record_cnt = 0; while let Ok(event) = rx.recv_async().await { match event { + // Find workflow-info and receipt-lookup records Event::FindRecord(QueryRecord { cid, .. }) => { find_record = true; - assert_eq!(cid, worker_workflow_cid) + assert!( + (cid == worker_workflow_cid) + || (workflow + .clone() + .tasks() + .into_iter() + .any(|t| cid == t.instruction_cid().unwrap())) + ); + + find_record_cnt += 1; } Event::GetProviders(QueryRecord { cid, .. }) => { get_providers = true; @@ -593,16 +554,19 @@ mod test { } assert!(find_record); + assert_eq!(find_record_cnt, 2); assert!(get_providers); assert!(captured_receipt); assert_eq!(receipts_cnt, 2); let (_, workflow_info) = MemoryDb::get_workflow_info(workflow_cid, &mut conn).unwrap(); - assert_eq!(workflow_info.num_tasks, 2); assert_eq!(workflow_info.cid, workflow_cid); assert_eq!(workflow_info.progress.len(), 2); assert_eq!(workflow_info.resources.len(), 2); + + let workflow_stored = MemoryDb::select_workflow(workflow_cid, &mut conn).unwrap(); + assert_eq!(workflow_stored.status, Status::Completed); } #[homestar_runtime_proc_macro::db_async_test] @@ -730,11 +694,23 @@ mod test { assert_eq!(workflow_info.progress.len(), 2); assert_eq!(wf_info.progress_count, 2); assert_eq!(wf_info.progress_count, workflow_info.progress_count); + + let workflow_stored = MemoryDb::select_workflow(workflow_cid, &mut conn).unwrap(); + assert_eq!(workflow_stored.status, Status::Completed); } #[homestar_runtime_proc_macro::db_async_test] fn initialize_worker_with_all_receipted_instruction() { - let settings = TestSettings::load(); + let mut settings = TestSettings::load(); + + // mod test settings to turn on background resolve + settings + .node + .network + .libp2p + .dht + .enable_resolve_receipts_in_background = true; + settings.node.network.libp2p.dht.p2p_receipt_timeout = std::time::Duration::from_millis(1); let config = Resources::default(); let (instruction1, instruction2, _) = @@ -835,7 +811,6 @@ mod test { assert_eq!(workflow_info.num_tasks, 2); assert_eq!(workflow_info.cid, workflow_cid); assert_eq!(workflow_info.progress.len(), 2); - - assert!(rx.try_recv().is_err()) + assert!(rx.try_recv().is_err()); } } diff --git a/homestar-runtime/src/worker/poller.rs b/homestar-runtime/src/worker/poller.rs new file mode 100644 index 00000000..7f428fd8 --- /dev/null +++ b/homestar-runtime/src/worker/poller.rs @@ -0,0 +1,140 @@ +//! Poller for workflow execution on pending tasks. + +use crate::{channel::AsyncChannelSender, db::Database, event_handler::Event}; +use anyhow::Result; +use homestar_invocation::task; +use homestar_wasm::io::Arg; +use std::{future::Future, sync::Arc, time::Duration}; +use tokio::{runtime::Handle, sync::RwLock}; + +type LinkMap = Arc>>>; + +/// Poller context for working with state. +pub(crate) struct Poller { + pub(crate) db: DB, + pub(crate) event_sender: Arc>, + pub(crate) linkmap: Option, +} + +/// Poll (once) eagerly when called (in the background). +pub(crate) async fn poll + Send + 'static, DB: Database + 'static>( + actor: P, + db: DB, + event_sender: Arc>, + linkmap: Option, +) { + let poller = Poller::new(db, event_sender, linkmap); + let handle = Handle::current(); + handle.spawn(async move { + let _ = actor.poll(&poller).await; + }); +} + +/// Start a poller at a given interval which runs in the background. +#[allow(dead_code)] +pub(crate) async fn poll_at_interval< + P: Poll + Send + Sync + Clone + 'static, + DB: Database + 'static, +>( + actor: P, + db: DB, + event_sender: Arc>, + interval: Duration, + linkmap: Option, +) { + let mut interval = tokio::time::interval(interval); + let poller = Arc::new(Poller::new(db, event_sender, linkmap)); + let handle = Handle::current(); + handle.spawn(async move { + loop { + interval.tick().await; + let poller_clone = Arc::clone(&poller); + let _ = actor.poll(Arc::as_ref(&poller_clone)).await; + } + }); +} + +impl Poller +where + DB: Database, +{ + /// Create a new [Poller]. + fn new(db: DB, event_sender: Arc>, linkmap: Option) -> Self { + Self { + db, + event_sender, + linkmap, + } + } +} + +/// Trait for polling a resource. +pub(crate) trait Poll +where + DB: Database, +{ + /// Poll for work. + fn poll(&self, ctx: &Poller) -> impl Future> + Send; +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::{ + channel::{AsyncChannel, AsyncChannelSender}, + test_utils::db::MemoryDb, + }; + + #[derive(Debug, Clone)] + struct TestResolver(AsyncChannelSender); + + impl Poll for TestResolver + where + DB: Database, + { + async fn poll(&self, _ctx: &Poller) -> Result<()> { + let _ = self.0.send_async(1).await; + Ok(()) + } + } + + #[homestar_runtime_proc_macro::db_async_test] + async fn polls_once() { + let settings = TestSettings::load(); + let (tx, rx) = AsyncChannel::with(1); + let db = MemoryDb::setup_connection_pool(settings.node(), None).unwrap(); + poll( + TestResolver(tx), + db, + Arc::new(AsyncChannel::with(1).0), + None, + ) + .await; + + let msg = rx.recv_async().await.unwrap(); + assert_eq!(msg, 1); + assert!(rx.try_recv().is_err()) + } + + #[homestar_runtime_proc_macro::db_async_test] + async fn polls_at_interval() { + let settings = TestSettings::load(); + let (tx, rx) = AsyncChannel::with(1); + let db = MemoryDb::setup_connection_pool(settings.node(), None).unwrap(); + poll_at_interval( + TestResolver(tx), + db, + Arc::new(AsyncChannel::with(1).0), + Duration::from_millis(10), + None, + ) + .await; + + tokio::time::sleep(Duration::from_millis(20)).await; + + let msg1 = rx.recv_async().await.unwrap(); + assert_eq!(msg1, 1); + let msg2 = rx.recv_async().await.unwrap(); + assert_eq!(msg2, 1); + } +} diff --git a/homestar-runtime/src/worker/resolver.rs b/homestar-runtime/src/worker/resolver.rs new file mode 100644 index 00000000..8629cfa5 --- /dev/null +++ b/homestar-runtime/src/worker/resolver.rs @@ -0,0 +1,169 @@ +use super::{poller::Poller, Poll}; +use crate::{ + channel::AsyncChannel, + db::Database, + event_handler::{ + event::QueryRecord, + swarm_event::{FoundEvent, ResponseEvent}, + Event, + }, + network::swarm::CapsuleTag, + workflow::Resource, + Db, +}; +use anyhow::{bail, Result}; +use fnv::FnvHashSet; +use homestar_invocation::{error::ResolveError, task}; +use homestar_wasm::io::Arg; +use homestar_workflow::LinkMap; +use indexmap::IndexMap; +use libipld::{Cid, Ipld}; +use std::{sync::Arc, time::Duration}; +use tokio::{ + sync::RwLock, + time::{timeout_at, Instant}, +}; +use tracing::debug; + +pub(crate) trait Resolver { + async fn resolve( + self, + linkmap: Arc>>>, + resources: Arc>>>, + db: impl Database, + ) -> Result, ResolveError>; +} + +impl Resolver for Cid { + async fn resolve( + self, + linkmap: Arc>>>, + resources: Arc>>>, + db: impl Database, + ) -> Result, ResolveError> { + if let Some(result) = linkmap.read().await.get(&self) { + debug!( + subject = "worker.resolve_cid", + category = "worker.run", + cid = self.to_string(), + "found CID in in-memory linkmap" + ); + + Ok(result.to_owned()) + } else if let Some(bytes) = resources.read().await.get(&Resource::Cid(self)) { + debug!( + subject = "worker.resolve_cid", + category = "worker.run", + cid = self.to_string(), + "found CID in map of resources" + ); + + Ok(task::Result::Ok(Arg::Ipld(Ipld::Bytes(bytes.to_vec())))) + } else { + let conn = &mut db.conn()?; + match Db::find_instruction_by_cid(self, conn) { + Ok(found) => Ok(found.output_as_arg()), + Err(_) => { + debug!( + subject = "worker.resolve_cid", + category = "worker.run", + cid = self.to_string(), + "no related instruction receipt found in the DB" + ); + Err(ResolveError::UnresolvedCid((self).to_string())) + } + } + } + } +} + +/// A resolver for CIDs that may be available on the DHT. +pub(crate) struct DHTResolver { + cids: Arc>, + p2p_receipt_timeout: Duration, + workflow_cid: Cid, +} + +impl DHTResolver { + /// Create a new [DHTResolver]. + pub(crate) fn new( + cids: Arc>, + p2p_receipt_timeout: Duration, + workflow_cid: Cid, + ) -> Self { + Self { + cids, + p2p_receipt_timeout, + workflow_cid, + } + } +} + +impl Poll for DHTResolver +where + DB: Database, +{ + async fn poll(&self, ctx: &Poller) -> Result<()> { + for cid in self.cids.iter() { + let (tx, rx) = AsyncChannel::oneshot(); + + let _ = ctx + .event_sender + .send_async(Event::FindRecord(QueryRecord::with( + *cid, + CapsuleTag::Receipt, + Some(tx), + ))) + .await; + + let found = match timeout_at(Instant::now() + self.p2p_receipt_timeout, rx.recv_async()) + .await + { + Ok(Ok(ResponseEvent::Found(Ok(FoundEvent::Receipt(found))))) => found, + Ok(Ok(ResponseEvent::Found(Err(err)))) => { + bail!(ResolveError::UnresolvedCid(format!( + "failure in attempting to find event: {err}" + ))) + } + Ok(Ok(_)) => bail!(ResolveError::UnresolvedCid( + "wrong or unexpected event message received".to_string(), + )), + Ok(Err(err)) => bail!(ResolveError::UnresolvedCid(format!( + "unexpected error while trying to resolve cid: {err}", + ))), + Err(_) => bail!(ResolveError::UnresolvedCid( + "timed out while trying to resolve cid".to_string(), + )), + }; + + let conn = &mut ctx.db.conn()?; + + let receipt = Db::commit_receipt(self.workflow_cid, found.clone().receipt, conn) + .unwrap_or(found.clone().receipt); + + debug!( + subject = "db.commit_receipt", + category = "dht.resolver", + cid_resolved = cid.to_string(), + receipt_cid = receipt.cid().to_string(), + "committed to database" + ); + + let found_result = receipt.output_as_arg(); + + // Store the result in the linkmap for use in next iterations. + if let Some(ref m) = ctx.linkmap { + m.write().await.entry(*cid).or_insert_with(|| found_result); + } + + // retrieval mechanism. + #[cfg(feature = "websocket-notify")] + let _ = ctx + .event_sender + .send_async(Event::StoredRecord(FoundEvent::Receipt(found))) + .await; + } + + Ok(()) + } +} diff --git a/homestar-runtime/src/workflow.rs b/homestar-runtime/src/workflow.rs index c06f90a1..59acf081 100644 --- a/homestar-runtime/src/workflow.rs +++ b/homestar-runtime/src/workflow.rs @@ -37,8 +37,8 @@ mod info; pub mod settings; pub(crate) use error::Error; -pub use info::WORKFLOW_TAG; pub(crate) use info::{Info, Stored, StoredReceipt}; +pub use info::{Status, StatusMapping, WORKFLOW_TAG}; #[allow(unused_imports)] pub use settings::Settings; @@ -80,6 +80,7 @@ impl fmt::Display for Resource { #[derive(Debug, Clone)] pub(crate) struct AOTContext<'a> { dag: Dag<'a>, + awaiting: Promises, indexed_resources: IndexedResources, } @@ -110,6 +111,37 @@ pub(crate) struct Vertex<'a> { pub(crate) invocation: Pointer, } +/// [Origin] of a [Cid] being in/not-in a [Workflow] itself. +#[derive(Debug, Clone, PartialEq)] +pub(crate) enum Origin { + /// [Cid] awaits an instruction/task in the [Workflow]. + InFlow, + /// [Cid] awaits an instruction/task outside of the [Workflow]. + OutFlow, +} + +/// [Workflow] promises being awaited on. +#[derive(Debug, Clone, PartialEq, Default)] +pub(crate) struct Promises { + pub(crate) in_flow: Vec, + pub(crate) out_flow: Vec, +} + +impl Promises { + /// Create a new [Promises] object from a given pair of + /// in-flow and out-flow [Cid]s. + pub(crate) fn new(in_flow: Vec, out_flow: Vec) -> Promises { + Promises { in_flow, out_flow } + } + + /// Return an iterator over the [Promises] in-flow and out-flow [Cid]s. + pub(crate) fn iter(&self) -> impl Iterator { + let in_iter = self.in_flow.iter().map(|cid| (Origin::InFlow, cid)); + let out_iter = self.out_flow.iter().map(|cid| (Origin::OutFlow, cid)); + in_iter.chain(out_iter) + } +} + impl<'a> Vertex<'a> { fn new( instruction: Instruction<'a, Arg>, @@ -150,6 +182,7 @@ impl<'a> Builder<'a> { match aot.dag.build_schedule() { Ok(schedule) => Ok(ExecutionGraph { schedule: schedule.batches, + awaiting: aot.awaiting, indexed_resources: aot.indexed_resources, }), Err(e) => homestar_invocation::bail!(Error::InvalidSchedule(e.to_string())), @@ -158,10 +191,23 @@ impl<'a> Builder<'a> { fn aot(self) -> anyhow::Result> { let lookup_table = self.lookup_table()?; - let (mut dag, unawaits, awaited, resources) = + let (mut dag, unawaits, awaited, promised_cids, resources) = self.into_inner().tasks().into_iter().enumerate().try_fold( - (Dag::default(), vec![], vec![], IndexMap::new()), - |(mut dag, mut unawaits, mut awaited, mut resources), (i, task)| { + ( + Dag::default(), + vec![], + vec![], + (vec![], vec![]), + IndexMap::new(), + ), + |( + mut dag, + mut unawaits, + mut awaited, + (mut in_flows, mut out_flows), + mut resources, + ), + (i, task)| { let instr_cid = task.instruction_cid()?; debug!( subject = "task.instruction", @@ -181,17 +227,18 @@ impl<'a> Builder<'a> { .entry(instr_cid) .or_insert_with(|| vec![Resource::Url(instr.resource().to_owned())]); let parsed = instr.input().parse()?; - let reads = parsed - .args() - .deferreds() - .fold(vec![], |mut in_flow_reads, cid| { - if let Some(v) = lookup_table.get(&cid) { - in_flow_reads.push(*v) - } - // TODO: else, it's a Promise from another task outside - // of the workflow. - in_flow_reads - }); + let deferred = parsed.args().deferreds(); + let reads = deferred.fold(vec![], |mut in_flow_reads, cid| { + if let Some(v) = lookup_table.get(&cid) { + in_flows.push(cid); + in_flow_reads.push(*v) + } else { + out_flows.push(cid); + } + // TODO: else, it's a Promise from another task outside + // of the workflow. + in_flow_reads + }); parsed.args().links().for_each(|cid| { resources @@ -213,7 +260,13 @@ impl<'a> Builder<'a> { unawaits.push(node); } - Ok::<_, anyhow::Error>((dag, unawaits, awaited, resources)) + Ok::<_, anyhow::Error>(( + dag, + unawaits, + awaited, + (in_flows, out_flows), + resources, + )) }, )?; @@ -229,6 +282,7 @@ impl<'a> Builder<'a> { Ok(AOTContext { dag, + awaiting: Promises::new(promised_cids.0, promised_cids.1), indexed_resources: IndexedResources(resources), }) } @@ -551,7 +605,7 @@ mod test { ("func".into(), Ipld::String("add_two".to_string())), ( "args".into(), - Ipld::List(vec![Ipld::try_from(promise1.clone()).unwrap()]), + Ipld::List(vec![Ipld::from(promise1.clone())]), ), ]))), ); diff --git a/homestar-runtime/src/workflow/info.rs b/homestar-runtime/src/workflow/info.rs index 3567eaec..7cbd53fc 100644 --- a/homestar-runtime/src/workflow/info.rs +++ b/homestar-runtime/src/workflow/info.rs @@ -1,3 +1,4 @@ +#![allow(missing_docs)] use super::IndexedResources; use crate::{ channel::{AsyncChannel, AsyncChannelSender}, @@ -36,6 +37,21 @@ const PROGRESS_KEY: &str = "progress"; const PROGRESS_COUNT_KEY: &str = "progress_count"; const RESOURCES_KEY: &str = "resources"; +/// Status of a [Workflow]. +/// +/// [Workflow]: homestar_workflow::Workflow +#[derive(Debug, Clone, PartialEq, diesel_derive_enum::DbEnum)] +pub enum Status { + /// Workflow is pending - default case. + Pending, + /// Workflow is currently running. + Running, + /// Workflow has been completed. + Completed, + /// Workflow is stuck, awaiting CIDs we can't find on the network. + Stuck, +} + /// [Workflow] information stored in the database. /// /// [Workflow]: homestar_workflow::Workflow @@ -66,6 +82,14 @@ pub struct Stored { /// /// [Workflow]: homestar_workflow::Workflow pub(crate) completed_at: Option, + /// Status of [Workflow]. + /// + /// [Workflow]: homestar_workflow::Workflow + pub(crate) status: Status, + /// Retries of [Workflow] when checking for provider. + /// + /// [Workflow]: homestar_workflow::Workflow + pub(crate) retries: i32, } impl Stored { @@ -86,6 +110,8 @@ impl Stored { resources, created_at, completed_at: None, + status: Status::Pending, + retries: 0, } } @@ -105,6 +131,8 @@ impl Stored { resources, created_at: Utc::now().naive_utc(), completed_at: None, + status: Status::Pending, + retries: 0, } } @@ -120,6 +148,8 @@ impl Stored { resources: IndexedResources::default(), created_at: Utc::now().naive_utc(), completed_at: None, + status: Status::Pending, + retries: 0, } } } diff --git a/homestar-runtime/tests/fixtures/test-workflow-add-one-part-one.json b/homestar-runtime/tests/fixtures/test-workflow-add-one-part-one.json index 66f62ef6..2718a396 100644 --- a/homestar-runtime/tests/fixtures/test-workflow-add-one-part-one.json +++ b/homestar-runtime/tests/fixtures/test-workflow-add-one-part-one.json @@ -1,22 +1,22 @@ { - "tasks": [ - { - "cause": null, - "meta": { - "fuel": 18446744073709552000, - "memory": 4294967296, - "time": 100000 - }, - "prf": [], - "run": { - "input": { - "args": [1], - "func": "add_one" - }, - "nnc": "", - "op": "wasm/run", - "rsc": "ipfs://bafybeidpmgamv4i6jqrlwbnkrm3kzvvu7hj3jnoolcswub27mkda6p75la" - } - } - ] + "tasks": [ + { + "cause": null, + "meta": { + "fuel": 18446744073709552000, + "memory": 4294967296, + "time": 100000 + }, + "prf": [], + "run": { + "input": { + "args": [1], + "func": "add_one" + }, + "nnc": "", + "op": "wasm/run", + "rsc": "ipfs://bafybeidpmgamv4i6jqrlwbnkrm3kzvvu7hj3jnoolcswub27mkda6p75la" + } + } + ] } diff --git a/homestar-runtime/tests/fixtures/test-workflow-add-one-part-two.json b/homestar-runtime/tests/fixtures/test-workflow-add-one-part-two.json index 73f606fb..84a2c8b1 100644 --- a/homestar-runtime/tests/fixtures/test-workflow-add-one-part-two.json +++ b/homestar-runtime/tests/fixtures/test-workflow-add-one-part-two.json @@ -1,28 +1,28 @@ { - "tasks": [ - { - "cause": null, - "meta": { - "fuel": 18446744073709552000, - "memory": 4294967296, - "time": 100000 - }, - "prf": [], - "run": { - "input": { - "args": [ - { - "await/ok": { - "/": "bafyrmicffbk5pvhdih4kqeix42irl7djjgzrzbilsj7dyxpgdipl5da5vq" - } + "tasks": [ + { + "cause": null, + "meta": { + "fuel": 18446744073709552000, + "memory": 4294967296, + "time": 100000 + }, + "prf": [], + "run": { + "input": { + "args": [ + { + "await/ok": { + "/": "bafyrmicffbk5pvhdih4kqeix42irl7djjgzrzbilsj7dyxpgdipl5da5vq" + } + } + ], + "func": "add_one" + }, + "nnc": "", + "op": "wasm/run", + "rsc": "ipfs://bafybeidpmgamv4i6jqrlwbnkrm3kzvvu7hj3jnoolcswub27mkda6p75la" } - ], - "func": "add_one" - }, - "nnc": "", - "op": "wasm/run", - "rsc": "ipfs://bafybeidpmgamv4i6jqrlwbnkrm3kzvvu7hj3jnoolcswub27mkda6p75la" - } - } - ] + } + ] } diff --git a/homestar-runtime/tests/network/dht.rs b/homestar-runtime/tests/network/dht.rs index f37c1907..e5a885e1 100644 --- a/homestar-runtime/tests/network/dht.rs +++ b/homestar-runtime/tests/network/dht.rs @@ -168,6 +168,7 @@ fn test_libp2p_dht_records_integration() -> Result<()> { .output(); // Poll for put receipt and workflow info messages + let mut put_receipt_cid: Cid = Cid::default(); let mut put_receipt = false; let mut put_workflow_info = false; let mut receipt_quorum_success = false; @@ -178,6 +179,9 @@ fn test_libp2p_dht_records_integration() -> Result<()> { serde_json::from_slice(&msg.unwrap().unwrap()).unwrap(); if json["put_receipt_dht"].is_object() { + put_receipt_cid = + Cid::from_str(json["put_receipt_dht"]["cid"].as_str().unwrap()) + .expect("Unable to parse put receipt CID."); put_receipt = true; } else if json["put_workflow_info_dht"].is_object() { put_workflow_info = true; @@ -210,35 +214,37 @@ fn test_libp2p_dht_records_integration() -> Result<()> { } } - // TODO Bring back tests for receipts retrieved from DHT - // both here and below. - - // Run test workflow on node two. - // The task in this workflow awaits the task run on node one, - // which forces it to retrieve the result from the DHT. - // let _ = Command::new(BIN.as_os_str()) - // .arg("run") - // .arg("-p") - // .arg(rpc_port2.to_string()) - // .arg("tests/fixtures/test-workflow-add-one-part-two.json") - // .output(); - - // Poll for got receipt message - // let received_receipt_cid: Cid; - // loop { - // if let Ok(msg) = sub2.next().with_timeout(Duration::from_secs(120)).await { - // let json: serde_json::Value = - // serde_json::from_slice(&msg.unwrap().unwrap()).unwrap(); - - // if json["got_receipt_dht"].is_object() { - // received_receipt_cid = Cid::from_str(json["got_receipt_dht"]["cid"].as_str().unwrap()) - // .expect("Unable to parse received receipt CID."); - // break; - // } - // } else { - // panic!("Node two did not get receipt in time.") - // } - // } + // TODO: Test full-flow for Receipt pull from DHT + // + // Polling on the workflow results will fail the first time around due + // to the purposeful race condition between grabbing a receipt from the + // DHT (outside the workflow) and running the workflow on the node + // itself. + // + // Step 2: + // a) We've started on the implementation of retries, which if a + // Cid (outside the workflow) cannot be resolved, the workflow's + // promises can be picked-up again by a background polling mechanism and + // resolved separately or the worker itself can retry (possibly both + // options) before having the runner cancel it. + // b) This will also involve work around checking if a task/promise even is + // running anywhere (if outside the given workflow). + + let _ = Command::new(BIN.as_os_str()) + .arg("run") + .arg("-p") + .arg(rpc_port2.to_string()) + .arg("tests/fixtures/test-workflow-add-one-part-two.json") + .output(); + + // Check database for stored receipt and workflow info + let config_fixture = config2.filename(); + let settings = Settings::load_from_file(PathBuf::from(config_fixture)).unwrap(); + let db = Db::setup_connection_pool( + settings.node(), + Some(proc_info2.db_path.display().to_string()), + ) + .expect("Failed to connect to node two database"); // Run the same workflow run on node one to retrieve // workflow info that should be available on the DHT. @@ -269,24 +275,6 @@ fn test_libp2p_dht_records_integration() -> Result<()> { } } - // Check database for stored receipt and workflow info - let config_fixture = config2.filename(); - let settings = Settings::load_from_file(PathBuf::from(config_fixture)).unwrap(); - let db = Db::setup_connection_pool( - settings.node(), - Some(proc_info2.db_path.display().to_string()), - ) - .expect("Failed to connect to node two database"); - - // let stored_receipt: Receipt = - // Db::find_receipt_by_cid(received_receipt_cid, &mut db.conn().unwrap()).unwrap_or_else( - // |_| { - // panic!( - // "Failed to find receipt with CID {} in database", - // received_receipt_cid - // ) - // }, - // ); let stored_workflow_info = Db::get_workflow_info(received_workflow_info_cid, &mut db.conn().unwrap()); @@ -315,19 +303,33 @@ fn test_libp2p_dht_records_integration() -> Result<()> { assert!(receipt_quorum_success_logged); assert!(workflow_info_quorum_success_logged); - // // Check node two received a receipt and workflow info from node one - // let retrieved_receipt_logged = check_for_line_with( - // stdout2.clone(), - // vec![ - // "found receipt record", - // ED25519MULTIHASH, - // ], - // ); - let retrieved_workflow_info_logged = - check_for_line_with(stdout2, vec!["found workflow info", ED25519MULTIHASH]); - - // assert!(retrieved_receipt_logged); + let retrieved_workflow_info_logged = check_for_line_with( + stdout2.clone(), + vec!["found workflow info", ED25519MULTIHASH], + ); + + let retrieved_receipt_info_logged = check_for_line_with( + stdout2.clone(), + vec!["found receipt record", ED25519MULTIHASH], + ); + + // this may race with the executed one on the non-await version, but we + // have a separated log. + let committed_receipt = check_for_line_with( + stdout2, + vec![ + "committed to database", + "dht.resolver", + &put_receipt_cid.to_string(), + ], + ); + assert!(retrieved_workflow_info_logged); + assert!(retrieved_receipt_info_logged); + assert!(committed_receipt); + + let stored_receipt = Db::find_receipt_by_cid(put_receipt_cid, &mut db.conn().unwrap()); + assert!(stored_receipt.is_ok()); }); Ok(()) @@ -1018,8 +1020,6 @@ fn test_libp2p_dht_workflow_info_provider_recursive_integration() -> Result<()> let json: serde_json::Value = serde_json::from_slice(&msg.unwrap().unwrap()).unwrap(); - println!("node1: {json}"); - if json["connection_established"].is_object() { assert_eq!( json["connection_established"]["peerId"], diff --git a/homestar-wasm/src/wasmtime/host/helpers.rs b/homestar-wasm/src/wasmtime/host/helpers.rs index 63394842..2dd9fb74 100644 --- a/homestar-wasm/src/wasmtime/host/helpers.rs +++ b/homestar-wasm/src/wasmtime/host/helpers.rs @@ -4,9 +4,10 @@ use crate::wasmtime::{ world::{homestar::host::helpers, wasi}, State, }; +use async_trait::async_trait; use std::time::Instant; -#[async_trait::async_trait] +#[async_trait] impl helpers::Host for State { /// Get the current time. async fn get_current_time(&mut self) -> wasmtime::Result { @@ -26,7 +27,7 @@ impl helpers::Host for State { } } -#[async_trait::async_trait] +#[async_trait] impl wasi::logging::logging::Host for State { /// Log a message, formatted by the runtime subscriber. async fn log( diff --git a/rust-toolchain.toml b/rust-toolchain.toml index 292fe499..2f58e287 100644 --- a/rust-toolchain.toml +++ b/rust-toolchain.toml @@ -1,2 +1,21 @@ [toolchain] channel = "stable" + +extensions = [ + "cargo", + "clippy", + "llvm-tools-preview", + "rustfmt", + "rust-src", + "rust-std", +] + +targets = [ + "wasm32-unknown-unknown", + "wasm32-wasi", + "x86_64-apple-darwin", + "aarch64-apple-darwin", + "x86_64-unknown-linux-gnu", + "x86_64-unknown-linux-musl", + "aarch64-unknown-linux-musl", +]