Skip to content

Commit

Permalink
Fix: poll DHT in background when worker runs up a workflow + dual-sta…
Browse files Browse the repository at this point in the history
…ck webserver (#590)

Completes:

- Closes #278. 
- Closes #589.

Includes:
* We no longer block DHT polling of receipts, and, instead, purposefully
race execution and DHT finding/polling.
* This moves us toward the goal of resolving outside-of-workflow awaited
(CIDs), but we'll need another background process to re-run a workflow
once known-awaits are resolved (separate ticket). The next set of work
will also validate that those outside-of-workflow promises exist, i.e.
currently running in another workflow or have already run.
* v4/v6 host address settings for the Json-RPC web server so that the
web server can handle dual v4/v6 requests on the same port.
  * Better / ordered error handling around CIDs that fail to resolve.
* Workflow status field in the DB (+ migration) (SQLite enum, with
special patching).
  * Workflow retries field in the DB (in same migration).
  * A new poller implementation that's general purpose.
  * Removes `async_trait` except for wasmtime impls (due to 1.73).
  • Loading branch information
Zeeshan Lakhani committed Feb 29, 2024
1 parent 913278b commit 2d77b0a
Show file tree
Hide file tree
Showing 43 changed files with 1,080 additions and 485 deletions.
2 changes: 1 addition & 1 deletion .envrc
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use_flake

export RUST_LOG=homestar=debug,homestar_runtime=debug,homestar_wasm=debug,libp2p=info,libp2p_gossipsub::behaviour=debug,tarpc=info,tower_http=debug,jsonrpsee_server=debug,moka=debug
export RUST_BACKTRACE=full
export RUST_BACKTRACE=1
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ repos:
- id: trailing-whitespace
exclude: bindings\.rs$
- id: end-of-file-fixer
exclude: \.(txt|json)$
exclude: \.(txt|json|patch)$
- id: check-yaml
- id: check-json
- id: check-added-large-files
Expand Down
40 changes: 27 additions & 13 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions diesel.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

[print_schema]
file = "homestar-runtime/src/db/schema.rs"
patch_file = "homestar-runtime/src/db/schema.patch"

[migrations_directory]
dir = "homestar-runtime/migrations"
24 changes: 12 additions & 12 deletions flake.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 12 additions & 1 deletion flake.nix
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,23 @@
pkgs = import nixpkgs {inherit system overlays;};
unstable = import nixos-unstable {inherit system overlays;};

rust-toolchain = fenix.packages.${system}.fromToolchainFile {
file-toolchain = fenix.packages.${system}.fromToolchainFile {
file = ./rust-toolchain.toml;
# sha256 = pkgs.lib.fakeSha256;
sha256 = "sha256-e4mlaJehWBymYxJGgnbuCObVlqMlQSilZ8FljG9zPHY=";
};

default-toolchain = fenix.packages.${system}.complete.withComponents [
"cargo"
"clippy"
"llvm-tools-preview"
"rustfmt"
"rust-src"
"rust-std"
];

rust-toolchain = fenix.packages.${system}.combine [file-toolchain default-toolchain];

rustPlatform = pkgs.makeRustPlatform {
cargo = rust-toolchain;
rustc = rust-toolchain;
Expand Down
2 changes: 1 addition & 1 deletion homestar-invocation/src/receipt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ pub struct Receipt<T> {
}

impl<T> Receipt<T> {
///
/// Create a new [Receipt].
pub fn new(
ran: Pointer,
result: task::Result<T>,
Expand Down
11 changes: 9 additions & 2 deletions homestar-invocation/src/task/instruction/input.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,15 @@ where
F: Fn(Cid) -> BoxFuture<'a, Result<task::Result<T>, ResolveError>> + Clone + Send + Sync,
Ipld: From<T>,
{
let inputs = resolve_args(self.0, lookup_fn);
Ok(Args(inputs.await))
let inputs = resolve_args(self.0, lookup_fn).await;
for input in inputs.iter() {
if let Input::Deferred(awaiting) = input {
return Err(ResolveError::UnresolvedCid(
awaiting.instruction_cid().to_string(),
));
}
}
Ok(Args(inputs))
}
}

Expand Down
4 changes: 3 additions & 1 deletion homestar-runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ diesel = { version = "2.1", default-features = false, features = [
"with-deprecated",
"chrono",
] }
diesel-derive-enum = { version = "2.1", features = ["sqlite"] }
diesel_migrations = "2.1"
dotenvy = "0.15"
dyn-clone = "1.0"
Expand All @@ -79,6 +80,7 @@ homestar-workspace-hack = { workspace = true }
http = "0.2"
http-serde = "1.1"
humantime = { workspace = true }
hyper = { version = "0.14", default-features = false }
indexmap = { workspace = true }
ipfs-api = { version = "0.17", optional = true }
ipfs-api-backend-hyper = { version = "0.6", default-features = false, features = [
Expand Down Expand Up @@ -147,7 +149,7 @@ serde_with = { version = "3.5", default-features = false, features = [
] }
stream-cancel = "0.8"
sysinfo = { version = "0.29", default-features = false, optional = true }
tabled = { version = "0.14", default-features = false, features = [
tabled = { version = "0.15", default-features = false, features = [
"derive",
"macros",
] }
Expand Down
4 changes: 3 additions & 1 deletion homestar-runtime/config/defaults.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ mesh_n = 2
mesh_outbound_min = 1

[node.network.libp2p.dht]
enable_resolve_receipts_in_background = true
p2p_receipt_timeout = 500
p2p_workflow_info_timeout = 500
p2p_provider_timeout = 10_000
Expand All @@ -74,7 +75,8 @@ max_connections = 10
server_timeout = 120

[node.network.webserver]
host = "127.0.0.1"
v4_host = "127.0.0.1"
v6_host = "[::1]"
port = 1337
timeout = 120
websocket_capacity = 2048
Expand Down
Empty file removed homestar-runtime/migrations/.keep
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ALTER TABLE workflows DROP COLUMN status;
ALTER TABLE workflows DROP COLUMN retries;
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
ALTER TABLE workflows ADD COLUMN status TEXT CHECK(
status IN ('pending', 'completed', 'running', 'stuck')) NOT NULL DEFAULT
'pending';
ALTER TABLE workflows ADD COLUMN retries INTEGER NOT NULL DEFAULT 0;
15 changes: 15 additions & 0 deletions homestar-runtime/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use tokio::fs;
use tracing::info;

#[allow(missing_docs, unused_imports)]
#[rustfmt::skip]
pub mod schema;
pub(crate) mod utils;

Expand Down Expand Up @@ -248,6 +249,20 @@ pub trait Database: Send + Sync + Clone {
}
}

/// Update workflow status given a Cid to the workflow.
fn set_workflow_status(
workflow_cid: Cid,
status: workflow::Status,
conn: &mut Connection,
) -> Result<(), diesel::result::Error> {
diesel::update(schema::workflows::dsl::workflows)
.filter(schema::workflows::cid.eq(Pointer::new(workflow_cid)))
.set(schema::workflows::status.eq(status))
.execute(conn)?;

Ok(())
}

/// Store workflow Cid and [Receipt] Cid in the database for inner join.
fn store_workflow_receipt(
workflow_cid: Cid,
Expand Down
9 changes: 9 additions & 0 deletions homestar-runtime/src/db/schema.patch
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
@@ -25,7 +25,7 @@ diesel::table! {
cid -> Text,
name -> Nullable<Text>,
num_tasks -> Integer,
resources -> Binary,
created_at -> Timestamp,
completed_at -> Nullable<Timestamp>,
+ status -> crate::workflow::StatusMapping,
- status -> Text,
8 changes: 7 additions & 1 deletion homestar-runtime/src/db/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ diesel::table! {
resources -> Binary,
created_at -> Timestamp,
completed_at -> Nullable<Timestamp>,
status -> crate::workflow::StatusMapping,
retries -> Integer,
}
}

Expand All @@ -34,4 +36,8 @@ diesel::table! {
diesel::joinable!(workflows_receipts -> receipts (receipt_cid));
diesel::joinable!(workflows_receipts -> workflows (workflow_cid));

diesel::allow_tables_to_appear_in_same_query!(receipts, workflows, workflows_receipts,);
diesel::allow_tables_to_appear_in_same_query!(
receipts,
workflows,
workflows_receipts,
);
2 changes: 0 additions & 2 deletions homestar-runtime/src/event_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ use crate::{
settings,
};
use anyhow::Result;
use async_trait::async_trait;
use fnv::FnvHashMap;
use libp2p::{
core::ConnectedPoint, futures::StreamExt, kad::QueryId, rendezvous::Cookie,
Expand Down Expand Up @@ -48,7 +47,6 @@ struct Bootstrap {
}

/// Handler trait for [EventHandler] events.
#[async_trait]
pub(crate) trait Handler<DB>
where
DB: Database,
Expand Down
2 changes: 0 additions & 2 deletions homestar-runtime/src/event_handler/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use crate::{
workflow, Db, Receipt,
};
use anyhow::Result;
use async_trait::async_trait;
#[cfg(feature = "websocket-notify")]
use homestar_invocation::Pointer;
use homestar_invocation::Receipt as InvocationReceipt;
Expand Down Expand Up @@ -701,7 +700,6 @@ impl PeerRequest {
}
}

#[async_trait]
impl<DB> Handler<DB> for Event
where
DB: Database,
Expand Down
1 change: 1 addition & 0 deletions homestar-runtime/src/event_handler/notification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ pub(crate) fn emit_receipt(
subject = "notification.receipt",
category = "notification",
cid = receipt_cid.to_string(),
instruction_cid = receipt.instruction().cid().to_string(),
"emitting receipt to WebSocket"
);
if let Some(ipld) = metadata {
Expand Down
Loading

0 comments on commit 2d77b0a

Please sign in to comment.