diff --git a/flake.lock b/flake.lock index 9c04b48d..5c4524f0 100644 --- a/flake.lock +++ b/flake.lock @@ -36,11 +36,11 @@ }, "nixpkgs": { "locked": { - "lastModified": 1688523306, - "narHash": "sha256-xcJHmwlw0w05D/c8oPX6COxVLYNbu6lXF2mIV7dFGkc=", + "lastModified": 1688956120, + "narHash": "sha256-7geHGr2aLpQvwGgaZlTLPHMVFxvFzAuB35mZYsKgLpQ=", "owner": "NixOS", "repo": "nixpkgs", - "rev": "023b1df882979a413a3f7e2009424db30d51a0fe", + "rev": "2169d3b0bce0daa64d05abbdf9da552a7b8c22a7", "type": "github" }, "original": { @@ -68,11 +68,11 @@ ] }, "locked": { - "lastModified": 1688524421, - "narHash": "sha256-EFauqXKKjbJKPWv3kbzl1lm1GnXl0+DBK4RcLuFndZQ=", + "lastModified": 1688956505, + "narHash": "sha256-6sa19mHTkdOi867lIolhpiS20trMdo0unk5/37859X4=", "owner": "oxalica", "repo": "rust-overlay", - "rev": "ab050286f18ea354bfe7a49ca8ddcbd633cae1ca", + "rev": "4acc04c26df84e0a718c3efe4e13021222d23b28", "type": "github" }, "original": { diff --git a/homestar-runtime/src/db.rs b/homestar-runtime/src/db.rs index 90c48d22..3fe2a4b9 100644 --- a/homestar-runtime/src/db.rs +++ b/homestar-runtime/src/db.rs @@ -145,6 +145,23 @@ pub trait Database: Send + Sync + Clone { .map_err(Into::into) } + /// Store series of receipts for a workflow [Cid] in the + /// [schema::workflows_receipts] table. + /// + /// NOTE: We cannot do batch inserts with `on_conflict`, so we add + /// each one 1-by-1: + /// + fn store_workflow_receipts( + workflow_cid: Cid, + receipts: &[Cid], + conn: &mut Connection, + ) -> Result { + receipts.iter().try_fold(0, |acc, receipt| { + let res = Self::store_workflow_receipt(workflow_cid, *receipt, conn)?; + Ok::<_, anyhow::Error>(acc + res) + }) + } + /// Select workflow given a [Cid] to the workflow. fn select_workflow(cid: Cid, conn: &mut Connection) -> Result { let wf = schema::workflows::dsl::workflows @@ -154,12 +171,11 @@ pub trait Database: Send + Sync + Clone { Ok(wf) } - /// Join workflow information with number of receipts emitted. + /// Return workflow information with number of receipts emitted. fn get_workflow_info(workflow_cid: Cid, conn: &mut Connection) -> Result { let wf = Self::select_workflow(workflow_cid, conn)?; let associated_receipts = workflow::StoredReceipt::belonging_to(&wf) - .inner_join(schema::receipts::dsl::receipts) - .select(schema::receipts::dsl::cid) + .select(schema::workflows_receipts::receipt_cid) .load(conn)?; let cids = associated_receipts diff --git a/homestar-runtime/src/runner.rs b/homestar-runtime/src/runner.rs index 4b363335..f5f90a47 100644 --- a/homestar-runtime/src/runner.rs +++ b/homestar-runtime/src/runner.rs @@ -27,15 +27,15 @@ use tokio::{ use tracing::info; /// Type alias for a [DashMap] containing running task information. -pub type RunningSet = DashMap>; +pub type RunningTaskSet = DashMap>; /// Trait for managing a [DashMap] of running task information. pub trait ModifiedSet { - /// Append or insert a new [AbortHandle] into the [RunningSet]. + /// Append or insert a new [AbortHandle] into the [RunningTaskSet]. fn append_or_insert(&mut self, cid: Cid, handles: Vec); } -impl ModifiedSet for RunningSet { +impl ModifiedSet for RunningTaskSet { fn append_or_insert(&mut self, cid: Cid, mut handles: Vec) { self.entry(cid) .and_modify(|prev_handles| { @@ -55,7 +55,7 @@ pub struct Runner { command_sender: oneshot::Sender, command_receiver: oneshot::Receiver, event_sender: Arc>, - running_set: RunningSet, + running_set: RunningTaskSet, #[allow(dead_code)] ws_sender: ws::WsSender, ws_receiver: BoundedChannelReceiver, @@ -71,7 +71,7 @@ pub struct Runner { command_sender: oneshot::Sender, command_receiver: oneshot::Receiver, event_sender: Arc>, - running_set: RunningSet, + running_set: RunningTaskSet, } impl Runner { @@ -162,7 +162,7 @@ impl Runner { Ok(()) } - /// Garbage-collect task [AbortHandle]s in the [RunningSet]. + /// Garbage-collect task [AbortHandle]s in the [RunningTaskSet]. pub fn gc(&mut self) { self.running_set.retain(|_cid, handles| { handles.retain(|handle| !handle.is_finished()); @@ -170,9 +170,8 @@ impl Runner { }); } - /// Garbage-collect task [AbortHandle]s in the [RunningSet] for a specific + /// Garbage-collect task [AbortHandle]s in the [RunningTaskSet] for a specific /// workflow [Cid], running on a worker. - /// pub fn gc_worker(&mut self, cid: Cid) { if let Some(mut handles) = self.running_set.get_mut(&cid) { handles.retain(|handle| !handle.is_finished()); @@ -242,15 +241,13 @@ mod test { time::Duration, }; - static ATOMIC_PORT: AtomicUsize = AtomicUsize::new(1338); + static ATOMIC_PORT: AtomicUsize = AtomicUsize::new(444); async fn setup() -> Runner { let mut settings = Settings::load().unwrap(); - settings.node.network.websocket_port = ATOMIC_PORT.fetch_add(1, Ordering::SeqCst) as u16; - let db = crate::test_utils::db::MemoryDb::setup_connection_pool( - Settings::load().unwrap().node(), - ) - .unwrap(); + let _ = ATOMIC_PORT.fetch_add(1, Ordering::SeqCst) as u16; + settings.node.network.websocket_port = ATOMIC_PORT.load(Ordering::SeqCst) as u16; + let db = crate::test_utils::db::MemoryDb::setup_connection_pool(settings.node()).unwrap(); Runner::start(settings.into(), db).await.unwrap() } diff --git a/homestar-runtime/src/worker.rs b/homestar-runtime/src/worker.rs index fc455ca9..3397e131 100644 --- a/homestar-runtime/src/worker.rs +++ b/homestar-runtime/src/worker.rs @@ -17,7 +17,7 @@ use crate::{ Event, }, network::swarm::CapsuleTag, - runner::{ModifiedSet, RunningSet}, + runner::{ModifiedSet, RunningTaskSet}, scheduler::TaskScheduler, tasks::{RegisteredTasks, WasmContext}, workflow::{self, Resource}, @@ -138,7 +138,7 @@ impl<'a> Worker<'a> { pub(crate) async fn run( self, db: impl Database + Sync, - running_set: &mut RunningSet, + running_set: &mut RunningTaskSet, ) -> Result<()> { self.run_queue(db, running_set).await } @@ -146,7 +146,7 @@ impl<'a> Worker<'a> { async fn run_queue( mut self, db: impl Database + Sync, - running_set: &mut RunningSet, + running_set: &mut RunningTaskSet, ) -> Result<()> { fn insert_into_map(mut map: Arc>, key: Cid, value: T) where diff --git a/homestar-runtime/src/workflow/info.rs b/homestar-runtime/src/workflow/info.rs index 3c0595dc..8963a7a5 100644 --- a/homestar-runtime/src/workflow/info.rs +++ b/homestar-runtime/src/workflow/info.rs @@ -52,7 +52,9 @@ impl Stored { /// /// [Workflow]: homestar_core::Workflow /// [receipts]: crate::Receipt -#[derive(Debug, Clone, PartialEq, Queryable, Insertable, Identifiable, Associations, Hash)] +#[derive( + Debug, Clone, PartialEq, Queryable, Insertable, Identifiable, Selectable, Associations, Hash, +)] #[diesel(belongs_to(Receipt, foreign_key = receipt_cid))] #[diesel(belongs_to(Stored, foreign_key = workflow_cid))] #[diesel(table_name = crate::db::schema::workflows_receipts, primary_key(workflow_cid, receipt_cid))] @@ -171,6 +173,7 @@ impl Info { ) -> Result { let workflow_len = workflow.len(); let workflow_cid = workflow.to_cid()?; + let handle_timeout_fn = |workflow_cid, reused_conn: Option<&'a mut Connection>| { let workflow_info = Self::default(workflow_cid, workflow_len); // store workflow from info @@ -234,6 +237,8 @@ impl Info { ), conn, )?; + + Db::store_workflow_receipts(workflow_cid, &workflow_info.progress, conn)?; } Ok(workflow_info)