From 2136e815bbbd9377154138c278cbb602b5f0b80a Mon Sep 17 00:00:00 2001 From: Jake Goulding Date: Wed, 14 Jun 2023 14:40:17 -0400 Subject: [PATCH 1/7] Tag tests that use external resources --- tests/spec/features/sharing_with_others_spec.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/spec/features/sharing_with_others_spec.rb b/tests/spec/features/sharing_with_others_spec.rb index 34abb8b5e..80fe0d7f5 100644 --- a/tests/spec/features/sharing_with_others_spec.rb +++ b/tests/spec/features/sharing_with_others_spec.rb @@ -3,7 +3,7 @@ require 'support/matchers/be_at_url' require 'support/playground_actions' -RSpec.feature "Sharing the code with others", type: :feature, js: true do +RSpec.feature "Sharing the code with others", :external, type: :feature, js: true do include PlaygroundActions before { visit '/' } From dae947f2b074e1a818edfd447947d323c9f270ca Mon Sep 17 00:00:00 2001 From: Jake Goulding Date: Wed, 10 May 2023 11:23:01 -0400 Subject: [PATCH 2/7] Extract a modify-cargo-toml library for reuse --- .github/workflows/ci.yml | 2 +- ci/workflows.yml | 1 + compiler/base/modify-cargo-toml/src/lib.rs | 131 ++++++++++++++++++++ compiler/base/modify-cargo-toml/src/main.rs | 131 +------------------- 4 files changed, 136 insertions(+), 129 deletions(-) create mode 100644 compiler/base/modify-cargo-toml/src/lib.rs diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 8046dd084..2875bdc8b 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -185,7 +185,7 @@ jobs: run: cargo fmt --manifest-path top-crates/Cargo.toml --check - name: Build backend run: |- - mkdir -p ui/target; docker run --rm -v $PWD/ui:/ui -v ~/.cargo/git:/root/.cargo/git -v ~/.cargo/registry:/root/.cargo/registry --workdir /ui rust:alpine sh -c ' + mkdir -p ui/target; docker run --rm -v $PWD/compiler/base/modify-cargo-toml:/compiler/base/modify-cargo-toml -v $PWD/ui:/ui -v ~/.cargo/git:/root/.cargo/git -v ~/.cargo/registry:/root/.cargo/registry --workdir /ui rust:alpine sh -c ' apk add musl-dev openssl-dev openssl-libs-static # Adding -C relocation-model=static due to diff --git a/ci/workflows.yml b/ci/workflows.yml index 3e94d569a..b59f09513 100644 --- a/ci/workflows.yml +++ b/ci/workflows.yml @@ -288,6 +288,7 @@ workflows: docker run --rm + -v $PWD/compiler/base/modify-cargo-toml:/compiler/base/modify-cargo-toml -v $PWD/ui:/ui -v ~/.cargo/git:/root/.cargo/git -v ~/.cargo/registry:/root/.cargo/registry diff --git a/compiler/base/modify-cargo-toml/src/lib.rs b/compiler/base/modify-cargo-toml/src/lib.rs new file mode 100644 index 000000000..8b886a0a5 --- /dev/null +++ b/compiler/base/modify-cargo-toml/src/lib.rs @@ -0,0 +1,131 @@ +extern crate serde; +#[macro_use] +extern crate serde_derive; +extern crate toml; + +use std::collections::BTreeMap; +use toml::Value; + +type Other = BTreeMap; + +fn modify(cargo_toml: Value, f: F) -> Value +where + F: FnOnce(T) -> T, + T: serde::Serialize + for<'de> serde::Deserialize<'de>, +{ + let cargo_toml = cargo_toml.try_into().unwrap(); + + let cargo_toml = f(cargo_toml); + + Value::try_from(cargo_toml).unwrap() +} + +fn ensure_string_in_vec(values: &mut Vec, val: &str) { + if !values.iter().any(|f| f == val) { + values.push(val.into()); + } +} + +pub fn set_edition(cargo_toml: Value, edition: &str) -> Value { + #[derive(Debug, Serialize, Deserialize)] + #[serde(rename_all = "kebab-case")] + struct CargoToml { + package: Package, + #[serde(flatten)] + other: Other, + } + + #[derive(Debug, Serialize, Deserialize)] + #[serde(rename_all = "kebab-case")] + struct Package { + #[serde(default)] + edition: String, + #[serde(flatten)] + other: Other, + } + + modify(cargo_toml, |mut cargo_toml: CargoToml| { + cargo_toml.package.edition = edition.into(); + cargo_toml + }) +} + +pub fn remove_dependencies(cargo_toml: Value) -> Value { + #[derive(Debug, Serialize, Deserialize)] + #[serde(rename_all = "kebab-case")] + struct CargoToml { + dependencies: BTreeMap, + #[serde(flatten)] + other: Other, + } + + modify(cargo_toml, |mut cargo_toml: CargoToml| { + cargo_toml.dependencies.clear(); + cargo_toml + }) +} + +pub fn set_crate_type(cargo_toml: Value, crate_type: &str) -> Value { + #[derive(Debug, Serialize, Deserialize)] + #[serde(rename_all = "kebab-case")] + struct CargoToml { + #[serde(default)] + lib: Lib, + #[serde(flatten)] + other: Other, + } + + #[derive(Debug, Default, Serialize, Deserialize)] + #[serde(rename_all = "kebab-case")] + struct Lib { + #[serde(default, skip_serializing_if = "Vec::is_empty")] + crate_type: Vec, + #[serde(default)] + proc_macro: bool, + #[serde(flatten)] + other: Other, + } + + modify(cargo_toml, |mut cargo_toml: CargoToml| { + if crate_type == "proc-macro" { + cargo_toml.lib.proc_macro = true; + } else { + ensure_string_in_vec(&mut cargo_toml.lib.crate_type, crate_type); + } + cargo_toml + }) +} + +pub fn set_release_lto(cargo_toml: Value, lto: bool) -> Value { + #[derive(Debug, Serialize, Deserialize)] + #[serde(rename_all = "kebab-case")] + struct CargoToml { + #[serde(default)] + profile: Profiles, + #[serde(flatten)] + other: Other, + } + + #[derive(Debug, Default, Serialize, Deserialize)] + #[serde(rename_all = "kebab-case")] + struct Profiles { + #[serde(default)] + release: Profile, + #[serde(flatten)] + other: Other, + } + + #[derive(Debug, Default, Serialize, Deserialize)] + #[serde(rename_all = "kebab-case")] + struct Profile { + #[serde(default)] + lto: bool, + #[serde(flatten)] + other: Other, + } + + modify(cargo_toml, |mut cargo_toml: CargoToml| { + cargo_toml.profile.release.lto = lto; + cargo_toml + }) +} diff --git a/compiler/base/modify-cargo-toml/src/main.rs b/compiler/base/modify-cargo-toml/src/main.rs index e5732d583..94177c884 100644 --- a/compiler/base/modify-cargo-toml/src/main.rs +++ b/compiler/base/modify-cargo-toml/src/main.rs @@ -1,9 +1,8 @@ -extern crate serde; -#[macro_use] -extern crate serde_derive; +extern crate modify_cargo_toml; extern crate toml; -use std::{collections::BTreeMap, env, ffi::OsString, fs, path::PathBuf}; +use modify_cargo_toml::*; +use std::{env, ffi::OsString, fs, path::PathBuf}; use toml::Value; fn main() { @@ -41,127 +40,3 @@ fn main() { fs::write(&output_filename, output) .unwrap_or_else(|e| panic!("Cannot write to {}: {}", output_filename.display(), e)); } - -type Other = BTreeMap; - -fn modify(cargo_toml: Value, f: F) -> Value -where - F: FnOnce(T) -> T, - T: serde::Serialize + for<'de> serde::Deserialize<'de>, -{ - let cargo_toml = cargo_toml.try_into().unwrap(); - - let cargo_toml = f(cargo_toml); - - Value::try_from(cargo_toml).unwrap() -} - -fn ensure_string_in_vec(values: &mut Vec, val: &str) { - if !values.iter().any(|f| f == val) { - values.push(val.into()); - } -} - -fn set_edition(cargo_toml: Value, edition: &str) -> Value { - #[derive(Debug, Serialize, Deserialize)] - #[serde(rename_all = "kebab-case")] - struct CargoToml { - package: Package, - #[serde(flatten)] - other: Other, - } - - #[derive(Debug, Serialize, Deserialize)] - #[serde(rename_all = "kebab-case")] - struct Package { - #[serde(default)] - edition: String, - #[serde(flatten)] - other: Other, - } - - modify(cargo_toml, |mut cargo_toml: CargoToml| { - cargo_toml.package.edition = edition.into(); - cargo_toml - }) -} - -fn remove_dependencies(cargo_toml: Value) -> Value { - #[derive(Debug, Serialize, Deserialize)] - #[serde(rename_all = "kebab-case")] - struct CargoToml { - dependencies: BTreeMap, - #[serde(flatten)] - other: Other, - } - - modify(cargo_toml, |mut cargo_toml: CargoToml| { - cargo_toml.dependencies.clear(); - cargo_toml - }) -} - -fn set_crate_type(cargo_toml: Value, crate_type: &str) -> Value { - #[derive(Debug, Serialize, Deserialize)] - #[serde(rename_all = "kebab-case")] - struct CargoToml { - #[serde(default)] - lib: Lib, - #[serde(flatten)] - other: Other, - } - - #[derive(Debug, Default, Serialize, Deserialize)] - #[serde(rename_all = "kebab-case")] - struct Lib { - #[serde(default, skip_serializing_if = "Vec::is_empty")] - crate_type: Vec, - #[serde(default)] - proc_macro: bool, - #[serde(flatten)] - other: Other, - } - - modify(cargo_toml, |mut cargo_toml: CargoToml| { - if crate_type == "proc-macro" { - cargo_toml.lib.proc_macro = true; - } else { - ensure_string_in_vec(&mut cargo_toml.lib.crate_type, crate_type); - } - cargo_toml - }) -} - -fn set_release_lto(cargo_toml: Value, lto: bool) -> Value { - #[derive(Debug, Serialize, Deserialize)] - #[serde(rename_all = "kebab-case")] - struct CargoToml { - #[serde(default)] - profile: Profiles, - #[serde(flatten)] - other: Other, - } - - #[derive(Debug, Default, Serialize, Deserialize)] - #[serde(rename_all = "kebab-case")] - struct Profiles { - #[serde(default)] - release: Profile, - #[serde(flatten)] - other: Other, - } - - #[derive(Debug, Default, Serialize, Deserialize)] - #[serde(rename_all = "kebab-case")] - struct Profile { - #[serde(default)] - lto: bool, - #[serde(flatten)] - other: Other, - } - - modify(cargo_toml, |mut cargo_toml: CargoToml| { - cargo_toml.profile.release.lto = lto; - cargo_toml - }) -} From c665784738979854c7958ea433501e3df12cb841 Mon Sep 17 00:00:00 2001 From: Jake Goulding Date: Wed, 10 May 2023 13:39:19 -0400 Subject: [PATCH 3/7] Extract an asm-cleanup library for reuse --- .github/workflows/ci.yml | 2 +- ci/workflows.yml | 1 + compiler/base/.dockerignore | 1 + compiler/base/asm-cleanup/.gitignore | 2 ++ compiler/base/asm-cleanup/Cargo.toml | 12 ++++++++++++ .../base/asm-cleanup/src/lib.rs | 0 ui/Cargo.lock | 13 +++++++++++-- ui/Cargo.toml | 3 +-- ui/src/main.rs | 1 - ui/src/sandbox.rs | 4 ++-- 10 files changed, 31 insertions(+), 8 deletions(-) create mode 100644 compiler/base/asm-cleanup/.gitignore create mode 100644 compiler/base/asm-cleanup/Cargo.toml rename ui/src/asm_cleanup.rs => compiler/base/asm-cleanup/src/lib.rs (100%) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 2875bdc8b..556e0daad 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -185,7 +185,7 @@ jobs: run: cargo fmt --manifest-path top-crates/Cargo.toml --check - name: Build backend run: |- - mkdir -p ui/target; docker run --rm -v $PWD/compiler/base/modify-cargo-toml:/compiler/base/modify-cargo-toml -v $PWD/ui:/ui -v ~/.cargo/git:/root/.cargo/git -v ~/.cargo/registry:/root/.cargo/registry --workdir /ui rust:alpine sh -c ' + mkdir -p ui/target; docker run --rm -v $PWD/compiler/base/asm-cleanup:/compiler/base/asm-cleanup -v $PWD/compiler/base/modify-cargo-toml:/compiler/base/modify-cargo-toml -v $PWD/ui:/ui -v ~/.cargo/git:/root/.cargo/git -v ~/.cargo/registry:/root/.cargo/registry --workdir /ui rust:alpine sh -c ' apk add musl-dev openssl-dev openssl-libs-static # Adding -C relocation-model=static due to diff --git a/ci/workflows.yml b/ci/workflows.yml index b59f09513..b4efc3383 100644 --- a/ci/workflows.yml +++ b/ci/workflows.yml @@ -288,6 +288,7 @@ workflows: docker run --rm + -v $PWD/compiler/base/asm-cleanup:/compiler/base/asm-cleanup -v $PWD/compiler/base/modify-cargo-toml:/compiler/base/modify-cargo-toml -v $PWD/ui:/ui -v ~/.cargo/git:/root/.cargo/git diff --git a/compiler/base/.dockerignore b/compiler/base/.dockerignore index ddfc04b3b..6eedd68fa 100644 --- a/compiler/base/.dockerignore +++ b/compiler/base/.dockerignore @@ -1 +1,2 @@ +asm-cleanup/target modify-cargo-toml/target diff --git a/compiler/base/asm-cleanup/.gitignore b/compiler/base/asm-cleanup/.gitignore new file mode 100644 index 000000000..1e7caa9ea --- /dev/null +++ b/compiler/base/asm-cleanup/.gitignore @@ -0,0 +1,2 @@ +Cargo.lock +target/ diff --git a/compiler/base/asm-cleanup/Cargo.toml b/compiler/base/asm-cleanup/Cargo.toml new file mode 100644 index 000000000..98645dd49 --- /dev/null +++ b/compiler/base/asm-cleanup/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "asm-cleanup" +version = "0.1.0" +edition = "2018" + +[workspace] + +[dependencies] +lazy_static = "1.0.0" +petgraph = "0.6.0" +regex = "1.0.0" +rustc-demangle = "0.1.5" diff --git a/ui/src/asm_cleanup.rs b/compiler/base/asm-cleanup/src/lib.rs similarity index 100% rename from ui/src/asm_cleanup.rs rename to compiler/base/asm-cleanup/src/lib.rs diff --git a/ui/Cargo.lock b/ui/Cargo.lock index f391f6ea4..2230497cb 100644 --- a/ui/Cargo.lock +++ b/ui/Cargo.lock @@ -47,6 +47,16 @@ version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bddcadddf5e9015d310179a59bb28c4d4b9920ad0f11e8e14dbadf654890c9a6" +[[package]] +name = "asm-cleanup" +version = "0.1.0" +dependencies = [ + "lazy_static", + "petgraph", + "regex", + "rustc-demangle", +] + [[package]] name = "async-trait" version = "0.1.68" @@ -1695,6 +1705,7 @@ checksum = "497961ef93d974e23eb6f433eb5fe1b7930b659f06d12dec6fc44a8f554c0bba" name = "ui" version = "0.1.0" dependencies = [ + "asm-cleanup", "async-trait", "axum", "dotenv", @@ -1702,10 +1713,8 @@ dependencies = [ "lazy_static", "octocrab", "openssl-probe", - "petgraph", "prometheus", "regex", - "rustc-demangle", "serde", "serde_derive", "serde_json", diff --git a/ui/Cargo.toml b/ui/Cargo.toml index f01f8b758..220c0a2b4 100644 --- a/ui/Cargo.toml +++ b/ui/Cargo.toml @@ -9,6 +9,7 @@ default = ['fork-bomb-prevention'] fork-bomb-prevention = [] [dependencies] +asm-cleanup = { path = "../compiler/base/asm-cleanup" } async-trait = "0.1.52" axum = { version = "0.6", features = ["headers", "ws"] } dotenv = "0.15.0" @@ -16,10 +17,8 @@ futures = "0.3.21" lazy_static = "1.0.0" octocrab = "0.25" openssl-probe = "0.1.2" -petgraph = "0.6.0" prometheus = "0.13.0" regex = "1.0.0" -rustc-demangle = "0.1.5" serde = { version = "1.0", features = ["rc"] } serde_derive = "1.0" serde_json = "1.0" diff --git a/ui/src/main.rs b/ui/src/main.rs index d80699cb7..5699b4400 100644 --- a/ui/src/main.rs +++ b/ui/src/main.rs @@ -14,7 +14,6 @@ use tracing::{error, info, warn}; const DEFAULT_ADDRESS: &str = "127.0.0.1"; const DEFAULT_PORT: u16 = 5000; -mod asm_cleanup; mod env; mod gist; mod metrics; diff --git a/ui/src/sandbox.rs b/ui/src/sandbox.rs index d96a5636b..d13b9662f 100644 --- a/ui/src/sandbox.rs +++ b/ui/src/sandbox.rs @@ -314,11 +314,11 @@ impl Sandbox { if let CompileTarget::Assembly(_, demangle, process) = req.target { if demangle == DemangleAssembly::Demangle { - code = crate::asm_cleanup::demangle_asm(&code); + code = asm_cleanup::demangle_asm(&code); } if process == ProcessAssembly::Filter { - code = crate::asm_cleanup::filter_asm(&code); + code = asm_cleanup::filter_asm(&code); } } else if CompileTarget::Hir == req.target { // TODO: Run rustfmt on the generated HIR. From 5b8e3aeebc7eb4efc76ed7f41c368d86cf9841ca Mon Sep 17 00:00:00 2001 From: Jake Goulding Date: Thu, 11 May 2023 12:50:07 -0400 Subject: [PATCH 4/7] Put the Wasm output in the expected location without the extension --- compiler/base/cargo-wasm | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/compiler/base/cargo-wasm b/compiler/base/cargo-wasm index b9107833d..f6d4b3b88 100755 --- a/compiler/base/cargo-wasm +++ b/compiler/base/cargo-wasm @@ -41,4 +41,9 @@ for wasm in $(find target/ -name '*wasm' -not -path '*/deps/*'); do # wasm2wat spits out an error that we don't care about, so hide it # https://github.com/WebAssembly/wabt/issues/842 # https://stackoverflow.com/a/15936384/155423 + + # The streaming playground expects the file to be without the + # extension while the original playground expects it to be with + # the extension. Support both for now. + cp "${output}.wat" "${output}" done From 7d9117ac650f4d506aac6110ec7f70ca6fa432bd Mon Sep 17 00:00:00 2001 From: Adwin White Date: Fri, 31 Mar 2023 21:33:49 +0800 Subject: [PATCH 5/7] Support CompileRequest in the orchestrator The orchestrator allows starting up a Docker container and then communicating with it while it's running. We can then request files to be read or written in the container as well as running processes. Just enough has been done to connect the pieces needed to support compiling to assembly / LLVM / MIR, etc. Future work will add other operations including the ability to run the code. Co-authored-by: Jake Goulding --- compiler/base/orchestrator/.gitignore | 1 + compiler/base/orchestrator/Cargo.lock | 648 +++++++++ compiler/base/orchestrator/Cargo.toml | 23 + compiler/base/orchestrator/src/bin/worker.rs | 12 + compiler/base/orchestrator/src/coordinator.rs | 1224 +++++++++++++++++ compiler/base/orchestrator/src/lib.rs | 25 + compiler/base/orchestrator/src/message.rs | 132 ++ compiler/base/orchestrator/src/worker.rs | 683 +++++++++ 8 files changed, 2748 insertions(+) create mode 100644 compiler/base/orchestrator/.gitignore create mode 100644 compiler/base/orchestrator/Cargo.lock create mode 100644 compiler/base/orchestrator/Cargo.toml create mode 100644 compiler/base/orchestrator/src/bin/worker.rs create mode 100644 compiler/base/orchestrator/src/coordinator.rs create mode 100644 compiler/base/orchestrator/src/lib.rs create mode 100644 compiler/base/orchestrator/src/message.rs create mode 100644 compiler/base/orchestrator/src/worker.rs diff --git a/compiler/base/orchestrator/.gitignore b/compiler/base/orchestrator/.gitignore new file mode 100644 index 000000000..eb5a316cb --- /dev/null +++ b/compiler/base/orchestrator/.gitignore @@ -0,0 +1 @@ +target diff --git a/compiler/base/orchestrator/Cargo.lock b/compiler/base/orchestrator/Cargo.lock new file mode 100644 index 000000000..3db923433 --- /dev/null +++ b/compiler/base/orchestrator/Cargo.lock @@ -0,0 +1,648 @@ +# This file is automatically @generated by Cargo. +# It is not intended for manual editing. +version = 3 + +[[package]] +name = "aho-corasick" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "43f6cb1bf222025340178f382c426f13757b2960e89779dfcb319c32542a5a41" +dependencies = [ + "memchr", +] + +[[package]] +name = "asm-cleanup" +version = "0.1.0" +dependencies = [ + "lazy_static", + "petgraph", + "regex", + "rustc-demangle", +] + +[[package]] +name = "assertables" +version = "7.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c24e9d990669fbd16806bff449e4ac644fd9b1fca014760087732fe4102f131" + +[[package]] +name = "autocfg" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" + +[[package]] +name = "bincode" +version = "1.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b1f45e9417d87227c7a56d22e471c6206462cba514c7590c09aff4cf6d1ddcad" +dependencies = [ + "serde", +] + +[[package]] +name = "bytes" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "89b2fd2a0dcf38d7971e2194b6b6eebab45ae01067456a7fd93d5547a61b70be" + +[[package]] +name = "doc-comment" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fea41bba32d969b513997752735605054bc0dfa92b4c56bf1189f2e174be7a10" + +[[package]] +name = "fixedbitset" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80" + +[[package]] +name = "fuchsia-cprng" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a06f77d526c1a601b7c4cdd98f54b5eaabffc14d5f2f0296febdc7f357c6d3ba" + +[[package]] +name = "futures" +version = "0.3.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "23342abe12aba583913b2e62f22225ff9c950774065e4bfb61a19cd9770fec40" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-channel" +version = "0.3.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "955518d47e09b25bbebc7a18df10b81f0c766eaf4c4f1cccef2fca5f2a4fb5f2" +dependencies = [ + "futures-core", + "futures-sink", +] + +[[package]] +name = "futures-core" +version = "0.3.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4bca583b7e26f571124fe5b7561d49cb2868d79116cfa0eefce955557c6fee8c" + +[[package]] +name = "futures-executor" +version = "0.3.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ccecee823288125bd88b4d7f565c9e58e41858e47ab72e8ea2d64e93624386e0" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-io" +version = "0.3.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4fff74096e71ed47f8e023204cfd0aa1289cd54ae5430a9523be060cdb849964" + +[[package]] +name = "futures-sink" +version = "0.3.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f43be4fe21a13b9781a69afa4985b0f6ee0e1afab2c6f454a8cf30e2b2237b6e" + +[[package]] +name = "futures-task" +version = "0.3.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "76d3d132be6c0e6aa1534069c705a74a5997a356c0dc2f86a47765e5617c5b65" + +[[package]] +name = "futures-util" +version = "0.3.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "26b01e40b772d54cf6c6d721c1d1abd0647a0106a12ecaa1c186273392a69533" +dependencies = [ + "futures-channel", + "futures-core", + "futures-io", + "futures-sink", + "futures-task", + "memchr", + "pin-project-lite", + "pin-utils", + "slab", +] + +[[package]] +name = "hashbrown" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" + +[[package]] +name = "heck" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "95505c38b4572b2d910cecb0281560f54b440a19336cbbcb27bf6ce6adc6f5a8" + +[[package]] +name = "indexmap" +version = "1.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bd070e393353796e801d209ad339e89596eb4c8d430d18ede6a1cced8fafbd99" +dependencies = [ + "autocfg", + "hashbrown", +] + +[[package]] +name = "lazy_static" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" + +[[package]] +name = "libc" +version = "0.2.146" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f92be4933c13fd498862a9e02a3055f8a8d9c039ce33db97306fd5a6caa7f29b" + +[[package]] +name = "memchr" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2dffe52ecf27772e601905b7522cb4ef790d2cc203488bbd0e2fe85fcb74566d" + +[[package]] +name = "mio" +version = "0.8.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "927a765cd3fc26206e66b296465fa9d3e5ab003e651c1b3c060e7956d96b19d2" +dependencies = [ + "libc", + "wasi", + "windows-sys", +] + +[[package]] +name = "modify-cargo-toml" +version = "0.1.0" +dependencies = [ + "serde", + "serde_derive", + "toml", +] + +[[package]] +name = "orchestrator" +version = "0.1.0" +dependencies = [ + "asm-cleanup", + "assertables", + "bincode", + "futures", + "modify-cargo-toml", + "serde", + "snafu", + "tempdir", + "tokio", + "tokio-stream", + "tokio-util", + "toml", +] + +[[package]] +name = "petgraph" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4dd7d28ee937e54fe3080c91faa1c3a46c06de6252988a7f4592ba2310ef22a4" +dependencies = [ + "fixedbitset", + "indexmap", +] + +[[package]] +name = "pin-project" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c95a7476719eab1e366eaf73d0260af3021184f18177925b07f54b30089ceead" +dependencies = [ + "pin-project-internal", +] + +[[package]] +name = "pin-project-internal" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "39407670928234ebc5e6e580247dd567ad73a3578460c5990f9503df207e8f07" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.18", +] + +[[package]] +name = "pin-project-lite" +version = "0.2.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e0a7ae3ac2f1173085d398531c705756c94a4c56843785df85a60c1a0afac116" + +[[package]] +name = "pin-utils" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" + +[[package]] +name = "proc-macro2" +version = "1.0.60" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dec2b086b7a862cf4de201096214fa870344cf922b2b30c167badb3af3195406" +dependencies = [ + "unicode-ident", +] + +[[package]] +name = "quote" +version = "1.0.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b9ab9c7eadfd8df19006f1cf1a4aed13540ed5cbc047010ece5826e10825488" +dependencies = [ + "proc-macro2", +] + +[[package]] +name = "rand" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "552840b97013b1a26992c11eac34bdd778e464601a4c2054b5f0bff7c6761293" +dependencies = [ + "fuchsia-cprng", + "libc", + "rand_core 0.3.1", + "rdrand", + "winapi", +] + +[[package]] +name = "rand_core" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a6fdeb83b075e8266dcc8762c22776f6877a63111121f5f8c7411e5be7eed4b" +dependencies = [ + "rand_core 0.4.2", +] + +[[package]] +name = "rand_core" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c33a3c44ca05fa6f1807d8e6743f3824e8509beca625669633be0acbdf509dc" + +[[package]] +name = "rdrand" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "678054eb77286b51581ba43620cc911abf02758c91f93f479767aed0f90458b2" +dependencies = [ + "rand_core 0.3.1", +] + +[[package]] +name = "regex" +version = "1.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d0ab3ca65655bb1e41f2a8c8cd662eb4fb035e67c3f78da1d61dffe89d07300f" +dependencies = [ + "aho-corasick", + "memchr", + "regex-syntax", +] + +[[package]] +name = "regex-syntax" +version = "0.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "436b050e76ed2903236f032a59761c1eb99e1b0aead2c257922771dab1fc8c78" + +[[package]] +name = "remove_dir_all" +version = "0.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3acd125665422973a33ac9d3dd2df85edad0f4ae9b00dafb1a05e43a9f5ef8e7" +dependencies = [ + "winapi", +] + +[[package]] +name = "rustc-demangle" +version = "0.1.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76" + +[[package]] +name = "serde" +version = "1.0.164" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e8c8cf938e98f769bc164923b06dce91cea1751522f46f8466461af04c9027d" +dependencies = [ + "serde_derive", +] + +[[package]] +name = "serde_derive" +version = "1.0.164" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d9735b638ccc51c28bf6914d90a2e9725b377144fc612c49a611fddd1b631d68" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.18", +] + +[[package]] +name = "serde_spanned" +version = "0.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "93107647184f6027e3b7dcb2e11034cf95ffa1e3a682c67951963ac69c1c007d" +dependencies = [ + "serde", +] + +[[package]] +name = "signal-hook-registry" +version = "1.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d8229b473baa5980ac72ef434c4415e70c4b5e71b423043adb4ba059f89c99a1" +dependencies = [ + "libc", +] + +[[package]] +name = "slab" +version = "0.4.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6528351c9bc8ab22353f9d776db39a20288e8d6c37ef8cfe3317cf875eecfc2d" +dependencies = [ + "autocfg", +] + +[[package]] +name = "snafu" +version = "0.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cb0656e7e3ffb70f6c39b3c2a86332bb74aa3c679da781642590f3c1118c5045" +dependencies = [ + "doc-comment", + "futures-core", + "pin-project", + "snafu-derive", +] + +[[package]] +name = "snafu-derive" +version = "0.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "475b3bbe5245c26f2d8a6f62d67c1f30eb9fffeccee721c45d162c3ebbdf81b2" +dependencies = [ + "heck", + "proc-macro2", + "quote", + "syn 1.0.109", +] + +[[package]] +name = "syn" +version = "1.0.109" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "72b64191b275b66ffe2469e8af2c1cfe3bafa67b529ead792a6d0160888b4237" +dependencies = [ + "proc-macro2", + "quote", + "unicode-ident", +] + +[[package]] +name = "syn" +version = "2.0.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32d41677bcbe24c20c52e7c70b0d8db04134c5d1066bf98662e2871ad200ea3e" +dependencies = [ + "proc-macro2", + "quote", + "unicode-ident", +] + +[[package]] +name = "tempdir" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "15f2b5fb00ccdf689e0149d1b1b3c03fead81c2b37735d812fa8bddbbf41b6d8" +dependencies = [ + "rand", + "remove_dir_all", +] + +[[package]] +name = "tokio" +version = "1.28.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94d7b1cfd2aa4011f2de74c2c4c63665e27a71006b0a192dcd2710272e73dfa2" +dependencies = [ + "autocfg", + "bytes", + "libc", + "mio", + "pin-project-lite", + "signal-hook-registry", + "tokio-macros", + "windows-sys", +] + +[[package]] +name = "tokio-macros" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "630bdcf245f78637c13ec01ffae6187cca34625e8c63150d424b59e55af2675e" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.18", +] + +[[package]] +name = "tokio-stream" +version = "0.1.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "397c988d37662c7dda6d2208364a706264bf3d6138b11d436cbac0ad38832842" +dependencies = [ + "futures-core", + "pin-project-lite", + "tokio", +] + +[[package]] +name = "tokio-util" +version = "0.7.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "806fe8c2c87eccc8b3267cbae29ed3ab2d0bd37fca70ab622e46aaa9375ddb7d" +dependencies = [ + "bytes", + "futures-core", + "futures-sink", + "pin-project-lite", + "tokio", +] + +[[package]] +name = "toml" +version = "0.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d6135d499e69981f9ff0ef2167955a5333c35e36f6937d382974566b3d5b94ec" +dependencies = [ + "serde", + "serde_spanned", + "toml_datetime", + "toml_edit", +] + +[[package]] +name = "toml_datetime" +version = "0.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a76a9312f5ba4c2dec6b9161fdf25d87ad8a09256ccea5a556fef03c706a10f" +dependencies = [ + "serde", +] + +[[package]] +name = "toml_edit" +version = "0.19.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2380d56e8670370eee6566b0bfd4265f65b3f432e8c6d85623f728d4fa31f739" +dependencies = [ + "indexmap", + "serde", + "serde_spanned", + "toml_datetime", + "winnow", +] + +[[package]] +name = "unicode-ident" +version = "1.0.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b15811caf2415fb889178633e7724bad2509101cde276048e013b9def5e51fa0" + +[[package]] +name = "wasi" +version = "0.11.0+wasi-snapshot-preview1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" + +[[package]] +name = "winapi" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419" +dependencies = [ + "winapi-i686-pc-windows-gnu", + "winapi-x86_64-pc-windows-gnu", +] + +[[package]] +name = "winapi-i686-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" + +[[package]] +name = "winapi-x86_64-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" + +[[package]] +name = "windows-sys" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "677d2418bec65e3338edb076e806bc1ec15693c5d0104683f2efe857f61056a9" +dependencies = [ + "windows-targets", +] + +[[package]] +name = "windows-targets" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b1eb6f0cd7c80c79759c929114ef071b87354ce476d9d94271031c0497adfd5" +dependencies = [ + "windows_aarch64_gnullvm", + "windows_aarch64_msvc", + "windows_i686_gnu", + "windows_i686_msvc", + "windows_x86_64_gnu", + "windows_x86_64_gnullvm", + "windows_x86_64_msvc", +] + +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "91ae572e1b79dba883e0d315474df7305d12f569b400fcf90581b06062f7e1bc" + +[[package]] +name = "windows_aarch64_msvc" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b2ef27e0d7bdfcfc7b868b317c1d32c641a6fe4629c171b8928c7b08d98d7cf3" + +[[package]] +name = "windows_i686_gnu" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "622a1962a7db830d6fd0a69683c80a18fda201879f0f447f065a3b7467daa241" + +[[package]] +name = "windows_i686_msvc" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4542c6e364ce21bf45d69fdd2a8e455fa38d316158cfd43b3ac1c5b1b19f8e00" + +[[package]] +name = "windows_x86_64_gnu" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ca2b8a661f7628cbd23440e50b05d705db3686f894fc9580820623656af974b1" + +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7896dbc1f41e08872e9d5e8f8baa8fdd2677f29468c4e156210174edc7f7b953" + +[[package]] +name = "windows_x86_64_msvc" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a515f5799fe4961cb532f983ce2b23082366b898e52ffbce459c86f67c8378a" + +[[package]] +name = "winnow" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "61de7bac303dc551fe038e2b3cef0f571087a47571ea6e79a87692ac99b99699" +dependencies = [ + "memchr", +] diff --git a/compiler/base/orchestrator/Cargo.toml b/compiler/base/orchestrator/Cargo.toml new file mode 100644 index 000000000..dd6083935 --- /dev/null +++ b/compiler/base/orchestrator/Cargo.toml @@ -0,0 +1,23 @@ +[package] +name = "orchestrator" +version = "0.1.0" +edition = "2021" + +[workspace] + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html +[dependencies] +asm-cleanup = { path = "../asm-cleanup" } +bincode = { version = "1.3", default-features = false } +futures = { version = "0.3.28", default-features = false, features = ["executor"] } +modify-cargo-toml = { path = "../modify-cargo-toml", default-features = false } +serde = { version = "1.0", default-features = false, features = ["derive"] } +snafu = { version = "0.7.4", default-features = false, features = ["futures", "std"] } +tokio = { version = "1.28", default-features = false, features = ["fs", "io-std", "io-util", "macros", "process", "rt", "time", "sync"] } +tokio-stream = { version = "0.1.14", default-features = false } +tokio-util = { version = "0.7.8", default-features = false, features = ["io", "io-util"] } +toml = { version = "0.7.3", default-features = false, features = ["parse", "display"] } + +[dev-dependencies] +assertables = "7.0.1" +tempdir = "0.3.7" diff --git a/compiler/base/orchestrator/src/bin/worker.rs b/compiler/base/orchestrator/src/bin/worker.rs new file mode 100644 index 000000000..504e26060 --- /dev/null +++ b/compiler/base/orchestrator/src/bin/worker.rs @@ -0,0 +1,12 @@ +use orchestrator::worker::{listen, Error}; +use std::env; + +#[tokio::main(flavor = "current_thread")] +#[snafu::report] +pub async fn main() -> Result<(), Error> { + let project_dir = env::args_os() + .nth(1) + .expect("Please specify project directory as the first argument"); + + listen(project_dir).await +} diff --git a/compiler/base/orchestrator/src/coordinator.rs b/compiler/base/orchestrator/src/coordinator.rs new file mode 100644 index 000000000..24fa636eb --- /dev/null +++ b/compiler/base/orchestrator/src/coordinator.rs @@ -0,0 +1,1224 @@ +use snafu::prelude::*; +use std::{ + collections::HashMap, + mem, + process::Stdio, + sync::{ + atomic::{AtomicU64, Ordering}, + Arc, + }, + time::Duration, +}; +use tokio::{ + join, + process::{Child, ChildStdin, ChildStdout, Command}, + select, + sync::{mpsc, oneshot}, + task::{JoinHandle, JoinSet}, + time::{self, MissedTickBehavior}, +}; +use tokio_stream::{wrappers::ReceiverStream, StreamExt}; +use tokio_util::{io::SyncIoBridge, sync::CancellationToken}; + +use crate::{ + bincode_input_closed, + message::{ + CoordinatorMessage, ExecuteCommandRequest, JobId, Multiplexed, OneToOneResponse, + ReadFileRequest, ReadFileResponse, SerializedError, WorkerMessage, WriteFileRequest, + }, + DropErrorDetailsExt, +}; + +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +pub enum AssemblyFlavor { + Att, + Intel, +} + +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +pub enum DemangleAssembly { + Demangle, + Mangle, +} + +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +pub enum ProcessAssembly { + Filter, + Raw, +} + +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +pub enum CompileTarget { + Assembly(AssemblyFlavor, DemangleAssembly, ProcessAssembly), + Hir, + LlvmIr, + Mir, + Wasm, +} + +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +pub enum Channel { + Stable, + Beta, + Nightly, +} + +impl Channel { + pub(crate) const ALL: [Self; 3] = [Self::Stable, Self::Beta, Self::Nightly]; + + #[cfg(test)] + pub(crate) fn to_str(self) -> &'static str { + match self { + Channel::Stable => "stable", + Channel::Beta => "beta", + Channel::Nightly => "nightly", + } + } +} + +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +pub enum Mode { + Debug, + Release, +} + +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +pub enum Edition { + Rust2015, + Rust2018, + Rust2021, +} + +impl Edition { + #[cfg(test)] + pub(crate) const ALL: [Self; 3] = [Self::Rust2015, Self::Rust2018, Self::Rust2021]; + + pub(crate) fn to_str(self) -> &'static str { + match self { + Edition::Rust2015 => "2015", + Edition::Rust2018 => "2018", + Edition::Rust2021 => "2021", + } + } + + pub(crate) fn to_cargo_toml_key(self) -> &'static str { + self.to_str() + } +} + +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +pub enum CrateType { + Binary, + Library(LibraryType), +} + +impl CrateType { + pub(crate) fn to_cargo_toml_key(self) -> &'static str { + use {CrateType::*, LibraryType::*}; + + match self { + Binary => "bin", + Library(Lib) => "lib", + Library(Dylib) => "dylib", + Library(Rlib) => "rlib", + Library(Staticlib) => "staticlib", + Library(Cdylib) => "cdylib", + Library(ProcMacro) => "proc-macro", + } + } + + pub(crate) fn to_library_cargo_toml_key(self) -> Option<&'static str> { + if self == Self::Binary { + None + } else { + Some(self.to_cargo_toml_key()) + } + } +} + +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +pub enum LibraryType { + Lib, + Dylib, + Rlib, + Staticlib, + Cdylib, + ProcMacro, +} + +#[derive(Debug, Clone)] +pub struct CompileRequest { + pub target: CompileTarget, + pub channel: Channel, + pub crate_type: CrateType, + pub mode: Mode, + pub edition: Edition, + pub tests: bool, + pub backtrace: bool, + pub code: String, +} + +impl CompileRequest { + pub(crate) fn write_main_request(&self) -> WriteFileRequest { + let path = match self.crate_type { + CrateType::Binary => "src/main.rs", + CrateType::Library(_) => "src/lib.rs", + }; + + WriteFileRequest { + path: path.to_owned(), + content: self.code.clone().into(), + } + } + + pub(crate) fn execute_cargo_request(&self, output_path: &str) -> ExecuteCommandRequest { + use CompileTarget::*; + + let mut args = if let Wasm = self.target { + vec!["wasm"] + } else { + vec!["rustc"] + }; + if let Mode::Release = self.mode { + args.push("--release"); + } + + match self.target { + Assembly(flavor, _, _) => { + args.extend(&["--", "--emit", "asm=compilation"]); + + // Enable extra assembly comments for nightly builds + if let Channel::Nightly = self.channel { + args.push("-Z"); + args.push("asm-comments"); + } + + args.push("-C"); + match flavor { + AssemblyFlavor::Att => args.push("llvm-args=-x86-asm-syntax=att"), + AssemblyFlavor::Intel => args.push("llvm-args=-x86-asm-syntax=intel"), + } + } + LlvmIr => args.extend(&["--", "--emit", "llvm-ir=compilation"]), + Mir => args.extend(&["--", "--emit", "mir=compilation"]), + Hir => args.extend(&["--", "-Zunpretty=hir", "-o", output_path]), + Wasm => args.extend(&["-o", output_path]), + } + let mut envs = HashMap::new(); + if self.backtrace { + envs.insert("RUST_BACKTRACE".to_owned(), "1".to_owned()); + } + + ExecuteCommandRequest { + cmd: "cargo".to_owned(), + args: args.into_iter().map(|s| s.to_owned()).collect(), + envs, + cwd: None, + } + } + + pub(crate) fn modify_cargo_toml(&self, mut cargo_toml: toml::Value) -> toml::Value { + cargo_toml = modify_cargo_toml::set_edition(cargo_toml, self.edition.to_cargo_toml_key()); + + if let Some(crate_type) = self.crate_type.to_library_cargo_toml_key() { + cargo_toml = modify_cargo_toml::set_crate_type(cargo_toml, crate_type); + } + + if CompileTarget::Wasm == self.target { + cargo_toml = modify_cargo_toml::remove_dependencies(cargo_toml); + cargo_toml = modify_cargo_toml::set_release_lto(cargo_toml, true); + } + + cargo_toml + } + + pub(crate) fn postprocess_result(&self, mut code: String) -> String { + if let CompileTarget::Assembly(_, demangle, process) = self.target { + if demangle == DemangleAssembly::Demangle { + code = asm_cleanup::demangle_asm(&code); + } + + if process == ProcessAssembly::Filter { + code = asm_cleanup::filter_asm(&code); + } + } + + code + } +} + +#[derive(Debug, Clone)] +pub struct CompileResponseWithOutput { + pub success: bool, + pub code: String, + pub stdout: String, + pub stderr: String, +} + +#[derive(Debug, Clone)] +pub struct CompileResponse { + pub success: bool, + pub code: String, +} + +#[derive(Debug)] +enum DemultiplexCommand { + Listen(JobId, mpsc::Sender), + ListenOnce(JobId, oneshot::Sender), +} + +#[derive(Debug)] +pub struct Coordinator { + backend: B, + // Consider making these lazily-created and/or idly time out + stable: Container, + beta: Container, + nightly: Container, + token: CancellationToken, +} + +impl Coordinator +where + B: Backend, +{ + pub async fn new(backend: B) -> Result { + let token = CancellationToken::new(); + + let [stable, beta, nightly] = + Channel::ALL.map(|channel| Container::new(channel, token.clone(), &backend)); + + let (stable, beta, nightly) = join!(stable, beta, nightly); + + let stable = stable?; + let beta = beta?; + let nightly = nightly?; + + Ok(Self { + backend, + stable, + beta, + nightly, + token, + }) + } + + pub async fn compile( + &self, + request: CompileRequest, + ) -> Result { + self.select_channel(request.channel).compile(request).await + } + + pub async fn begin_compile( + &self, + request: CompileRequest, + ) -> Result { + self.select_channel(request.channel) + .begin_compile(request) + .await + } + + pub async fn shutdown(self) -> Result { + let Self { + backend, + stable, + beta, + nightly, + token, + } = self; + token.cancel(); + + let (stable, beta, nightly) = join!(stable.shutdown(), beta.shutdown(), nightly.shutdown()); + + stable?; + beta?; + nightly?; + + Ok(backend) + } + + fn select_channel(&self, channel: Channel) -> &Container { + match channel { + Channel::Stable => &self.stable, + Channel::Beta => &self.beta, + Channel::Nightly => &self.nightly, + } + } +} + +#[derive(Debug)] +struct Container { + task: JoinHandle>, + modify_cargo_toml: ModifyCargoToml, + commander: Commander, +} + +impl Container { + async fn new( + channel: Channel, + token: CancellationToken, + backend: &impl Backend, + ) -> Result { + let (mut child, stdin, stdout) = backend.run_worker_in_background(channel)?; + let IoQueue { + mut tasks, + to_worker_tx, + from_worker_rx, + } = spawn_io_queue(stdin, stdout, token); + + let (command_tx, command_rx) = mpsc::channel(8); + let demultiplex_task = tokio::spawn(Commander::demultiplex(command_rx, from_worker_rx)); + + let task = tokio::spawn(async move { + let (c, d, t) = join!(child.wait(), demultiplex_task, tasks.join_next()); + c.context(JoinWorkerSnafu)?; + d.context(DemultiplexerTaskPanickedSnafu)? + .context(DemultiplexerTaskFailedSnafu)?; + if let Some(t) = t { + t.context(IoQueuePanickedSnafu)??; + } + + Ok(()) + }); + + let commander = Commander { + to_worker_tx, + to_demultiplexer_tx: command_tx, + id: Default::default(), + }; + + let modify_cargo_toml = ModifyCargoToml::new(commander.clone()) + .await + .context(CouldNotLoadCargoTomlSnafu)?; + + Ok(Container { + task, + modify_cargo_toml, + commander, + }) + } + + async fn compile( + &self, + request: CompileRequest, + ) -> Result { + use compile_error::*; + + let ActiveCompilation { + task, + stdout_rx, + stderr_rx, + } = self.begin_compile(request).await?; + + let stdout = ReceiverStream::new(stdout_rx).collect(); + let stderr = ReceiverStream::new(stderr_rx).collect(); + + let (result, stdout, stderr) = join!(task, stdout, stderr); + + let CompileResponse { success, code } = result.context(CompilationTaskPanickedSnafu)??; + Ok(CompileResponseWithOutput { + success, + code, + stdout, + stderr, + }) + } + + async fn begin_compile( + &self, + request: CompileRequest, + ) -> Result { + use compile_error::*; + + let output_path: &str = "compilation"; + + let write_main = request.write_main_request(); + let execute_cargo = request.execute_cargo_request(output_path); + let read_output = ReadFileRequest { + path: output_path.to_owned(), + }; + + let write_main = self.commander.one(write_main); + let modify_cargo_toml = self.modify_cargo_toml.modify_for(&request); + + let (write_main, modify_cargo_toml) = join!(write_main, modify_cargo_toml); + + write_main.context(CouldNotWriteCodeSnafu)?; + modify_cargo_toml.context(CouldNotModifyCargoTomlSnafu)?; + + let (stdout_tx, stdout_rx) = mpsc::channel(8); + let (stderr_tx, stderr_rx) = mpsc::channel(8); + + let mut from_worker_rx = self + .commander + .many(execute_cargo) + .await + .context(CouldNotStartCompilerSnafu)?; + + let task = tokio::spawn({ + let commander = self.commander.clone(); + async move { + let mut success = false; + + while let Some(container_msg) = from_worker_rx.recv().await { + match container_msg { + WorkerMessage::ExecuteCommand(resp) => { + success = resp.success; + break; + } + WorkerMessage::StdoutPacket(packet) => { + stdout_tx.send(packet).await.ok(/* Receiver gone, that's OK */); + } + WorkerMessage::StderrPacket(packet) => { + stderr_tx.send(packet).await.ok(/* Receiver gone, that's OK */); + } + _ => return UnexpectedMessageSnafu.fail(), + } + } + + let code = if success { + let file: ReadFileResponse = commander + .one(read_output) + .await + .context(CouldNotReadCodeSnafu)?; + String::from_utf8(file.0).context(CodeNotUtf8Snafu)? + } else { + String::new() + }; + + // TODO: This is synchronous... + let code = request.postprocess_result(code); + + Ok(CompileResponse { success, code }) + } + }); + + Ok(ActiveCompilation { + task, + stdout_rx, + stderr_rx, + }) + } + + async fn shutdown(self) -> Result<()> { + let Self { + task, + modify_cargo_toml, + commander, + } = self; + drop(commander); + drop(modify_cargo_toml); + task.await.context(ContainerTaskPanickedSnafu)? + } +} + +#[derive(Debug)] +pub struct ActiveCompilation { + pub task: JoinHandle>, + pub stdout_rx: mpsc::Receiver, + pub stderr_rx: mpsc::Receiver, +} + +#[derive(Debug, Snafu)] +#[snafu(module)] +pub enum CompileError { + #[snafu(display("The compilation task panicked"))] + CompilationTaskPanicked { source: tokio::task::JoinError }, + + #[snafu(display("Could not modify Cargo.toml"))] + CouldNotModifyCargoToml { source: ModifyCargoTomlError }, + + #[snafu(display("Could not write source code"))] + CouldNotWriteCode { source: CommanderError }, + + #[snafu(display("Could not start compiler"))] + CouldNotStartCompiler { source: CommanderError }, + + #[snafu(display("Received an unexpected message"))] + UnexpectedMessage, + + #[snafu(display("Could not read the compilation output"))] + CouldNotReadCode { source: CommanderError }, + + #[snafu(display("The compilation output was not UTF-8"))] + CodeNotUtf8 { source: std::string::FromUtf8Error }, +} + +#[derive(Debug, Clone)] +struct Commander { + to_worker_tx: mpsc::Sender>, + to_demultiplexer_tx: mpsc::Sender, + id: Arc, +} + +#[derive(Debug)] +struct ModifyCargoToml { + commander: Commander, + cargo_toml: toml::Value, +} + +impl ModifyCargoToml { + const PATH: &str = "Cargo.toml"; + + async fn new(commander: Commander) -> Result { + let cargo_toml = Self::read(&commander).await?; + Ok(Self { + commander, + cargo_toml, + }) + } + + async fn modify_for(&self, request: &CompileRequest) -> Result<(), ModifyCargoTomlError> { + let cargo_toml = self.cargo_toml.clone(); + let cargo_toml = request.modify_cargo_toml(cargo_toml); + Self::write(&self.commander, cargo_toml).await + } + + async fn read(commander: &Commander) -> Result { + use modify_cargo_toml_error::*; + + let path = Self::PATH.to_owned(); + let cargo_toml = commander + .one(ReadFileRequest { path }) + .await + .context(CouldNotReadSnafu)?; + + let cargo_toml = String::from_utf8(cargo_toml.0)?; + let cargo_toml = toml::from_str(&cargo_toml)?; + + Ok(cargo_toml) + } + + async fn write( + commander: &Commander, + cargo_toml: toml::Value, + ) -> Result<(), ModifyCargoTomlError> { + use modify_cargo_toml_error::*; + + let cargo_toml = toml::to_string(&cargo_toml)?; + let content = cargo_toml.into_bytes(); + + let path = Self::PATH.to_owned(); + commander + .one(WriteFileRequest { path, content }) + .await + .context(CouldNotWriteSnafu)?; + + Ok(()) + } +} + +#[derive(Debug, Snafu)] +#[snafu(module)] +pub enum ModifyCargoTomlError { + #[snafu(display("Could not read the file"))] + CouldNotRead { source: CommanderError }, + + #[snafu(display("Could not parse the file as UTF-8"))] + #[snafu(context(false))] + InvalidUtf8 { source: std::string::FromUtf8Error }, + + #[snafu(display("Could not deserialize the file as TOML"))] + #[snafu(context(false))] + CouldNotDeserialize { source: toml::de::Error }, + + #[snafu(display("Could not serialize the file as TOML"))] + #[snafu(context(false))] + CouldNotSerialize { source: toml::ser::Error }, + + #[snafu(display("Could not write the file"))] + CouldNotWrite { source: CommanderError }, +} + +impl Commander { + const GC_PERIOD: Duration = Duration::from_secs(30); + + async fn demultiplex( + mut command_rx: mpsc::Receiver, + mut from_worker_rx: mpsc::Receiver>, + ) -> Result<(), CommanderError> { + use commander_error::*; + + let mut waiting = HashMap::new(); + let mut waiting_once = HashMap::new(); + + let mut gc_interval = time::interval(Self::GC_PERIOD); + gc_interval.set_missed_tick_behavior(MissedTickBehavior::Delay); + + loop { + select! { + command = command_rx.recv() => { + let Some(command) = command else { break }; + + match command { + DemultiplexCommand::Listen(job_id, waiter) => { + let old = waiting.insert(job_id, waiter); + ensure!(old.is_none(), DuplicateDemultiplexerClientSnafu { job_id }); + } + + DemultiplexCommand::ListenOnce(job_id, waiter) => { + let old = waiting_once.insert(job_id, waiter); + ensure!(old.is_none(), DuplicateDemultiplexerClientSnafu { job_id }); + } + } + }, + + msg = from_worker_rx.recv() => { + let Some(Multiplexed(job_id, msg)) = msg else { break }; + + if let Some(waiter) = waiting_once.remove(&job_id) { + waiter.send(msg).ok(/* Don't care about it */); + continue; + } + + if let Some(waiter) = waiting.get(&job_id) { + waiter.send(msg).await.ok(/* Don't care about it */); + continue; + } + + // Should we log messages that didn't have a receiver? + } + + // Find any channels where the receivers have been + // dropped and clear out the sending halves. + _ = gc_interval.tick() => { + waiting = mem::take(&mut waiting) + .into_iter() + .filter(|(_job_id, tx)| !tx.is_closed()) + .collect(); + + waiting_once = mem::take(&mut waiting_once) + .into_iter() + .filter(|(_job_id, tx)| !tx.is_closed()) + .collect(); + } + } + } + + Ok(()) + } + + fn next_id(&self) -> JobId { + self.id.fetch_add(1, Ordering::SeqCst) + } + + async fn send_to_demultiplexer( + &self, + command: DemultiplexCommand, + ) -> Result<(), CommanderError> { + use commander_error::*; + + self.to_demultiplexer_tx + .send(command) + .await + .drop_error_details() + .context(UnableToSendToDemultiplexerSnafu) + } + + async fn send_to_worker( + &self, + message: Multiplexed, + ) -> Result<(), CommanderError> { + use commander_error::*; + + self.to_worker_tx + .send(message) + .await + .drop_error_details() + .context(UnableToSendToWorkerSnafu) + } + + async fn one(&self, message: M) -> Result + where + M: Into, + M: OneToOneResponse, + Result: TryFrom, + { + use commander_error::*; + + let id = self.next_id(); + let (from_demultiplexer_tx, from_demultiplexer_rx) = oneshot::channel(); + + self.send_to_demultiplexer(DemultiplexCommand::ListenOnce(id, from_demultiplexer_tx)) + .await?; + self.send_to_worker(Multiplexed(id, message.into())).await?; + let msg = from_demultiplexer_rx + .await + .context(UnableToReceiveFromDemultiplexerSnafu)?; + + match msg.try_into() { + Ok(Ok(v)) => Ok(v), + Ok(Err(e)) => WorkerOperationFailedSnafu { text: e.0 }.fail(), + Err(_) => UnexpectedResponseTypeSnafu.fail(), + } + } + + async fn many(&self, message: M) -> Result, CommanderError> + where + M: Into, + { + let id = self.next_id(); + let (from_worker_tx, from_worker_rx) = mpsc::channel(8); + + self.send_to_demultiplexer(DemultiplexCommand::Listen(id, from_worker_tx)) + .await?; + self.send_to_worker(Multiplexed(id, message.into())).await?; + + Ok(from_worker_rx) + } +} + +#[derive(Debug, Snafu)] +#[snafu(module)] +pub enum CommanderError { + #[snafu(display("Two listeners subscribed to job {job_id}"))] + DuplicateDemultiplexerClient { job_id: JobId }, + + #[snafu(display("Could not send a message to the demultiplexer"))] + UnableToSendToDemultiplexer { source: mpsc::error::SendError<()> }, + + #[snafu(display("Did not receive a response from the demultiplexer"))] + UnableToReceiveFromDemultiplexer { source: oneshot::error::RecvError }, + + #[snafu(display("Could not send a message to the worker"))] + UnableToSendToWorker { source: mpsc::error::SendError<()> }, + + #[snafu(display("Did not receive the expected response type from the worker"))] + UnexpectedResponseType, + + #[snafu(display("The worker operation failed: {text}"))] + WorkerOperationFailed { text: String }, +} + +pub trait Backend { + fn run_worker_in_background( + &self, + channel: Channel, + ) -> Result<(Child, ChildStdin, ChildStdout)> { + let mut child = self + .prepare_worker_command(channel) + .stdin(Stdio::piped()) + .stdout(Stdio::piped()) + .stderr(Stdio::inherit()) + .spawn() + .context(SpawnWorkerSnafu)?; + let stdin = child.stdin.take().context(WorkerStdinCaptureSnafu)?; + let stdout = child.stdout.take().context(WorkerStdoutCaptureSnafu)?; + Ok((child, stdin, stdout)) + } + + fn prepare_worker_command(&self, channel: Channel) -> Command; +} + +impl Backend for &B +where + B: Backend, +{ + fn prepare_worker_command(&self, channel: Channel) -> Command { + B::prepare_worker_command(self, channel) + } +} + +pub type Result = ::std::result::Result; + +#[derive(Debug, Snafu)] +pub enum Error { + #[snafu(display("Reached system process limit"))] + SpawnWorker { source: std::io::Error }, + + #[snafu(display("Unable to join child process"))] + JoinWorker { source: std::io::Error }, + + #[snafu(display("The demultiplexer task panicked"))] + DemultiplexerTaskPanicked { source: tokio::task::JoinError }, + + #[snafu(display("The demultiplexer task failed"))] + DemultiplexerTaskFailed { source: CommanderError }, + + #[snafu(display("The IO queue task panicked"))] + IoQueuePanicked { source: tokio::task::JoinError }, + + #[snafu(display("The container task panicked"))] + ContainerTaskPanicked { source: tokio::task::JoinError }, + + #[snafu(display("Worker process's stdin not captured"))] + WorkerStdinCapture, + + #[snafu(display("Worker process's stdout not captured"))] + WorkerStdoutCapture, + + #[snafu(display("Failed to flush child stdin"))] + WorkerStdinFlush { source: std::io::Error }, + + #[snafu(display("Failed to deserialize worker message"))] + WorkerMessageDeserialization { source: bincode::Error }, + + #[snafu(display("Failed to serialize coordinator message"))] + CoordinatorMessageSerialization { source: bincode::Error }, + + #[snafu(display("Failed to send worker message through channel"))] + UnableToSendWorkerMessage { source: mpsc::error::SendError<()> }, + + #[snafu(display("Unable to load original Cargo.toml"))] + CouldNotLoadCargoToml { source: ModifyCargoTomlError }, +} + +struct IoQueue { + tasks: JoinSet>, + to_worker_tx: mpsc::Sender>, + from_worker_rx: mpsc::Receiver>, +} + +// Child stdin/out <--> messages. +fn spawn_io_queue(stdin: ChildStdin, stdout: ChildStdout, token: CancellationToken) -> IoQueue { + use std::io::{prelude::*, BufReader, BufWriter}; + + let mut tasks = JoinSet::new(); + + let (tx, from_worker_rx) = mpsc::channel(8); + tasks.spawn_blocking(move || { + let stdout = SyncIoBridge::new(stdout); + let mut stdout = BufReader::new(stdout); + + loop { + let worker_msg = bincode::deserialize_from(&mut stdout); + + if bincode_input_closed(&worker_msg) { + break; + }; + + let worker_msg = worker_msg.context(WorkerMessageDeserializationSnafu)?; + + tx.blocking_send(worker_msg) + .drop_error_details() + .context(UnableToSendWorkerMessageSnafu)?; + } + + Ok(()) + }); + + let (to_worker_tx, mut rx) = mpsc::channel(8); + tasks.spawn_blocking(move || { + let stdin = SyncIoBridge::new(stdin); + let mut stdin = BufWriter::new(stdin); + + loop { + let coordinator_msg = futures::executor::block_on(async { + select! { + () = token.cancelled() => None, + msg = rx.recv() => msg, + } + }); + + let Some(coordinator_msg) = coordinator_msg else { break }; + + bincode::serialize_into(&mut stdin, &coordinator_msg) + .context(CoordinatorMessageSerializationSnafu)?; + + stdin.flush().context(WorkerStdinFlushSnafu)?; + } + + Ok(()) + }); + + IoQueue { + tasks, + to_worker_tx, + from_worker_rx, + } +} + +#[cfg(test)] +mod tests { + use assertables::*; + use futures::{Future, FutureExt}; + use std::{sync::Once, time::Duration}; + use tempdir::TempDir; + use tokio::join; + use tokio_stream::{wrappers::ReceiverStream, StreamExt}; + + use super::*; + + #[derive(Debug)] + struct TestBackend { + project_dir: TempDir, + } + + impl TestBackend { + fn new() -> Self { + static COMPILE_WORKER_ONCE: Once = Once::new(); + + COMPILE_WORKER_ONCE.call_once(|| { + let output = std::process::Command::new("cargo") + .arg("build") + .output() + .expect("Build failed"); + assert!(output.status.success(), "Build failed"); + }); + + let project_dir = + TempDir::new("playground").expect("Failed to create temporary project directory"); + + let output = std::process::Command::new("cargo") + .arg("init") + .args(["--name", "playground"]) + .arg(project_dir.path()) + .output() + .expect("Build failed"); + assert!(output.status.success(), "Cargo initialization failed"); + + let main = project_dir.path().join("src").join("main.rs"); + std::fs::remove_file(main).expect("Could not delete main.rs"); + + Self { project_dir } + } + } + + impl Backend for TestBackend { + fn prepare_worker_command(&self, channel: Channel) -> Command { + let toolchain_file = format!(r#"[toolchain]\nchannel = "{}""#, channel.to_str()); + let path = self.project_dir.path().join("rust-toolchain.toml"); + std::fs::write(path, toolchain_file).expect("Couldn't write toolchain file"); + + let mut command = Command::new("./target/debug/worker"); + command.arg(self.project_dir.path()); + command + } + } + + async fn new_coordinator() -> Result> { + Coordinator::new(TestBackend::new()).await + } + + fn new_compile_request() -> CompileRequest { + new_compile_mir_request() + } + + fn new_compile_assembly_request() -> CompileRequest { + CompileRequest { + target: CompileTarget::Assembly( + AssemblyFlavor::Intel, + DemangleAssembly::Demangle, + ProcessAssembly::Filter, + ), + channel: Channel::Beta, + crate_type: CrateType::Library(LibraryType::Lib), + mode: Mode::Release, + edition: Edition::Rust2018, + tests: false, + backtrace: false, + code: r#"pub fn add(a: u8, b: u8) -> u8 { a + b }"#.to_owned(), + } + } + + fn new_compile_hir_request() -> CompileRequest { + new_compile_hir_request_for(Edition::Rust2021) + } + + fn new_compile_hir_request_for(edition: Edition) -> CompileRequest { + CompileRequest { + target: CompileTarget::Hir, + channel: Channel::Nightly, + crate_type: CrateType::Library(LibraryType::Lib), + mode: Mode::Release, + edition, + tests: false, + backtrace: false, + code: r#"pub fn sub(a: u8, b: u8) -> u8 { a - b }"#.to_owned(), + } + } + + fn new_compile_llvm_ir_request() -> CompileRequest { + CompileRequest { + target: CompileTarget::LlvmIr, + channel: Channel::Stable, + crate_type: CrateType::Library(LibraryType::Lib), + mode: Mode::Debug, + edition: Edition::Rust2015, + tests: false, + backtrace: false, + code: r#"pub fn mul(a: u8, b: u8) -> u8 { a * b }"#.to_owned(), + } + } + + fn new_compile_mir_request() -> CompileRequest { + CompileRequest { + target: CompileTarget::Mir, + channel: Channel::Stable, + crate_type: CrateType::Binary, + mode: Mode::Release, + edition: Edition::Rust2021, + tests: false, + backtrace: false, + code: r#"fn main() { println!("Hello World!"); }"#.to_owned(), + } + } + + fn new_compile_wasm_request() -> CompileRequest { + CompileRequest { + target: CompileTarget::Wasm, + channel: Channel::Nightly, // TODO: Can we run this on all channels now? + crate_type: CrateType::Library(LibraryType::Cdylib), + mode: Mode::Release, + edition: Edition::Rust2021, + tests: false, + backtrace: false, + code: r#"#[export_name = "inc"] pub fn inc(a: u8) -> u8 { a + 1 }"#.to_owned(), + } + } + + #[tokio::test] + #[snafu::report] + async fn test_compile_response() -> Result<()> { + let coordinator = new_coordinator().await?; + + let response = coordinator + .compile(new_compile_request()) + .with_timeout() + .await + .unwrap(); + + assert!(response.success, "stderr: {}", response.stderr); + assert_contains!(response.stderr, "Compiling"); + assert_contains!(response.stderr, "Finished"); + + coordinator.shutdown().await?; + + Ok(()) + } + + #[tokio::test] + #[snafu::report] + async fn test_compile_streaming() -> Result<()> { + let coordinator = new_coordinator().await?; + + let ActiveCompilation { + task, + stdout_rx, + stderr_rx, + } = coordinator + .begin_compile(new_compile_request()) + .await + .unwrap(); + + let stdout = ReceiverStream::new(stdout_rx); + let stdout = stdout.collect::(); + + let stderr = ReceiverStream::new(stderr_rx); + let stderr = stderr.collect::(); + + let (complete, _stdout, stderr) = + async { join!(task, stdout, stderr) }.with_timeout().await; + + let response = complete.unwrap().unwrap(); + + assert!(response.success, "stderr: {}", stderr); + assert_contains!(stderr, "Compiling"); + assert_contains!(stderr, "Finished"); + + coordinator.shutdown().await?; + + Ok(()) + } + + #[tokio::test] + #[snafu::report] + async fn test_compile_edition() -> Result<()> { + for edition in Edition::ALL { + let coordinator = new_coordinator().await?; + + let response = coordinator + .compile(new_compile_hir_request_for(edition)) + .with_timeout() + .await + .unwrap(); + + let prelude = format!("std::prelude::rust_{}", edition.to_str()); + + assert!(response.success, "stderr: {}", response.stderr); + assert_contains!(response.code, &prelude); + + coordinator.shutdown().await?; + } + + Ok(()) + } + + #[tokio::test] + #[snafu::report] + async fn test_compile_assembly() -> Result<()> { + let coordinator = new_coordinator().await?; + + let response = coordinator + .compile(new_compile_assembly_request()) + .with_timeout() + .await + .unwrap(); + + //#[cfg(target_arch = "x86_64")] + //let asm = ""; + + #[cfg(target_arch = "aarch64")] + let asm = "w0, w1, w0"; + + assert!(response.success, "stderr: {}", response.stderr); + assert_contains!(response.code, asm); + + coordinator.shutdown().await?; + + Ok(()) + } + + #[tokio::test] + #[snafu::report] + async fn test_compile_hir() -> Result<()> { + let coordinator = new_coordinator().await?; + + let response = coordinator + .compile(new_compile_hir_request()) + .with_timeout() + .await + .unwrap(); + + assert!(response.success, "stderr: {}", response.stderr); + assert_contains!(response.code, "extern crate std"); + + coordinator.shutdown().await?; + + Ok(()) + } + + #[tokio::test] + #[snafu::report] + async fn test_compile_llvm_ir() -> Result<()> { + let coordinator = new_coordinator().await?; + + let response = coordinator + .compile(new_compile_llvm_ir_request()) + .with_timeout() + .await + .unwrap(); + + assert!(response.success, "stderr: {}", response.stderr); + assert_contains!(response.code, "@llvm.umul.with.overflow.i8(i8, i8)"); + + coordinator.shutdown().await?; + + Ok(()) + } + + trait TimeoutExt: Future + Sized { + #[allow(clippy::type_complexity)] + fn with_timeout( + self, + ) -> futures::future::Map< + tokio::time::Timeout, + fn(Result) -> Self::Output, + > { + tokio::time::timeout(Duration::from_millis(5000), self) + .map(|v| v.expect("The operation timed out")) + } + } + + impl TimeoutExt for F {} +} diff --git a/compiler/base/orchestrator/src/lib.rs b/compiler/base/orchestrator/src/lib.rs new file mode 100644 index 000000000..775862939 --- /dev/null +++ b/compiler/base/orchestrator/src/lib.rs @@ -0,0 +1,25 @@ +#![deny(rust_2018_idioms)] + +pub mod coordinator; +mod message; +pub mod worker; + +trait DropErrorDetailsExt { + fn drop_error_details(self) -> Result>; +} + +impl DropErrorDetailsExt for Result> { + fn drop_error_details(self) -> Result> { + self.map_err(|_| tokio::sync::mpsc::error::SendError(())) + } +} + +fn bincode_input_closed(coordinator_msg: &bincode::Result) -> bool { + if let Err(e) = coordinator_msg { + if let bincode::ErrorKind::Io(e) = &**e { + return e.kind() == std::io::ErrorKind::UnexpectedEof; + } + } + + false +} diff --git a/compiler/base/orchestrator/src/message.rs b/compiler/base/orchestrator/src/message.rs new file mode 100644 index 000000000..1421fafb5 --- /dev/null +++ b/compiler/base/orchestrator/src/message.rs @@ -0,0 +1,132 @@ +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; + +pub type JobId = u64; +pub type Path = String; + +macro_rules! impl_narrow_to_broad { + ($enum_type:ident, $($variant_name:ident => $variant_type:ident),* $(,)?) => { + $( + impl From<$variant_type> for $enum_type { + fn from(other: $variant_type) -> Self { + $enum_type::$variant_name(other) + } + } + )* + }; +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct Multiplexed(pub JobId, pub T); + +#[derive(Debug, Serialize, Deserialize)] +pub enum CoordinatorMessage { + WriteFile(WriteFileRequest), + ReadFile(ReadFileRequest), + ExecuteCommand(ExecuteCommandRequest), + StdinPacket(String), +} + +impl_narrow_to_broad!( + CoordinatorMessage, + WriteFile => WriteFileRequest, + ReadFile => ReadFileRequest, + ExecuteCommand => ExecuteCommandRequest, +); + +#[derive(Debug, Serialize, Deserialize)] +pub enum WorkerMessage { + WriteFile(WriteFileResponse), + ReadFile(ReadFileResponse), + ExecuteCommand(ExecuteCommandResponse), + StdoutPacket(String), + StderrPacket(String), + Error(SerializedError), +} + +macro_rules! impl_broad_to_narrow_with_error { + ($enum_type:ident, $($variant_name:ident => $variant_type:ty),* $(,)?) => { + $( + impl TryFrom<$enum_type> for Result<$variant_type, SerializedError> { + type Error = $enum_type; + + fn try_from(other: $enum_type) -> Result { + match other { + $enum_type::$variant_name(x) => Ok(Ok(x)), + $enum_type::Error(e) => Ok(Err(e)), + o => Err(o) + } + } + } + )* + }; +} + +impl_narrow_to_broad!( + WorkerMessage, + WriteFile => WriteFileResponse, + ReadFile => ReadFileResponse, + ExecuteCommand => ExecuteCommandResponse, +); + +impl_broad_to_narrow_with_error!( + WorkerMessage, + WriteFile => WriteFileResponse, + ReadFile => ReadFileResponse, + ExecuteCommand => ExecuteCommandResponse, +); + +#[derive(Debug, Serialize, Deserialize)] +pub struct WriteFileRequest { + pub path: Path, + pub content: Vec, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct WriteFileResponse(pub ()); + +#[derive(Debug, Serialize, Deserialize)] +pub struct ReadFileRequest { + pub path: Path, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct ReadFileResponse(pub Vec); + +#[derive(Debug, Serialize, Deserialize)] +pub struct ExecuteCommandRequest { + pub cmd: String, + pub args: Vec, + pub envs: HashMap, + pub cwd: Option, // None means in project direcotry. +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct ExecuteCommandResponse { + pub success: bool, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct SerializedError(pub String); + +impl SerializedError { + pub fn new(e: impl snafu::Error) -> Self { + Self(snafu::Report::from_error(e).to_string()) + } +} + +pub trait OneToOneResponse { + type Response; +} + +impl OneToOneResponse for WriteFileRequest { + type Response = WriteFileResponse; +} + +impl OneToOneResponse for ReadFileRequest { + type Response = ReadFileResponse; +} + +impl OneToOneResponse for ExecuteCommandRequest { + type Response = ExecuteCommandResponse; +} diff --git a/compiler/base/orchestrator/src/worker.rs b/compiler/base/orchestrator/src/worker.rs new file mode 100644 index 000000000..428d0e1b0 --- /dev/null +++ b/compiler/base/orchestrator/src/worker.rs @@ -0,0 +1,683 @@ +//! # Task information +//! +//! ## Hierarchy +//! +//! ```text +//! listen +//! ├── stdin +//! ├── stdout +//! ├── handle_coordinator_message +//! │ └── background (N) +//! └── manage_processes +//! └── process (N) +//! ├── process stdin +//! ├── process stdout +//! └── process stderr +//! ``` +//! +//! ## Notable resources +//! +//! - stdin +//! - [`std::io::Stdin`][] +//! - stdout +//! - [`std::io::Stdout`][] +//! - process +//! - [`tokio::process::Child`][] +//! - process stdin +//! - [`tokio::process::ChildStdin`][] +//! - process stdout +//! - [`tokio::process::ChildStdout`][] +//! - process stderr +//! - [`tokio::process::ChildStderr`][] +//! + +use snafu::prelude::*; +use std::{ + collections::HashMap, + path::{Path, PathBuf}, + process::Stdio, +}; +use tokio::{ + fs, + io::{AsyncBufReadExt, AsyncRead, AsyncWriteExt, BufReader}, + process::{Child, ChildStderr, ChildStdin, ChildStdout, Command}, + select, + sync::mpsc, + task::JoinSet, +}; + +use crate::{ + bincode_input_closed, + message::{ + CoordinatorMessage, ExecuteCommandRequest, ExecuteCommandResponse, JobId, Multiplexed, + ReadFileRequest, ReadFileResponse, SerializedError, WorkerMessage, WriteFileRequest, + WriteFileResponse, + }, + DropErrorDetailsExt, +}; + +type CommandRequest = (Multiplexed, MultiplexingSender); + +pub async fn listen(project_dir: impl Into) -> Result<(), Error> { + let project_dir = project_dir.into(); + + let (coordinator_msg_tx, coordinator_msg_rx) = mpsc::channel(8); + let (worker_msg_tx, worker_msg_rx) = mpsc::channel(8); + let mut io_tasks = spawn_io_queue(coordinator_msg_tx, worker_msg_rx); + + let (cmd_tx, cmd_rx) = mpsc::channel(8); + let (stdin_tx, stdin_rx) = mpsc::channel(8); + let process_task = tokio::spawn(manage_processes(stdin_rx, cmd_rx, project_dir.clone())); + + let handler_task = tokio::spawn(handle_coordinator_message( + coordinator_msg_rx, + worker_msg_tx, + project_dir, + cmd_tx, + stdin_tx, + )); + + select! { + Some(io_task) = io_tasks.join_next() => { + io_task.context(IoTaskPanickedSnafu)?.context(IoTaskFailedSnafu)?; + } + + process_task = process_task => { + process_task.context(ProcessTaskPanickedSnafu)?.context(ProcessTaskFailedSnafu)?; + } + + handler_task = handler_task => { + handler_task.context(HandlerTaskPanickedSnafu)?.context(HandlerTaskFailedSnafu)?; + } + } + + Ok(()) +} + +#[derive(Debug, Snafu)] +pub enum Error { + #[snafu(display("The IO queue task panicked"))] + IoTaskPanicked { source: tokio::task::JoinError }, + + #[snafu(display("The IO queue task failed"))] + IoTaskFailed { source: IoQueueError }, + + #[snafu(display("The process task panicked"))] + ProcessTaskPanicked { source: tokio::task::JoinError }, + + #[snafu(display("The process task failed"))] + ProcessTaskFailed { source: ProcessError }, + + #[snafu(display("The coordinator message handler task panicked"))] + HandlerTaskPanicked { source: tokio::task::JoinError }, + + #[snafu(display("The coordinator message handler task failed"))] + HandlerTaskFailed { + source: HandleCoordinatorMessageError, + }, +} + +async fn handle_coordinator_message( + mut coordinator_msg_rx: mpsc::Receiver>, + worker_msg_tx: mpsc::Sender>, + project_dir: PathBuf, + cmd_tx: mpsc::Sender, + stdin_tx: mpsc::Sender>, +) -> Result<(), HandleCoordinatorMessageError> { + use handle_coordinator_message_error::*; + + let mut tasks = JoinSet::new(); + + loop { + select! { + coordinator_msg = coordinator_msg_rx.recv() => { + let Some(Multiplexed(job_id, coordinator_msg)) = coordinator_msg else { break }; + + let worker_msg_tx = || MultiplexingSender { + job_id, + tx: worker_msg_tx.clone(), + }; + + match coordinator_msg { + CoordinatorMessage::WriteFile(req) => { + let project_dir = project_dir.clone(); + let worker_msg_tx = worker_msg_tx(); + + tasks.spawn(async move { + worker_msg_tx + .send(handle_write_file(req, project_dir).await) + .await + .context(UnableToSendWriteFileResponseSnafu) + }); + } + + CoordinatorMessage::ReadFile(req) => { + let project_dir = project_dir.clone(); + let worker_msg_tx = worker_msg_tx(); + + tasks.spawn(async move { + worker_msg_tx + .send(handle_read_file(req, project_dir).await) + .await + .context(UnableToSendReadFileResponseSnafu) + }); + } + + CoordinatorMessage::ExecuteCommand(req) => { + cmd_tx + .send((Multiplexed(job_id, req), worker_msg_tx())) + .await + .drop_error_details() + .context(UnableToSendCommandExecutionRequestSnafu)?; + } + + CoordinatorMessage::StdinPacket(data) => { + stdin_tx + .send(Multiplexed(job_id, data)) + .await + .drop_error_details() + .context(UnableToSendStdinPacketSnafu)?; + } + } + } + + Some(task) = tasks.join_next() => { + task.context(TaskPanickedSnafu)??; + } + } + } + + Ok(()) +} + +#[derive(Debug, Snafu)] +#[snafu(module)] +pub enum HandleCoordinatorMessageError { + #[snafu(display("Could not send the write command response to the coordinator"))] + UnableToSendWriteFileResponse { source: MultiplexingSenderError }, + + #[snafu(display("Could not send the read command response to the coordinator"))] + UnableToSendReadFileResponse { source: MultiplexingSenderError }, + + #[snafu(display("Failed to send command execution request to the command task"))] + UnableToSendCommandExecutionRequest { source: mpsc::error::SendError<()> }, + + #[snafu(display("Failed to send stdin packet to the command task"))] + UnableToSendStdinPacket { source: mpsc::error::SendError<()> }, + + #[snafu(display("A coordinator command handler background task panicked"))] + TaskPanicked { source: tokio::task::JoinError }, +} + +#[derive(Debug, Clone)] +struct MultiplexingSender { + job_id: JobId, + tx: mpsc::Sender>, +} + +impl MultiplexingSender { + async fn send( + &self, + message: Result, impl std::error::Error>, + ) -> Result<(), MultiplexingSenderError> { + match message { + Ok(v) => self.send_ok(v).await, + Err(e) => self.send_err(e).await, + } + } + + async fn send_ok( + &self, + message: impl Into, + ) -> Result<(), MultiplexingSenderError> { + self.send_raw(message.into()).await + } + + async fn send_err(&self, e: impl std::error::Error) -> Result<(), MultiplexingSenderError> { + self.send_raw(WorkerMessage::Error(SerializedError::new(e))) + .await + } + + async fn send_raw(&self, message: WorkerMessage) -> Result<(), MultiplexingSenderError> { + use multiplexing_sender_error::*; + + self.tx + .send(Multiplexed(self.job_id, message)) + .await + .drop_error_details() + .context(UnableToSendWorkerMessageSnafu) + } +} + +#[derive(Debug, Snafu)] +#[snafu(module)] +pub enum MultiplexingSenderError { + #[snafu(display("Failed to send worker message to the serialization task"))] + UnableToSendWorkerMessage { source: mpsc::error::SendError<()> }, +} + +async fn handle_write_file( + req: WriteFileRequest, + project_dir: PathBuf, +) -> Result { + use write_file_error::*; + + let path = parse_working_dir(Some(req.path), project_dir); + + // Create intermediate directories. + if let Some(parent_dir) = path.parent() { + fs::create_dir_all(parent_dir) + .await + .context(UnableToCreateDirSnafu { parent_dir })?; + } + + fs::write(&path, req.content) + .await + .context(UnableToWriteFileSnafu { path })?; + + Ok(WriteFileResponse(())) +} + +#[derive(Debug, Snafu)] +#[snafu(module)] +pub enum WriteFileError { + #[snafu(display("Failed to create parent directory {}", parent_dir.display()))] + UnableToCreateDir { + source: std::io::Error, + parent_dir: PathBuf, + }, + + #[snafu(display("Failed to write file {}", path.display()))] + UnableToWriteFile { + source: std::io::Error, + path: PathBuf, + }, + + #[snafu(display("Failed to send worker message to serialization task"))] + UnableToSendWorkerMessage { source: mpsc::error::SendError<()> }, +} + +async fn handle_read_file( + req: ReadFileRequest, + project_dir: PathBuf, +) -> Result { + use read_file_error::*; + + let path = parse_working_dir(Some(req.path), project_dir); + + let content = fs::read(&path) + .await + .context(UnableToReadFileSnafu { path })?; + + Ok(ReadFileResponse(content)) +} + +#[derive(Debug, Snafu)] +#[snafu(module)] +pub enum ReadFileError { + #[snafu(display("Failed to read file {}", path.display()))] + UnableToReadFile { + source: std::io::Error, + path: PathBuf, + }, + + #[snafu(display("Failed to send worker message to serialization task"))] + UnableToSendWorkerMessage { source: mpsc::error::SendError<()> }, +} + +// Current working directory defaults to project dir unless specified otherwise. +fn parse_working_dir(cwd: Option, project_path: impl Into) -> PathBuf { + let mut final_path = project_path.into(); + if let Some(path) = cwd { + // Absolute path will replace final_path. + final_path.push(path) + } + final_path +} + +async fn manage_processes( + mut stdin_rx: mpsc::Receiver>, + mut cmd_rx: mpsc::Receiver, + project_path: PathBuf, +) -> Result<(), ProcessError> { + use process_error::*; + + let mut processes = JoinSet::new(); + let mut stdin_senders = HashMap::new(); + let (stdin_shutdown_tx, mut stdin_shutdown_rx) = mpsc::channel(8); + + loop { + select! { + cmd_req = cmd_rx.recv() => { + let Some((Multiplexed(job_id, req), worker_msg_tx)) = cmd_req else { break }; + + let RunningChild { child, stdin_rx, stdin, stdout, stderr } = match process_begin(req, &project_path, &mut stdin_senders, job_id) { + Ok(v) => v, + Err(e) => { + // Should we add a message for process started + // in addition to the current message which + // indicates that the process has ended? + worker_msg_tx.send_err(e).await.context(UnableToSendExecuteCommandStartedResponseSnafu)?; + continue; + } + }; + + let task_set = stream_stdio(worker_msg_tx.clone(), stdin_rx, stdin, stdout, stderr); + + processes.spawn({ + let stdin_shutdown_tx = stdin_shutdown_tx.clone(); + async move { + worker_msg_tx + .send(process_end(child, task_set, stdin_shutdown_tx, job_id).await) + .await + .context(UnableToSendExecuteCommandResponseSnafu) + } + }); + } + + stdin_packet = stdin_rx.recv() => { + // Dispatch stdin packet to different child by attached command id. + let Some(Multiplexed(job_id, packet)) = stdin_packet else { break }; + + if let Some(stdin_tx) = stdin_senders.get(&job_id) { + stdin_tx.send(packet).await.drop_error_details().context(UnableToSendStdinDataSnafu)?; + } + } + + job_id = stdin_shutdown_rx.recv() => { + let job_id = job_id.context(StdinShutdownReceiverEndedSnafu)?; + stdin_senders.remove(&job_id); + // Should we care if we remove a sender that's already removed? + } + + Some(process) = processes.join_next() => { + process.context(ProcessTaskPanickedSnafu)??; + } + } + } + + Ok(()) +} + +struct RunningChild { + child: Child, + stdin_rx: mpsc::Receiver, + stdin: ChildStdin, + stdout: ChildStdout, + stderr: ChildStderr, +} + +fn process_begin( + req: ExecuteCommandRequest, + project_path: &Path, + stdin_senders: &mut HashMap>, + job_id: JobId, +) -> Result { + use process_error::*; + + let ExecuteCommandRequest { + cmd, + args, + envs, + cwd, + } = req; + let mut child = Command::new(&cmd) + .args(args) + .envs(envs) + .current_dir(parse_working_dir(cwd, project_path)) + .kill_on_drop(true) + .stdin(Stdio::piped()) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .spawn() + .context(UnableToSpawnProcessSnafu { cmd })?; + + let stdin = child.stdin.take().context(UnableToCaptureStdinSnafu)?; + let stdout = child.stdout.take().context(UnableToCaptureStdoutSnafu)?; + let stderr = child.stderr.take().context(UnableToCaptureStderrSnafu)?; + + // Preparing for receiving stdin packet. + let (stdin_tx, stdin_rx) = mpsc::channel(8); + stdin_senders.insert(job_id, stdin_tx); + + Ok(RunningChild { + child, + stdin_rx, + stdin, + stdout, + stderr, + }) +} + +async fn process_end( + mut child: Child, + mut task_set: JoinSet>, + stdin_shutdown_tx: mpsc::Sender, + job_id: JobId, +) -> Result { + use process_error::*; + + let status = child.wait().await.context(WaitChildSnafu)?; + + stdin_shutdown_tx + .send(job_id) + .await + .drop_error_details() + .context(UnableToSendStdinShutdownSnafu)?; + + while let Some(task) = task_set.join_next().await { + task.context(StdioTaskPanickedSnafu)? + .context(StdioTaskFailedSnafu)?; + } + + let success = status.success(); + Ok(ExecuteCommandResponse { success }) +} + +#[derive(Debug, Snafu)] +#[snafu(module)] +pub enum ProcessError { + #[snafu(display("Failed to spawn child process {cmd}"))] + UnableToSpawnProcess { source: std::io::Error, cmd: String }, + + #[snafu(display("Failed to capture child process stdin"))] + UnableToCaptureStdin, + + #[snafu(display("Failed to capture child process stdout"))] + UnableToCaptureStdout, + + #[snafu(display("Failed to capture child process stderr"))] + UnableToCaptureStderr, + + #[snafu(display("Failed to send stdin data"))] + UnableToSendStdinData { source: mpsc::error::SendError<()> }, + + #[snafu(display("Failed to wait for child process exiting"))] + WaitChild { source: std::io::Error }, + + #[snafu(display("Failed to send the stdin shutdown request"))] + UnableToSendStdinShutdown { source: mpsc::error::SendError<()> }, + + #[snafu(display("The command's stdio task panicked"))] + StdioTaskPanicked { source: tokio::task::JoinError }, + + #[snafu(display("The command's stdio task failed"))] + StdioTaskFailed { source: StdioError }, + + #[snafu(display("Failed to send the command started response to the coordinator"))] + UnableToSendExecuteCommandStartedResponse { source: MultiplexingSenderError }, + + #[snafu(display("Failed to send the command completed response to the coordinator"))] + UnableToSendExecuteCommandResponse { source: MultiplexingSenderError }, + + #[snafu(display("The stdin shutdown receiver ended prematurely"))] + StdinShutdownReceiverEnded, + + #[snafu(display("The process task panicked"))] + ProcessTaskPanicked { source: tokio::task::JoinError }, +} + +fn stream_stdio( + coordinator_tx: MultiplexingSender, + mut stdin_rx: mpsc::Receiver, + mut stdin: ChildStdin, + stdout: ChildStdout, + stderr: ChildStderr, +) -> JoinSet> { + use stdio_error::*; + + let mut set = JoinSet::new(); + + set.spawn(async move { + loop { + let Some(data) = stdin_rx.recv().await else { break }; + stdin + .write_all(data.as_bytes()) + .await + .context(UnableToWriteStdinSnafu)?; + stdin.flush().await.context(UnableToFlushStdinSnafu)?; + } + + Ok(()) + }); + + set.spawn({ + copy_child_output(stdout, coordinator_tx.clone(), WorkerMessage::StdoutPacket) + .context(CopyStdoutSnafu) + }); + + set.spawn({ + copy_child_output(stderr, coordinator_tx, WorkerMessage::StderrPacket) + .context(CopyStderrSnafu) + }); + + set +} + +#[derive(Debug, Snafu)] +#[snafu(module)] +pub enum StdioError { + #[snafu(display("Failed to write stdin data"))] + UnableToWriteStdin { source: std::io::Error }, + + #[snafu(display("Failed to flush stdin data"))] + UnableToFlushStdin { source: std::io::Error }, + + #[snafu(display("Failed to copy child stdout"))] + CopyStdout { source: CopyChildOutputError }, + + #[snafu(display("Failed to copy child stderr"))] + CopyStderr { source: CopyChildOutputError }, +} + +async fn copy_child_output( + output: impl AsyncRead + Unpin, + coordinator_tx: MultiplexingSender, + mut xform: impl FnMut(String) -> WorkerMessage, +) -> Result<(), CopyChildOutputError> { + use copy_child_output_error::*; + + let mut buf = BufReader::new(output); + + loop { + // Must be valid UTF-8. + let mut buffer = String::new(); + + let n = buf + .read_line(&mut buffer) + .await + .context(UnableToReadSnafu)?; + + if n == 0 { + break; + } + + coordinator_tx + .send_ok(xform(buffer)) + .await + .context(UnableToSendSnafu)?; + } + + Ok(()) +} + +#[derive(Debug, Snafu)] +#[snafu(module)] +pub enum CopyChildOutputError { + #[snafu(display("Failed to read child output"))] + UnableToRead { source: std::io::Error }, + + #[snafu(display("Failed to send output packet"))] + UnableToSend { source: MultiplexingSenderError }, +} + +// stdin/out <--> messages. +fn spawn_io_queue( + coordinator_msg_tx: mpsc::Sender>, + mut worker_msg_rx: mpsc::Receiver>, +) -> JoinSet> { + use io_queue_error::*; + use std::io::{prelude::*, BufReader, BufWriter}; + + let mut tasks = JoinSet::new(); + + tasks.spawn_blocking(move || { + let stdin = std::io::stdin(); + let mut stdin = BufReader::new(stdin); + + loop { + let coordinator_msg = bincode::deserialize_from(&mut stdin); + + if bincode_input_closed(&coordinator_msg) { + break; + }; + + let coordinator_msg = + coordinator_msg.context(UnableToDeserializeCoordinatorMessageSnafu)?; + + coordinator_msg_tx + .blocking_send(coordinator_msg) + .drop_error_details() + .context(UnableToSendCoordinatorMessageSnafu)?; + } + + Ok(()) + }); + + tasks.spawn_blocking(move || { + let stdout = std::io::stdout(); + let mut stdout = BufWriter::new(stdout); + + loop { + let worker_msg = worker_msg_rx + .blocking_recv() + .context(UnableToReceiveWorkerMessageSnafu)?; + + bincode::serialize_into(&mut stdout, &worker_msg) + .context(UnableToSerializeWorkerMessageSnafu)?; + + stdout.flush().context(UnableToFlushStdoutSnafu)?; + } + }); + + tasks +} + +#[derive(Debug, Snafu)] +#[snafu(module)] +pub enum IoQueueError { + #[snafu(display("Failed to deserialize coordinator message"))] + UnableToDeserializeCoordinatorMessage { source: bincode::Error }, + + #[snafu(display("Failed to serialize worker message"))] + UnableToSerializeWorkerMessage { source: bincode::Error }, + + #[snafu(display("Failed to send coordinator message from deserialization task"))] + UnableToSendCoordinatorMessage { source: mpsc::error::SendError<()> }, + + #[snafu(display("Failed to receive worker message"))] + UnableToReceiveWorkerMessage, + + #[snafu(display("Failed to flush stdout"))] + UnableToFlushStdout { source: std::io::Error }, +} From 4463451a0519db5698ce70897b77831bd834633f Mon Sep 17 00:00:00 2001 From: Jake Goulding Date: Wed, 14 Jun 2023 12:17:58 -0400 Subject: [PATCH 6/7] Build the orchestrator in Docker --- compiler/base/.dockerignore | 1 + compiler/base/Dockerfile | 30 +++++++ compiler/base/entrypoint.sh | 14 +-- compiler/base/orchestrator/src/coordinator.rs | 87 +++++++++++++++++++ 4 files changed, 127 insertions(+), 5 deletions(-) diff --git a/compiler/base/.dockerignore b/compiler/base/.dockerignore index 6eedd68fa..a8072fb0b 100644 --- a/compiler/base/.dockerignore +++ b/compiler/base/.dockerignore @@ -1,2 +1,3 @@ asm-cleanup/target modify-cargo-toml/target +orchestrator/target diff --git a/compiler/base/Dockerfile b/compiler/base/Dockerfile index b75bbc1ed..a572c3da9 100644 --- a/compiler/base/Dockerfile +++ b/compiler/base/Dockerfile @@ -61,11 +61,41 @@ FROM bare-sources as munge ADD --chown=playground modify-cargo-toml /playground/modify-cargo-toml RUN cargo build --release --manifest-path=/playground/modify-cargo-toml/Cargo.toml +# Set up cargo-chef for faster builds + +FROM bare-sources as chef-available + +RUN cargo install cargo-chef + +WORKDIR /orchestrator + +# Prepare the orchestrator's dependencies + +FROM chef-available as prepare-orchestrator + +COPY --chown=playground asm-cleanup /asm-cleanup +COPY --chown=playground modify-cargo-toml /modify-cargo-toml +COPY --chown=playground orchestrator /orchestrator +RUN cargo chef prepare + +# Build the orchestrator + +FROM chef-available as build-orchestrator + +COPY --chown=playground asm-cleanup /asm-cleanup +COPY --chown=playground modify-cargo-toml /modify-cargo-toml +COPY --chown=playground --from=prepare-orchestrator /orchestrator/recipe.json /orchestrator/recipe.json +RUN cargo chef cook --release + +COPY --chown=playground orchestrator /orchestrator +RUN cargo install --path . + # Compiler and sources FROM bare-sources as sources COPY --from=munge /playground/modify-cargo-toml/target/release/modify-cargo-toml /playground/.cargo/bin +COPY --from=build-orchestrator /playground/.cargo/bin/worker /playground/.cargo/bin/worker # Compiler and pre-compiled crates diff --git a/compiler/base/entrypoint.sh b/compiler/base/entrypoint.sh index bd3da4e03..47b790eea 100755 --- a/compiler/base/entrypoint.sh +++ b/compiler/base/entrypoint.sh @@ -2,10 +2,14 @@ set -eu -timeout=${PLAYGROUND_TIMEOUT:-10} +if [[ -z "${PLAYGROUND_ORCHESTRATOR:-}" ]]; then + timeout=${PLAYGROUND_TIMEOUT:-10} -modify-cargo-toml + modify-cargo-toml -# Don't use `exec` here. The shell is what prints out the useful -# "Killed" message -timeout --signal=KILL ${timeout} "$@" + # Don't use `exec` here. The shell is what prints out the useful + # "Killed" message + timeout --signal=KILL ${timeout} "$@" +else + exec "$@" +fi diff --git a/compiler/base/orchestrator/src/coordinator.rs b/compiler/base/orchestrator/src/coordinator.rs index 24fa636eb..f0ef2bd28 100644 --- a/compiler/base/orchestrator/src/coordinator.rs +++ b/compiler/base/orchestrator/src/coordinator.rs @@ -346,6 +346,12 @@ where } } +impl Coordinator { + pub async fn new_docker() -> Result { + Self::new(DockerBackend(())).await + } +} + #[derive(Debug)] struct Container { task: JoinHandle>, @@ -819,6 +825,63 @@ where } } +macro_rules! docker_command { + ($($arg:expr),* $(,)?) => ({ + let mut cmd = Command::new("docker"); + $( cmd.arg($arg); )* + cmd + }); +} + +#[cfg(target_arch = "x86_64")] +const DOCKER_ARCH: &str = "linux/amd64"; + +#[cfg(target_arch = "aarch64")] +const DOCKER_ARCH: &str = "linux/arm64"; + +fn basic_secure_docker_command() -> Command { + docker_command!( + "run", + "--platform", + DOCKER_ARCH, + "--cap-drop=ALL", + "--net", + "none", + "--memory", + "512m", + "--memory-swap", + "640m", + "--pids-limit", + "512", + ) +} + +pub struct DockerBackend(()); + +impl Backend for DockerBackend { + fn prepare_worker_command(&self, channel: Channel) -> Command { + let mut command = basic_secure_docker_command(); + command + .arg("-i") + .args(["-a", "stdin", "-a", "stdout", "-a", "stderr"]) + .arg("--rm") + .arg(channel.to_container_name()) + .arg("worker") + .arg("/playground"); + command + } +} + +impl Channel { + fn to_container_name(self) -> &'static str { + match self { + Channel::Stable => "rust-stable", + Channel::Beta => "rust-beta", + Channel::Nightly => "rust-nightly", + } + } +} + pub type Result = ::std::result::Result; #[derive(Debug, Snafu)] @@ -988,6 +1051,7 @@ mod tests { async fn new_coordinator() -> Result> { Coordinator::new(TestBackend::new()).await + //Coordinator::new_docker().await } fn new_compile_request() -> CompileRequest { @@ -1207,6 +1271,29 @@ mod tests { Ok(()) } + #[tokio::test] + #[snafu::report] + async fn test_compile_wasm() -> Result<()> { + // cargo-wasm only exists inside the container + let coordinator = Coordinator::new_docker().await?; + + let response = coordinator + .compile(new_compile_wasm_request()) + .with_timeout() + .await + .unwrap(); + + assert!(response.success, "stderr: {}", response.stderr); + assert_contains!( + response.code, + r#"(func $inc (export "inc") (type $t0) (param $p0 i32) (result i32)"# + ); + + coordinator.shutdown().await?; + + Ok(()) + } + trait TimeoutExt: Future + Sized { #[allow(clippy::type_complexity)] fn with_timeout( From 8b8a3dd9d3ffbbd0da4181f1c80caf25de2dcf82 Mon Sep 17 00:00:00 2001 From: Jake Goulding Date: Wed, 10 May 2023 12:55:26 -0400 Subject: [PATCH 7/7] Optionally use the orchestrator for the "compile" commands in the UI --- .github/workflows/ci.yml | 2 +- ci/workflows.yml | 1 + ui/Cargo.lock | 100 ++++++++++++++ ui/Cargo.toml | 1 + ui/src/main.rs | 26 ++++ ui/src/metrics.rs | 93 +++++++++++-- ui/src/sandbox.rs | 98 ++++++++++++- ui/src/server_axum.rs | 287 ++++++++++++++++++++++++++++++++++++--- 8 files changed, 580 insertions(+), 28 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 556e0daad..1b13d1c73 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -185,7 +185,7 @@ jobs: run: cargo fmt --manifest-path top-crates/Cargo.toml --check - name: Build backend run: |- - mkdir -p ui/target; docker run --rm -v $PWD/compiler/base/asm-cleanup:/compiler/base/asm-cleanup -v $PWD/compiler/base/modify-cargo-toml:/compiler/base/modify-cargo-toml -v $PWD/ui:/ui -v ~/.cargo/git:/root/.cargo/git -v ~/.cargo/registry:/root/.cargo/registry --workdir /ui rust:alpine sh -c ' + mkdir -p ui/target; docker run --rm -v $PWD/compiler/base/asm-cleanup:/compiler/base/asm-cleanup -v $PWD/compiler/base/orchestrator:/compiler/base/orchestrator -v $PWD/compiler/base/modify-cargo-toml:/compiler/base/modify-cargo-toml -v $PWD/ui:/ui -v ~/.cargo/git:/root/.cargo/git -v ~/.cargo/registry:/root/.cargo/registry --workdir /ui rust:alpine sh -c ' apk add musl-dev openssl-dev openssl-libs-static # Adding -C relocation-model=static due to diff --git a/ci/workflows.yml b/ci/workflows.yml index b4efc3383..0a06222ab 100644 --- a/ci/workflows.yml +++ b/ci/workflows.yml @@ -289,6 +289,7 @@ workflows: run --rm -v $PWD/compiler/base/asm-cleanup:/compiler/base/asm-cleanup + -v $PWD/compiler/base/orchestrator:/compiler/base/orchestrator -v $PWD/compiler/base/modify-cargo-toml:/compiler/base/modify-cargo-toml -v $PWD/ui:/ui -v ~/.cargo/git:/root/.cargo/git diff --git a/ui/Cargo.lock b/ui/Cargo.lock index 2230497cb..b2164ff24 100644 --- a/ui/Cargo.lock +++ b/ui/Cargo.lock @@ -154,6 +154,15 @@ version = "0.21.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "604178f6c5c21f02dc555784810edfb88d34ac2c73b2eae109655649ee73ce3d" +[[package]] +name = "bincode" +version = "1.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b1f45e9417d87227c7a56d22e471c6206462cba514c7590c09aff4cf6d1ddcad" +dependencies = [ + "serde", +] + [[package]] name = "bitflags" version = "1.3.2" @@ -758,6 +767,15 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "modify-cargo-toml" +version = "0.1.0" +dependencies = [ + "serde", + "serde_derive", + "toml", +] + [[package]] name = "nu-ansi-term" version = "0.46.0" @@ -866,6 +884,22 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" +[[package]] +name = "orchestrator" +version = "0.1.0" +dependencies = [ + "asm-cleanup", + "bincode", + "futures", + "modify-cargo-toml", + "serde", + "snafu", + "tokio", + "tokio-stream", + "tokio-util", + "toml", +] + [[package]] name = "overload" version = "0.1.1" @@ -1240,6 +1274,15 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_spanned" +version = "0.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "93107647184f6027e3b7dcb2e11034cf95ffa1e3a682c67951963ac69c1c007d" +dependencies = [ + "serde", +] + [[package]] name = "serde_urlencoded" version = "0.7.1" @@ -1316,6 +1359,8 @@ checksum = "cb0656e7e3ffb70f6c39b3c2a86332bb74aa3c679da781642590f3c1118c5045" dependencies = [ "backtrace", "doc-comment", + "futures-core", + "pin-project", "snafu-derive", ] @@ -1532,6 +1577,17 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-stream" +version = "0.1.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "397c988d37662c7dda6d2208364a706264bf3d6138b11d436cbac0ad38832842" +dependencies = [ + "futures-core", + "pin-project-lite", + "tokio", +] + [[package]] name = "tokio-tungstenite" version = "0.18.0" @@ -1557,6 +1613,40 @@ dependencies = [ "tokio", ] +[[package]] +name = "toml" +version = "0.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d6135d499e69981f9ff0ef2167955a5333c35e36f6937d382974566b3d5b94ec" +dependencies = [ + "serde", + "serde_spanned", + "toml_datetime", + "toml_edit", +] + +[[package]] +name = "toml_datetime" +version = "0.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a76a9312f5ba4c2dec6b9161fdf25d87ad8a09256ccea5a556fef03c706a10f" +dependencies = [ + "serde", +] + +[[package]] +name = "toml_edit" +version = "0.19.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2380d56e8670370eee6566b0bfd4265f65b3f432e8c6d85623f728d4fa31f739" +dependencies = [ + "indexmap", + "serde", + "serde_spanned", + "toml_datetime", + "winnow", +] + [[package]] name = "tower" version = "0.4.13" @@ -1713,6 +1803,7 @@ dependencies = [ "lazy_static", "octocrab", "openssl-probe", + "orchestrator", "prometheus", "regex", "serde", @@ -2026,6 +2117,15 @@ version = "0.48.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1a515f5799fe4961cb532f983ce2b23082366b898e52ffbce459c86f67c8378a" +[[package]] +name = "winnow" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "61de7bac303dc551fe038e2b3cef0f571087a47571ea6e79a87692ac99b99699" +dependencies = [ + "memchr", +] + [[package]] name = "zeroize" version = "1.6.0" diff --git a/ui/Cargo.toml b/ui/Cargo.toml index 220c0a2b4..4e89afa69 100644 --- a/ui/Cargo.toml +++ b/ui/Cargo.toml @@ -17,6 +17,7 @@ futures = "0.3.21" lazy_static = "1.0.0" octocrab = "0.25" openssl-probe = "0.1.2" +orchestrator = { path = "../compiler/base/orchestrator" } prometheus = "0.13.0" regex = "1.0.0" serde = { version = "1.0", features = ["rc"] } diff --git a/ui/src/main.rs b/ui/src/main.rs index 5699b4400..5bec44bf8 100644 --- a/ui/src/main.rs +++ b/ui/src/main.rs @@ -37,6 +37,7 @@ struct Config { cors_enabled: bool, gh_token: Option, metrics_token: Option, + orchestrator_enabled: bool, port: u16, root: PathBuf, } @@ -90,11 +91,14 @@ impl Config { let cors_enabled = env::var_os("PLAYGROUND_CORS_ENABLED").is_some(); + let orchestrator_enabled = env::var_os("PLAYGROUND_ORCHESTRATOR_ENABLED").is_some(); + Self { address, cors_enabled, gh_token, metrics_token, + orchestrator_enabled, port, root, } @@ -112,6 +116,10 @@ impl Config { self.cors_enabled } + fn use_orchestrator(&self) -> bool { + self.orchestrator_enabled + } + fn metrics_token(&self) -> Option { self.metrics_token.as_deref().map(MetricsToken::new) } @@ -203,6 +211,24 @@ pub enum Error { CachePoisoned, #[snafu(display("The WebSocket worker panicked: {}", text))] WebSocketTaskPanic { text: String }, + + #[snafu(display("Unable to create the coordinator"))] + CreateCoordinator { + source: orchestrator::coordinator::Error, + }, + + #[snafu(display("Unable to shutdown the coordinator"))] + ShutdownCoordinator { + source: orchestrator::coordinator::Error, + }, + + #[snafu(display("Unable to convert the compile request"))] + Compile { + source: orchestrator::coordinator::CompileError, + }, + + #[snafu(display("The operation timed out"))] + Timeout { source: tokio::time::error::Elapsed }, } type Result = ::std::result::Result; diff --git a/ui/src/metrics.rs b/ui/src/metrics.rs index 15a787aa5..5c22bfd97 100644 --- a/ui/src/metrics.rs +++ b/ui/src/metrics.rs @@ -1,11 +1,15 @@ use futures::future::BoxFuture; use lazy_static::lazy_static; +use orchestrator::coordinator; use prometheus::{ self, register_histogram, register_histogram_vec, register_int_counter, register_int_gauge, Histogram, HistogramVec, IntCounter, IntGauge, }; use regex::Regex; -use std::{future::Future, time::Instant}; +use std::{ + future::Future, + time::{Duration, Instant}, +}; use crate::sandbox::{self, Channel, CompileTarget, CrateType, Edition, Mode}; @@ -62,6 +66,16 @@ pub(crate) enum Outcome { ErrorUserCode, } +pub(crate) struct LabelsCore { + target: Option, + channel: Option, + mode: Option, + edition: Option>, + crate_type: Option, + tests: Option, + backtrace: Option, +} + #[derive(Debug, Copy, Clone)] pub(crate) struct Labels { endpoint: Endpoint, @@ -132,6 +146,29 @@ impl Labels { backtrace, ] } + + pub(crate) fn complete(endpoint: Endpoint, labels_core: LabelsCore, outcome: Outcome) -> Self { + let LabelsCore { + target, + channel, + mode, + edition, + crate_type, + tests, + backtrace, + } = labels_core; + Self { + endpoint, + outcome, + target, + channel, + mode, + edition, + crate_type, + tests, + backtrace, + } + } } pub(crate) trait GenerateLabels { @@ -406,11 +443,8 @@ where let outcome = SuccessDetails::for_sandbox_result(&response); let mut labels = request.generate_labels(outcome); f(&mut labels); - let values = &labels.as_values(); - - let histogram = REQUESTS.with_label_values(values); - histogram.observe(elapsed.as_secs_f64()); + record_metric_complete(labels, elapsed); response } @@ -443,10 +477,53 @@ where tests: None, backtrace: None, }; - let values = &labels.as_values(); - let histogram = REQUESTS.with_label_values(values); - histogram.observe(elapsed.as_secs_f64()); + record_metric_complete(labels, elapsed); response } + +pub(crate) trait HasLabelsCore { + fn labels_core(&self) -> LabelsCore; +} + +impl HasLabelsCore for coordinator::CompileRequest { + fn labels_core(&self) -> LabelsCore { + let Self { + target, + channel, + crate_type, + mode, + edition, + tests, + backtrace, + code: _, + } = *self; + + LabelsCore { + target: Some(target.into()), + channel: Some(channel.into()), + mode: Some(mode.into()), + edition: Some(Some(edition.into())), + crate_type: Some(crate_type.into()), + tests: Some(tests), + backtrace: Some(backtrace), + } + } +} + +pub(crate) fn record_metric( + endpoint: Endpoint, + labels_core: LabelsCore, + outcome: Outcome, + elapsed: Duration, +) { + let labels = Labels::complete(endpoint, labels_core, outcome); + record_metric_complete(labels, elapsed) +} + +fn record_metric_complete(labels: Labels, elapsed: Duration) { + let values = &labels.as_values(); + let histogram = REQUESTS.with_label_values(values); + histogram.observe(elapsed.as_secs_f64()); +} diff --git a/ui/src/sandbox.rs b/ui/src/sandbox.rs index d13b9662f..b6163145b 100644 --- a/ui/src/sandbox.rs +++ b/ui/src/sandbox.rs @@ -14,7 +14,7 @@ use tempfile::TempDir; use tokio::{fs, process::Command, time}; use tracing::debug; -const DOCKER_PROCESS_TIMEOUT_SOFT: Duration = Duration::from_secs(10); +pub(crate) const DOCKER_PROCESS_TIMEOUT_SOFT: Duration = Duration::from_secs(10); const DOCKER_PROCESS_TIMEOUT_HARD: Duration = Duration::from_secs(12); #[derive(Debug, Deserialize)] @@ -1055,6 +1055,102 @@ pub struct MacroExpansionResponse { pub stderr: String, } +mod sandbox_orchestrator_integration_impls { + use orchestrator::coordinator; + + impl From for super::CompileTarget { + fn from(value: coordinator::CompileTarget) -> Self { + match value { + coordinator::CompileTarget::Assembly(a, b, c) => { + super::CompileTarget::Assembly(a.into(), b.into(), c.into()) + } + coordinator::CompileTarget::Hir => super::CompileTarget::Hir, + coordinator::CompileTarget::LlvmIr => super::CompileTarget::LlvmIr, + coordinator::CompileTarget::Mir => super::CompileTarget::Mir, + coordinator::CompileTarget::Wasm => super::CompileTarget::Wasm, + } + } + } + + impl From for super::Mode { + fn from(value: coordinator::Mode) -> Self { + match value { + coordinator::Mode::Debug => super::Mode::Debug, + coordinator::Mode::Release => super::Mode::Release, + } + } + } + + impl From for super::Edition { + fn from(value: coordinator::Edition) -> Self { + match value { + coordinator::Edition::Rust2015 => super::Edition::Rust2015, + coordinator::Edition::Rust2018 => super::Edition::Rust2018, + coordinator::Edition::Rust2021 => super::Edition::Rust2021, + } + } + } + + impl From for super::Channel { + fn from(value: coordinator::Channel) -> Self { + match value { + coordinator::Channel::Stable => super::Channel::Stable, + coordinator::Channel::Beta => super::Channel::Beta, + coordinator::Channel::Nightly => super::Channel::Nightly, + } + } + } + + impl From for super::AssemblyFlavor { + fn from(value: coordinator::AssemblyFlavor) -> Self { + match value { + coordinator::AssemblyFlavor::Att => super::AssemblyFlavor::Att, + coordinator::AssemblyFlavor::Intel => super::AssemblyFlavor::Intel, + } + } + } + + impl From for super::CrateType { + fn from(value: coordinator::CrateType) -> Self { + match value { + coordinator::CrateType::Binary => super::CrateType::Binary, + coordinator::CrateType::Library(a) => super::CrateType::Library(a.into()), + } + } + } + + impl From for super::DemangleAssembly { + fn from(value: coordinator::DemangleAssembly) -> Self { + match value { + coordinator::DemangleAssembly::Demangle => super::DemangleAssembly::Demangle, + coordinator::DemangleAssembly::Mangle => super::DemangleAssembly::Mangle, + } + } + } + + impl From for super::ProcessAssembly { + fn from(value: coordinator::ProcessAssembly) -> Self { + match value { + coordinator::ProcessAssembly::Filter => super::ProcessAssembly::Filter, + coordinator::ProcessAssembly::Raw => super::ProcessAssembly::Raw, + } + } + } + + impl From for super::LibraryType { + fn from(value: coordinator::LibraryType) -> Self { + match value { + coordinator::LibraryType::Lib => super::LibraryType::Lib, + coordinator::LibraryType::Dylib => super::LibraryType::Dylib, + coordinator::LibraryType::Rlib => super::LibraryType::Rlib, + coordinator::LibraryType::Staticlib => super::LibraryType::Staticlib, + coordinator::LibraryType::Cdylib => super::LibraryType::Cdylib, + coordinator::LibraryType::ProcMacro => super::LibraryType::ProcMacro, + } + } + } +} + #[cfg(test)] mod test { use super::*; diff --git a/ui/src/server_axum.rs b/ui/src/server_axum.rs index fdd473a26..e82d029c8 100644 --- a/ui/src/server_axum.rs +++ b/ui/src/server_axum.rs @@ -1,17 +1,19 @@ use crate::{ gist, metrics::{ - track_metric_async, track_metric_force_endpoint_async, track_metric_no_request_async, - Endpoint, GenerateLabels, SuccessDetails, UNAVAILABLE_WS, + record_metric, track_metric_async, track_metric_force_endpoint_async, + track_metric_no_request_async, Endpoint, GenerateLabels, HasLabelsCore, Outcome, + SuccessDetails, UNAVAILABLE_WS, }, - sandbox::{self, Channel, Sandbox}, + sandbox::{self, Channel, Sandbox, DOCKER_PROCESS_TIMEOUT_SOFT}, CachingSnafu, ClippyRequest, ClippyResponse, CompilationSnafu, CompileRequest, CompileResponse, - Config, Error, ErrorJson, EvaluateRequest, EvaluateResponse, EvaluationSnafu, ExecuteRequest, - ExecuteResponse, ExecutionSnafu, ExpansionSnafu, FormatRequest, FormatResponse, - FormattingSnafu, GhToken, GistCreationSnafu, GistLoadingSnafu, InterpretingSnafu, LintingSnafu, - MacroExpansionRequest, MacroExpansionResponse, MetaCratesResponse, MetaGistCreateRequest, - MetaGistResponse, MetaVersionResponse, MetricsToken, MiriRequest, MiriResponse, Result, - SandboxCreationSnafu, + CompileSnafu, Config, CreateCoordinatorSnafu, Error, ErrorJson, EvaluateRequest, + EvaluateResponse, EvaluationSnafu, ExecuteRequest, ExecuteResponse, ExecutionSnafu, + ExpansionSnafu, FormatRequest, FormatResponse, FormattingSnafu, GhToken, GistCreationSnafu, + GistLoadingSnafu, InterpretingSnafu, LintingSnafu, MacroExpansionRequest, + MacroExpansionResponse, MetaCratesResponse, MetaGistCreateRequest, MetaGistResponse, + MetaVersionResponse, MetricsToken, MiriRequest, MiriResponse, Result, SandboxCreationSnafu, + ShutdownCoordinatorSnafu, TimeoutSnafu, }; use async_trait::async_trait; use axum::{ @@ -27,6 +29,7 @@ use axum::{ Router, }; use futures::{future::BoxFuture, FutureExt}; +use orchestrator::coordinator::{self, DockerBackend}; use snafu::{prelude::*, IntoError}; use std::{ convert::{TryFrom, TryInto}, @@ -55,6 +58,9 @@ const MAX_AGE_ONE_YEAR: HeaderValue = HeaderValue::from_static("public, max-age= mod websocket; +#[derive(Debug, Copy, Clone)] +struct OrchestratorEnabled(bool); + #[tokio::main] pub(crate) async fn serve(config: Config) { let root_files = static_file_service(config.root_path(), MAX_AGE_ONE_DAY); @@ -87,7 +93,8 @@ pub(crate) async fn serve(config: Config) { .route("/nowebsocket", post(nowebsocket)) .route("/whynowebsocket", get(whynowebsocket)) .layer(Extension(Arc::new(SandboxCache::default()))) - .layer(Extension(config.github_token())); + .layer(Extension(config.github_token())) + .layer(Extension(OrchestratorEnabled(config.use_orchestrator()))); if let Some(token) = config.metrics_token() { app = app.layer(Extension(token)) @@ -152,14 +159,23 @@ async fn evaluate(Json(req): Json) -> Result) -> Result> { - with_sandbox( - req, - |sb, req| async move { sb.compile(req).await }.boxed(), - CompilationSnafu, - ) - .await - .map(Json) +async fn compile( + Extension(use_orchestrator): Extension, + Json(req): Json, +) -> Result> { + if use_orchestrator.0 { + with_coordinator(req, |c, req| c.compile(req).context(CompileSnafu).boxed()) + .await + .map(Json) + } else { + with_sandbox( + req, + |sb, req| async move { sb.compile(req).await }.boxed(), + CompilationSnafu, + ) + .await + .map(Json) + } } async fn execute(Json(req): Json) -> Result> { @@ -251,6 +267,82 @@ where .context(ctx) } +pub(crate) trait HasEndpoint { + const ENDPOINT: Endpoint; +} + +impl HasEndpoint for CompileRequest { + const ENDPOINT: Endpoint = Endpoint::Compile; +} + +trait IsSuccess { + fn is_success(&self) -> bool; +} + +impl IsSuccess for coordinator::CompileResponseWithOutput { + fn is_success(&self) -> bool { + self.success + } +} + +async fn with_coordinator(req: WebReq, f: F) -> Result +where + WebReq: TryInto, + WebReq: HasEndpoint, + Req: HasLabelsCore, + Resp: Into, + Resp: IsSuccess, + for<'f> F: + FnOnce(&'f coordinator::Coordinator, Req) -> BoxFuture<'f, Result>, +{ + let coordinator = orchestrator::coordinator::Coordinator::new_docker() + .await + .context(CreateCoordinatorSnafu)?; + + let job = async { + let req = req.try_into()?; + + let labels_core = req.labels_core(); + + let start = Instant::now(); + + let job = f(&coordinator, req); + let resp = tokio::time::timeout(DOCKER_PROCESS_TIMEOUT_SOFT, job).await; + + let elapsed = start.elapsed(); + + let outcome = match &resp { + Ok(Ok(v)) => { + if v.is_success() { + Outcome::Success + } else { + Outcome::ErrorUserCode + } + } + Ok(Err(_)) => Outcome::ErrorServer, + Err(_) => Outcome::ErrorTimeoutSoft, + }; + + // Note that any early return before this point won't be + // reported in the metrics! + + record_metric(WebReq::ENDPOINT, labels_core, outcome, elapsed); + + let resp = resp.context(TimeoutSnafu)?; + + resp.map(Into::into) + }; + + let resp = job.await; + + coordinator + .shutdown() + .await + .context(ShutdownCoordinatorSnafu)?; + + resp +} + async fn meta_crates( Extension(cache): Extension>, if_none_match: Option>, @@ -676,3 +768,162 @@ where axum::Json(self.0).into_response() } } + +mod api_orchestrator_integration_impls { + use orchestrator::coordinator::*; + use std::convert::TryFrom; + + use super::{Error, Result}; + + impl TryFrom for CompileRequest { + type Error = Error; + + fn try_from(other: crate::CompileRequest) -> Result { + let crate::CompileRequest { + target, + assembly_flavor, + demangle_assembly, + process_assembly, + channel, + mode, + edition, + crate_type, + tests, + backtrace, + code, + } = other; + + Ok(Self { + target: parse_target( + &target, + assembly_flavor.as_deref(), + demangle_assembly.as_deref(), + process_assembly.as_deref(), + )?, + channel: parse_channel(&channel)?, + crate_type: parse_crate_type(&crate_type)?, + mode: parse_mode(&mode)?, + edition: parse_edition(&edition)?, + tests, + backtrace, + code, + }) + } + } + + impl From for crate::CompileResponse { + fn from(other: CompileResponseWithOutput) -> Self { + let CompileResponseWithOutput { + success, + code, + stdout, + stderr, + } = other; + + Self { + success, + code, + stdout, + stderr, + } + } + } + + fn parse_target( + target: &str, + assembly_flavor: Option<&str>, + demangle_assembly: Option<&str>, + process_assembly: Option<&str>, + ) -> Result { + Ok(match target { + "asm" => { + let assembly_flavor = match assembly_flavor { + Some(f) => parse_assembly_flavor(f)?, + None => AssemblyFlavor::Att, + }; + + let demangle = match demangle_assembly { + Some(f) => parse_demangle_assembly(f)?, + None => DemangleAssembly::Demangle, + }; + + let process_assembly = match process_assembly { + Some(f) => parse_process_assembly(f)?, + None => ProcessAssembly::Filter, + }; + + CompileTarget::Assembly(assembly_flavor, demangle, process_assembly) + } + "llvm-ir" => CompileTarget::LlvmIr, + "mir" => CompileTarget::Mir, + "hir" => CompileTarget::Hir, + "wasm" => CompileTarget::Wasm, + value => crate::InvalidTargetSnafu { value }.fail()?, + }) + } + + fn parse_assembly_flavor(s: &str) -> Result { + Ok(match s { + "att" => AssemblyFlavor::Att, + "intel" => AssemblyFlavor::Intel, + value => crate::InvalidAssemblyFlavorSnafu { value }.fail()?, + }) + } + + fn parse_demangle_assembly(s: &str) -> Result { + Ok(match s { + "demangle" => DemangleAssembly::Demangle, + "mangle" => DemangleAssembly::Mangle, + value => crate::InvalidDemangleAssemblySnafu { value }.fail()?, + }) + } + + fn parse_process_assembly(s: &str) -> Result { + Ok(match s { + "filter" => ProcessAssembly::Filter, + "raw" => ProcessAssembly::Raw, + value => crate::InvalidProcessAssemblySnafu { value }.fail()?, + }) + } + + fn parse_channel(s: &str) -> Result { + Ok(match s { + "stable" => Channel::Stable, + "beta" => Channel::Beta, + "nightly" => Channel::Nightly, + value => crate::InvalidChannelSnafu { value }.fail()?, + }) + } + + fn parse_crate_type(s: &str) -> Result { + use {CrateType::*, LibraryType::*}; + + Ok(match s { + "bin" => Binary, + "lib" => Library(Lib), + "dylib" => Library(Dylib), + "rlib" => Library(Rlib), + "staticlib" => Library(Staticlib), + "cdylib" => Library(Cdylib), + "proc-macro" => Library(ProcMacro), + value => crate::InvalidCrateTypeSnafu { value }.fail()?, + }) + } + + fn parse_mode(s: &str) -> Result { + Ok(match s { + "debug" => Mode::Debug, + "release" => Mode::Release, + value => crate::InvalidModeSnafu { value }.fail()?, + }) + } + + fn parse_edition(s: &str) -> Result { + Ok(match s { + "2015" => Edition::Rust2015, + "2018" => Edition::Rust2018, + "2021" => Edition::Rust2021, + value => crate::InvalidEditionSnafu { value }.fail()?, + }) + } +}