diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 8046dd084..1b13d1c73 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -185,7 +185,7 @@ jobs: run: cargo fmt --manifest-path top-crates/Cargo.toml --check - name: Build backend run: |- - mkdir -p ui/target; docker run --rm -v $PWD/ui:/ui -v ~/.cargo/git:/root/.cargo/git -v ~/.cargo/registry:/root/.cargo/registry --workdir /ui rust:alpine sh -c ' + mkdir -p ui/target; docker run --rm -v $PWD/compiler/base/asm-cleanup:/compiler/base/asm-cleanup -v $PWD/compiler/base/orchestrator:/compiler/base/orchestrator -v $PWD/compiler/base/modify-cargo-toml:/compiler/base/modify-cargo-toml -v $PWD/ui:/ui -v ~/.cargo/git:/root/.cargo/git -v ~/.cargo/registry:/root/.cargo/registry --workdir /ui rust:alpine sh -c ' apk add musl-dev openssl-dev openssl-libs-static # Adding -C relocation-model=static due to diff --git a/ci/workflows.yml b/ci/workflows.yml index 3e94d569a..0a06222ab 100644 --- a/ci/workflows.yml +++ b/ci/workflows.yml @@ -288,6 +288,9 @@ workflows: docker run --rm + -v $PWD/compiler/base/asm-cleanup:/compiler/base/asm-cleanup + -v $PWD/compiler/base/orchestrator:/compiler/base/orchestrator + -v $PWD/compiler/base/modify-cargo-toml:/compiler/base/modify-cargo-toml -v $PWD/ui:/ui -v ~/.cargo/git:/root/.cargo/git -v ~/.cargo/registry:/root/.cargo/registry diff --git a/compiler/base/.dockerignore b/compiler/base/.dockerignore index ddfc04b3b..a8072fb0b 100644 --- a/compiler/base/.dockerignore +++ b/compiler/base/.dockerignore @@ -1 +1,3 @@ +asm-cleanup/target modify-cargo-toml/target +orchestrator/target diff --git a/compiler/base/Dockerfile b/compiler/base/Dockerfile index b75bbc1ed..a572c3da9 100644 --- a/compiler/base/Dockerfile +++ b/compiler/base/Dockerfile @@ -61,11 +61,41 @@ FROM bare-sources as munge ADD --chown=playground modify-cargo-toml /playground/modify-cargo-toml RUN cargo build --release --manifest-path=/playground/modify-cargo-toml/Cargo.toml +# Set up cargo-chef for faster builds + +FROM bare-sources as chef-available + +RUN cargo install cargo-chef + +WORKDIR /orchestrator + +# Prepare the orchestrator's dependencies + +FROM chef-available as prepare-orchestrator + +COPY --chown=playground asm-cleanup /asm-cleanup +COPY --chown=playground modify-cargo-toml /modify-cargo-toml +COPY --chown=playground orchestrator /orchestrator +RUN cargo chef prepare + +# Build the orchestrator + +FROM chef-available as build-orchestrator + +COPY --chown=playground asm-cleanup /asm-cleanup +COPY --chown=playground modify-cargo-toml /modify-cargo-toml +COPY --chown=playground --from=prepare-orchestrator /orchestrator/recipe.json /orchestrator/recipe.json +RUN cargo chef cook --release + +COPY --chown=playground orchestrator /orchestrator +RUN cargo install --path . + # Compiler and sources FROM bare-sources as sources COPY --from=munge /playground/modify-cargo-toml/target/release/modify-cargo-toml /playground/.cargo/bin +COPY --from=build-orchestrator /playground/.cargo/bin/worker /playground/.cargo/bin/worker # Compiler and pre-compiled crates diff --git a/compiler/base/asm-cleanup/.gitignore b/compiler/base/asm-cleanup/.gitignore new file mode 100644 index 000000000..1e7caa9ea --- /dev/null +++ b/compiler/base/asm-cleanup/.gitignore @@ -0,0 +1,2 @@ +Cargo.lock +target/ diff --git a/compiler/base/asm-cleanup/Cargo.toml b/compiler/base/asm-cleanup/Cargo.toml new file mode 100644 index 000000000..98645dd49 --- /dev/null +++ b/compiler/base/asm-cleanup/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "asm-cleanup" +version = "0.1.0" +edition = "2018" + +[workspace] + +[dependencies] +lazy_static = "1.0.0" +petgraph = "0.6.0" +regex = "1.0.0" +rustc-demangle = "0.1.5" diff --git a/ui/src/asm_cleanup.rs b/compiler/base/asm-cleanup/src/lib.rs similarity index 100% rename from ui/src/asm_cleanup.rs rename to compiler/base/asm-cleanup/src/lib.rs diff --git a/compiler/base/cargo-wasm b/compiler/base/cargo-wasm index b9107833d..f6d4b3b88 100755 --- a/compiler/base/cargo-wasm +++ b/compiler/base/cargo-wasm @@ -41,4 +41,9 @@ for wasm in $(find target/ -name '*wasm' -not -path '*/deps/*'); do # wasm2wat spits out an error that we don't care about, so hide it # https://github.com/WebAssembly/wabt/issues/842 # https://stackoverflow.com/a/15936384/155423 + + # The streaming playground expects the file to be without the + # extension while the original playground expects it to be with + # the extension. Support both for now. + cp "${output}.wat" "${output}" done diff --git a/compiler/base/entrypoint.sh b/compiler/base/entrypoint.sh index bd3da4e03..47b790eea 100755 --- a/compiler/base/entrypoint.sh +++ b/compiler/base/entrypoint.sh @@ -2,10 +2,14 @@ set -eu -timeout=${PLAYGROUND_TIMEOUT:-10} +if [[ -z "${PLAYGROUND_ORCHESTRATOR:-}" ]]; then + timeout=${PLAYGROUND_TIMEOUT:-10} -modify-cargo-toml + modify-cargo-toml -# Don't use `exec` here. The shell is what prints out the useful -# "Killed" message -timeout --signal=KILL ${timeout} "$@" + # Don't use `exec` here. The shell is what prints out the useful + # "Killed" message + timeout --signal=KILL ${timeout} "$@" +else + exec "$@" +fi diff --git a/compiler/base/modify-cargo-toml/src/lib.rs b/compiler/base/modify-cargo-toml/src/lib.rs new file mode 100644 index 000000000..8b886a0a5 --- /dev/null +++ b/compiler/base/modify-cargo-toml/src/lib.rs @@ -0,0 +1,131 @@ +extern crate serde; +#[macro_use] +extern crate serde_derive; +extern crate toml; + +use std::collections::BTreeMap; +use toml::Value; + +type Other = BTreeMap; + +fn modify(cargo_toml: Value, f: F) -> Value +where + F: FnOnce(T) -> T, + T: serde::Serialize + for<'de> serde::Deserialize<'de>, +{ + let cargo_toml = cargo_toml.try_into().unwrap(); + + let cargo_toml = f(cargo_toml); + + Value::try_from(cargo_toml).unwrap() +} + +fn ensure_string_in_vec(values: &mut Vec, val: &str) { + if !values.iter().any(|f| f == val) { + values.push(val.into()); + } +} + +pub fn set_edition(cargo_toml: Value, edition: &str) -> Value { + #[derive(Debug, Serialize, Deserialize)] + #[serde(rename_all = "kebab-case")] + struct CargoToml { + package: Package, + #[serde(flatten)] + other: Other, + } + + #[derive(Debug, Serialize, Deserialize)] + #[serde(rename_all = "kebab-case")] + struct Package { + #[serde(default)] + edition: String, + #[serde(flatten)] + other: Other, + } + + modify(cargo_toml, |mut cargo_toml: CargoToml| { + cargo_toml.package.edition = edition.into(); + cargo_toml + }) +} + +pub fn remove_dependencies(cargo_toml: Value) -> Value { + #[derive(Debug, Serialize, Deserialize)] + #[serde(rename_all = "kebab-case")] + struct CargoToml { + dependencies: BTreeMap, + #[serde(flatten)] + other: Other, + } + + modify(cargo_toml, |mut cargo_toml: CargoToml| { + cargo_toml.dependencies.clear(); + cargo_toml + }) +} + +pub fn set_crate_type(cargo_toml: Value, crate_type: &str) -> Value { + #[derive(Debug, Serialize, Deserialize)] + #[serde(rename_all = "kebab-case")] + struct CargoToml { + #[serde(default)] + lib: Lib, + #[serde(flatten)] + other: Other, + } + + #[derive(Debug, Default, Serialize, Deserialize)] + #[serde(rename_all = "kebab-case")] + struct Lib { + #[serde(default, skip_serializing_if = "Vec::is_empty")] + crate_type: Vec, + #[serde(default)] + proc_macro: bool, + #[serde(flatten)] + other: Other, + } + + modify(cargo_toml, |mut cargo_toml: CargoToml| { + if crate_type == "proc-macro" { + cargo_toml.lib.proc_macro = true; + } else { + ensure_string_in_vec(&mut cargo_toml.lib.crate_type, crate_type); + } + cargo_toml + }) +} + +pub fn set_release_lto(cargo_toml: Value, lto: bool) -> Value { + #[derive(Debug, Serialize, Deserialize)] + #[serde(rename_all = "kebab-case")] + struct CargoToml { + #[serde(default)] + profile: Profiles, + #[serde(flatten)] + other: Other, + } + + #[derive(Debug, Default, Serialize, Deserialize)] + #[serde(rename_all = "kebab-case")] + struct Profiles { + #[serde(default)] + release: Profile, + #[serde(flatten)] + other: Other, + } + + #[derive(Debug, Default, Serialize, Deserialize)] + #[serde(rename_all = "kebab-case")] + struct Profile { + #[serde(default)] + lto: bool, + #[serde(flatten)] + other: Other, + } + + modify(cargo_toml, |mut cargo_toml: CargoToml| { + cargo_toml.profile.release.lto = lto; + cargo_toml + }) +} diff --git a/compiler/base/modify-cargo-toml/src/main.rs b/compiler/base/modify-cargo-toml/src/main.rs index e5732d583..94177c884 100644 --- a/compiler/base/modify-cargo-toml/src/main.rs +++ b/compiler/base/modify-cargo-toml/src/main.rs @@ -1,9 +1,8 @@ -extern crate serde; -#[macro_use] -extern crate serde_derive; +extern crate modify_cargo_toml; extern crate toml; -use std::{collections::BTreeMap, env, ffi::OsString, fs, path::PathBuf}; +use modify_cargo_toml::*; +use std::{env, ffi::OsString, fs, path::PathBuf}; use toml::Value; fn main() { @@ -41,127 +40,3 @@ fn main() { fs::write(&output_filename, output) .unwrap_or_else(|e| panic!("Cannot write to {}: {}", output_filename.display(), e)); } - -type Other = BTreeMap; - -fn modify(cargo_toml: Value, f: F) -> Value -where - F: FnOnce(T) -> T, - T: serde::Serialize + for<'de> serde::Deserialize<'de>, -{ - let cargo_toml = cargo_toml.try_into().unwrap(); - - let cargo_toml = f(cargo_toml); - - Value::try_from(cargo_toml).unwrap() -} - -fn ensure_string_in_vec(values: &mut Vec, val: &str) { - if !values.iter().any(|f| f == val) { - values.push(val.into()); - } -} - -fn set_edition(cargo_toml: Value, edition: &str) -> Value { - #[derive(Debug, Serialize, Deserialize)] - #[serde(rename_all = "kebab-case")] - struct CargoToml { - package: Package, - #[serde(flatten)] - other: Other, - } - - #[derive(Debug, Serialize, Deserialize)] - #[serde(rename_all = "kebab-case")] - struct Package { - #[serde(default)] - edition: String, - #[serde(flatten)] - other: Other, - } - - modify(cargo_toml, |mut cargo_toml: CargoToml| { - cargo_toml.package.edition = edition.into(); - cargo_toml - }) -} - -fn remove_dependencies(cargo_toml: Value) -> Value { - #[derive(Debug, Serialize, Deserialize)] - #[serde(rename_all = "kebab-case")] - struct CargoToml { - dependencies: BTreeMap, - #[serde(flatten)] - other: Other, - } - - modify(cargo_toml, |mut cargo_toml: CargoToml| { - cargo_toml.dependencies.clear(); - cargo_toml - }) -} - -fn set_crate_type(cargo_toml: Value, crate_type: &str) -> Value { - #[derive(Debug, Serialize, Deserialize)] - #[serde(rename_all = "kebab-case")] - struct CargoToml { - #[serde(default)] - lib: Lib, - #[serde(flatten)] - other: Other, - } - - #[derive(Debug, Default, Serialize, Deserialize)] - #[serde(rename_all = "kebab-case")] - struct Lib { - #[serde(default, skip_serializing_if = "Vec::is_empty")] - crate_type: Vec, - #[serde(default)] - proc_macro: bool, - #[serde(flatten)] - other: Other, - } - - modify(cargo_toml, |mut cargo_toml: CargoToml| { - if crate_type == "proc-macro" { - cargo_toml.lib.proc_macro = true; - } else { - ensure_string_in_vec(&mut cargo_toml.lib.crate_type, crate_type); - } - cargo_toml - }) -} - -fn set_release_lto(cargo_toml: Value, lto: bool) -> Value { - #[derive(Debug, Serialize, Deserialize)] - #[serde(rename_all = "kebab-case")] - struct CargoToml { - #[serde(default)] - profile: Profiles, - #[serde(flatten)] - other: Other, - } - - #[derive(Debug, Default, Serialize, Deserialize)] - #[serde(rename_all = "kebab-case")] - struct Profiles { - #[serde(default)] - release: Profile, - #[serde(flatten)] - other: Other, - } - - #[derive(Debug, Default, Serialize, Deserialize)] - #[serde(rename_all = "kebab-case")] - struct Profile { - #[serde(default)] - lto: bool, - #[serde(flatten)] - other: Other, - } - - modify(cargo_toml, |mut cargo_toml: CargoToml| { - cargo_toml.profile.release.lto = lto; - cargo_toml - }) -} diff --git a/compiler/base/orchestrator/.gitignore b/compiler/base/orchestrator/.gitignore new file mode 100644 index 000000000..eb5a316cb --- /dev/null +++ b/compiler/base/orchestrator/.gitignore @@ -0,0 +1 @@ +target diff --git a/compiler/base/orchestrator/Cargo.lock b/compiler/base/orchestrator/Cargo.lock new file mode 100644 index 000000000..3db923433 --- /dev/null +++ b/compiler/base/orchestrator/Cargo.lock @@ -0,0 +1,648 @@ +# This file is automatically @generated by Cargo. +# It is not intended for manual editing. +version = 3 + +[[package]] +name = "aho-corasick" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "43f6cb1bf222025340178f382c426f13757b2960e89779dfcb319c32542a5a41" +dependencies = [ + "memchr", +] + +[[package]] +name = "asm-cleanup" +version = "0.1.0" +dependencies = [ + "lazy_static", + "petgraph", + "regex", + "rustc-demangle", +] + +[[package]] +name = "assertables" +version = "7.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c24e9d990669fbd16806bff449e4ac644fd9b1fca014760087732fe4102f131" + +[[package]] +name = "autocfg" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" + +[[package]] +name = "bincode" +version = "1.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b1f45e9417d87227c7a56d22e471c6206462cba514c7590c09aff4cf6d1ddcad" +dependencies = [ + "serde", +] + +[[package]] +name = "bytes" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "89b2fd2a0dcf38d7971e2194b6b6eebab45ae01067456a7fd93d5547a61b70be" + +[[package]] +name = "doc-comment" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fea41bba32d969b513997752735605054bc0dfa92b4c56bf1189f2e174be7a10" + +[[package]] +name = "fixedbitset" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80" + +[[package]] +name = "fuchsia-cprng" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a06f77d526c1a601b7c4cdd98f54b5eaabffc14d5f2f0296febdc7f357c6d3ba" + +[[package]] +name = "futures" +version = "0.3.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "23342abe12aba583913b2e62f22225ff9c950774065e4bfb61a19cd9770fec40" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-channel" +version = "0.3.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "955518d47e09b25bbebc7a18df10b81f0c766eaf4c4f1cccef2fca5f2a4fb5f2" +dependencies = [ + "futures-core", + "futures-sink", +] + +[[package]] +name = "futures-core" +version = "0.3.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4bca583b7e26f571124fe5b7561d49cb2868d79116cfa0eefce955557c6fee8c" + +[[package]] +name = "futures-executor" +version = "0.3.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ccecee823288125bd88b4d7f565c9e58e41858e47ab72e8ea2d64e93624386e0" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-io" +version = "0.3.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4fff74096e71ed47f8e023204cfd0aa1289cd54ae5430a9523be060cdb849964" + +[[package]] +name = "futures-sink" +version = "0.3.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f43be4fe21a13b9781a69afa4985b0f6ee0e1afab2c6f454a8cf30e2b2237b6e" + +[[package]] +name = "futures-task" +version = "0.3.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "76d3d132be6c0e6aa1534069c705a74a5997a356c0dc2f86a47765e5617c5b65" + +[[package]] +name = "futures-util" +version = "0.3.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "26b01e40b772d54cf6c6d721c1d1abd0647a0106a12ecaa1c186273392a69533" +dependencies = [ + "futures-channel", + "futures-core", + "futures-io", + "futures-sink", + "futures-task", + "memchr", + "pin-project-lite", + "pin-utils", + "slab", +] + +[[package]] +name = "hashbrown" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" + +[[package]] +name = "heck" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "95505c38b4572b2d910cecb0281560f54b440a19336cbbcb27bf6ce6adc6f5a8" + +[[package]] +name = "indexmap" +version = "1.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bd070e393353796e801d209ad339e89596eb4c8d430d18ede6a1cced8fafbd99" +dependencies = [ + "autocfg", + "hashbrown", +] + +[[package]] +name = "lazy_static" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" + +[[package]] +name = "libc" +version = "0.2.146" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f92be4933c13fd498862a9e02a3055f8a8d9c039ce33db97306fd5a6caa7f29b" + +[[package]] +name = "memchr" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2dffe52ecf27772e601905b7522cb4ef790d2cc203488bbd0e2fe85fcb74566d" + +[[package]] +name = "mio" +version = "0.8.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "927a765cd3fc26206e66b296465fa9d3e5ab003e651c1b3c060e7956d96b19d2" +dependencies = [ + "libc", + "wasi", + "windows-sys", +] + +[[package]] +name = "modify-cargo-toml" +version = "0.1.0" +dependencies = [ + "serde", + "serde_derive", + "toml", +] + +[[package]] +name = "orchestrator" +version = "0.1.0" +dependencies = [ + "asm-cleanup", + "assertables", + "bincode", + "futures", + "modify-cargo-toml", + "serde", + "snafu", + "tempdir", + "tokio", + "tokio-stream", + "tokio-util", + "toml", +] + +[[package]] +name = "petgraph" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4dd7d28ee937e54fe3080c91faa1c3a46c06de6252988a7f4592ba2310ef22a4" +dependencies = [ + "fixedbitset", + "indexmap", +] + +[[package]] +name = "pin-project" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c95a7476719eab1e366eaf73d0260af3021184f18177925b07f54b30089ceead" +dependencies = [ + "pin-project-internal", +] + +[[package]] +name = "pin-project-internal" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "39407670928234ebc5e6e580247dd567ad73a3578460c5990f9503df207e8f07" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.18", +] + +[[package]] +name = "pin-project-lite" +version = "0.2.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e0a7ae3ac2f1173085d398531c705756c94a4c56843785df85a60c1a0afac116" + +[[package]] +name = "pin-utils" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" + +[[package]] +name = "proc-macro2" +version = "1.0.60" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dec2b086b7a862cf4de201096214fa870344cf922b2b30c167badb3af3195406" +dependencies = [ + "unicode-ident", +] + +[[package]] +name = "quote" +version = "1.0.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b9ab9c7eadfd8df19006f1cf1a4aed13540ed5cbc047010ece5826e10825488" +dependencies = [ + "proc-macro2", +] + +[[package]] +name = "rand" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "552840b97013b1a26992c11eac34bdd778e464601a4c2054b5f0bff7c6761293" +dependencies = [ + "fuchsia-cprng", + "libc", + "rand_core 0.3.1", + "rdrand", + "winapi", +] + +[[package]] +name = "rand_core" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a6fdeb83b075e8266dcc8762c22776f6877a63111121f5f8c7411e5be7eed4b" +dependencies = [ + "rand_core 0.4.2", +] + +[[package]] +name = "rand_core" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c33a3c44ca05fa6f1807d8e6743f3824e8509beca625669633be0acbdf509dc" + +[[package]] +name = "rdrand" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "678054eb77286b51581ba43620cc911abf02758c91f93f479767aed0f90458b2" +dependencies = [ + "rand_core 0.3.1", +] + +[[package]] +name = "regex" +version = "1.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d0ab3ca65655bb1e41f2a8c8cd662eb4fb035e67c3f78da1d61dffe89d07300f" +dependencies = [ + "aho-corasick", + "memchr", + "regex-syntax", +] + +[[package]] +name = "regex-syntax" +version = "0.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "436b050e76ed2903236f032a59761c1eb99e1b0aead2c257922771dab1fc8c78" + +[[package]] +name = "remove_dir_all" +version = "0.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3acd125665422973a33ac9d3dd2df85edad0f4ae9b00dafb1a05e43a9f5ef8e7" +dependencies = [ + "winapi", +] + +[[package]] +name = "rustc-demangle" +version = "0.1.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76" + +[[package]] +name = "serde" +version = "1.0.164" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e8c8cf938e98f769bc164923b06dce91cea1751522f46f8466461af04c9027d" +dependencies = [ + "serde_derive", +] + +[[package]] +name = "serde_derive" +version = "1.0.164" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d9735b638ccc51c28bf6914d90a2e9725b377144fc612c49a611fddd1b631d68" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.18", +] + +[[package]] +name = "serde_spanned" +version = "0.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "93107647184f6027e3b7dcb2e11034cf95ffa1e3a682c67951963ac69c1c007d" +dependencies = [ + "serde", +] + +[[package]] +name = "signal-hook-registry" +version = "1.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d8229b473baa5980ac72ef434c4415e70c4b5e71b423043adb4ba059f89c99a1" +dependencies = [ + "libc", +] + +[[package]] +name = "slab" +version = "0.4.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6528351c9bc8ab22353f9d776db39a20288e8d6c37ef8cfe3317cf875eecfc2d" +dependencies = [ + "autocfg", +] + +[[package]] +name = "snafu" +version = "0.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cb0656e7e3ffb70f6c39b3c2a86332bb74aa3c679da781642590f3c1118c5045" +dependencies = [ + "doc-comment", + "futures-core", + "pin-project", + "snafu-derive", +] + +[[package]] +name = "snafu-derive" +version = "0.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "475b3bbe5245c26f2d8a6f62d67c1f30eb9fffeccee721c45d162c3ebbdf81b2" +dependencies = [ + "heck", + "proc-macro2", + "quote", + "syn 1.0.109", +] + +[[package]] +name = "syn" +version = "1.0.109" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "72b64191b275b66ffe2469e8af2c1cfe3bafa67b529ead792a6d0160888b4237" +dependencies = [ + "proc-macro2", + "quote", + "unicode-ident", +] + +[[package]] +name = "syn" +version = "2.0.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32d41677bcbe24c20c52e7c70b0d8db04134c5d1066bf98662e2871ad200ea3e" +dependencies = [ + "proc-macro2", + "quote", + "unicode-ident", +] + +[[package]] +name = "tempdir" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "15f2b5fb00ccdf689e0149d1b1b3c03fead81c2b37735d812fa8bddbbf41b6d8" +dependencies = [ + "rand", + "remove_dir_all", +] + +[[package]] +name = "tokio" +version = "1.28.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94d7b1cfd2aa4011f2de74c2c4c63665e27a71006b0a192dcd2710272e73dfa2" +dependencies = [ + "autocfg", + "bytes", + "libc", + "mio", + "pin-project-lite", + "signal-hook-registry", + "tokio-macros", + "windows-sys", +] + +[[package]] +name = "tokio-macros" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "630bdcf245f78637c13ec01ffae6187cca34625e8c63150d424b59e55af2675e" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.18", +] + +[[package]] +name = "tokio-stream" +version = "0.1.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "397c988d37662c7dda6d2208364a706264bf3d6138b11d436cbac0ad38832842" +dependencies = [ + "futures-core", + "pin-project-lite", + "tokio", +] + +[[package]] +name = "tokio-util" +version = "0.7.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "806fe8c2c87eccc8b3267cbae29ed3ab2d0bd37fca70ab622e46aaa9375ddb7d" +dependencies = [ + "bytes", + "futures-core", + "futures-sink", + "pin-project-lite", + "tokio", +] + +[[package]] +name = "toml" +version = "0.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d6135d499e69981f9ff0ef2167955a5333c35e36f6937d382974566b3d5b94ec" +dependencies = [ + "serde", + "serde_spanned", + "toml_datetime", + "toml_edit", +] + +[[package]] +name = "toml_datetime" +version = "0.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a76a9312f5ba4c2dec6b9161fdf25d87ad8a09256ccea5a556fef03c706a10f" +dependencies = [ + "serde", +] + +[[package]] +name = "toml_edit" +version = "0.19.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2380d56e8670370eee6566b0bfd4265f65b3f432e8c6d85623f728d4fa31f739" +dependencies = [ + "indexmap", + "serde", + "serde_spanned", + "toml_datetime", + "winnow", +] + +[[package]] +name = "unicode-ident" +version = "1.0.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b15811caf2415fb889178633e7724bad2509101cde276048e013b9def5e51fa0" + +[[package]] +name = "wasi" +version = "0.11.0+wasi-snapshot-preview1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" + +[[package]] +name = "winapi" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419" +dependencies = [ + "winapi-i686-pc-windows-gnu", + "winapi-x86_64-pc-windows-gnu", +] + +[[package]] +name = "winapi-i686-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" + +[[package]] +name = "winapi-x86_64-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" + +[[package]] +name = "windows-sys" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "677d2418bec65e3338edb076e806bc1ec15693c5d0104683f2efe857f61056a9" +dependencies = [ + "windows-targets", +] + +[[package]] +name = "windows-targets" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b1eb6f0cd7c80c79759c929114ef071b87354ce476d9d94271031c0497adfd5" +dependencies = [ + "windows_aarch64_gnullvm", + "windows_aarch64_msvc", + "windows_i686_gnu", + "windows_i686_msvc", + "windows_x86_64_gnu", + "windows_x86_64_gnullvm", + "windows_x86_64_msvc", +] + +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "91ae572e1b79dba883e0d315474df7305d12f569b400fcf90581b06062f7e1bc" + +[[package]] +name = "windows_aarch64_msvc" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b2ef27e0d7bdfcfc7b868b317c1d32c641a6fe4629c171b8928c7b08d98d7cf3" + +[[package]] +name = "windows_i686_gnu" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "622a1962a7db830d6fd0a69683c80a18fda201879f0f447f065a3b7467daa241" + +[[package]] +name = "windows_i686_msvc" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4542c6e364ce21bf45d69fdd2a8e455fa38d316158cfd43b3ac1c5b1b19f8e00" + +[[package]] +name = "windows_x86_64_gnu" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ca2b8a661f7628cbd23440e50b05d705db3686f894fc9580820623656af974b1" + +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7896dbc1f41e08872e9d5e8f8baa8fdd2677f29468c4e156210174edc7f7b953" + +[[package]] +name = "windows_x86_64_msvc" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a515f5799fe4961cb532f983ce2b23082366b898e52ffbce459c86f67c8378a" + +[[package]] +name = "winnow" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "61de7bac303dc551fe038e2b3cef0f571087a47571ea6e79a87692ac99b99699" +dependencies = [ + "memchr", +] diff --git a/compiler/base/orchestrator/Cargo.toml b/compiler/base/orchestrator/Cargo.toml new file mode 100644 index 000000000..dd6083935 --- /dev/null +++ b/compiler/base/orchestrator/Cargo.toml @@ -0,0 +1,23 @@ +[package] +name = "orchestrator" +version = "0.1.0" +edition = "2021" + +[workspace] + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html +[dependencies] +asm-cleanup = { path = "../asm-cleanup" } +bincode = { version = "1.3", default-features = false } +futures = { version = "0.3.28", default-features = false, features = ["executor"] } +modify-cargo-toml = { path = "../modify-cargo-toml", default-features = false } +serde = { version = "1.0", default-features = false, features = ["derive"] } +snafu = { version = "0.7.4", default-features = false, features = ["futures", "std"] } +tokio = { version = "1.28", default-features = false, features = ["fs", "io-std", "io-util", "macros", "process", "rt", "time", "sync"] } +tokio-stream = { version = "0.1.14", default-features = false } +tokio-util = { version = "0.7.8", default-features = false, features = ["io", "io-util"] } +toml = { version = "0.7.3", default-features = false, features = ["parse", "display"] } + +[dev-dependencies] +assertables = "7.0.1" +tempdir = "0.3.7" diff --git a/compiler/base/orchestrator/src/bin/worker.rs b/compiler/base/orchestrator/src/bin/worker.rs new file mode 100644 index 000000000..504e26060 --- /dev/null +++ b/compiler/base/orchestrator/src/bin/worker.rs @@ -0,0 +1,12 @@ +use orchestrator::worker::{listen, Error}; +use std::env; + +#[tokio::main(flavor = "current_thread")] +#[snafu::report] +pub async fn main() -> Result<(), Error> { + let project_dir = env::args_os() + .nth(1) + .expect("Please specify project directory as the first argument"); + + listen(project_dir).await +} diff --git a/compiler/base/orchestrator/src/coordinator.rs b/compiler/base/orchestrator/src/coordinator.rs new file mode 100644 index 000000000..f0ef2bd28 --- /dev/null +++ b/compiler/base/orchestrator/src/coordinator.rs @@ -0,0 +1,1311 @@ +use snafu::prelude::*; +use std::{ + collections::HashMap, + mem, + process::Stdio, + sync::{ + atomic::{AtomicU64, Ordering}, + Arc, + }, + time::Duration, +}; +use tokio::{ + join, + process::{Child, ChildStdin, ChildStdout, Command}, + select, + sync::{mpsc, oneshot}, + task::{JoinHandle, JoinSet}, + time::{self, MissedTickBehavior}, +}; +use tokio_stream::{wrappers::ReceiverStream, StreamExt}; +use tokio_util::{io::SyncIoBridge, sync::CancellationToken}; + +use crate::{ + bincode_input_closed, + message::{ + CoordinatorMessage, ExecuteCommandRequest, JobId, Multiplexed, OneToOneResponse, + ReadFileRequest, ReadFileResponse, SerializedError, WorkerMessage, WriteFileRequest, + }, + DropErrorDetailsExt, +}; + +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +pub enum AssemblyFlavor { + Att, + Intel, +} + +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +pub enum DemangleAssembly { + Demangle, + Mangle, +} + +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +pub enum ProcessAssembly { + Filter, + Raw, +} + +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +pub enum CompileTarget { + Assembly(AssemblyFlavor, DemangleAssembly, ProcessAssembly), + Hir, + LlvmIr, + Mir, + Wasm, +} + +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +pub enum Channel { + Stable, + Beta, + Nightly, +} + +impl Channel { + pub(crate) const ALL: [Self; 3] = [Self::Stable, Self::Beta, Self::Nightly]; + + #[cfg(test)] + pub(crate) fn to_str(self) -> &'static str { + match self { + Channel::Stable => "stable", + Channel::Beta => "beta", + Channel::Nightly => "nightly", + } + } +} + +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +pub enum Mode { + Debug, + Release, +} + +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +pub enum Edition { + Rust2015, + Rust2018, + Rust2021, +} + +impl Edition { + #[cfg(test)] + pub(crate) const ALL: [Self; 3] = [Self::Rust2015, Self::Rust2018, Self::Rust2021]; + + pub(crate) fn to_str(self) -> &'static str { + match self { + Edition::Rust2015 => "2015", + Edition::Rust2018 => "2018", + Edition::Rust2021 => "2021", + } + } + + pub(crate) fn to_cargo_toml_key(self) -> &'static str { + self.to_str() + } +} + +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +pub enum CrateType { + Binary, + Library(LibraryType), +} + +impl CrateType { + pub(crate) fn to_cargo_toml_key(self) -> &'static str { + use {CrateType::*, LibraryType::*}; + + match self { + Binary => "bin", + Library(Lib) => "lib", + Library(Dylib) => "dylib", + Library(Rlib) => "rlib", + Library(Staticlib) => "staticlib", + Library(Cdylib) => "cdylib", + Library(ProcMacro) => "proc-macro", + } + } + + pub(crate) fn to_library_cargo_toml_key(self) -> Option<&'static str> { + if self == Self::Binary { + None + } else { + Some(self.to_cargo_toml_key()) + } + } +} + +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +pub enum LibraryType { + Lib, + Dylib, + Rlib, + Staticlib, + Cdylib, + ProcMacro, +} + +#[derive(Debug, Clone)] +pub struct CompileRequest { + pub target: CompileTarget, + pub channel: Channel, + pub crate_type: CrateType, + pub mode: Mode, + pub edition: Edition, + pub tests: bool, + pub backtrace: bool, + pub code: String, +} + +impl CompileRequest { + pub(crate) fn write_main_request(&self) -> WriteFileRequest { + let path = match self.crate_type { + CrateType::Binary => "src/main.rs", + CrateType::Library(_) => "src/lib.rs", + }; + + WriteFileRequest { + path: path.to_owned(), + content: self.code.clone().into(), + } + } + + pub(crate) fn execute_cargo_request(&self, output_path: &str) -> ExecuteCommandRequest { + use CompileTarget::*; + + let mut args = if let Wasm = self.target { + vec!["wasm"] + } else { + vec!["rustc"] + }; + if let Mode::Release = self.mode { + args.push("--release"); + } + + match self.target { + Assembly(flavor, _, _) => { + args.extend(&["--", "--emit", "asm=compilation"]); + + // Enable extra assembly comments for nightly builds + if let Channel::Nightly = self.channel { + args.push("-Z"); + args.push("asm-comments"); + } + + args.push("-C"); + match flavor { + AssemblyFlavor::Att => args.push("llvm-args=-x86-asm-syntax=att"), + AssemblyFlavor::Intel => args.push("llvm-args=-x86-asm-syntax=intel"), + } + } + LlvmIr => args.extend(&["--", "--emit", "llvm-ir=compilation"]), + Mir => args.extend(&["--", "--emit", "mir=compilation"]), + Hir => args.extend(&["--", "-Zunpretty=hir", "-o", output_path]), + Wasm => args.extend(&["-o", output_path]), + } + let mut envs = HashMap::new(); + if self.backtrace { + envs.insert("RUST_BACKTRACE".to_owned(), "1".to_owned()); + } + + ExecuteCommandRequest { + cmd: "cargo".to_owned(), + args: args.into_iter().map(|s| s.to_owned()).collect(), + envs, + cwd: None, + } + } + + pub(crate) fn modify_cargo_toml(&self, mut cargo_toml: toml::Value) -> toml::Value { + cargo_toml = modify_cargo_toml::set_edition(cargo_toml, self.edition.to_cargo_toml_key()); + + if let Some(crate_type) = self.crate_type.to_library_cargo_toml_key() { + cargo_toml = modify_cargo_toml::set_crate_type(cargo_toml, crate_type); + } + + if CompileTarget::Wasm == self.target { + cargo_toml = modify_cargo_toml::remove_dependencies(cargo_toml); + cargo_toml = modify_cargo_toml::set_release_lto(cargo_toml, true); + } + + cargo_toml + } + + pub(crate) fn postprocess_result(&self, mut code: String) -> String { + if let CompileTarget::Assembly(_, demangle, process) = self.target { + if demangle == DemangleAssembly::Demangle { + code = asm_cleanup::demangle_asm(&code); + } + + if process == ProcessAssembly::Filter { + code = asm_cleanup::filter_asm(&code); + } + } + + code + } +} + +#[derive(Debug, Clone)] +pub struct CompileResponseWithOutput { + pub success: bool, + pub code: String, + pub stdout: String, + pub stderr: String, +} + +#[derive(Debug, Clone)] +pub struct CompileResponse { + pub success: bool, + pub code: String, +} + +#[derive(Debug)] +enum DemultiplexCommand { + Listen(JobId, mpsc::Sender), + ListenOnce(JobId, oneshot::Sender), +} + +#[derive(Debug)] +pub struct Coordinator { + backend: B, + // Consider making these lazily-created and/or idly time out + stable: Container, + beta: Container, + nightly: Container, + token: CancellationToken, +} + +impl Coordinator +where + B: Backend, +{ + pub async fn new(backend: B) -> Result { + let token = CancellationToken::new(); + + let [stable, beta, nightly] = + Channel::ALL.map(|channel| Container::new(channel, token.clone(), &backend)); + + let (stable, beta, nightly) = join!(stable, beta, nightly); + + let stable = stable?; + let beta = beta?; + let nightly = nightly?; + + Ok(Self { + backend, + stable, + beta, + nightly, + token, + }) + } + + pub async fn compile( + &self, + request: CompileRequest, + ) -> Result { + self.select_channel(request.channel).compile(request).await + } + + pub async fn begin_compile( + &self, + request: CompileRequest, + ) -> Result { + self.select_channel(request.channel) + .begin_compile(request) + .await + } + + pub async fn shutdown(self) -> Result { + let Self { + backend, + stable, + beta, + nightly, + token, + } = self; + token.cancel(); + + let (stable, beta, nightly) = join!(stable.shutdown(), beta.shutdown(), nightly.shutdown()); + + stable?; + beta?; + nightly?; + + Ok(backend) + } + + fn select_channel(&self, channel: Channel) -> &Container { + match channel { + Channel::Stable => &self.stable, + Channel::Beta => &self.beta, + Channel::Nightly => &self.nightly, + } + } +} + +impl Coordinator { + pub async fn new_docker() -> Result { + Self::new(DockerBackend(())).await + } +} + +#[derive(Debug)] +struct Container { + task: JoinHandle>, + modify_cargo_toml: ModifyCargoToml, + commander: Commander, +} + +impl Container { + async fn new( + channel: Channel, + token: CancellationToken, + backend: &impl Backend, + ) -> Result { + let (mut child, stdin, stdout) = backend.run_worker_in_background(channel)?; + let IoQueue { + mut tasks, + to_worker_tx, + from_worker_rx, + } = spawn_io_queue(stdin, stdout, token); + + let (command_tx, command_rx) = mpsc::channel(8); + let demultiplex_task = tokio::spawn(Commander::demultiplex(command_rx, from_worker_rx)); + + let task = tokio::spawn(async move { + let (c, d, t) = join!(child.wait(), demultiplex_task, tasks.join_next()); + c.context(JoinWorkerSnafu)?; + d.context(DemultiplexerTaskPanickedSnafu)? + .context(DemultiplexerTaskFailedSnafu)?; + if let Some(t) = t { + t.context(IoQueuePanickedSnafu)??; + } + + Ok(()) + }); + + let commander = Commander { + to_worker_tx, + to_demultiplexer_tx: command_tx, + id: Default::default(), + }; + + let modify_cargo_toml = ModifyCargoToml::new(commander.clone()) + .await + .context(CouldNotLoadCargoTomlSnafu)?; + + Ok(Container { + task, + modify_cargo_toml, + commander, + }) + } + + async fn compile( + &self, + request: CompileRequest, + ) -> Result { + use compile_error::*; + + let ActiveCompilation { + task, + stdout_rx, + stderr_rx, + } = self.begin_compile(request).await?; + + let stdout = ReceiverStream::new(stdout_rx).collect(); + let stderr = ReceiverStream::new(stderr_rx).collect(); + + let (result, stdout, stderr) = join!(task, stdout, stderr); + + let CompileResponse { success, code } = result.context(CompilationTaskPanickedSnafu)??; + Ok(CompileResponseWithOutput { + success, + code, + stdout, + stderr, + }) + } + + async fn begin_compile( + &self, + request: CompileRequest, + ) -> Result { + use compile_error::*; + + let output_path: &str = "compilation"; + + let write_main = request.write_main_request(); + let execute_cargo = request.execute_cargo_request(output_path); + let read_output = ReadFileRequest { + path: output_path.to_owned(), + }; + + let write_main = self.commander.one(write_main); + let modify_cargo_toml = self.modify_cargo_toml.modify_for(&request); + + let (write_main, modify_cargo_toml) = join!(write_main, modify_cargo_toml); + + write_main.context(CouldNotWriteCodeSnafu)?; + modify_cargo_toml.context(CouldNotModifyCargoTomlSnafu)?; + + let (stdout_tx, stdout_rx) = mpsc::channel(8); + let (stderr_tx, stderr_rx) = mpsc::channel(8); + + let mut from_worker_rx = self + .commander + .many(execute_cargo) + .await + .context(CouldNotStartCompilerSnafu)?; + + let task = tokio::spawn({ + let commander = self.commander.clone(); + async move { + let mut success = false; + + while let Some(container_msg) = from_worker_rx.recv().await { + match container_msg { + WorkerMessage::ExecuteCommand(resp) => { + success = resp.success; + break; + } + WorkerMessage::StdoutPacket(packet) => { + stdout_tx.send(packet).await.ok(/* Receiver gone, that's OK */); + } + WorkerMessage::StderrPacket(packet) => { + stderr_tx.send(packet).await.ok(/* Receiver gone, that's OK */); + } + _ => return UnexpectedMessageSnafu.fail(), + } + } + + let code = if success { + let file: ReadFileResponse = commander + .one(read_output) + .await + .context(CouldNotReadCodeSnafu)?; + String::from_utf8(file.0).context(CodeNotUtf8Snafu)? + } else { + String::new() + }; + + // TODO: This is synchronous... + let code = request.postprocess_result(code); + + Ok(CompileResponse { success, code }) + } + }); + + Ok(ActiveCompilation { + task, + stdout_rx, + stderr_rx, + }) + } + + async fn shutdown(self) -> Result<()> { + let Self { + task, + modify_cargo_toml, + commander, + } = self; + drop(commander); + drop(modify_cargo_toml); + task.await.context(ContainerTaskPanickedSnafu)? + } +} + +#[derive(Debug)] +pub struct ActiveCompilation { + pub task: JoinHandle>, + pub stdout_rx: mpsc::Receiver, + pub stderr_rx: mpsc::Receiver, +} + +#[derive(Debug, Snafu)] +#[snafu(module)] +pub enum CompileError { + #[snafu(display("The compilation task panicked"))] + CompilationTaskPanicked { source: tokio::task::JoinError }, + + #[snafu(display("Could not modify Cargo.toml"))] + CouldNotModifyCargoToml { source: ModifyCargoTomlError }, + + #[snafu(display("Could not write source code"))] + CouldNotWriteCode { source: CommanderError }, + + #[snafu(display("Could not start compiler"))] + CouldNotStartCompiler { source: CommanderError }, + + #[snafu(display("Received an unexpected message"))] + UnexpectedMessage, + + #[snafu(display("Could not read the compilation output"))] + CouldNotReadCode { source: CommanderError }, + + #[snafu(display("The compilation output was not UTF-8"))] + CodeNotUtf8 { source: std::string::FromUtf8Error }, +} + +#[derive(Debug, Clone)] +struct Commander { + to_worker_tx: mpsc::Sender>, + to_demultiplexer_tx: mpsc::Sender, + id: Arc, +} + +#[derive(Debug)] +struct ModifyCargoToml { + commander: Commander, + cargo_toml: toml::Value, +} + +impl ModifyCargoToml { + const PATH: &str = "Cargo.toml"; + + async fn new(commander: Commander) -> Result { + let cargo_toml = Self::read(&commander).await?; + Ok(Self { + commander, + cargo_toml, + }) + } + + async fn modify_for(&self, request: &CompileRequest) -> Result<(), ModifyCargoTomlError> { + let cargo_toml = self.cargo_toml.clone(); + let cargo_toml = request.modify_cargo_toml(cargo_toml); + Self::write(&self.commander, cargo_toml).await + } + + async fn read(commander: &Commander) -> Result { + use modify_cargo_toml_error::*; + + let path = Self::PATH.to_owned(); + let cargo_toml = commander + .one(ReadFileRequest { path }) + .await + .context(CouldNotReadSnafu)?; + + let cargo_toml = String::from_utf8(cargo_toml.0)?; + let cargo_toml = toml::from_str(&cargo_toml)?; + + Ok(cargo_toml) + } + + async fn write( + commander: &Commander, + cargo_toml: toml::Value, + ) -> Result<(), ModifyCargoTomlError> { + use modify_cargo_toml_error::*; + + let cargo_toml = toml::to_string(&cargo_toml)?; + let content = cargo_toml.into_bytes(); + + let path = Self::PATH.to_owned(); + commander + .one(WriteFileRequest { path, content }) + .await + .context(CouldNotWriteSnafu)?; + + Ok(()) + } +} + +#[derive(Debug, Snafu)] +#[snafu(module)] +pub enum ModifyCargoTomlError { + #[snafu(display("Could not read the file"))] + CouldNotRead { source: CommanderError }, + + #[snafu(display("Could not parse the file as UTF-8"))] + #[snafu(context(false))] + InvalidUtf8 { source: std::string::FromUtf8Error }, + + #[snafu(display("Could not deserialize the file as TOML"))] + #[snafu(context(false))] + CouldNotDeserialize { source: toml::de::Error }, + + #[snafu(display("Could not serialize the file as TOML"))] + #[snafu(context(false))] + CouldNotSerialize { source: toml::ser::Error }, + + #[snafu(display("Could not write the file"))] + CouldNotWrite { source: CommanderError }, +} + +impl Commander { + const GC_PERIOD: Duration = Duration::from_secs(30); + + async fn demultiplex( + mut command_rx: mpsc::Receiver, + mut from_worker_rx: mpsc::Receiver>, + ) -> Result<(), CommanderError> { + use commander_error::*; + + let mut waiting = HashMap::new(); + let mut waiting_once = HashMap::new(); + + let mut gc_interval = time::interval(Self::GC_PERIOD); + gc_interval.set_missed_tick_behavior(MissedTickBehavior::Delay); + + loop { + select! { + command = command_rx.recv() => { + let Some(command) = command else { break }; + + match command { + DemultiplexCommand::Listen(job_id, waiter) => { + let old = waiting.insert(job_id, waiter); + ensure!(old.is_none(), DuplicateDemultiplexerClientSnafu { job_id }); + } + + DemultiplexCommand::ListenOnce(job_id, waiter) => { + let old = waiting_once.insert(job_id, waiter); + ensure!(old.is_none(), DuplicateDemultiplexerClientSnafu { job_id }); + } + } + }, + + msg = from_worker_rx.recv() => { + let Some(Multiplexed(job_id, msg)) = msg else { break }; + + if let Some(waiter) = waiting_once.remove(&job_id) { + waiter.send(msg).ok(/* Don't care about it */); + continue; + } + + if let Some(waiter) = waiting.get(&job_id) { + waiter.send(msg).await.ok(/* Don't care about it */); + continue; + } + + // Should we log messages that didn't have a receiver? + } + + // Find any channels where the receivers have been + // dropped and clear out the sending halves. + _ = gc_interval.tick() => { + waiting = mem::take(&mut waiting) + .into_iter() + .filter(|(_job_id, tx)| !tx.is_closed()) + .collect(); + + waiting_once = mem::take(&mut waiting_once) + .into_iter() + .filter(|(_job_id, tx)| !tx.is_closed()) + .collect(); + } + } + } + + Ok(()) + } + + fn next_id(&self) -> JobId { + self.id.fetch_add(1, Ordering::SeqCst) + } + + async fn send_to_demultiplexer( + &self, + command: DemultiplexCommand, + ) -> Result<(), CommanderError> { + use commander_error::*; + + self.to_demultiplexer_tx + .send(command) + .await + .drop_error_details() + .context(UnableToSendToDemultiplexerSnafu) + } + + async fn send_to_worker( + &self, + message: Multiplexed, + ) -> Result<(), CommanderError> { + use commander_error::*; + + self.to_worker_tx + .send(message) + .await + .drop_error_details() + .context(UnableToSendToWorkerSnafu) + } + + async fn one(&self, message: M) -> Result + where + M: Into, + M: OneToOneResponse, + Result: TryFrom, + { + use commander_error::*; + + let id = self.next_id(); + let (from_demultiplexer_tx, from_demultiplexer_rx) = oneshot::channel(); + + self.send_to_demultiplexer(DemultiplexCommand::ListenOnce(id, from_demultiplexer_tx)) + .await?; + self.send_to_worker(Multiplexed(id, message.into())).await?; + let msg = from_demultiplexer_rx + .await + .context(UnableToReceiveFromDemultiplexerSnafu)?; + + match msg.try_into() { + Ok(Ok(v)) => Ok(v), + Ok(Err(e)) => WorkerOperationFailedSnafu { text: e.0 }.fail(), + Err(_) => UnexpectedResponseTypeSnafu.fail(), + } + } + + async fn many(&self, message: M) -> Result, CommanderError> + where + M: Into, + { + let id = self.next_id(); + let (from_worker_tx, from_worker_rx) = mpsc::channel(8); + + self.send_to_demultiplexer(DemultiplexCommand::Listen(id, from_worker_tx)) + .await?; + self.send_to_worker(Multiplexed(id, message.into())).await?; + + Ok(from_worker_rx) + } +} + +#[derive(Debug, Snafu)] +#[snafu(module)] +pub enum CommanderError { + #[snafu(display("Two listeners subscribed to job {job_id}"))] + DuplicateDemultiplexerClient { job_id: JobId }, + + #[snafu(display("Could not send a message to the demultiplexer"))] + UnableToSendToDemultiplexer { source: mpsc::error::SendError<()> }, + + #[snafu(display("Did not receive a response from the demultiplexer"))] + UnableToReceiveFromDemultiplexer { source: oneshot::error::RecvError }, + + #[snafu(display("Could not send a message to the worker"))] + UnableToSendToWorker { source: mpsc::error::SendError<()> }, + + #[snafu(display("Did not receive the expected response type from the worker"))] + UnexpectedResponseType, + + #[snafu(display("The worker operation failed: {text}"))] + WorkerOperationFailed { text: String }, +} + +pub trait Backend { + fn run_worker_in_background( + &self, + channel: Channel, + ) -> Result<(Child, ChildStdin, ChildStdout)> { + let mut child = self + .prepare_worker_command(channel) + .stdin(Stdio::piped()) + .stdout(Stdio::piped()) + .stderr(Stdio::inherit()) + .spawn() + .context(SpawnWorkerSnafu)?; + let stdin = child.stdin.take().context(WorkerStdinCaptureSnafu)?; + let stdout = child.stdout.take().context(WorkerStdoutCaptureSnafu)?; + Ok((child, stdin, stdout)) + } + + fn prepare_worker_command(&self, channel: Channel) -> Command; +} + +impl Backend for &B +where + B: Backend, +{ + fn prepare_worker_command(&self, channel: Channel) -> Command { + B::prepare_worker_command(self, channel) + } +} + +macro_rules! docker_command { + ($($arg:expr),* $(,)?) => ({ + let mut cmd = Command::new("docker"); + $( cmd.arg($arg); )* + cmd + }); +} + +#[cfg(target_arch = "x86_64")] +const DOCKER_ARCH: &str = "linux/amd64"; + +#[cfg(target_arch = "aarch64")] +const DOCKER_ARCH: &str = "linux/arm64"; + +fn basic_secure_docker_command() -> Command { + docker_command!( + "run", + "--platform", + DOCKER_ARCH, + "--cap-drop=ALL", + "--net", + "none", + "--memory", + "512m", + "--memory-swap", + "640m", + "--pids-limit", + "512", + ) +} + +pub struct DockerBackend(()); + +impl Backend for DockerBackend { + fn prepare_worker_command(&self, channel: Channel) -> Command { + let mut command = basic_secure_docker_command(); + command + .arg("-i") + .args(["-a", "stdin", "-a", "stdout", "-a", "stderr"]) + .arg("--rm") + .arg(channel.to_container_name()) + .arg("worker") + .arg("/playground"); + command + } +} + +impl Channel { + fn to_container_name(self) -> &'static str { + match self { + Channel::Stable => "rust-stable", + Channel::Beta => "rust-beta", + Channel::Nightly => "rust-nightly", + } + } +} + +pub type Result = ::std::result::Result; + +#[derive(Debug, Snafu)] +pub enum Error { + #[snafu(display("Reached system process limit"))] + SpawnWorker { source: std::io::Error }, + + #[snafu(display("Unable to join child process"))] + JoinWorker { source: std::io::Error }, + + #[snafu(display("The demultiplexer task panicked"))] + DemultiplexerTaskPanicked { source: tokio::task::JoinError }, + + #[snafu(display("The demultiplexer task failed"))] + DemultiplexerTaskFailed { source: CommanderError }, + + #[snafu(display("The IO queue task panicked"))] + IoQueuePanicked { source: tokio::task::JoinError }, + + #[snafu(display("The container task panicked"))] + ContainerTaskPanicked { source: tokio::task::JoinError }, + + #[snafu(display("Worker process's stdin not captured"))] + WorkerStdinCapture, + + #[snafu(display("Worker process's stdout not captured"))] + WorkerStdoutCapture, + + #[snafu(display("Failed to flush child stdin"))] + WorkerStdinFlush { source: std::io::Error }, + + #[snafu(display("Failed to deserialize worker message"))] + WorkerMessageDeserialization { source: bincode::Error }, + + #[snafu(display("Failed to serialize coordinator message"))] + CoordinatorMessageSerialization { source: bincode::Error }, + + #[snafu(display("Failed to send worker message through channel"))] + UnableToSendWorkerMessage { source: mpsc::error::SendError<()> }, + + #[snafu(display("Unable to load original Cargo.toml"))] + CouldNotLoadCargoToml { source: ModifyCargoTomlError }, +} + +struct IoQueue { + tasks: JoinSet>, + to_worker_tx: mpsc::Sender>, + from_worker_rx: mpsc::Receiver>, +} + +// Child stdin/out <--> messages. +fn spawn_io_queue(stdin: ChildStdin, stdout: ChildStdout, token: CancellationToken) -> IoQueue { + use std::io::{prelude::*, BufReader, BufWriter}; + + let mut tasks = JoinSet::new(); + + let (tx, from_worker_rx) = mpsc::channel(8); + tasks.spawn_blocking(move || { + let stdout = SyncIoBridge::new(stdout); + let mut stdout = BufReader::new(stdout); + + loop { + let worker_msg = bincode::deserialize_from(&mut stdout); + + if bincode_input_closed(&worker_msg) { + break; + }; + + let worker_msg = worker_msg.context(WorkerMessageDeserializationSnafu)?; + + tx.blocking_send(worker_msg) + .drop_error_details() + .context(UnableToSendWorkerMessageSnafu)?; + } + + Ok(()) + }); + + let (to_worker_tx, mut rx) = mpsc::channel(8); + tasks.spawn_blocking(move || { + let stdin = SyncIoBridge::new(stdin); + let mut stdin = BufWriter::new(stdin); + + loop { + let coordinator_msg = futures::executor::block_on(async { + select! { + () = token.cancelled() => None, + msg = rx.recv() => msg, + } + }); + + let Some(coordinator_msg) = coordinator_msg else { break }; + + bincode::serialize_into(&mut stdin, &coordinator_msg) + .context(CoordinatorMessageSerializationSnafu)?; + + stdin.flush().context(WorkerStdinFlushSnafu)?; + } + + Ok(()) + }); + + IoQueue { + tasks, + to_worker_tx, + from_worker_rx, + } +} + +#[cfg(test)] +mod tests { + use assertables::*; + use futures::{Future, FutureExt}; + use std::{sync::Once, time::Duration}; + use tempdir::TempDir; + use tokio::join; + use tokio_stream::{wrappers::ReceiverStream, StreamExt}; + + use super::*; + + #[derive(Debug)] + struct TestBackend { + project_dir: TempDir, + } + + impl TestBackend { + fn new() -> Self { + static COMPILE_WORKER_ONCE: Once = Once::new(); + + COMPILE_WORKER_ONCE.call_once(|| { + let output = std::process::Command::new("cargo") + .arg("build") + .output() + .expect("Build failed"); + assert!(output.status.success(), "Build failed"); + }); + + let project_dir = + TempDir::new("playground").expect("Failed to create temporary project directory"); + + let output = std::process::Command::new("cargo") + .arg("init") + .args(["--name", "playground"]) + .arg(project_dir.path()) + .output() + .expect("Build failed"); + assert!(output.status.success(), "Cargo initialization failed"); + + let main = project_dir.path().join("src").join("main.rs"); + std::fs::remove_file(main).expect("Could not delete main.rs"); + + Self { project_dir } + } + } + + impl Backend for TestBackend { + fn prepare_worker_command(&self, channel: Channel) -> Command { + let toolchain_file = format!(r#"[toolchain]\nchannel = "{}""#, channel.to_str()); + let path = self.project_dir.path().join("rust-toolchain.toml"); + std::fs::write(path, toolchain_file).expect("Couldn't write toolchain file"); + + let mut command = Command::new("./target/debug/worker"); + command.arg(self.project_dir.path()); + command + } + } + + async fn new_coordinator() -> Result> { + Coordinator::new(TestBackend::new()).await + //Coordinator::new_docker().await + } + + fn new_compile_request() -> CompileRequest { + new_compile_mir_request() + } + + fn new_compile_assembly_request() -> CompileRequest { + CompileRequest { + target: CompileTarget::Assembly( + AssemblyFlavor::Intel, + DemangleAssembly::Demangle, + ProcessAssembly::Filter, + ), + channel: Channel::Beta, + crate_type: CrateType::Library(LibraryType::Lib), + mode: Mode::Release, + edition: Edition::Rust2018, + tests: false, + backtrace: false, + code: r#"pub fn add(a: u8, b: u8) -> u8 { a + b }"#.to_owned(), + } + } + + fn new_compile_hir_request() -> CompileRequest { + new_compile_hir_request_for(Edition::Rust2021) + } + + fn new_compile_hir_request_for(edition: Edition) -> CompileRequest { + CompileRequest { + target: CompileTarget::Hir, + channel: Channel::Nightly, + crate_type: CrateType::Library(LibraryType::Lib), + mode: Mode::Release, + edition, + tests: false, + backtrace: false, + code: r#"pub fn sub(a: u8, b: u8) -> u8 { a - b }"#.to_owned(), + } + } + + fn new_compile_llvm_ir_request() -> CompileRequest { + CompileRequest { + target: CompileTarget::LlvmIr, + channel: Channel::Stable, + crate_type: CrateType::Library(LibraryType::Lib), + mode: Mode::Debug, + edition: Edition::Rust2015, + tests: false, + backtrace: false, + code: r#"pub fn mul(a: u8, b: u8) -> u8 { a * b }"#.to_owned(), + } + } + + fn new_compile_mir_request() -> CompileRequest { + CompileRequest { + target: CompileTarget::Mir, + channel: Channel::Stable, + crate_type: CrateType::Binary, + mode: Mode::Release, + edition: Edition::Rust2021, + tests: false, + backtrace: false, + code: r#"fn main() { println!("Hello World!"); }"#.to_owned(), + } + } + + fn new_compile_wasm_request() -> CompileRequest { + CompileRequest { + target: CompileTarget::Wasm, + channel: Channel::Nightly, // TODO: Can we run this on all channels now? + crate_type: CrateType::Library(LibraryType::Cdylib), + mode: Mode::Release, + edition: Edition::Rust2021, + tests: false, + backtrace: false, + code: r#"#[export_name = "inc"] pub fn inc(a: u8) -> u8 { a + 1 }"#.to_owned(), + } + } + + #[tokio::test] + #[snafu::report] + async fn test_compile_response() -> Result<()> { + let coordinator = new_coordinator().await?; + + let response = coordinator + .compile(new_compile_request()) + .with_timeout() + .await + .unwrap(); + + assert!(response.success, "stderr: {}", response.stderr); + assert_contains!(response.stderr, "Compiling"); + assert_contains!(response.stderr, "Finished"); + + coordinator.shutdown().await?; + + Ok(()) + } + + #[tokio::test] + #[snafu::report] + async fn test_compile_streaming() -> Result<()> { + let coordinator = new_coordinator().await?; + + let ActiveCompilation { + task, + stdout_rx, + stderr_rx, + } = coordinator + .begin_compile(new_compile_request()) + .await + .unwrap(); + + let stdout = ReceiverStream::new(stdout_rx); + let stdout = stdout.collect::(); + + let stderr = ReceiverStream::new(stderr_rx); + let stderr = stderr.collect::(); + + let (complete, _stdout, stderr) = + async { join!(task, stdout, stderr) }.with_timeout().await; + + let response = complete.unwrap().unwrap(); + + assert!(response.success, "stderr: {}", stderr); + assert_contains!(stderr, "Compiling"); + assert_contains!(stderr, "Finished"); + + coordinator.shutdown().await?; + + Ok(()) + } + + #[tokio::test] + #[snafu::report] + async fn test_compile_edition() -> Result<()> { + for edition in Edition::ALL { + let coordinator = new_coordinator().await?; + + let response = coordinator + .compile(new_compile_hir_request_for(edition)) + .with_timeout() + .await + .unwrap(); + + let prelude = format!("std::prelude::rust_{}", edition.to_str()); + + assert!(response.success, "stderr: {}", response.stderr); + assert_contains!(response.code, &prelude); + + coordinator.shutdown().await?; + } + + Ok(()) + } + + #[tokio::test] + #[snafu::report] + async fn test_compile_assembly() -> Result<()> { + let coordinator = new_coordinator().await?; + + let response = coordinator + .compile(new_compile_assembly_request()) + .with_timeout() + .await + .unwrap(); + + //#[cfg(target_arch = "x86_64")] + //let asm = ""; + + #[cfg(target_arch = "aarch64")] + let asm = "w0, w1, w0"; + + assert!(response.success, "stderr: {}", response.stderr); + assert_contains!(response.code, asm); + + coordinator.shutdown().await?; + + Ok(()) + } + + #[tokio::test] + #[snafu::report] + async fn test_compile_hir() -> Result<()> { + let coordinator = new_coordinator().await?; + + let response = coordinator + .compile(new_compile_hir_request()) + .with_timeout() + .await + .unwrap(); + + assert!(response.success, "stderr: {}", response.stderr); + assert_contains!(response.code, "extern crate std"); + + coordinator.shutdown().await?; + + Ok(()) + } + + #[tokio::test] + #[snafu::report] + async fn test_compile_llvm_ir() -> Result<()> { + let coordinator = new_coordinator().await?; + + let response = coordinator + .compile(new_compile_llvm_ir_request()) + .with_timeout() + .await + .unwrap(); + + assert!(response.success, "stderr: {}", response.stderr); + assert_contains!(response.code, "@llvm.umul.with.overflow.i8(i8, i8)"); + + coordinator.shutdown().await?; + + Ok(()) + } + + #[tokio::test] + #[snafu::report] + async fn test_compile_wasm() -> Result<()> { + // cargo-wasm only exists inside the container + let coordinator = Coordinator::new_docker().await?; + + let response = coordinator + .compile(new_compile_wasm_request()) + .with_timeout() + .await + .unwrap(); + + assert!(response.success, "stderr: {}", response.stderr); + assert_contains!( + response.code, + r#"(func $inc (export "inc") (type $t0) (param $p0 i32) (result i32)"# + ); + + coordinator.shutdown().await?; + + Ok(()) + } + + trait TimeoutExt: Future + Sized { + #[allow(clippy::type_complexity)] + fn with_timeout( + self, + ) -> futures::future::Map< + tokio::time::Timeout, + fn(Result) -> Self::Output, + > { + tokio::time::timeout(Duration::from_millis(5000), self) + .map(|v| v.expect("The operation timed out")) + } + } + + impl TimeoutExt for F {} +} diff --git a/compiler/base/orchestrator/src/lib.rs b/compiler/base/orchestrator/src/lib.rs new file mode 100644 index 000000000..775862939 --- /dev/null +++ b/compiler/base/orchestrator/src/lib.rs @@ -0,0 +1,25 @@ +#![deny(rust_2018_idioms)] + +pub mod coordinator; +mod message; +pub mod worker; + +trait DropErrorDetailsExt { + fn drop_error_details(self) -> Result>; +} + +impl DropErrorDetailsExt for Result> { + fn drop_error_details(self) -> Result> { + self.map_err(|_| tokio::sync::mpsc::error::SendError(())) + } +} + +fn bincode_input_closed(coordinator_msg: &bincode::Result) -> bool { + if let Err(e) = coordinator_msg { + if let bincode::ErrorKind::Io(e) = &**e { + return e.kind() == std::io::ErrorKind::UnexpectedEof; + } + } + + false +} diff --git a/compiler/base/orchestrator/src/message.rs b/compiler/base/orchestrator/src/message.rs new file mode 100644 index 000000000..1421fafb5 --- /dev/null +++ b/compiler/base/orchestrator/src/message.rs @@ -0,0 +1,132 @@ +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; + +pub type JobId = u64; +pub type Path = String; + +macro_rules! impl_narrow_to_broad { + ($enum_type:ident, $($variant_name:ident => $variant_type:ident),* $(,)?) => { + $( + impl From<$variant_type> for $enum_type { + fn from(other: $variant_type) -> Self { + $enum_type::$variant_name(other) + } + } + )* + }; +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct Multiplexed(pub JobId, pub T); + +#[derive(Debug, Serialize, Deserialize)] +pub enum CoordinatorMessage { + WriteFile(WriteFileRequest), + ReadFile(ReadFileRequest), + ExecuteCommand(ExecuteCommandRequest), + StdinPacket(String), +} + +impl_narrow_to_broad!( + CoordinatorMessage, + WriteFile => WriteFileRequest, + ReadFile => ReadFileRequest, + ExecuteCommand => ExecuteCommandRequest, +); + +#[derive(Debug, Serialize, Deserialize)] +pub enum WorkerMessage { + WriteFile(WriteFileResponse), + ReadFile(ReadFileResponse), + ExecuteCommand(ExecuteCommandResponse), + StdoutPacket(String), + StderrPacket(String), + Error(SerializedError), +} + +macro_rules! impl_broad_to_narrow_with_error { + ($enum_type:ident, $($variant_name:ident => $variant_type:ty),* $(,)?) => { + $( + impl TryFrom<$enum_type> for Result<$variant_type, SerializedError> { + type Error = $enum_type; + + fn try_from(other: $enum_type) -> Result { + match other { + $enum_type::$variant_name(x) => Ok(Ok(x)), + $enum_type::Error(e) => Ok(Err(e)), + o => Err(o) + } + } + } + )* + }; +} + +impl_narrow_to_broad!( + WorkerMessage, + WriteFile => WriteFileResponse, + ReadFile => ReadFileResponse, + ExecuteCommand => ExecuteCommandResponse, +); + +impl_broad_to_narrow_with_error!( + WorkerMessage, + WriteFile => WriteFileResponse, + ReadFile => ReadFileResponse, + ExecuteCommand => ExecuteCommandResponse, +); + +#[derive(Debug, Serialize, Deserialize)] +pub struct WriteFileRequest { + pub path: Path, + pub content: Vec, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct WriteFileResponse(pub ()); + +#[derive(Debug, Serialize, Deserialize)] +pub struct ReadFileRequest { + pub path: Path, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct ReadFileResponse(pub Vec); + +#[derive(Debug, Serialize, Deserialize)] +pub struct ExecuteCommandRequest { + pub cmd: String, + pub args: Vec, + pub envs: HashMap, + pub cwd: Option, // None means in project direcotry. +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct ExecuteCommandResponse { + pub success: bool, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct SerializedError(pub String); + +impl SerializedError { + pub fn new(e: impl snafu::Error) -> Self { + Self(snafu::Report::from_error(e).to_string()) + } +} + +pub trait OneToOneResponse { + type Response; +} + +impl OneToOneResponse for WriteFileRequest { + type Response = WriteFileResponse; +} + +impl OneToOneResponse for ReadFileRequest { + type Response = ReadFileResponse; +} + +impl OneToOneResponse for ExecuteCommandRequest { + type Response = ExecuteCommandResponse; +} diff --git a/compiler/base/orchestrator/src/worker.rs b/compiler/base/orchestrator/src/worker.rs new file mode 100644 index 000000000..428d0e1b0 --- /dev/null +++ b/compiler/base/orchestrator/src/worker.rs @@ -0,0 +1,683 @@ +//! # Task information +//! +//! ## Hierarchy +//! +//! ```text +//! listen +//! ├── stdin +//! ├── stdout +//! ├── handle_coordinator_message +//! │ └── background (N) +//! └── manage_processes +//! └── process (N) +//! ├── process stdin +//! ├── process stdout +//! └── process stderr +//! ``` +//! +//! ## Notable resources +//! +//! - stdin +//! - [`std::io::Stdin`][] +//! - stdout +//! - [`std::io::Stdout`][] +//! - process +//! - [`tokio::process::Child`][] +//! - process stdin +//! - [`tokio::process::ChildStdin`][] +//! - process stdout +//! - [`tokio::process::ChildStdout`][] +//! - process stderr +//! - [`tokio::process::ChildStderr`][] +//! + +use snafu::prelude::*; +use std::{ + collections::HashMap, + path::{Path, PathBuf}, + process::Stdio, +}; +use tokio::{ + fs, + io::{AsyncBufReadExt, AsyncRead, AsyncWriteExt, BufReader}, + process::{Child, ChildStderr, ChildStdin, ChildStdout, Command}, + select, + sync::mpsc, + task::JoinSet, +}; + +use crate::{ + bincode_input_closed, + message::{ + CoordinatorMessage, ExecuteCommandRequest, ExecuteCommandResponse, JobId, Multiplexed, + ReadFileRequest, ReadFileResponse, SerializedError, WorkerMessage, WriteFileRequest, + WriteFileResponse, + }, + DropErrorDetailsExt, +}; + +type CommandRequest = (Multiplexed, MultiplexingSender); + +pub async fn listen(project_dir: impl Into) -> Result<(), Error> { + let project_dir = project_dir.into(); + + let (coordinator_msg_tx, coordinator_msg_rx) = mpsc::channel(8); + let (worker_msg_tx, worker_msg_rx) = mpsc::channel(8); + let mut io_tasks = spawn_io_queue(coordinator_msg_tx, worker_msg_rx); + + let (cmd_tx, cmd_rx) = mpsc::channel(8); + let (stdin_tx, stdin_rx) = mpsc::channel(8); + let process_task = tokio::spawn(manage_processes(stdin_rx, cmd_rx, project_dir.clone())); + + let handler_task = tokio::spawn(handle_coordinator_message( + coordinator_msg_rx, + worker_msg_tx, + project_dir, + cmd_tx, + stdin_tx, + )); + + select! { + Some(io_task) = io_tasks.join_next() => { + io_task.context(IoTaskPanickedSnafu)?.context(IoTaskFailedSnafu)?; + } + + process_task = process_task => { + process_task.context(ProcessTaskPanickedSnafu)?.context(ProcessTaskFailedSnafu)?; + } + + handler_task = handler_task => { + handler_task.context(HandlerTaskPanickedSnafu)?.context(HandlerTaskFailedSnafu)?; + } + } + + Ok(()) +} + +#[derive(Debug, Snafu)] +pub enum Error { + #[snafu(display("The IO queue task panicked"))] + IoTaskPanicked { source: tokio::task::JoinError }, + + #[snafu(display("The IO queue task failed"))] + IoTaskFailed { source: IoQueueError }, + + #[snafu(display("The process task panicked"))] + ProcessTaskPanicked { source: tokio::task::JoinError }, + + #[snafu(display("The process task failed"))] + ProcessTaskFailed { source: ProcessError }, + + #[snafu(display("The coordinator message handler task panicked"))] + HandlerTaskPanicked { source: tokio::task::JoinError }, + + #[snafu(display("The coordinator message handler task failed"))] + HandlerTaskFailed { + source: HandleCoordinatorMessageError, + }, +} + +async fn handle_coordinator_message( + mut coordinator_msg_rx: mpsc::Receiver>, + worker_msg_tx: mpsc::Sender>, + project_dir: PathBuf, + cmd_tx: mpsc::Sender, + stdin_tx: mpsc::Sender>, +) -> Result<(), HandleCoordinatorMessageError> { + use handle_coordinator_message_error::*; + + let mut tasks = JoinSet::new(); + + loop { + select! { + coordinator_msg = coordinator_msg_rx.recv() => { + let Some(Multiplexed(job_id, coordinator_msg)) = coordinator_msg else { break }; + + let worker_msg_tx = || MultiplexingSender { + job_id, + tx: worker_msg_tx.clone(), + }; + + match coordinator_msg { + CoordinatorMessage::WriteFile(req) => { + let project_dir = project_dir.clone(); + let worker_msg_tx = worker_msg_tx(); + + tasks.spawn(async move { + worker_msg_tx + .send(handle_write_file(req, project_dir).await) + .await + .context(UnableToSendWriteFileResponseSnafu) + }); + } + + CoordinatorMessage::ReadFile(req) => { + let project_dir = project_dir.clone(); + let worker_msg_tx = worker_msg_tx(); + + tasks.spawn(async move { + worker_msg_tx + .send(handle_read_file(req, project_dir).await) + .await + .context(UnableToSendReadFileResponseSnafu) + }); + } + + CoordinatorMessage::ExecuteCommand(req) => { + cmd_tx + .send((Multiplexed(job_id, req), worker_msg_tx())) + .await + .drop_error_details() + .context(UnableToSendCommandExecutionRequestSnafu)?; + } + + CoordinatorMessage::StdinPacket(data) => { + stdin_tx + .send(Multiplexed(job_id, data)) + .await + .drop_error_details() + .context(UnableToSendStdinPacketSnafu)?; + } + } + } + + Some(task) = tasks.join_next() => { + task.context(TaskPanickedSnafu)??; + } + } + } + + Ok(()) +} + +#[derive(Debug, Snafu)] +#[snafu(module)] +pub enum HandleCoordinatorMessageError { + #[snafu(display("Could not send the write command response to the coordinator"))] + UnableToSendWriteFileResponse { source: MultiplexingSenderError }, + + #[snafu(display("Could not send the read command response to the coordinator"))] + UnableToSendReadFileResponse { source: MultiplexingSenderError }, + + #[snafu(display("Failed to send command execution request to the command task"))] + UnableToSendCommandExecutionRequest { source: mpsc::error::SendError<()> }, + + #[snafu(display("Failed to send stdin packet to the command task"))] + UnableToSendStdinPacket { source: mpsc::error::SendError<()> }, + + #[snafu(display("A coordinator command handler background task panicked"))] + TaskPanicked { source: tokio::task::JoinError }, +} + +#[derive(Debug, Clone)] +struct MultiplexingSender { + job_id: JobId, + tx: mpsc::Sender>, +} + +impl MultiplexingSender { + async fn send( + &self, + message: Result, impl std::error::Error>, + ) -> Result<(), MultiplexingSenderError> { + match message { + Ok(v) => self.send_ok(v).await, + Err(e) => self.send_err(e).await, + } + } + + async fn send_ok( + &self, + message: impl Into, + ) -> Result<(), MultiplexingSenderError> { + self.send_raw(message.into()).await + } + + async fn send_err(&self, e: impl std::error::Error) -> Result<(), MultiplexingSenderError> { + self.send_raw(WorkerMessage::Error(SerializedError::new(e))) + .await + } + + async fn send_raw(&self, message: WorkerMessage) -> Result<(), MultiplexingSenderError> { + use multiplexing_sender_error::*; + + self.tx + .send(Multiplexed(self.job_id, message)) + .await + .drop_error_details() + .context(UnableToSendWorkerMessageSnafu) + } +} + +#[derive(Debug, Snafu)] +#[snafu(module)] +pub enum MultiplexingSenderError { + #[snafu(display("Failed to send worker message to the serialization task"))] + UnableToSendWorkerMessage { source: mpsc::error::SendError<()> }, +} + +async fn handle_write_file( + req: WriteFileRequest, + project_dir: PathBuf, +) -> Result { + use write_file_error::*; + + let path = parse_working_dir(Some(req.path), project_dir); + + // Create intermediate directories. + if let Some(parent_dir) = path.parent() { + fs::create_dir_all(parent_dir) + .await + .context(UnableToCreateDirSnafu { parent_dir })?; + } + + fs::write(&path, req.content) + .await + .context(UnableToWriteFileSnafu { path })?; + + Ok(WriteFileResponse(())) +} + +#[derive(Debug, Snafu)] +#[snafu(module)] +pub enum WriteFileError { + #[snafu(display("Failed to create parent directory {}", parent_dir.display()))] + UnableToCreateDir { + source: std::io::Error, + parent_dir: PathBuf, + }, + + #[snafu(display("Failed to write file {}", path.display()))] + UnableToWriteFile { + source: std::io::Error, + path: PathBuf, + }, + + #[snafu(display("Failed to send worker message to serialization task"))] + UnableToSendWorkerMessage { source: mpsc::error::SendError<()> }, +} + +async fn handle_read_file( + req: ReadFileRequest, + project_dir: PathBuf, +) -> Result { + use read_file_error::*; + + let path = parse_working_dir(Some(req.path), project_dir); + + let content = fs::read(&path) + .await + .context(UnableToReadFileSnafu { path })?; + + Ok(ReadFileResponse(content)) +} + +#[derive(Debug, Snafu)] +#[snafu(module)] +pub enum ReadFileError { + #[snafu(display("Failed to read file {}", path.display()))] + UnableToReadFile { + source: std::io::Error, + path: PathBuf, + }, + + #[snafu(display("Failed to send worker message to serialization task"))] + UnableToSendWorkerMessage { source: mpsc::error::SendError<()> }, +} + +// Current working directory defaults to project dir unless specified otherwise. +fn parse_working_dir(cwd: Option, project_path: impl Into) -> PathBuf { + let mut final_path = project_path.into(); + if let Some(path) = cwd { + // Absolute path will replace final_path. + final_path.push(path) + } + final_path +} + +async fn manage_processes( + mut stdin_rx: mpsc::Receiver>, + mut cmd_rx: mpsc::Receiver, + project_path: PathBuf, +) -> Result<(), ProcessError> { + use process_error::*; + + let mut processes = JoinSet::new(); + let mut stdin_senders = HashMap::new(); + let (stdin_shutdown_tx, mut stdin_shutdown_rx) = mpsc::channel(8); + + loop { + select! { + cmd_req = cmd_rx.recv() => { + let Some((Multiplexed(job_id, req), worker_msg_tx)) = cmd_req else { break }; + + let RunningChild { child, stdin_rx, stdin, stdout, stderr } = match process_begin(req, &project_path, &mut stdin_senders, job_id) { + Ok(v) => v, + Err(e) => { + // Should we add a message for process started + // in addition to the current message which + // indicates that the process has ended? + worker_msg_tx.send_err(e).await.context(UnableToSendExecuteCommandStartedResponseSnafu)?; + continue; + } + }; + + let task_set = stream_stdio(worker_msg_tx.clone(), stdin_rx, stdin, stdout, stderr); + + processes.spawn({ + let stdin_shutdown_tx = stdin_shutdown_tx.clone(); + async move { + worker_msg_tx + .send(process_end(child, task_set, stdin_shutdown_tx, job_id).await) + .await + .context(UnableToSendExecuteCommandResponseSnafu) + } + }); + } + + stdin_packet = stdin_rx.recv() => { + // Dispatch stdin packet to different child by attached command id. + let Some(Multiplexed(job_id, packet)) = stdin_packet else { break }; + + if let Some(stdin_tx) = stdin_senders.get(&job_id) { + stdin_tx.send(packet).await.drop_error_details().context(UnableToSendStdinDataSnafu)?; + } + } + + job_id = stdin_shutdown_rx.recv() => { + let job_id = job_id.context(StdinShutdownReceiverEndedSnafu)?; + stdin_senders.remove(&job_id); + // Should we care if we remove a sender that's already removed? + } + + Some(process) = processes.join_next() => { + process.context(ProcessTaskPanickedSnafu)??; + } + } + } + + Ok(()) +} + +struct RunningChild { + child: Child, + stdin_rx: mpsc::Receiver, + stdin: ChildStdin, + stdout: ChildStdout, + stderr: ChildStderr, +} + +fn process_begin( + req: ExecuteCommandRequest, + project_path: &Path, + stdin_senders: &mut HashMap>, + job_id: JobId, +) -> Result { + use process_error::*; + + let ExecuteCommandRequest { + cmd, + args, + envs, + cwd, + } = req; + let mut child = Command::new(&cmd) + .args(args) + .envs(envs) + .current_dir(parse_working_dir(cwd, project_path)) + .kill_on_drop(true) + .stdin(Stdio::piped()) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .spawn() + .context(UnableToSpawnProcessSnafu { cmd })?; + + let stdin = child.stdin.take().context(UnableToCaptureStdinSnafu)?; + let stdout = child.stdout.take().context(UnableToCaptureStdoutSnafu)?; + let stderr = child.stderr.take().context(UnableToCaptureStderrSnafu)?; + + // Preparing for receiving stdin packet. + let (stdin_tx, stdin_rx) = mpsc::channel(8); + stdin_senders.insert(job_id, stdin_tx); + + Ok(RunningChild { + child, + stdin_rx, + stdin, + stdout, + stderr, + }) +} + +async fn process_end( + mut child: Child, + mut task_set: JoinSet>, + stdin_shutdown_tx: mpsc::Sender, + job_id: JobId, +) -> Result { + use process_error::*; + + let status = child.wait().await.context(WaitChildSnafu)?; + + stdin_shutdown_tx + .send(job_id) + .await + .drop_error_details() + .context(UnableToSendStdinShutdownSnafu)?; + + while let Some(task) = task_set.join_next().await { + task.context(StdioTaskPanickedSnafu)? + .context(StdioTaskFailedSnafu)?; + } + + let success = status.success(); + Ok(ExecuteCommandResponse { success }) +} + +#[derive(Debug, Snafu)] +#[snafu(module)] +pub enum ProcessError { + #[snafu(display("Failed to spawn child process {cmd}"))] + UnableToSpawnProcess { source: std::io::Error, cmd: String }, + + #[snafu(display("Failed to capture child process stdin"))] + UnableToCaptureStdin, + + #[snafu(display("Failed to capture child process stdout"))] + UnableToCaptureStdout, + + #[snafu(display("Failed to capture child process stderr"))] + UnableToCaptureStderr, + + #[snafu(display("Failed to send stdin data"))] + UnableToSendStdinData { source: mpsc::error::SendError<()> }, + + #[snafu(display("Failed to wait for child process exiting"))] + WaitChild { source: std::io::Error }, + + #[snafu(display("Failed to send the stdin shutdown request"))] + UnableToSendStdinShutdown { source: mpsc::error::SendError<()> }, + + #[snafu(display("The command's stdio task panicked"))] + StdioTaskPanicked { source: tokio::task::JoinError }, + + #[snafu(display("The command's stdio task failed"))] + StdioTaskFailed { source: StdioError }, + + #[snafu(display("Failed to send the command started response to the coordinator"))] + UnableToSendExecuteCommandStartedResponse { source: MultiplexingSenderError }, + + #[snafu(display("Failed to send the command completed response to the coordinator"))] + UnableToSendExecuteCommandResponse { source: MultiplexingSenderError }, + + #[snafu(display("The stdin shutdown receiver ended prematurely"))] + StdinShutdownReceiverEnded, + + #[snafu(display("The process task panicked"))] + ProcessTaskPanicked { source: tokio::task::JoinError }, +} + +fn stream_stdio( + coordinator_tx: MultiplexingSender, + mut stdin_rx: mpsc::Receiver, + mut stdin: ChildStdin, + stdout: ChildStdout, + stderr: ChildStderr, +) -> JoinSet> { + use stdio_error::*; + + let mut set = JoinSet::new(); + + set.spawn(async move { + loop { + let Some(data) = stdin_rx.recv().await else { break }; + stdin + .write_all(data.as_bytes()) + .await + .context(UnableToWriteStdinSnafu)?; + stdin.flush().await.context(UnableToFlushStdinSnafu)?; + } + + Ok(()) + }); + + set.spawn({ + copy_child_output(stdout, coordinator_tx.clone(), WorkerMessage::StdoutPacket) + .context(CopyStdoutSnafu) + }); + + set.spawn({ + copy_child_output(stderr, coordinator_tx, WorkerMessage::StderrPacket) + .context(CopyStderrSnafu) + }); + + set +} + +#[derive(Debug, Snafu)] +#[snafu(module)] +pub enum StdioError { + #[snafu(display("Failed to write stdin data"))] + UnableToWriteStdin { source: std::io::Error }, + + #[snafu(display("Failed to flush stdin data"))] + UnableToFlushStdin { source: std::io::Error }, + + #[snafu(display("Failed to copy child stdout"))] + CopyStdout { source: CopyChildOutputError }, + + #[snafu(display("Failed to copy child stderr"))] + CopyStderr { source: CopyChildOutputError }, +} + +async fn copy_child_output( + output: impl AsyncRead + Unpin, + coordinator_tx: MultiplexingSender, + mut xform: impl FnMut(String) -> WorkerMessage, +) -> Result<(), CopyChildOutputError> { + use copy_child_output_error::*; + + let mut buf = BufReader::new(output); + + loop { + // Must be valid UTF-8. + let mut buffer = String::new(); + + let n = buf + .read_line(&mut buffer) + .await + .context(UnableToReadSnafu)?; + + if n == 0 { + break; + } + + coordinator_tx + .send_ok(xform(buffer)) + .await + .context(UnableToSendSnafu)?; + } + + Ok(()) +} + +#[derive(Debug, Snafu)] +#[snafu(module)] +pub enum CopyChildOutputError { + #[snafu(display("Failed to read child output"))] + UnableToRead { source: std::io::Error }, + + #[snafu(display("Failed to send output packet"))] + UnableToSend { source: MultiplexingSenderError }, +} + +// stdin/out <--> messages. +fn spawn_io_queue( + coordinator_msg_tx: mpsc::Sender>, + mut worker_msg_rx: mpsc::Receiver>, +) -> JoinSet> { + use io_queue_error::*; + use std::io::{prelude::*, BufReader, BufWriter}; + + let mut tasks = JoinSet::new(); + + tasks.spawn_blocking(move || { + let stdin = std::io::stdin(); + let mut stdin = BufReader::new(stdin); + + loop { + let coordinator_msg = bincode::deserialize_from(&mut stdin); + + if bincode_input_closed(&coordinator_msg) { + break; + }; + + let coordinator_msg = + coordinator_msg.context(UnableToDeserializeCoordinatorMessageSnafu)?; + + coordinator_msg_tx + .blocking_send(coordinator_msg) + .drop_error_details() + .context(UnableToSendCoordinatorMessageSnafu)?; + } + + Ok(()) + }); + + tasks.spawn_blocking(move || { + let stdout = std::io::stdout(); + let mut stdout = BufWriter::new(stdout); + + loop { + let worker_msg = worker_msg_rx + .blocking_recv() + .context(UnableToReceiveWorkerMessageSnafu)?; + + bincode::serialize_into(&mut stdout, &worker_msg) + .context(UnableToSerializeWorkerMessageSnafu)?; + + stdout.flush().context(UnableToFlushStdoutSnafu)?; + } + }); + + tasks +} + +#[derive(Debug, Snafu)] +#[snafu(module)] +pub enum IoQueueError { + #[snafu(display("Failed to deserialize coordinator message"))] + UnableToDeserializeCoordinatorMessage { source: bincode::Error }, + + #[snafu(display("Failed to serialize worker message"))] + UnableToSerializeWorkerMessage { source: bincode::Error }, + + #[snafu(display("Failed to send coordinator message from deserialization task"))] + UnableToSendCoordinatorMessage { source: mpsc::error::SendError<()> }, + + #[snafu(display("Failed to receive worker message"))] + UnableToReceiveWorkerMessage, + + #[snafu(display("Failed to flush stdout"))] + UnableToFlushStdout { source: std::io::Error }, +} diff --git a/tests/spec/features/sharing_with_others_spec.rb b/tests/spec/features/sharing_with_others_spec.rb index 34abb8b5e..80fe0d7f5 100644 --- a/tests/spec/features/sharing_with_others_spec.rb +++ b/tests/spec/features/sharing_with_others_spec.rb @@ -3,7 +3,7 @@ require 'support/matchers/be_at_url' require 'support/playground_actions' -RSpec.feature "Sharing the code with others", type: :feature, js: true do +RSpec.feature "Sharing the code with others", :external, type: :feature, js: true do include PlaygroundActions before { visit '/' } diff --git a/ui/Cargo.lock b/ui/Cargo.lock index f391f6ea4..b2164ff24 100644 --- a/ui/Cargo.lock +++ b/ui/Cargo.lock @@ -47,6 +47,16 @@ version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bddcadddf5e9015d310179a59bb28c4d4b9920ad0f11e8e14dbadf654890c9a6" +[[package]] +name = "asm-cleanup" +version = "0.1.0" +dependencies = [ + "lazy_static", + "petgraph", + "regex", + "rustc-demangle", +] + [[package]] name = "async-trait" version = "0.1.68" @@ -144,6 +154,15 @@ version = "0.21.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "604178f6c5c21f02dc555784810edfb88d34ac2c73b2eae109655649ee73ce3d" +[[package]] +name = "bincode" +version = "1.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b1f45e9417d87227c7a56d22e471c6206462cba514c7590c09aff4cf6d1ddcad" +dependencies = [ + "serde", +] + [[package]] name = "bitflags" version = "1.3.2" @@ -748,6 +767,15 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "modify-cargo-toml" +version = "0.1.0" +dependencies = [ + "serde", + "serde_derive", + "toml", +] + [[package]] name = "nu-ansi-term" version = "0.46.0" @@ -856,6 +884,22 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" +[[package]] +name = "orchestrator" +version = "0.1.0" +dependencies = [ + "asm-cleanup", + "bincode", + "futures", + "modify-cargo-toml", + "serde", + "snafu", + "tokio", + "tokio-stream", + "tokio-util", + "toml", +] + [[package]] name = "overload" version = "0.1.1" @@ -1230,6 +1274,15 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_spanned" +version = "0.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "93107647184f6027e3b7dcb2e11034cf95ffa1e3a682c67951963ac69c1c007d" +dependencies = [ + "serde", +] + [[package]] name = "serde_urlencoded" version = "0.7.1" @@ -1306,6 +1359,8 @@ checksum = "cb0656e7e3ffb70f6c39b3c2a86332bb74aa3c679da781642590f3c1118c5045" dependencies = [ "backtrace", "doc-comment", + "futures-core", + "pin-project", "snafu-derive", ] @@ -1522,6 +1577,17 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-stream" +version = "0.1.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "397c988d37662c7dda6d2208364a706264bf3d6138b11d436cbac0ad38832842" +dependencies = [ + "futures-core", + "pin-project-lite", + "tokio", +] + [[package]] name = "tokio-tungstenite" version = "0.18.0" @@ -1547,6 +1613,40 @@ dependencies = [ "tokio", ] +[[package]] +name = "toml" +version = "0.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d6135d499e69981f9ff0ef2167955a5333c35e36f6937d382974566b3d5b94ec" +dependencies = [ + "serde", + "serde_spanned", + "toml_datetime", + "toml_edit", +] + +[[package]] +name = "toml_datetime" +version = "0.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a76a9312f5ba4c2dec6b9161fdf25d87ad8a09256ccea5a556fef03c706a10f" +dependencies = [ + "serde", +] + +[[package]] +name = "toml_edit" +version = "0.19.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2380d56e8670370eee6566b0bfd4265f65b3f432e8c6d85623f728d4fa31f739" +dependencies = [ + "indexmap", + "serde", + "serde_spanned", + "toml_datetime", + "winnow", +] + [[package]] name = "tower" version = "0.4.13" @@ -1695,6 +1795,7 @@ checksum = "497961ef93d974e23eb6f433eb5fe1b7930b659f06d12dec6fc44a8f554c0bba" name = "ui" version = "0.1.0" dependencies = [ + "asm-cleanup", "async-trait", "axum", "dotenv", @@ -1702,10 +1803,9 @@ dependencies = [ "lazy_static", "octocrab", "openssl-probe", - "petgraph", + "orchestrator", "prometheus", "regex", - "rustc-demangle", "serde", "serde_derive", "serde_json", @@ -2017,6 +2117,15 @@ version = "0.48.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1a515f5799fe4961cb532f983ce2b23082366b898e52ffbce459c86f67c8378a" +[[package]] +name = "winnow" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "61de7bac303dc551fe038e2b3cef0f571087a47571ea6e79a87692ac99b99699" +dependencies = [ + "memchr", +] + [[package]] name = "zeroize" version = "1.6.0" diff --git a/ui/Cargo.toml b/ui/Cargo.toml index f01f8b758..4e89afa69 100644 --- a/ui/Cargo.toml +++ b/ui/Cargo.toml @@ -9,6 +9,7 @@ default = ['fork-bomb-prevention'] fork-bomb-prevention = [] [dependencies] +asm-cleanup = { path = "../compiler/base/asm-cleanup" } async-trait = "0.1.52" axum = { version = "0.6", features = ["headers", "ws"] } dotenv = "0.15.0" @@ -16,10 +17,9 @@ futures = "0.3.21" lazy_static = "1.0.0" octocrab = "0.25" openssl-probe = "0.1.2" -petgraph = "0.6.0" +orchestrator = { path = "../compiler/base/orchestrator" } prometheus = "0.13.0" regex = "1.0.0" -rustc-demangle = "0.1.5" serde = { version = "1.0", features = ["rc"] } serde_derive = "1.0" serde_json = "1.0" diff --git a/ui/src/main.rs b/ui/src/main.rs index d80699cb7..5bec44bf8 100644 --- a/ui/src/main.rs +++ b/ui/src/main.rs @@ -14,7 +14,6 @@ use tracing::{error, info, warn}; const DEFAULT_ADDRESS: &str = "127.0.0.1"; const DEFAULT_PORT: u16 = 5000; -mod asm_cleanup; mod env; mod gist; mod metrics; @@ -38,6 +37,7 @@ struct Config { cors_enabled: bool, gh_token: Option, metrics_token: Option, + orchestrator_enabled: bool, port: u16, root: PathBuf, } @@ -91,11 +91,14 @@ impl Config { let cors_enabled = env::var_os("PLAYGROUND_CORS_ENABLED").is_some(); + let orchestrator_enabled = env::var_os("PLAYGROUND_ORCHESTRATOR_ENABLED").is_some(); + Self { address, cors_enabled, gh_token, metrics_token, + orchestrator_enabled, port, root, } @@ -113,6 +116,10 @@ impl Config { self.cors_enabled } + fn use_orchestrator(&self) -> bool { + self.orchestrator_enabled + } + fn metrics_token(&self) -> Option { self.metrics_token.as_deref().map(MetricsToken::new) } @@ -204,6 +211,24 @@ pub enum Error { CachePoisoned, #[snafu(display("The WebSocket worker panicked: {}", text))] WebSocketTaskPanic { text: String }, + + #[snafu(display("Unable to create the coordinator"))] + CreateCoordinator { + source: orchestrator::coordinator::Error, + }, + + #[snafu(display("Unable to shutdown the coordinator"))] + ShutdownCoordinator { + source: orchestrator::coordinator::Error, + }, + + #[snafu(display("Unable to convert the compile request"))] + Compile { + source: orchestrator::coordinator::CompileError, + }, + + #[snafu(display("The operation timed out"))] + Timeout { source: tokio::time::error::Elapsed }, } type Result = ::std::result::Result; diff --git a/ui/src/metrics.rs b/ui/src/metrics.rs index 15a787aa5..5c22bfd97 100644 --- a/ui/src/metrics.rs +++ b/ui/src/metrics.rs @@ -1,11 +1,15 @@ use futures::future::BoxFuture; use lazy_static::lazy_static; +use orchestrator::coordinator; use prometheus::{ self, register_histogram, register_histogram_vec, register_int_counter, register_int_gauge, Histogram, HistogramVec, IntCounter, IntGauge, }; use regex::Regex; -use std::{future::Future, time::Instant}; +use std::{ + future::Future, + time::{Duration, Instant}, +}; use crate::sandbox::{self, Channel, CompileTarget, CrateType, Edition, Mode}; @@ -62,6 +66,16 @@ pub(crate) enum Outcome { ErrorUserCode, } +pub(crate) struct LabelsCore { + target: Option, + channel: Option, + mode: Option, + edition: Option>, + crate_type: Option, + tests: Option, + backtrace: Option, +} + #[derive(Debug, Copy, Clone)] pub(crate) struct Labels { endpoint: Endpoint, @@ -132,6 +146,29 @@ impl Labels { backtrace, ] } + + pub(crate) fn complete(endpoint: Endpoint, labels_core: LabelsCore, outcome: Outcome) -> Self { + let LabelsCore { + target, + channel, + mode, + edition, + crate_type, + tests, + backtrace, + } = labels_core; + Self { + endpoint, + outcome, + target, + channel, + mode, + edition, + crate_type, + tests, + backtrace, + } + } } pub(crate) trait GenerateLabels { @@ -406,11 +443,8 @@ where let outcome = SuccessDetails::for_sandbox_result(&response); let mut labels = request.generate_labels(outcome); f(&mut labels); - let values = &labels.as_values(); - - let histogram = REQUESTS.with_label_values(values); - histogram.observe(elapsed.as_secs_f64()); + record_metric_complete(labels, elapsed); response } @@ -443,10 +477,53 @@ where tests: None, backtrace: None, }; - let values = &labels.as_values(); - let histogram = REQUESTS.with_label_values(values); - histogram.observe(elapsed.as_secs_f64()); + record_metric_complete(labels, elapsed); response } + +pub(crate) trait HasLabelsCore { + fn labels_core(&self) -> LabelsCore; +} + +impl HasLabelsCore for coordinator::CompileRequest { + fn labels_core(&self) -> LabelsCore { + let Self { + target, + channel, + crate_type, + mode, + edition, + tests, + backtrace, + code: _, + } = *self; + + LabelsCore { + target: Some(target.into()), + channel: Some(channel.into()), + mode: Some(mode.into()), + edition: Some(Some(edition.into())), + crate_type: Some(crate_type.into()), + tests: Some(tests), + backtrace: Some(backtrace), + } + } +} + +pub(crate) fn record_metric( + endpoint: Endpoint, + labels_core: LabelsCore, + outcome: Outcome, + elapsed: Duration, +) { + let labels = Labels::complete(endpoint, labels_core, outcome); + record_metric_complete(labels, elapsed) +} + +fn record_metric_complete(labels: Labels, elapsed: Duration) { + let values = &labels.as_values(); + let histogram = REQUESTS.with_label_values(values); + histogram.observe(elapsed.as_secs_f64()); +} diff --git a/ui/src/sandbox.rs b/ui/src/sandbox.rs index d96a5636b..b6163145b 100644 --- a/ui/src/sandbox.rs +++ b/ui/src/sandbox.rs @@ -14,7 +14,7 @@ use tempfile::TempDir; use tokio::{fs, process::Command, time}; use tracing::debug; -const DOCKER_PROCESS_TIMEOUT_SOFT: Duration = Duration::from_secs(10); +pub(crate) const DOCKER_PROCESS_TIMEOUT_SOFT: Duration = Duration::from_secs(10); const DOCKER_PROCESS_TIMEOUT_HARD: Duration = Duration::from_secs(12); #[derive(Debug, Deserialize)] @@ -314,11 +314,11 @@ impl Sandbox { if let CompileTarget::Assembly(_, demangle, process) = req.target { if demangle == DemangleAssembly::Demangle { - code = crate::asm_cleanup::demangle_asm(&code); + code = asm_cleanup::demangle_asm(&code); } if process == ProcessAssembly::Filter { - code = crate::asm_cleanup::filter_asm(&code); + code = asm_cleanup::filter_asm(&code); } } else if CompileTarget::Hir == req.target { // TODO: Run rustfmt on the generated HIR. @@ -1055,6 +1055,102 @@ pub struct MacroExpansionResponse { pub stderr: String, } +mod sandbox_orchestrator_integration_impls { + use orchestrator::coordinator; + + impl From for super::CompileTarget { + fn from(value: coordinator::CompileTarget) -> Self { + match value { + coordinator::CompileTarget::Assembly(a, b, c) => { + super::CompileTarget::Assembly(a.into(), b.into(), c.into()) + } + coordinator::CompileTarget::Hir => super::CompileTarget::Hir, + coordinator::CompileTarget::LlvmIr => super::CompileTarget::LlvmIr, + coordinator::CompileTarget::Mir => super::CompileTarget::Mir, + coordinator::CompileTarget::Wasm => super::CompileTarget::Wasm, + } + } + } + + impl From for super::Mode { + fn from(value: coordinator::Mode) -> Self { + match value { + coordinator::Mode::Debug => super::Mode::Debug, + coordinator::Mode::Release => super::Mode::Release, + } + } + } + + impl From for super::Edition { + fn from(value: coordinator::Edition) -> Self { + match value { + coordinator::Edition::Rust2015 => super::Edition::Rust2015, + coordinator::Edition::Rust2018 => super::Edition::Rust2018, + coordinator::Edition::Rust2021 => super::Edition::Rust2021, + } + } + } + + impl From for super::Channel { + fn from(value: coordinator::Channel) -> Self { + match value { + coordinator::Channel::Stable => super::Channel::Stable, + coordinator::Channel::Beta => super::Channel::Beta, + coordinator::Channel::Nightly => super::Channel::Nightly, + } + } + } + + impl From for super::AssemblyFlavor { + fn from(value: coordinator::AssemblyFlavor) -> Self { + match value { + coordinator::AssemblyFlavor::Att => super::AssemblyFlavor::Att, + coordinator::AssemblyFlavor::Intel => super::AssemblyFlavor::Intel, + } + } + } + + impl From for super::CrateType { + fn from(value: coordinator::CrateType) -> Self { + match value { + coordinator::CrateType::Binary => super::CrateType::Binary, + coordinator::CrateType::Library(a) => super::CrateType::Library(a.into()), + } + } + } + + impl From for super::DemangleAssembly { + fn from(value: coordinator::DemangleAssembly) -> Self { + match value { + coordinator::DemangleAssembly::Demangle => super::DemangleAssembly::Demangle, + coordinator::DemangleAssembly::Mangle => super::DemangleAssembly::Mangle, + } + } + } + + impl From for super::ProcessAssembly { + fn from(value: coordinator::ProcessAssembly) -> Self { + match value { + coordinator::ProcessAssembly::Filter => super::ProcessAssembly::Filter, + coordinator::ProcessAssembly::Raw => super::ProcessAssembly::Raw, + } + } + } + + impl From for super::LibraryType { + fn from(value: coordinator::LibraryType) -> Self { + match value { + coordinator::LibraryType::Lib => super::LibraryType::Lib, + coordinator::LibraryType::Dylib => super::LibraryType::Dylib, + coordinator::LibraryType::Rlib => super::LibraryType::Rlib, + coordinator::LibraryType::Staticlib => super::LibraryType::Staticlib, + coordinator::LibraryType::Cdylib => super::LibraryType::Cdylib, + coordinator::LibraryType::ProcMacro => super::LibraryType::ProcMacro, + } + } + } +} + #[cfg(test)] mod test { use super::*; diff --git a/ui/src/server_axum.rs b/ui/src/server_axum.rs index fdd473a26..e82d029c8 100644 --- a/ui/src/server_axum.rs +++ b/ui/src/server_axum.rs @@ -1,17 +1,19 @@ use crate::{ gist, metrics::{ - track_metric_async, track_metric_force_endpoint_async, track_metric_no_request_async, - Endpoint, GenerateLabels, SuccessDetails, UNAVAILABLE_WS, + record_metric, track_metric_async, track_metric_force_endpoint_async, + track_metric_no_request_async, Endpoint, GenerateLabels, HasLabelsCore, Outcome, + SuccessDetails, UNAVAILABLE_WS, }, - sandbox::{self, Channel, Sandbox}, + sandbox::{self, Channel, Sandbox, DOCKER_PROCESS_TIMEOUT_SOFT}, CachingSnafu, ClippyRequest, ClippyResponse, CompilationSnafu, CompileRequest, CompileResponse, - Config, Error, ErrorJson, EvaluateRequest, EvaluateResponse, EvaluationSnafu, ExecuteRequest, - ExecuteResponse, ExecutionSnafu, ExpansionSnafu, FormatRequest, FormatResponse, - FormattingSnafu, GhToken, GistCreationSnafu, GistLoadingSnafu, InterpretingSnafu, LintingSnafu, - MacroExpansionRequest, MacroExpansionResponse, MetaCratesResponse, MetaGistCreateRequest, - MetaGistResponse, MetaVersionResponse, MetricsToken, MiriRequest, MiriResponse, Result, - SandboxCreationSnafu, + CompileSnafu, Config, CreateCoordinatorSnafu, Error, ErrorJson, EvaluateRequest, + EvaluateResponse, EvaluationSnafu, ExecuteRequest, ExecuteResponse, ExecutionSnafu, + ExpansionSnafu, FormatRequest, FormatResponse, FormattingSnafu, GhToken, GistCreationSnafu, + GistLoadingSnafu, InterpretingSnafu, LintingSnafu, MacroExpansionRequest, + MacroExpansionResponse, MetaCratesResponse, MetaGistCreateRequest, MetaGistResponse, + MetaVersionResponse, MetricsToken, MiriRequest, MiriResponse, Result, SandboxCreationSnafu, + ShutdownCoordinatorSnafu, TimeoutSnafu, }; use async_trait::async_trait; use axum::{ @@ -27,6 +29,7 @@ use axum::{ Router, }; use futures::{future::BoxFuture, FutureExt}; +use orchestrator::coordinator::{self, DockerBackend}; use snafu::{prelude::*, IntoError}; use std::{ convert::{TryFrom, TryInto}, @@ -55,6 +58,9 @@ const MAX_AGE_ONE_YEAR: HeaderValue = HeaderValue::from_static("public, max-age= mod websocket; +#[derive(Debug, Copy, Clone)] +struct OrchestratorEnabled(bool); + #[tokio::main] pub(crate) async fn serve(config: Config) { let root_files = static_file_service(config.root_path(), MAX_AGE_ONE_DAY); @@ -87,7 +93,8 @@ pub(crate) async fn serve(config: Config) { .route("/nowebsocket", post(nowebsocket)) .route("/whynowebsocket", get(whynowebsocket)) .layer(Extension(Arc::new(SandboxCache::default()))) - .layer(Extension(config.github_token())); + .layer(Extension(config.github_token())) + .layer(Extension(OrchestratorEnabled(config.use_orchestrator()))); if let Some(token) = config.metrics_token() { app = app.layer(Extension(token)) @@ -152,14 +159,23 @@ async fn evaluate(Json(req): Json) -> Result) -> Result> { - with_sandbox( - req, - |sb, req| async move { sb.compile(req).await }.boxed(), - CompilationSnafu, - ) - .await - .map(Json) +async fn compile( + Extension(use_orchestrator): Extension, + Json(req): Json, +) -> Result> { + if use_orchestrator.0 { + with_coordinator(req, |c, req| c.compile(req).context(CompileSnafu).boxed()) + .await + .map(Json) + } else { + with_sandbox( + req, + |sb, req| async move { sb.compile(req).await }.boxed(), + CompilationSnafu, + ) + .await + .map(Json) + } } async fn execute(Json(req): Json) -> Result> { @@ -251,6 +267,82 @@ where .context(ctx) } +pub(crate) trait HasEndpoint { + const ENDPOINT: Endpoint; +} + +impl HasEndpoint for CompileRequest { + const ENDPOINT: Endpoint = Endpoint::Compile; +} + +trait IsSuccess { + fn is_success(&self) -> bool; +} + +impl IsSuccess for coordinator::CompileResponseWithOutput { + fn is_success(&self) -> bool { + self.success + } +} + +async fn with_coordinator(req: WebReq, f: F) -> Result +where + WebReq: TryInto, + WebReq: HasEndpoint, + Req: HasLabelsCore, + Resp: Into, + Resp: IsSuccess, + for<'f> F: + FnOnce(&'f coordinator::Coordinator, Req) -> BoxFuture<'f, Result>, +{ + let coordinator = orchestrator::coordinator::Coordinator::new_docker() + .await + .context(CreateCoordinatorSnafu)?; + + let job = async { + let req = req.try_into()?; + + let labels_core = req.labels_core(); + + let start = Instant::now(); + + let job = f(&coordinator, req); + let resp = tokio::time::timeout(DOCKER_PROCESS_TIMEOUT_SOFT, job).await; + + let elapsed = start.elapsed(); + + let outcome = match &resp { + Ok(Ok(v)) => { + if v.is_success() { + Outcome::Success + } else { + Outcome::ErrorUserCode + } + } + Ok(Err(_)) => Outcome::ErrorServer, + Err(_) => Outcome::ErrorTimeoutSoft, + }; + + // Note that any early return before this point won't be + // reported in the metrics! + + record_metric(WebReq::ENDPOINT, labels_core, outcome, elapsed); + + let resp = resp.context(TimeoutSnafu)?; + + resp.map(Into::into) + }; + + let resp = job.await; + + coordinator + .shutdown() + .await + .context(ShutdownCoordinatorSnafu)?; + + resp +} + async fn meta_crates( Extension(cache): Extension>, if_none_match: Option>, @@ -676,3 +768,162 @@ where axum::Json(self.0).into_response() } } + +mod api_orchestrator_integration_impls { + use orchestrator::coordinator::*; + use std::convert::TryFrom; + + use super::{Error, Result}; + + impl TryFrom for CompileRequest { + type Error = Error; + + fn try_from(other: crate::CompileRequest) -> Result { + let crate::CompileRequest { + target, + assembly_flavor, + demangle_assembly, + process_assembly, + channel, + mode, + edition, + crate_type, + tests, + backtrace, + code, + } = other; + + Ok(Self { + target: parse_target( + &target, + assembly_flavor.as_deref(), + demangle_assembly.as_deref(), + process_assembly.as_deref(), + )?, + channel: parse_channel(&channel)?, + crate_type: parse_crate_type(&crate_type)?, + mode: parse_mode(&mode)?, + edition: parse_edition(&edition)?, + tests, + backtrace, + code, + }) + } + } + + impl From for crate::CompileResponse { + fn from(other: CompileResponseWithOutput) -> Self { + let CompileResponseWithOutput { + success, + code, + stdout, + stderr, + } = other; + + Self { + success, + code, + stdout, + stderr, + } + } + } + + fn parse_target( + target: &str, + assembly_flavor: Option<&str>, + demangle_assembly: Option<&str>, + process_assembly: Option<&str>, + ) -> Result { + Ok(match target { + "asm" => { + let assembly_flavor = match assembly_flavor { + Some(f) => parse_assembly_flavor(f)?, + None => AssemblyFlavor::Att, + }; + + let demangle = match demangle_assembly { + Some(f) => parse_demangle_assembly(f)?, + None => DemangleAssembly::Demangle, + }; + + let process_assembly = match process_assembly { + Some(f) => parse_process_assembly(f)?, + None => ProcessAssembly::Filter, + }; + + CompileTarget::Assembly(assembly_flavor, demangle, process_assembly) + } + "llvm-ir" => CompileTarget::LlvmIr, + "mir" => CompileTarget::Mir, + "hir" => CompileTarget::Hir, + "wasm" => CompileTarget::Wasm, + value => crate::InvalidTargetSnafu { value }.fail()?, + }) + } + + fn parse_assembly_flavor(s: &str) -> Result { + Ok(match s { + "att" => AssemblyFlavor::Att, + "intel" => AssemblyFlavor::Intel, + value => crate::InvalidAssemblyFlavorSnafu { value }.fail()?, + }) + } + + fn parse_demangle_assembly(s: &str) -> Result { + Ok(match s { + "demangle" => DemangleAssembly::Demangle, + "mangle" => DemangleAssembly::Mangle, + value => crate::InvalidDemangleAssemblySnafu { value }.fail()?, + }) + } + + fn parse_process_assembly(s: &str) -> Result { + Ok(match s { + "filter" => ProcessAssembly::Filter, + "raw" => ProcessAssembly::Raw, + value => crate::InvalidProcessAssemblySnafu { value }.fail()?, + }) + } + + fn parse_channel(s: &str) -> Result { + Ok(match s { + "stable" => Channel::Stable, + "beta" => Channel::Beta, + "nightly" => Channel::Nightly, + value => crate::InvalidChannelSnafu { value }.fail()?, + }) + } + + fn parse_crate_type(s: &str) -> Result { + use {CrateType::*, LibraryType::*}; + + Ok(match s { + "bin" => Binary, + "lib" => Library(Lib), + "dylib" => Library(Dylib), + "rlib" => Library(Rlib), + "staticlib" => Library(Staticlib), + "cdylib" => Library(Cdylib), + "proc-macro" => Library(ProcMacro), + value => crate::InvalidCrateTypeSnafu { value }.fail()?, + }) + } + + fn parse_mode(s: &str) -> Result { + Ok(match s { + "debug" => Mode::Debug, + "release" => Mode::Release, + value => crate::InvalidModeSnafu { value }.fail()?, + }) + } + + fn parse_edition(s: &str) -> Result { + Ok(match s { + "2015" => Edition::Rust2015, + "2018" => Edition::Rust2018, + "2021" => Edition::Rust2021, + value => crate::InvalidEditionSnafu { value }.fail()?, + }) + } +}