From a60b8d8b084cff3e358c895e9d6c839a0d1eb3a1 Mon Sep 17 00:00:00 2001 From: Zeeshan Lakhani Date: Tue, 12 Mar 2024 21:24:51 -0400 Subject: [PATCH] refactor: handle nonce as incoming string/arraybuf (#611) Includes: * flexibly handling normcore-json or dag-json, but more flexible for the former. * will cbor cid Nonce as String if it came in that way or bytes. * cbor as input over the websocket listeners (vs just json). * Additional testing across the board, including with cbor. --- .gitignore | 6 +- Cargo.lock | 17 +- homestar-invocation/src/error.rs | 3 + homestar-invocation/src/ipld/dag_cbor.rs | 46 +++++- .../src/task/instruction/nonce.rs | 145 +++++++++++++++--- .../src/test_utils/invocation.rs | 4 +- homestar-runtime/src/db/utils.rs | 5 +- .../src/network/webserver/listener.rs | 87 ++++++++++- homestar-runtime/src/network/webserver/rpc.rs | 94 ++++++++---- homestar-runtime/src/tasks/fetch.rs | 3 + homestar-runtime/src/workflow/info.rs | 17 ++ homestar-runtime/tests/cli.rs | 74 +++++++++ .../test-workflow-add-one-nonced.json | 72 +++++++++ .../test-workflow-image-pipeline.cbor | Bin 0 -> 677 bytes homestar-runtime/tests/utils.rs | 21 ++- homestar-runtime/tests/webserver.rs | 104 ++++++++++++- homestar-workflow/src/workflow.rs | 28 ++++ 17 files changed, 644 insertions(+), 82 deletions(-) create mode 100644 homestar-runtime/tests/fixtures/test-workflow-add-one-nonced.json create mode 100644 homestar-runtime/tests/fixtures/test-workflow-image-pipeline.cbor diff --git a/.gitignore b/.gitignore index 5cf5bbbb..1456dd0e 100644 --- a/.gitignore +++ b/.gitignore @@ -21,10 +21,10 @@ private .DS_Store homestar-functions/**/out/*.png homestar-wasm/out -homestar-invocation/test_* -homestar-runtime/fixtures/test_* +homestar-invocation/test* +homestar-runtime/fixtures/test* homestar-runtime/tests/fixtures/*.toml -homestar-workflow/fixtures/test_* +homestar-workflow/fixtures/test* .zed result-alejandra report.json diff --git a/Cargo.lock b/Cargo.lock index e37e8621..9d07a50b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -871,9 +871,9 @@ dependencies = [ [[package]] name = "chrono" -version = "0.4.34" +version = "0.4.35" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5bc015644b92d5890fab7489e49d21f879d5c990186827d42ec511919404f38b" +checksum = "8eaf5903dcbc0a39312feb77df2ff4c76387d591b9fc7b04a238dcf8bb62639a" dependencies = [ "android-tzdata", "iana-time-zone", @@ -3064,7 +3064,7 @@ dependencies = [ "iana-time-zone-haiku", "js-sys", "wasm-bindgen", - "windows-core 0.52.0", + "windows-core", ] [[package]] @@ -8382,7 +8382,7 @@ version = "0.51.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ca229916c5ee38c2f2bc1e9d8f04df975b4bd93f9955dc69fabb5d91270045c9" dependencies = [ - "windows-core 0.51.1", + "windows-core", "windows-targets 0.48.5", ] @@ -8395,15 +8395,6 @@ dependencies = [ "windows-targets 0.48.5", ] -[[package]] -name = "windows-core" -version = "0.52.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "33ab640c8d7e35bf8ba19b884ba838ceb4fba93a4e8c65a9059d08afcfc683d9" -dependencies = [ - "windows-targets 0.52.3", -] - [[package]] name = "windows-sys" version = "0.48.0" diff --git a/homestar-invocation/src/error.rs b/homestar-invocation/src/error.rs index 2da82469..5f8a1eb6 100644 --- a/homestar-invocation/src/error.rs +++ b/homestar-invocation/src/error.rs @@ -33,6 +33,9 @@ pub enum Error { /// `Display` methods through to an underlying error. #[error("cannot convert from Ipld structure: {0}")] FromIpld(#[from] libipld::error::SerdeError), + /// Error with a [libipld::multibase] encoding/decoding. + #[error("failed to decode/encode structure: {0}")] + FromMultibase(#[from] libipld::multibase::Error), /// Invalid match discriminant or enumeration. #[error("invalid discriminant {0:#?}")] InvalidDiscriminant(T), diff --git a/homestar-invocation/src/ipld/dag_cbor.rs b/homestar-invocation/src/ipld/dag_cbor.rs index 086bcbbd..b87218b8 100644 --- a/homestar-invocation/src/ipld/dag_cbor.rs +++ b/homestar-invocation/src/ipld/dag_cbor.rs @@ -3,10 +3,15 @@ use crate::{consts::DAG_CBOR, Error, Unit}; use libipld::{ cbor::DagCborCodec, + json::DagJsonCodec, multihash::{Code, MultihashDigest}, - prelude::Codec, + prelude::{Codec, Decode}, Cid, Ipld, }; +use std::{ + fs, + io::{Cursor, Write}, +}; /// Trait for DagCbor-related encode/decode. pub trait DagCbor @@ -21,6 +26,45 @@ where let hash = Code::Sha3_256.digest(&bytes); Ok(Cid::new_v1(DAG_CBOR, hash)) } + + /// Serialize `Self` to JSON bytes. + fn to_dag_json(self) -> Result, Error> { + let ipld: Ipld = self.into(); + Ok(DagJsonCodec.encode(&ipld)?) + } + + /// Serialize `Self` to JSON [String]. + fn to_dagjson_string(self) -> Result> { + let encoded = self.to_dag_json()?; + // JSON spec requires UTF-8 support + let s = std::str::from_utf8(&encoded)?; + Ok(s.to_string()) + } + + /// Serialize `Self` to CBOR bytes. + fn to_cbor(self) -> Result, Error> { + let ipld: Ipld = self.into(); + Ok(DagCborCodec.encode(&ipld)?) + } + + /// Deserialize `Self` from CBOR bytes. + fn from_cbor(data: &[u8]) -> Result> + where + Self: TryFrom, + { + let ipld = Ipld::decode(DagCborCodec, &mut Cursor::new(data))?; + let from_ipld = Self::try_from(ipld).map_err(|_err| { + Error::::UnexpectedIpldType(Ipld::String( + "Failed to convert Ipld to expected type".to_string(), + )) + })?; + Ok(from_ipld) + } + + /// Serialize `Self` to a CBOR file. + fn to_cbor_file(self, filename: String) -> Result<(), Error> { + Ok(fs::File::create(filename)?.write_all(&self.to_cbor()?)?) + } } /// Trait for DagCbor-related encode/decode for references. diff --git a/homestar-invocation/src/task/instruction/nonce.rs b/homestar-invocation/src/task/instruction/nonce.rs index 46ffda07..3c45d596 100644 --- a/homestar-invocation/src/task/instruction/nonce.rs +++ b/homestar-invocation/src/task/instruction/nonce.rs @@ -12,7 +12,7 @@ use generic_array::{ use libipld::{multibase::Base::Base32HexLower, Ipld}; use schemars::{ gen::SchemaGenerator, - schema::{InstanceType, Metadata, Schema, SchemaObject, SingleOrVec}, + schema::{InstanceType, Metadata, Schema, SchemaObject, SingleOrVec, StringValidation}, JsonSchema, }; use serde::{Deserialize, Serialize}; @@ -23,13 +23,22 @@ use uuid::Uuid; type Nonce96 = GenericArray; type Nonce128 = GenericArray; +/// Incoming type for nonce conversion. +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +pub enum IncomingTyp { + /// Nonce incoming as a string. + String, + /// Nonce incoming as bytes. + Bytes, +} + /// Enumeration over allowed `nonce` types. #[derive(Clone, Debug, PartialEq, EnumAsInner, Serialize, Deserialize)] pub enum Nonce { /// 96-bit, 12-byte nonce, e.g. [xid]. - Nonce96(Nonce96), + Nonce96(Nonce96, IncomingTyp), /// 128-bit, 16-byte nonce. - Nonce128(Nonce128), + Nonce128(Nonce128, IncomingTyp), /// No Nonce attributed. Empty, } @@ -38,22 +47,37 @@ impl Nonce { /// Default generator, outputting a [xid] nonce, which is a 96-bit, 12-byte /// nonce. pub fn generate() -> Self { - Nonce::Nonce96(*GenericArray::from_slice(xid::new().as_bytes())) + Nonce::Nonce96( + *GenericArray::from_slice(xid::new().as_bytes()), + IncomingTyp::Bytes, + ) } /// Generate a default, 128-bit, 16-byte nonce via [Uuid::new_v4()]. pub fn generate_128() -> Self { - Nonce::Nonce128(*GenericArray::from_slice(Uuid::new_v4().as_bytes())) + Nonce::Nonce128( + *GenericArray::from_slice(Uuid::new_v4().as_bytes()), + IncomingTyp::Bytes, + ) + } + + /// Convert the nonce to a byte vector. + pub fn to_vec(&self) -> Vec { + match self { + Nonce::Nonce96(nonce, _) => nonce.to_vec(), + Nonce::Nonce128(nonce, _) => nonce.to_vec(), + Nonce::Empty => vec![], + } } } impl fmt::Display for Nonce { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { - Nonce::Nonce96(nonce) => { + Nonce::Nonce96(nonce, _) => { write!(f, "{}", Base32HexLower.encode(nonce.as_slice())) } - Nonce::Nonce128(nonce) => { + Nonce::Nonce128(nonce, _) => { write!(f, "{}", Base32HexLower.encode(nonce.as_slice())) } Nonce::Empty => write!(f, ""), @@ -64,8 +88,20 @@ impl fmt::Display for Nonce { impl From for Ipld { fn from(nonce: Nonce) -> Self { match nonce { - Nonce::Nonce96(nonce) => Ipld::Bytes(nonce.to_vec()), - Nonce::Nonce128(nonce) => Ipld::Bytes(nonce.to_vec()), + Nonce::Nonce96(nonce, typ) => { + if let IncomingTyp::Bytes = typ { + Ipld::Bytes(nonce.to_vec()) + } else { + Ipld::String(Base32HexLower.encode(nonce.as_slice())) + } + } + Nonce::Nonce128(nonce, typ) => { + if let IncomingTyp::Bytes = typ { + Ipld::Bytes(nonce.to_vec()) + } else { + Ipld::String(Base32HexLower.encode(nonce.as_slice())) + } + } Nonce::Empty => Ipld::String("".to_string()), } } @@ -75,14 +111,37 @@ impl TryFrom for Nonce { type Error = Error; fn try_from(ipld: Ipld) -> Result { - if let Ipld::Bytes(v) = ipld { - match v.len() { - 12 => Ok(Nonce::Nonce96(*GenericArray::from_slice(&v))), - 16 => Ok(Nonce::Nonce128(*GenericArray::from_slice(&v))), - other_ipld => Err(Error::unexpected_ipld(other_ipld.to_owned().into())), + match ipld { + Ipld::String(s) if s.is_empty() => Ok(Nonce::Empty), + Ipld::String(s) => { + let bytes = Base32HexLower.decode(s)?; + match bytes.len() { + 12 => Ok(Nonce::Nonce96( + *GenericArray::from_slice(&bytes), + IncomingTyp::String, + )), + 16 => Ok(Nonce::Nonce128( + *GenericArray::from_slice(&bytes), + IncomingTyp::String, + )), + other => Err(Error::unexpected_ipld(other.to_owned().into())), + } } - } else { - Ok(Nonce::Empty) + Ipld::Bytes(v) => match v.len() { + 12 => Ok(Nonce::Nonce96( + *GenericArray::from_slice(&v), + IncomingTyp::Bytes, + )), + 16 => Ok(Nonce::Nonce128( + *GenericArray::from_slice(&v), + IncomingTyp::Bytes, + )), + other_ipld => { + println!("other_ipld: {:?}", v.len()); + Err(Error::unexpected_ipld(other_ipld.to_owned().into())) + } + }, + _ => Ok(Nonce::Empty), } } } @@ -122,9 +181,23 @@ impl JsonSchema for Nonce { ..Default::default() }; + let non_empty_string = SchemaObject { + instance_type: Some(SingleOrVec::Single(InstanceType::String.into())), + metadata: Some(Box::new(Metadata { + description: Some("A 12-byte or 16-byte nonce encoded as a string, which expects to be decoded with Base32hex lower".to_string()), + ..Default::default() + })), + string: Some(Box::new(StringValidation { + min_length: Some(1), + ..Default::default() + })), + ..Default::default() + }; + schema.subschemas().one_of = Some(vec![ gen.subschema_for::(), Schema::Object(empty_string), + Schema::Object(non_empty_string), ]); schema.into() @@ -141,7 +214,7 @@ mod test { let gen = Nonce::generate(); let ipld = Ipld::from(gen.clone()); - let inner = if let Nonce::Nonce96(nonce) = gen { + let inner = if let Nonce::Nonce96(nonce, _) = gen { Ipld::Bytes(nonce.to_vec()) } else { panic!("No conversion!") @@ -156,7 +229,7 @@ mod test { let gen = Nonce::generate_128(); let ipld = Ipld::from(gen.clone()); - let inner = if let Nonce::Nonce128(nonce) = gen { + let inner = if let Nonce::Nonce128(nonce, _) = gen { Ipld::Bytes(nonce.to_vec()) } else { panic!("No conversion!") @@ -192,11 +265,45 @@ mod test { assert_eq!(bytes, b); assert_eq!(ipld, Ipld::Bytes(b.to_vec())); - assert_eq!(nonce, Nonce::Nonce128(*GenericArray::from_slice(b))); + assert_eq!( + nonce, + Nonce::Nonce128(*GenericArray::from_slice(b), IncomingTyp::Bytes) + ); assert_eq!(nonce, Nonce::try_from(ipld.clone()).unwrap()); let nonce: Nonce = ipld.clone().try_into().unwrap(); let ipld = Ipld::from(nonce.clone()); assert_eq!(ipld, Ipld::Bytes(b.to_vec())); } + + #[test] + fn nonce_as_string_roundtrip() { + let nonce = Nonce::generate(); + let string = nonce.to_string(); + let from_string = Nonce::try_from(Ipld::String(string.clone())).unwrap(); + + assert_eq!(nonce.to_vec(), from_string.to_vec()); + assert_eq!(string, nonce.to_string()); + } + + #[test] + fn json_nonce_string_roundtrip() { + let in_nnc = "1sod60ml6g26mfhsrsa0"; + let json = json!({ + "nnc": in_nnc + }); + + let ipld: Ipld = DagJsonCodec.decode(json.to_string().as_bytes()).unwrap(); + let Ipld::Map(map) = ipld else { + panic!("IPLD is not a map"); + }; + let nnc = map.get("nnc").unwrap(); + let nnc: Nonce = Nonce::try_from(nnc.clone()).unwrap(); + assert_eq!(nnc.to_string(), in_nnc); + let nonce = Nonce::Nonce96( + *GenericArray::from_slice(Base32HexLower.decode(in_nnc).unwrap().as_slice()), + IncomingTyp::String, + ); + assert_eq!(nnc, nonce); + } } diff --git a/homestar-invocation/src/test_utils/invocation.rs b/homestar-invocation/src/test_utils/invocation.rs index 0869604f..0d5898ba 100644 --- a/homestar-invocation/src/test_utils/invocation.rs +++ b/homestar-invocation/src/test_utils/invocation.rs @@ -112,7 +112,7 @@ pub fn wasm_instruction_with_nonce<'a, T>() -> (Instruction<'a, T>, NonceBytes) ]))), nonce.clone(), ), - nonce.as_nonce96().unwrap().to_vec(), + nonce.to_vec(), ) } @@ -139,7 +139,7 @@ pub fn instruction_with_nonce<'a, T>() -> (Instruction<'a, T>, NonceBytes) { Input::Ipld(Ipld::List(vec![Ipld::Bool(true)])), nonce.clone(), ), - nonce.as_nonce96().unwrap().to_vec(), + nonce.to_vec(), ) } diff --git a/homestar-runtime/src/db/utils.rs b/homestar-runtime/src/db/utils.rs index 656744a3..3462d909 100644 --- a/homestar-runtime/src/db/utils.rs +++ b/homestar-runtime/src/db/utils.rs @@ -1,6 +1,6 @@ //! Utility functions Database interaction. -use chrono::NaiveDateTime; +use chrono::{DateTime, NaiveDateTime}; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; @@ -14,7 +14,8 @@ impl Timestamp for i64 { fn timestamp_from_nanos(&self) -> Option { let nanos = self % 1_000_000_000; let seconds = (self - nanos) / 1_000_000_000; - NaiveDateTime::from_timestamp_opt(seconds, nanos as u32) + let dt = DateTime::from_timestamp(seconds, nanos as u32); + dt.map(|dt| dt.naive_utc()) } } diff --git a/homestar-runtime/src/network/webserver/listener.rs b/homestar-runtime/src/network/webserver/listener.rs index c49d2f6a..200061c7 100644 --- a/homestar-runtime/src/network/webserver/listener.rs +++ b/homestar-runtime/src/network/webserver/listener.rs @@ -1,19 +1,25 @@ //! Listener for incoming requests types. +use anyhow::anyhow; use faststr::FastStr; -use homestar_invocation::ipld::DagJson; +use homestar_invocation::ipld::{DagCbor, DagJson}; use homestar_wasm::io::Arg; use homestar_workflow::Workflow; +use libipld::{serde::from_ipld, Ipld}; use names::{Generator, Name}; use serde::{de, Deserialize, Deserializer, Serialize}; use serde_json::value::RawValue; +use std::collections::BTreeMap; + +const NAME_KEY: &str = "name"; +const WORKFLOW_KEY: &str = "workflow"; /// A [Workflow] run command via a WebSocket channel. /// /// Note: We leverage the [RawValue] type in order to use our DagJson /// implementation, which is not a direct [Deserialize] implementation. #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] -pub(crate) struct Run<'a> { +pub(crate) struct JsonRun<'a> { #[serde(default = "default_name")] pub(crate) name: FastStr, #[serde(deserialize_with = "from_raw_value")] @@ -36,6 +42,58 @@ where Workflow::from_json(raw_value.get().as_bytes()).map_err(de::Error::custom) } +#[derive(Debug, Clone, PartialEq, Serialize)] +pub(crate) struct CborRun<'a> { + pub(crate) name: FastStr, + pub(crate) workflow: Workflow<'a, Arg>, +} + +impl<'a> From> for Ipld { + fn from(run: CborRun<'a>) -> Self { + Ipld::Map(BTreeMap::from([ + ("name".into(), Ipld::String(run.name.as_str().to_string())), + ("workflow".into(), run.workflow.into()), + ])) + } +} + +impl<'a> TryFrom for CborRun<'a> { + type Error = anyhow::Error; + + fn try_from(ipld: Ipld) -> Result { + let map = from_ipld::>(ipld)?; + let name: String = from_ipld( + map.get(NAME_KEY) + .ok_or_else(|| anyhow!("missing {NAME_KEY}"))? + .to_owned(), + )?; + let workflow = Workflow::try_from( + map.get(WORKFLOW_KEY) + .ok_or_else(|| anyhow!("missing {WORKFLOW_KEY}"))? + .to_owned(), + )?; + Ok(CborRun { + name: FastStr::from(name), + workflow, + }) + } +} + +impl DagCbor for CborRun<'_> {} +impl DagJson for CborRun<'_> {} + +impl<'a, 'de> Deserialize<'de> for CborRun<'a> { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + let value = Vec::::deserialize(deserializer)?; + let ipld: Ipld = serde_ipld_dagcbor::from_slice(&value).map_err(de::Error::custom)?; + let run = CborRun::try_from(ipld).map_err(de::Error::custom)?; + Ok(run) + } +} + /// Filter metrics by prefix. #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] pub(crate) struct MetricsPrefix { @@ -50,7 +108,7 @@ mod test { task::{instruction::RunInstruction, Resources}, test_utils, Task, }; - use std::assert_eq; + use std::{fs, path::PathBuf}; #[test] fn run_json() { @@ -70,7 +128,7 @@ mod test { ); let workflow = Workflow::new(vec![task1.clone(), task2.clone()]); - let run = Run { + let run = JsonRun { name: "test".into(), workflow: workflow.clone(), }; @@ -81,7 +139,26 @@ mod test { ); let post_run = serde_json::from_str(&run_str).unwrap(); - assert_eq!(run, post_run); } + + #[test] + fn write_cbor_to_file_and_read() { + let workflow_str = + fs::read_to_string("tests/fixtures/test-workflow-image-pipeline.json").unwrap(); + let json: serde_json::Value = serde_json::from_str(&workflow_str).unwrap(); + let json_string = serde_json::to_string(&json).unwrap(); + let run_str = format!(r#"{{"name": "test","workflow": {}}}"#, json_string); + let run1: CborRun<'_> = DagJson::from_json_string(run_str).unwrap(); + + let path = PathBuf::from("./fixtures/test.cbor"); + assert!(run1 + .clone() + .to_cbor_file(path.display().to_string()) + .is_ok()); + + let cbor_file = fs::read(path).unwrap(); + let run2: CborRun<'_> = DagCbor::from_cbor(&cbor_file).unwrap(); + assert_eq!(run1, run2); + } } diff --git a/homestar-runtime/src/network/webserver/rpc.rs b/homestar-runtime/src/network/webserver/rpc.rs index 4cdc1512..7b1436e2 100644 --- a/homestar-runtime/src/network/webserver/rpc.rs +++ b/homestar-runtime/src/network/webserver/rpc.rs @@ -5,7 +5,7 @@ use super::notifier::{self, Header, Notifier, SubscriptionTyp}; #[allow(unused_imports)] use super::{listener, prom::PrometheusData, Message}; #[cfg(feature = "websocket-notify")] -use crate::channel::AsyncChannel; +use crate::channel::{AsyncChannel, AsyncChannelReceiver}; use crate::{ db::Database, runner::{NodeInfo, WsSender}, @@ -21,12 +21,19 @@ use faststr::FastStr; use futures::StreamExt; #[cfg(feature = "websocket-notify")] use homestar_invocation::ipld::DagCbor; +#[cfg(feature = "websocket-notify")] +use homestar_wasm::io::Arg; +#[cfg(feature = "websocket-notify")] +use homestar_workflow::Workflow; use jsonrpsee::{ server::RpcModule, types::error::{ErrorCode, ErrorObject}, }; #[cfg(feature = "websocket-notify")] -use jsonrpsee::{types::SubscriptionId, SendTimeoutError, SubscriptionMessage, SubscriptionSink}; +use jsonrpsee::{ + types::SubscriptionId, PendingSubscriptionSink, SendTimeoutError, SubscriptionMessage, + SubscriptionSink, +}; #[cfg(feature = "websocket-notify")] use libipld::Cid; use metrics_exporter_prometheus::PrometheusHandle; @@ -238,8 +245,8 @@ where SUBSCRIBE_RUN_WORKFLOW_ENDPOINT, UNSUBSCRIBE_RUN_WORKFLOW_ENDPOINT, |params, pending, ctx| async move { - match params.one::>() { - Ok(listener::Run { name, workflow }) => { + match params.one::>() { + Ok(listener::JsonRun { name, workflow }) => { let (tx, rx) = AsyncChannel::oneshot(); ctx.runner_sender .send_async(( @@ -248,36 +255,29 @@ where )) .await?; - if let Ok(Message::AckWorkflow((cid, name))) = rx.recv_async().await { - let sink = pending.accept().await?; - ctx.workflow_listeners - .insert(sink.subscription_id(), (cid, name)); - let rx = ctx.workflow_msg_notifier.inner().subscribe(); - let stream = BroadcastStream::new(rx); - Self::handle_workflow_subscription(sink, stream, ctx).await?; - } else { - error!( - subject = "subscription.workflow.err", - category = "jsonrpc.subscription", - sub = SUBSCRIBE_RUN_WORKFLOW_ENDPOINT, - workflow_name = name.to_string(), - "did not acknowledge message in time" - ); - let _ = pending - .reject(busy_err(format!( - "not able to run workflow {}", - workflow.to_cid()? - ))) - .await; - } + Self::handle_run_workflow(name, workflow, rx, ctx, pending).await?; } - Err(err) => { - warn!(subject = "subscription.workflow.err", + + Err(_err) => match params.one::>() { + Ok(listener::CborRun { name, workflow }) => { + let (tx, rx) = AsyncChannel::oneshot(); + ctx.runner_sender + .send_async(( + Message::RunWorkflow((name.clone(), workflow.clone())), + Some(tx), + )) + .await?; + + Self::handle_run_workflow(name, workflow, rx, ctx, pending).await?; + } + Err(err) => { + warn!(subject = "subscription.workflow.err", category = "jsonrpc.subscription", err=?err, "failed to parse run workflow params"); - let _ = pending.reject(err).await; - } + let _ = pending.reject(err).await; + } + }, } Ok(()) }, @@ -286,6 +286,40 @@ where Ok(module) } + #[cfg(feature = "websocket-notify")] + async fn handle_run_workflow( + name: FastStr, + workflow: Workflow<'_, Arg>, + rx: AsyncChannelReceiver, + ctx: Arc>, + pending: PendingSubscriptionSink, + ) -> Result<()> { + if let Ok(Message::AckWorkflow((cid, name))) = rx.recv_async().await { + let sink = pending.accept().await?; + ctx.workflow_listeners + .insert(sink.subscription_id(), (cid, name)); + let rx = ctx.workflow_msg_notifier.inner().subscribe(); + let stream = BroadcastStream::new(rx); + Self::handle_workflow_subscription(sink, stream, ctx).await?; + } else { + error!( + subject = "subscription.workflow.err", + category = "jsonrpc.subscription", + sub = SUBSCRIBE_RUN_WORKFLOW_ENDPOINT, + workflow_name = name.to_string(), + "did not acknowledge message in time" + ); + let _ = pending + .reject(busy_err(format!( + "not able to run workflow {}", + workflow.to_cid()? + ))) + .await; + } + + Ok(()) + } + #[cfg(feature = "websocket-notify")] async fn handle_event_subscription( sink: SubscriptionSink, diff --git a/homestar-runtime/src/tasks/fetch.rs b/homestar-runtime/src/tasks/fetch.rs index 5df0245c..c06915cb 100644 --- a/homestar-runtime/src/tasks/fetch.rs +++ b/homestar-runtime/src/tasks/fetch.rs @@ -11,6 +11,9 @@ use fnv::FnvHashSet; use indexmap::IndexMap; use std::sync::Arc; +/// Fetch module for gathering data over the network related to [Task]. +/// +/// [Task]: homestar_invocation::Task pub(crate) struct Fetch; #[cfg(any(test, feature = "test-utils"))] diff --git a/homestar-runtime/src/workflow/info.rs b/homestar-runtime/src/workflow/info.rs index 7cbd53fc..fe7b4653 100644 --- a/homestar-runtime/src/workflow/info.rs +++ b/homestar-runtime/src/workflow/info.rs @@ -235,6 +235,23 @@ impl Info { } } + /// Get workflow progress as a vector of Cids. + pub fn progress(&self) -> &Vec { + &self.progress + } + + /// Get workflow progress as a number of receipts completed. + pub fn progress_count(&self) -> u32 { + self.progress_count + } + + /// Get the number of tasks in the [Workflow]. + /// + /// [Workflow]: homestar_workflow::Workflow + pub fn num_tasks(&self) -> u32 { + self.num_tasks + } + /// Get unique identifier, Cid, of [Workflow]. /// /// [Workflow]: homestar_workflow::Workflow diff --git a/homestar-runtime/tests/cli.rs b/homestar-runtime/tests/cli.rs index e61a73c6..98507319 100644 --- a/homestar-runtime/tests/cli.rs +++ b/homestar-runtime/tests/cli.rs @@ -1,5 +1,7 @@ #[cfg(not(windows))] use crate::utils::kill_homestar_daemon; +#[cfg(feature = "test-utils")] +use crate::utils::wait_for_asserts; use crate::{ make_config, utils::{ @@ -10,8 +12,14 @@ use crate::{ use anyhow::Result; use assert_cmd::prelude::*; use homestar_runtime::Settings; +#[cfg(feature = "test-utils")] +use homestar_runtime::{db::Database, Db}; +#[cfg(feature = "test-utils")] +use libipld::Cid; use once_cell::sync::Lazy; use predicates::prelude::*; +#[cfg(feature = "test-utils")] +use std::str::FromStr; use std::{ path::PathBuf, process::{Command, Stdio}, @@ -257,6 +265,72 @@ fn test_workflow_run_integration() -> Result<()> { Ok(()) } +#[test] +#[serial_test::parallel] +#[cfg(feature = "test-utils")] +fn test_workflow_run_integration_nonced() -> Result<()> { + let proc_info = ProcInfo::new().unwrap(); + let rpc_port = proc_info.rpc_port; + let metrics_port = proc_info.metrics_port; + let ws_port = proc_info.ws_port; + let workflow_cid = "bafyrmid4ev2l44lgbazmgg36rui3eirzp5tg5ebnaexyogdnzv4hmvvtay"; + let toml = format!( + r#" + [node] + [node.network.libp2p.mdns] + enable = false + [node.network.metrics] + port = {metrics_port} + [node.network.rpc] + port = {rpc_port} + [node.network.webserver] + port = {ws_port} + "# + ); + let config = make_config!(toml); + + let homestar_proc = Command::new(BIN.as_os_str()) + .arg("start") + .arg("-c") + .arg(config.filename()) + .arg("--db") + .arg(&proc_info.db_path) + .stdout(Stdio::piped()) + .spawn() + .unwrap(); + let _proc_guard = ChildGuard::new(homestar_proc); + + if wait_for_socket_connection_v6(rpc_port, 1000).is_err() { + panic!("Homestar server/runtime failed to start in time"); + } + + Command::new(BIN.as_os_str()) + .arg("run") + .arg("-p") + .arg(rpc_port.to_string()) + .arg("tests/fixtures/test-workflow-add-one-nonced.json") + .assert() + .success(); + + let settings = Settings::load_from_file(PathBuf::from(config.filename())).unwrap(); + let cid = Cid::from_str(workflow_cid).unwrap(); + let db = Db::setup_connection_pool( + settings.node(), + Some(proc_info.db_path.display().to_string()), + ) + .expect("Failed to connect to node two database"); + + wait_for_asserts(500, || { + let (name, info) = Db::get_workflow_info(cid, &mut db.conn().unwrap()).unwrap(); + name.unwrap().as_str() == workflow_cid + && info.progress().len() == 3 + && info.progress_count() == 3 + }) + .unwrap(); + + Ok(()) +} + #[test] #[serial_test::parallel] #[cfg(not(windows))] diff --git a/homestar-runtime/tests/fixtures/test-workflow-add-one-nonced.json b/homestar-runtime/tests/fixtures/test-workflow-add-one-nonced.json new file mode 100644 index 00000000..773b08a2 --- /dev/null +++ b/homestar-runtime/tests/fixtures/test-workflow-add-one-nonced.json @@ -0,0 +1,72 @@ +{ + "tasks": [ + { + "cause": null, + "meta": { + "fuel": 18446744073709552000, + "memory": 4294967296, + "time": 100000 + }, + "prf": [], + "run": { + "input": { + "args": [1], + "func": "add_one" + }, + "nnc": "1sod60ml6g26mfhsrsa0", + "op": "wasm/run", + "rsc": "ipfs://bafybeia32q3oy6u47x624rmsmgrrlpn7ulruissmz5z2ap6alv7goe7h3q" + } + }, + { + "cause": null, + "meta": { + "fuel": 18446744073709552000, + "memory": 4294967296, + "time": 100000 + }, + "prf": [], + "run": { + "input": { + "args": [ + { + "await/ok": { + "/": "bafyrmig5x46c6nzk74adlo6ffwo4bi7fr42jkv54zqa5kzazfirb47ninm" + } + } + ], + "func": "add_one" + }, + "nnc": { + "/": { "bytes": "oVI8XiFiec+c2XHS" } + }, + "op": "wasm/run", + "rsc": "ipfs://bafybeia32q3oy6u47x624rmsmgrrlpn7ulruissmz5z2ap6alv7goe7h3q" + } + }, + { + "cause": null, + "meta": { + "fuel": 18446744073709552000, + "memory": 4294967296, + "time": 100000 + }, + "prf": [], + "run": { + "input": { + "args": [ + { + "await/ok": { + "/": "bafyrmiaa2pbgwydezutitiqq6j6qd73o3gqpeycp4g6dxnx466xdxjfnvq" + } + } + ], + "func": "add_one" + }, + "nnc": "", + "op": "wasm/run", + "rsc": "ipfs://bafybeia32q3oy6u47x624rmsmgrrlpn7ulruissmz5z2ap6alv7goe7h3q" + } + } + ] +} diff --git a/homestar-runtime/tests/fixtures/test-workflow-image-pipeline.cbor b/homestar-runtime/tests/fixtures/test-workflow-image-pipeline.cbor new file mode 100644 index 0000000000000000000000000000000000000000..551628d256441e99b11b3393d83298dea7a852c2 GIT binary patch literal 677 zcmZ3Kl9!m9no^QlT#`|qUzDAelV83twIs1PySRBtazRmALvm4R-jbyJf{gOS;#_?o zCpj-KIU%{IIJv?pvmmY5N?$)IF|9HwH8auJsL(jS(yY|Pyu!@Lq$syIH@&DRry$R~ zG^eOEv$#07%CyQTvA`@br_4M(Kh-?LxG*&{ub{MKQA%P_dU5Lwtq4^H#sVRQPStR) zvqx4=eQ%ro=5I-IMwq7e>{8=yuRToxH4c0#lO?7}GR8?VegUGCG@y%8l8f>SQgVUr zUX)UjnVTxbz}U7REjKkczo=50fq{V$h#69o6HALzztP)QjSDjp%M&w8^z*YJ{wowy z*r$46nS)JuiWfbL?B$lLF8Vn(ieckl? yHkZM=y8ow(w;j2dqnPpO>yHH*Q Result<(), ()> { let socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port); let result = retry(Exponential::from_millis(exp_retry_base).take(10), || { @@ -374,7 +374,7 @@ pub(crate) fn wait_for_socket_connection(port: u16, exp_retry_base: u64) -> Resu result.map_or_else(|_| Err(()), |_| Ok(())) } -/// Wait for socket connection or timeout (ipv6) +/// Wait for socket connection or timeout (ipv6). pub(crate) fn wait_for_socket_connection_v6(port: u16, exp_retry_base: u64) -> Result<(), ()> { let socket = SocketAddr::new(IpAddr::V6(Ipv6Addr::LOCALHOST), port); let result = retry(Exponential::from_millis(exp_retry_base).take(10), || { @@ -384,6 +384,23 @@ pub(crate) fn wait_for_socket_connection_v6(port: u16, exp_retry_base: u64) -> R result.map_or_else(|_| Err(()), |_| Ok(())) } +/// Wait for asserts to pass or timeout expires. +#[allow(dead_code)] +pub(crate) fn wait_for_asserts( + exp_retry_base: u64, + assertion: impl Fn() -> bool, +) -> Result<(), ()> { + let result = retry(Exponential::from_millis(exp_retry_base).take(10), || { + if assertion() { + Ok(()) + } else { + Err(()) + } + }); + + result.map_or_else(|_| Err(()), |_| Ok(())) +} + /// Client and subscription. #[cfg(feature = "websocket-notify")] pub(crate) struct WsClientSub { diff --git a/homestar-runtime/tests/webserver.rs b/homestar-runtime/tests/webserver.rs index c9237555..fc07a160 100644 --- a/homestar-runtime/tests/webserver.rs +++ b/homestar-runtime/tests/webserver.rs @@ -90,7 +90,7 @@ fn test_workflow_run_integration() -> Result<()> { // we have 3 operations let mut received_cids = 0; loop { - if let Ok(msg) = sub1.next().with_timeout(Duration::from_secs(30)).await { + if let Ok(msg) = sub1.next().with_timeout(Duration::from_secs(45)).await { let json: serde_json::Value = serde_json::from_slice(&msg.unwrap().unwrap()).unwrap(); let check = json.get("metadata").unwrap(); @@ -118,7 +118,7 @@ fn test_workflow_run_integration() -> Result<()> { .unwrap(); loop { - if let Ok(msg) = sub2.next().with_timeout(Duration::from_secs(30)).await { + if let Ok(msg) = sub2.next().with_timeout(Duration::from_secs(45)).await { let json: serde_json::Value = serde_json::from_slice(&msg.unwrap().unwrap()).unwrap(); let check = json.get("metadata").unwrap(); @@ -135,7 +135,10 @@ fn test_workflow_run_integration() -> Result<()> { } } - let client2 = WsClientBuilder::default().build(ws_url).await.unwrap(); + let client2 = WsClientBuilder::default() + .build(ws_url.clone()) + .await + .unwrap(); let mut sub3: Subscription> = client2 .subscribe( SUBSCRIBE_RUN_WORKFLOW_ENDPOINT, @@ -152,7 +155,7 @@ fn test_workflow_run_integration() -> Result<()> { .is_err(); loop { - if let Ok(msg) = sub3.next().with_timeout(Duration::from_secs(30)).await { + if let Ok(msg) = sub3.next().with_timeout(Duration::from_secs(45)).await { let json: serde_json::Value = serde_json::from_slice(&msg.unwrap().unwrap()).unwrap(); let check = json.get("metadata").unwrap(); @@ -187,7 +190,7 @@ fn test_workflow_run_integration() -> Result<()> { .unwrap(); loop { - if let Ok(msg) = sub4.next().with_timeout(Duration::from_secs(30)).await { + if let Ok(msg) = sub4.next().with_timeout(Duration::from_secs(45)).await { let json: serde_json::Value = serde_json::from_slice(&msg.unwrap().unwrap()).unwrap(); let check = json.get("metadata").unwrap(); @@ -202,6 +205,12 @@ fn test_workflow_run_integration() -> Result<()> { break; } } + + let _ = sub4 + .next() + .with_timeout(Duration::from_secs(10)) + .await + .is_err(); }); // Collect logs then kill proceses. @@ -221,3 +230,88 @@ fn test_workflow_run_integration() -> Result<()> { Ok(()) } + +#[test] +#[serial_test::parallel] +fn test_workflow_run_integration_cbor() -> Result<()> { + let proc_info = ProcInfo::new().unwrap(); + let rpc_port = proc_info.rpc_port; + let metrics_port = proc_info.metrics_port; + let ws_port = proc_info.ws_port; + let toml = format!( + r#" + [node] + [node.network.libp2p.mdns] + enable = false + [node.network.metrics] + port = {metrics_port} + [node.network.rpc] + port = {rpc_port} + [node.network.webserver] + port = {ws_port} + "# + ); + + let config = make_config!(toml); + let homestar_proc = Command::new(BIN.as_os_str()) + .env("RUST_BACKTRACE", "0") + .arg("start") + .arg("-c") + .arg(config.filename()) + .arg("--db") + .arg(&proc_info.db_path) + .stdout(Stdio::piped()) + .spawn() + .unwrap(); + let _proc_guard = ChildGuard::new(homestar_proc); + + if wait_for_socket_connection(ws_port, 1000).is_err() { + panic!("Homestar server/runtime failed to start in time"); + } + + let ws_url = format!("ws://{}:{}", Ipv4Addr::LOCALHOST, ws_port); + + tokio_test::block_on(async { + let run_cbor = fs::read("tests/fixtures/test-workflow-image-pipeline.cbor").unwrap(); + let client = WsClientBuilder::default() + .build(ws_url.clone()) + .await + .unwrap(); + + let mut sub: Subscription> = client + .subscribe( + SUBSCRIBE_RUN_WORKFLOW_ENDPOINT, + rpc_params![run_cbor], + UNSUBSCRIBE_RUN_WORKFLOW_ENDPOINT, + ) + .await + .unwrap(); + + // we have 3 operations + let mut received_cids = 0; + loop { + if let Ok(msg) = sub.next().with_timeout(Duration::from_secs(45)).await { + let json: serde_json::Value = + serde_json::from_slice(&msg.unwrap().unwrap()).unwrap(); + let check = json.get("metadata").unwrap(); + let expected = serde_json::json!({"name": "test", "replayed": false, "workflow": {"/": format!("{AWAIT_CID}")}}); + assert_eq!(check, &expected); + received_cids += 1; + } else { + panic!("Node one did not publish receipt in time.") + } + + if received_cids == 3 { + break; + } + } + + let _ = sub + .next() + .with_timeout(Duration::from_secs(10)) + .await + .is_err(); + }); + + Ok(()) +} diff --git a/homestar-workflow/src/workflow.rs b/homestar-workflow/src/workflow.rs index 7168ba4c..0bd6c672 100644 --- a/homestar-workflow/src/workflow.rs +++ b/homestar-workflow/src/workflow.rs @@ -157,6 +157,34 @@ mod test { assert_eq!(workflow, wf_from_json2); } + #[test] + fn workflow_to_cbor_to_json_roundtrip() { + let config = Resources::default(); + let instruction1 = test_utils::instruction::(); + let (instruction2, _) = test_utils::wasm_instruction_with_nonce::(); + + let task1 = Task::new( + RunInstruction::Expanded(instruction1), + config.clone().into(), + UcanPrf::default(), + ); + let task2 = Task::new( + RunInstruction::Expanded(instruction2), + config.into(), + UcanPrf::default(), + ); + + let workflow = Workflow::new(vec![task1.clone(), task2.clone()]); + let cbor_bytes = workflow.clone().to_cbor().unwrap(); + let workflow_from_cbor = Workflow::::from_cbor(&cbor_bytes).unwrap(); + assert_eq!(workflow, workflow_from_cbor); + + let json_from_cbor_string = workflow_from_cbor.clone().to_dagjson_string().unwrap(); + let json_string = workflow.to_json_string().unwrap(); + + assert_eq!(json_from_cbor_string, json_string); + } + #[test] fn ipld_roundtrip_workflow() { let config = Resources::default();