Skip to content

Commit

Permalink
refactor: add memory limits, update scheduler, progress and info (#144)
Browse files Browse the repository at this point in the history
Includes:

* match found records via "capsule" types use dagcborcodec
* runtime cleanup
* leverage shorter p2p timeout when building scheduler to check network
* remove some clones
  • Loading branch information
Zeeshan Lakhani committed Jun 20, 2023
1 parent fc53a42 commit eaedb38
Show file tree
Hide file tree
Showing 32 changed files with 686 additions and 267 deletions.
3 changes: 2 additions & 1 deletion .envrc
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use_flake

export RUST_LOG=homestar_runtime=debug,atuin_client=warn,libp2p=info
export RUST_LOG=homestar_runtime=debug,atuin_client=warn,libp2p=info,libp2p_gossipsub::behaviour=debug
export RUST_BACKTRACE=full
export RUSTFLAGS="--cfg tokio_unstable"
16 changes: 15 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,14 @@ rust-version = "1.66.0"

[workspace.dependencies]
anyhow = { version = "1.0", features = ["backtrace"] }
async-trait = "0.1"
byte-unit = { version = "4.0", default-features = false }
enum-assoc = " 1.1"
enum-as-inner = "0.6"
libipld = { version = "0.16", features = ["serde-codec"] }
serde_ipld_dagcbor = "0.3"
thiserror = "1.0"
tokio = { version = "1.26", features = ["fs", "io-util", "io-std", "macros", "rt", "rt-multi-thread"] }
tokio = { version = "1.26", features = ["fs", "io-util", "io-std", "macros", "rt", "rt-multi-thread", "tracing"] }
tracing = "0.1"

# Speedup build on macOS
Expand Down
1 change: 1 addition & 0 deletions flake.nix
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
cargo clippy
cargo build --release
nx-test
nx-test-0
'';

db = pkgs.writeScriptBin "db" ''
Expand Down
3 changes: 2 additions & 1 deletion homestar-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,13 @@ doctest = true
# return to version.workspace = true after the following issue is fixed:
# https://github.com/DevinR528/cargo-sort/issues/47
anyhow = { workspace = true }
byte-unit = { workspace = true }
diesel = { version = "2.0", features = ["sqlite"] }
enum-as-inner = { workspace = true }
enum-assoc = { workspace = true }
generic-array = { version = "0.14", features = ["serde"] }
indexmap = "1.9"
libipld = { version = "0.16", features = ["serde-codec"] }
libipld = { workspace = true }
libsqlite3-sys = { version = "0.26", features = ["bundled"] }
proptest = { version = "1.2", optional = true }
serde = { version = "1.0", features = ["derive"] }
Expand Down
2 changes: 2 additions & 0 deletions homestar-core/src/consts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,5 @@
pub const INVOCATION_VERSION: &str = "0.2.0";
/// DagCbor codec.
pub const DAG_CBOR: u64 = 0x71;
/// 4GiB maximum memory for Wasm.
pub const WASM_MAX_MEMORY: u64 = byte_unit::n_gib_bytes!(4);
33 changes: 30 additions & 3 deletions homestar-core/src/workflow/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,35 +4,39 @@
//! [workflow]: super
//! [Invocations]: super::Invocation

use crate::{workflow, Unit};
use crate::{consts, workflow, Unit};
use libipld::{serde::from_ipld, Ipld};
use serde::{Deserialize, Serialize};
use std::{collections::BTreeMap, default::Default, time::Duration};

const FUEL_KEY: &str = "fuel";
const MEMORY_KEY: &str = "memory";
const TIMEOUT_KEY: &str = "time";

/// Resource configuration for defining fuel quota, timeout, etc.
#[derive(Clone, Debug, PartialEq, Eq, Deserialize, Serialize)]
pub struct Resources {
fuel: Option<u64>,
memory: Option<u64>,
time: Option<Duration>,
}

impl Default for Resources {
fn default() -> Self {
Self {
fuel: Some(u64::MAX),
memory: Some(consts::WASM_MAX_MEMORY),
time: Some(Duration::from_millis(100_000)),
}
}
}

impl Resources {
/// Create new [Resources] configuration.
pub fn new(fuel: u64, time: Duration) -> Self {
pub fn new(fuel: u64, memory: u64, time: Duration) -> Self {
Self {
fuel: Some(fuel),
memory: Some(memory),
time: Some(time),
}
}
Expand All @@ -56,6 +60,16 @@ impl Resources {
pub fn set_time(&mut self, time: Duration) {
self.time = Some(time)
}

/// Get max memory.
pub fn memory(&self) -> Option<u64> {
self.memory
}

/// Set max memory.
pub fn set_memory(&mut self, memory: u64) {
self.memory = Some(memory)
}
}

impl From<Resources> for Ipld {
Expand All @@ -65,6 +79,10 @@ impl From<Resources> for Ipld {
FUEL_KEY.into(),
resources.fuel().map(Ipld::from).unwrap_or(Ipld::Null),
),
(
MEMORY_KEY.into(),
resources.memory().map(Ipld::from).unwrap_or(Ipld::Null),
),
(
TIMEOUT_KEY.into(),
resources
Expand Down Expand Up @@ -95,6 +113,11 @@ impl TryFrom<Ipld> for Resources {
ipld => from_ipld(ipld.to_owned()).ok(),
});

let memory = map.get(MEMORY_KEY).and_then(|ipld| match ipld {
Ipld::Null => None,
ipld => from_ipld(ipld.to_owned()).ok(),
});

let time = map.get(TIMEOUT_KEY).and_then(|ipld| match ipld {
Ipld::Null => None,
ipld => {
Expand All @@ -103,7 +126,7 @@ impl TryFrom<Ipld> for Resources {
}
});

Ok(Resources { fuel, time })
Ok(Resources { fuel, memory, time })
}
}

Expand All @@ -121,6 +144,10 @@ mod test {
ipld,
Ipld::Map(BTreeMap::from([
(FUEL_KEY.into(), Ipld::Integer(u64::MAX.into())),
(
MEMORY_KEY.into(),
Ipld::Integer(consts::WASM_MAX_MEMORY.into())
),
(TIMEOUT_KEY.into(), Ipld::Integer(100_000))
]))
);
Expand Down
2 changes: 2 additions & 0 deletions homestar-core/src/workflow/receipt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ use crate::{
use libipld::{self, cbor::DagCborCodec, prelude::Codec, serde::from_ipld, Ipld};
use std::collections::BTreeMap;

pub mod metadata;

const RAN_KEY: &str = "ran";
const OUT_KEY: &str = "out";
const ISSUER_KEY: &str = "iss";
Expand Down
9 changes: 9 additions & 0 deletions homestar-core/src/workflow/receipt/metadata.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
//! Metadata related to [Receipt]s.

/// Metadata key for an operation or function name.
pub const OP_KEY: &str = "op";

/// Metadata key for a workflow [Cid].
///
/// [Cid]: libipld::Cid
pub const WORKFLOW_KEY: &str = "workflow";
4 changes: 3 additions & 1 deletion homestar-core/src/workflow/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ where
#[cfg(test)]
mod test {
use super::*;
use crate::{test_utils, workflow::config::Resources, Unit};
use crate::{consts::WASM_MAX_MEMORY, test_utils, workflow::config::Resources, Unit};

#[test]
fn ipld_roundtrip() {
Expand Down Expand Up @@ -207,6 +207,7 @@ mod test {
METADATA_KEY.into(),
Ipld::Map(BTreeMap::from([
("fuel".into(), Ipld::Integer(u64::MAX.into())),
("memory".into(), Ipld::Integer(WASM_MAX_MEMORY.into())),
("time".into(), Ipld::Integer(100_000))
]))
),
Expand Down Expand Up @@ -236,6 +237,7 @@ mod test {
METADATA_KEY.into(),
Ipld::Map(BTreeMap::from([
("fuel".into(), Ipld::Integer(u64::MAX.into())),
("memory".into(), Ipld::Integer(WASM_MAX_MEMORY.into())),
("time".into(), Ipld::Integer(100_000))
]))
),
Expand Down
13 changes: 7 additions & 6 deletions homestar-runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ ansi_term = { version = "0.12", optional = true, default-features = false }
anyhow = { workspace = true }
async-trait = "0.1"
axum = { version = "0.6", features = ["ws", "headers"] }
byte-unit = { version = "4.0", default-features = false }
byte-unit = { workspace = true }
clap = { version = "4.3", features = ["derive", "color", "help"] }
concat-in-place = "1.1"
config = "0.13"
Expand All @@ -52,18 +52,20 @@ indexmap = "1.9"
ipfs-api = { version = "0.17", optional = true }
ipfs-api-backend-hyper = { version = "0.6", features = ["with-builder", "with-send-sync"], optional = true }
itertools = "0.10"
libipld = "0.16"
libp2p = { version = "0.51", features = ["kad", "request-response", "macros", "identify", "mdns", "gossipsub", "tokio", "dns", "mplex", "tcp", "noise", "yamux", "websocket"] }
libipld = { workspace = true }
libp2p = { version = "0.51", default-features = false, features = ["kad", "request-response", "macros", "identify", "mdns", "gossipsub", "tokio", "dns", "mplex", "tcp", "noise", "yamux", "websocket"] }
libsqlite3-sys = { version = "0.26", features = ["bundled"] }
openssl = { version = "0.10", features = ["vendored"] }
proptest = { version = "1.2", optional = true }
reqwest = { version = "0.11", features = ["json"] }
semver = "1.0"
serde = { version = "1.0", features = ["derive"] }
serde_ipld_dagcbor = { workspace = true }
thiserror = "1.0"
tokio = { workspace = true }
tracing = { workspace = true }
tracing-logfmt = { version = "0.3", optional = true }
tracing-appender = "0.2"
tracing-logfmt = "0.3"
tracing-subscriber = { version = "0.3", features = ["env-filter", "parking_lot", "registry"] }
tryhard = "0.5"
url = "2.4"
Expand All @@ -76,11 +78,10 @@ json = "0.12"
tokio-tungstenite = "0.19"

[features]
default = ["console", "ipfs", "logfmt"]
default = ["ipfs"]
ansi-logs = ["ansi_term"]
console = ["console-subscriber"]
ipfs = ["ipfs-api", "ipfs-api-backend-hyper"]
logfmt = ["tracing-logfmt"]
test_utils = ["proptest"]

[package.metadata.docs.rs]
Expand Down
2 changes: 1 addition & 1 deletion homestar-runtime/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ pub trait Database {
/// work across vecs/arrays.
///
/// [Instruction]: homestar_core::workflow::Instruction
fn find_instructions(pointers: Vec<Pointer>, conn: &mut Connection) -> Result<Vec<Receipt>> {
fn find_instructions(pointers: &Vec<Pointer>, conn: &mut Connection) -> Result<Vec<Receipt>> {
let found_receipts = schema::receipts::dsl::receipts
.filter(schema::receipts::instruction.eq_any(pointers))
.load(conn)?;
Expand Down
2 changes: 1 addition & 1 deletion homestar-runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ pub mod workflow;
pub use db::Db;
#[cfg(feature = "ipfs")]
pub use network::ipfs::IpfsCli;
pub use receipt::Receipt;
pub use receipt::{Receipt, RECEIPT_TAG, VERSION_KEY};
pub use runtime::*;
pub use settings::Settings;
pub use worker::Worker;
Expand Down
Loading

0 comments on commit eaedb38

Please sign in to comment.