diff --git a/Cargo.lock b/Cargo.lock index 7b36c01..8cd9a61 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -429,7 +429,7 @@ dependencies = [ "http", "http-body", "hyper", - "hyper-rustls 0.23.2", + "hyper-rustls", "lazy_static", "pin-project-lite", "rustls 0.20.9", @@ -605,6 +605,12 @@ dependencies = [ "rustc-demangle", ] +[[package]] +name = "base64" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e1b586273c5702936fe7b7d6896644d8be71e6314cfe09d3167c95f712589e8" + [[package]] name = "base64" version = "0.21.7" @@ -713,6 +719,7 @@ dependencies = [ "android-tzdata", "iana-time-zone", "num-traits", + "serde", "windows-targets 0.52.5", ] @@ -767,7 +774,7 @@ dependencies = [ "anstream", "anstyle", "clap_lex", - "strsim", + "strsim 0.11.1", ] [[package]] @@ -908,6 +915,41 @@ dependencies = [ "cipher", ] +[[package]] +name = "darling" +version = "0.20.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "54e36fcd13ed84ffdfda6f5be89b31287cbb80c439841fe69e04841435464391" +dependencies = [ + "darling_core", + "darling_macro", +] + +[[package]] +name = "darling_core" +version = "0.20.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c2cf1c23a687a1feeb728783b993c4e1ad83d99f351801977dd809b48d0a70f" +dependencies = [ + "fnv", + "ident_case", + "proc-macro2", + "quote", + "strsim 0.10.0", + "syn 2.0.60", +] + +[[package]] +name = "darling_macro" +version = "0.20.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a668eda54683121533a393014d8692171709ff57a7d61f187b6e782719f8933f" +dependencies = [ + "darling_core", + "quote", + "syn 2.0.60", +] + [[package]] name = "deranged" version = "0.3.11" @@ -915,6 +957,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b42b6fa04a440b495c8b04d0e71b707c585f83cb9cb28cf8cd0d976c315e31b4" dependencies = [ "powerfmt", + "serde", ] [[package]] @@ -1440,22 +1483,6 @@ dependencies = [ "tokio-rustls 0.23.4", ] -[[package]] -name = "hyper-rustls" -version = "0.24.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ec3efd23720e2049821a693cbc7e65ea87c72f1c58ff2f9522ff332b1491e590" -dependencies = [ - "futures-util", - "http", - "hyper", - "log", - "rustls 0.21.11", - "rustls-native-certs", - "tokio", - "tokio-rustls 0.24.1", -] - [[package]] name = "hyper-timeout" version = "0.4.1" @@ -1504,6 +1531,12 @@ dependencies = [ "cc", ] +[[package]] +name = "ident_case" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9e0384b61958566e926dc50660321d12159025e767c18e043daf26b70104c39" + [[package]] name = "idna" version = "0.5.0" @@ -1538,6 +1571,7 @@ checksum = "bd070e393353796e801d209ad339e89596eb4c8d430d18ede6a1cced8fafbd99" dependencies = [ "autocfg", "hashbrown 0.12.3", + "serde", ] [[package]] @@ -1636,6 +1670,24 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "jwtk" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f968792e4a4e8ffd2fcd6adab9a096c6bdb42eb584a0ee7068ec91c62a08ff65" +dependencies = [ + "base64 0.13.1", + "foreign-types", + "openssl", + "openssl-sys", + "reqwest", + "serde", + "serde_json", + "serde_with", + "smallvec", + "tokio", +] + [[package]] name = "kqueue" version = "1.0.8" @@ -2541,24 +2593,6 @@ dependencies = [ "uncased", ] -[[package]] -name = "rocket_oauth2" -version = "0.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "428be05365d3f1e10eeb4659c7635d365449ec699778bf4c1b0a246cab5c235b" -dependencies = [ - "async-trait", - "base64 0.21.7", - "hyper", - "hyper-rustls 0.24.2", - "log", - "rand", - "rocket", - "serde", - "serde_json", - "url", -] - [[package]] name = "rustc-demangle" version = "0.1.23" @@ -2576,9 +2610,9 @@ dependencies = [ [[package]] name = "rustix" -version = "0.38.33" +version = "0.38.34" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e3cc72858054fcff6d7dea32df2aeaee6a7c24227366d7ea429aada2f26b16ad" +checksum = "70dc5ec042f7a43c4a73241207cecc9873a06d45debb38b329f8541d85c2730f" dependencies = [ "bitflags 2.5.0", "errno", @@ -2775,6 +2809,36 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_with" +version = "3.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee80b0e361bbf88fd2f6e242ccd19cfda072cb0faa6ae694ecee08199938569a" +dependencies = [ + "base64 0.21.7", + "chrono", + "hex", + "indexmap 1.9.3", + "indexmap 2.2.6", + "serde", + "serde_derive", + "serde_json", + "serde_with_macros", + "time", +] + +[[package]] +name = "serde_with_macros" +version = "3.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6561dc161a9224638a31d876ccdfefbc1df91d3f3a8342eddb35f055d48c7655" +dependencies = [ + "darling", + "proc-macro2", + "quote", + "syn 2.0.60", +] + [[package]] name = "serde_yaml" version = "0.9.34+deprecated" @@ -2899,6 +2963,12 @@ dependencies = [ "loom", ] +[[package]] +name = "strsim" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623" + [[package]] name = "strsim" version = "0.11.1" @@ -3552,12 +3622,12 @@ dependencies = [ "clap", "env_logger", "etcd-client", + "jwtk", "log", "rand", "reqwest", "rocket", "rocket_dyn_templates", - "rocket_oauth2", "serde", "serde_json", "serde_yaml", diff --git a/README.md b/README.md new file mode 100644 index 0000000..7f5d1bf --- /dev/null +++ b/README.md @@ -0,0 +1,77 @@ +# Vicky + +Vicky, which is the babysitter of Timmy, Cosmo and Wanda, is a CD tool for environments with many constraints and dependencies that usually cannot be represented. + + +## Components + +Vicky consists out of multiple components to make a spreaded deployment possible. + ++ vicky + + Main Task Scheduler ++ vicky-worker + + Task Worker, can run multiple times. ++ dashboard + + Web-UI ++ vicky-cli + + CLI + +Each component can be developed and deployed individually. + +## Concepts + +We use an etcd cluster to sync state between multiple instances of Vicky. Vicky will do leader election, so at each time only one instance is active. We try to make Vicky as resilient to network and other failues as possible but it is not our main goal, yet. +All data in the etcd is stored under `vicky.wobcom.de/` in YAML format. + +## Development Setup + +We need to start at least a `vicky` instance, S3 storage and etcd to run anything. + +### Storage & Database & Certificates + +#### docker-compose + ++ Generate TLS client certificates for etcd authentication + + `nix run .\#generate-certs` + + Certificates are located at `certs` ++ Enter `deployment` ++ Start docker-compose collection + + `docker-compose up -d` + +#### devenv + +TODO @yu-re-ka: Add Information + +### Vicky + ++ Copy `vicky/Rocket.example.toml` to `vicky/Rocket.toml` + + `Rocket.example.toml` contains the correct configuration to run with the provided development environment. ++ Edit `vicky/Rocket.toml` + + Add own machine token to configuration + + This is needed for `vicky-worker` later. + + Add OIDC authentication provider to configuration ++ Enter `vicky` ++ Run `cargo run --bin vicky` + + +### Vicky Worker + ++ Copy `vicky-worker/Rocket.example.toml` to `vicky-worker/Rocket.toml` ++ Edit `vicky-worker/Rocket.toml` + + Add `machine_token` from last step into this configuration. ++ Enter `vicky-worker` ++ Run `cargo run --bin vicky-worker` + +### Dashboard + ++ Enter `dashboard` ++ Install Dependencies + + `npm ci` in `dashboard` Folder ++ Run `npm run start` + +### CLI + +TODO: Add Content for CLI configuration and development. + + + diff --git a/dashboard/package-lock.json b/dashboard/package-lock.json index 060c8f3..8080369 100644 --- a/dashboard/package-lock.json +++ b/dashboard/package-lock.json @@ -11,9 +11,11 @@ "dependencies": { "@uidotdev/usehooks": "^2.4.1", "axios": "^1.5.0", + "oidc-client-ts": "^3.0.0-beta.0", "parcel": "^2.9.3", "react": "^18.2.0", "react-dom": "^18.2.0", + "react-oidc-context": "^3.0.0-beta.0", "react-router-dom": "^6.15.0", "rsuite": "^5.39.0", "xterm": "^5.3.0", @@ -2871,6 +2873,14 @@ "node": ">=6" } }, + "node_modules/jwt-decode": { + "version": "4.0.0", + "resolved": "https://registry.npmjs.org/jwt-decode/-/jwt-decode-4.0.0.tgz", + "integrity": "sha512-+KJGIyHgkGuIq3IEBNftfhW/LfWhXUIY6OmyVWjliu5KH1y0fw7VQ8YndE2O4qZdMSd9SqbnC8GOcZEy0Om7sA==", + "engines": { + "node": ">=18" + } + }, "node_modules/lightningcss": { "version": "1.21.8", "resolved": "https://registry.npmjs.org/lightningcss/-/lightningcss-1.21.8.tgz", @@ -3264,6 +3274,17 @@ "node": ">=0.10.0" } }, + "node_modules/oidc-client-ts": { + "version": "3.0.0-beta.0", + "resolved": "https://registry.npmjs.org/oidc-client-ts/-/oidc-client-ts-3.0.0-beta.0.tgz", + "integrity": "sha512-LXd4/w6kzYe0Hc2d+orHR9ACmRVgUOtWxOttca5DoZgvWU1tWlMbIKfiiKRRLOsHejc2y28Q+JhvGofK8gfXyQ==", + "dependencies": { + "jwt-decode": "^4.0.0" + }, + "engines": { + "node": ">=18" + } + }, "node_modules/ordered-binary": { "version": "1.4.1", "resolved": "https://registry.npmjs.org/ordered-binary/-/ordered-binary-1.4.1.tgz", @@ -3459,6 +3480,18 @@ "resolved": "https://registry.npmjs.org/react-is/-/react-is-16.13.1.tgz", "integrity": "sha512-24e6ynE2H+OKt4kqsOvNd8kBpV65zoxbA4BVsEOB3ARVWQki/DHzaUoC5KuON/BiccDaCCTZBuOcfZs70kR8bQ==" }, + "node_modules/react-oidc-context": { + "version": "3.0.0-beta.0", + "resolved": "https://registry.npmjs.org/react-oidc-context/-/react-oidc-context-3.0.0-beta.0.tgz", + "integrity": "sha512-Y3WjJ+2qBpNqkX7DTnYc2WRLhXajX2tCv2S5rSB05J9IOjAOBYPXArIlHbjp6UWnSBW8rGbobmoRB9clrqIcWg==", + "engines": { + "node": ">=18" + }, + "peerDependencies": { + "oidc-client-ts": "^3.0.0-beta.0", + "react": ">=16.8.0" + } + }, "node_modules/react-refresh": { "version": "0.9.0", "resolved": "https://registry.npmjs.org/react-refresh/-/react-refresh-0.9.0.tgz", diff --git a/dashboard/package.json b/dashboard/package.json index ac04f68..f579251 100644 --- a/dashboard/package.json +++ b/dashboard/package.json @@ -17,9 +17,11 @@ "dependencies": { "@uidotdev/usehooks": "^2.4.1", "axios": "^1.5.0", + "oidc-client-ts": "^3.0.0-beta.0", "parcel": "^2.9.3", "react": "^18.2.0", "react-dom": "^18.2.0", + "react-oidc-context": "^3.0.0-beta.0", "react-router-dom": "^6.15.0", "rsuite": "^5.39.0", "xterm": "^5.3.0", diff --git a/dashboard/src/app.tsx b/dashboard/src/app.tsx index c404a27..0e7fea2 100644 --- a/dashboard/src/app.tsx +++ b/dashboard/src/app.tsx @@ -7,18 +7,28 @@ import { Tasks } from './components/tasks'; import { UserProvider } from './contexts/user'; import { Content } from './content'; import { CustomProvider } from 'rsuite'; +import { AuthProvider } from 'react-oidc-context'; const App = () => { + const oidcConfig = { + authority: "https://id.lab.wobcom.de/realms/wobcom/", + client_id: "vicky-dev", + redirect_uri: "http://localhost:1234", + onSigninCallback: (): void => { + window.history.replaceState({}, document.title, window.location.pathname); + }, + }; + return ( <> - + - + ) diff --git a/dashboard/src/components/login.tsx b/dashboard/src/components/login.tsx index 47e6ae8..04008f6 100644 --- a/dashboard/src/components/login.tsx +++ b/dashboard/src/components/login.tsx @@ -2,10 +2,13 @@ import { Button, Col, FlexboxGrid, Panel, Stack } from "rsuite" import GitHubIcon from '@rsuite/icons/legacy/Github'; import * as s from "./login.module.css"; +import { useAuth } from "react-oidc-context"; const Login = () => { + const auth = useAuth(); + return ( { Wenn der Account nicht freigeschaltet ist, ist ein Einloggen nicht möglich.

- diff --git a/dashboard/src/components/menu.tsx b/dashboard/src/components/menu.tsx index 6f2b9b8..3becf2d 100644 --- a/dashboard/src/components/menu.tsx +++ b/dashboard/src/components/menu.tsx @@ -8,12 +8,14 @@ import { Link, useNavigate } from "react-router-dom"; import { useContext, useEffect, useState } from "react"; import { useAPI } from "../services/api"; import { UserContext } from "../contexts/user"; +import { useAuth } from "react-oidc-context"; const Menu = () => { const activeKey = "tasks"; const user = useContext(UserContext) + const auth = useAuth(); return ( @@ -38,7 +40,7 @@ const Menu = () => { ) : null} { !user ? ( - + auth.signinRedirect()}> Login With GitHub ): null} diff --git a/dashboard/src/content.tsx b/dashboard/src/content.tsx index 9030eb0..253d36b 100644 --- a/dashboard/src/content.tsx +++ b/dashboard/src/content.tsx @@ -1,41 +1,62 @@ -import { useContext } from "react" +import { ReactNode, useContext } from "react" import { Navigate, Route, Routes } from "react-router-dom" import { Login } from "./components/login" import { Menu } from "./components/menu" import { Tasks } from "./components/tasks" -import { UserContext } from "./contexts/user" +import { UserContext, UserProvider } from "./contexts/user" import * as s from "./content.module.css" +import { useAuth } from "react-oidc-context" const Content = () => { - const user = useContext(UserContext) + const auth = useAuth(); - return ( - <> - -
+ switch (auth.activeNavigator) { + case "signinSilent": + return
Signing you in...
; + case "signoutRedirect": + return
Signing you out...
; + } - { - user ? ( + if (auth.isLoading) { + return
Loading...
; + } + + if (auth.error) { + return
Oops... {auth.error.message}
; + } + + if (auth.isAuthenticated) { + return ( + + +
}> - - ) : ( +
+ +
+ ); + } else { + return ( + <> + +
+ - ) +
+ + ) + } - } -
- - ) } export { diff --git a/dashboard/src/contexts/user.tsx b/dashboard/src/contexts/user.tsx index 6784a9c..32f0104 100644 --- a/dashboard/src/contexts/user.tsx +++ b/dashboard/src/contexts/user.tsx @@ -1,15 +1,15 @@ import { createContext, PropsWithChildren, useEffect, useState } from "react"; -import { useAPI, User } from "../services/api"; +import { useAPI, IUser } from "../services/api"; -const defaultVal: User | null = null -const UserContext = createContext(null) +const defaultVal: IUser | null = null +const UserContext = createContext(null) const UserProvider = (props: PropsWithChildren) => { const api = useAPI(); - const [user, setUser] = useState(null); + const [user, setUser] = useState(null); const [userFetched, setUserFetched] = useState(false); useEffect(() => { diff --git a/dashboard/src/services/api.tsx b/dashboard/src/services/api.tsx index 8d2ec97..8035c35 100644 --- a/dashboard/src/services/api.tsx +++ b/dashboard/src/services/api.tsx @@ -1,5 +1,6 @@ import axios, { Axios } from "axios" import { useMemo } from "react" +import { useAuth } from "react-oidc-context" type ITask = { id: string, @@ -23,20 +24,39 @@ const useAPI = () => { const BASE_URL = "/api" + const auth = useAuth(); + + const fetchJSON = async (url: string) => { + const authToken = auth.user?.access_token; + + if(!authToken) { + throw Error("Using useAPI without an authenticated user is not possible") + } + + return fetch( + url, + { + headers: { + "Authorization": `Bearer ${authToken}` + } + } + ).then(x => x.json()); + } + const getTasks = (): Promise => { - return fetch(`${BASE_URL}/tasks`).then(x => x.json()); + return fetchJSON(`${BASE_URL}/tasks`); } const getTask = (id: string): Promise => { - return fetch(`${BASE_URL}/tasks/${id}`).then(x => x.json()); + return fetchJSON(`${BASE_URL}/tasks/${id}`); } const getTaskLogs = (id: string) => { - return fetch(`${BASE_URL}/tasks/${id}/logs`).then(x => x.json()); + return fetchJSON(`${BASE_URL}/tasks/${id}/logs`); } const getUser = (): Promise => { - return fetch(`${BASE_URL}/user`).then(x => x.json()); + return fetchJSON(`${BASE_URL}/user`); } return { diff --git a/vicky-worker/Rocket.example.toml b/vicky-worker/Rocket.example.toml index 3c6097e..f3e59d9 100644 --- a/vicky-worker/Rocket.example.toml +++ b/vicky-worker/Rocket.example.toml @@ -1,5 +1,6 @@ [default] -vicky_url = "" +vicky_url = "http://localhost:8000" vicky_external_url = "https://vicky.lab.wobcom.de" -machine_token = "" \ No newline at end of file +machine_token = "" +features = [] \ No newline at end of file diff --git a/vicky-worker/src/main.rs b/vicky-worker/src/main.rs index 32718e6..bf3773f 100644 --- a/vicky-worker/src/main.rs +++ b/vicky-worker/src/main.rs @@ -9,14 +9,15 @@ use tokio::process::Command; use tokio_util::codec::{FramedRead, LinesCodec}; use uuid::Uuid; - +use rocket::figment::providers::{Env, Format, Toml}; use rocket::figment::{Figment, Profile}; -use rocket::figment::providers::{Toml, Env, Format}; + #[derive(Deserialize)] pub(crate) struct AppConfig { pub(crate) vicky_url: String, pub(crate) vicky_external_url: String, pub(crate) machine_token: String, + pub(crate) features: Vec, } fn main() -> anyhow::Result<()> { @@ -25,10 +26,18 @@ fn main() -> anyhow::Result<()> { // Took from rocket source code and added .split("__") to be able to add keys in nested structures. let rocket_config_figment = Figment::from(rocket::Config::default()) .merge(Toml::file(Env::var_or("ROCKET_CONFIG", "Rocket.toml")).nested()) - .merge(Env::prefixed("ROCKET_").ignore(&["PROFILE"]).split("__").global()) - .select(Profile::from_env_or("ROCKET_PROFILE", rocket::Config::DEFAULT_PROFILE)); - - let app_config = rocket_config_figment.extract::()?; + .merge( + Env::prefixed("ROCKET_") + .ignore(&["PROFILE"]) + .split("__") + .global(), + ) + .select(Profile::from_env_or( + "ROCKET_PROFILE", + rocket::Config::DEFAULT_PROFILE, + )); + + let app_config = rocket_config_figment.extract::()?; run(app_config) } @@ -39,7 +48,7 @@ async fn api( q: &Q, ) -> anyhow::Result { let client = Client::new(); - let req_data = serde_json::to_vec(&q)?; + let req_data = serde_json::to_vec(q)?; let request = Request::builder() .uri(format!("{}/{}", cfg.vicky_url, endpoint)) @@ -87,7 +96,10 @@ pub struct Task { pub flake_ref: FlakeRef, } -fn log_sink(cfg: Arc, task_id: Uuid) -> impl Sink, Error = anyhow::Error> + Send { +fn log_sink( + cfg: Arc, + task_id: Uuid, +) -> impl Sink, Error = anyhow::Error> + Send { futures_util::sink::unfold((), move |_, lines: Vec| { println!("{}", lines.len()); let cfg = cfg.clone(); @@ -104,11 +116,7 @@ fn log_sink(cfg: Arc, task_id: Uuid) -> impl Sink, Error } async fn try_run_task(cfg: Arc, task: &Task) -> anyhow::Result<()> { - let mut args = vec![ - "run".into(), - "-L".into(), - task.flake_ref.flake.clone(), - ]; + let mut args = vec!["run".into(), "-L".into(), task.flake_ref.flake.clone()]; args.extend(task.flake_ref.args.clone()); let mut child = Command::new("nix") @@ -163,8 +171,13 @@ async fn run_task(cfg: Arc, task: Task) { } async fn try_claim(cfg: Arc) -> anyhow::Result<()> { - if let Some(task) = - api::<_, Option>(&cfg, Method::POST, "api/v1/tasks/claim", &None::).await? + if let Some(task) = api::<_, Option>( + &cfg, + Method::POST, + "api/v1/tasks/claim", + &serde_json::json!({ "features": cfg.features }), + ) + .await? { log::info!("task claimed: {} {} 🎉", task.id, task.display_name); log::debug!("{:#?}", task); diff --git a/vicky/Cargo.toml b/vicky/Cargo.toml index cf157e9..3695fe5 100644 --- a/vicky/Cargo.toml +++ b/vicky/Cargo.toml @@ -23,8 +23,8 @@ aws-config = "0.55.3" uuid = { version="1.4.1", features = ["fast-rng", "v4", "serde"] } rocket = { version="0.5.0", features = ["json", "secrets"] } rocket_dyn_templates = { version = "0.1.0", features = ["tera"] } -rocket_oauth2 = "0.5.0-rc.2" reqwest = { version="0.11.20", features = ["json"]} +jwtk = "0.3.0" [[bin]] name = "vicky" diff --git a/vicky/README.md b/vicky/README.md deleted file mode 100644 index 67f7da7..0000000 --- a/vicky/README.md +++ /dev/null @@ -1,18 +0,0 @@ -# Vicky - -Vicky, which is the babysitter of Timmy, Cosmo and Wanda, is a CD tool for environments with many constraints and dependencies that usually cannot be represented. - -## Background - -We use an etcd cluster to sync state between multiple instances of Vicky. Vicky will do leader election, so at each time only one instance is active. We try to make Vicky as resilient to network and other failues as possible but it is not our main goal, yet. -All data in the etcd is stored under `vicky.wobcom.de/` in YAML format. - -## Development Usage - -+ Start etcd in Docker - + `cd deployment` - + `docker-compose up -d` -+ Start vicky - + `cargo run --bin vicky` - -Make sure to set the correct rust log flag according to your needs. \ No newline at end of file diff --git a/vicky/Rocket.example.toml b/vicky/Rocket.example.toml index a06a1fa..28a1709 100644 --- a/vicky/Rocket.example.toml +++ b/vicky/Rocket.example.toml @@ -7,9 +7,9 @@ machines = [ [default.etcd_config] endpoints = [ "https://localhost:2379" ] [default.etcd_config.tls_options] -ca_file = "./certs/Vicky_CA.crt" -certificate_file = "./certs/Vicky.crt" -key_file = "./certs/Vicky.key" +ca_file = "../certs/Vicky_CA.crt" +certificate_file = "../certs/Vicky.crt" +key_file = "../certs/Vicky.key" [default.s3_config] endpoint = "http://localhost:9000" @@ -18,14 +18,6 @@ secret_access_key = "aichudiKohr6aithi4ahh3aeng2eL7xo" region = "us-east-1" log_bucket = "vicky-logs" -[default.oauth.github] -provider = "GitHub" -client_id = "xxx" -client_secret = "xxx" -redirect_uri = "http://localhost:1234/api/auth/callback/github" - - -[default.users.johannwagner] -full_name = "Johann Wagner" -role = "admin" +[default.oidc_config] +jwks_url = "" diff --git a/vicky/src/bin/vicky/auth.rs b/vicky/src/bin/vicky/auth.rs index 09da726..b2fd5e8 100644 --- a/vicky/src/bin/vicky/auth.rs +++ b/vicky/src/bin/vicky/auth.rs @@ -1,63 +1,56 @@ -use anyhow::{Context, Error}; -use reqwest::header::{ACCEPT, AUTHORIZATION, USER_AGENT}; -use rocket::http::{Cookie, CookieJar, SameSite}; +use jwtk::jwk::RemoteJwksVerifier; +use log::{debug, warn}; +use rocket::http::Status; use rocket::{request, State}; -use rocket::response::{Debug, Redirect}; -use rocket::{get}; -use rocket_oauth2::{OAuth2, TokenResponse}; use serde::Deserialize; -use rocket::http::Status; +use serde_json::{Map, Value}; use crate::Config; #[derive(Deserialize, Clone)] #[serde(rename_all = "lowercase")] pub enum Role { - Admin + Admin, } #[derive(Deserialize)] pub struct User { pub full_name: String, - pub role: Role + pub role: Role, } -pub struct Machine { - -} +pub struct Machine {} #[rocket::async_trait] impl<'r> request::FromRequest<'r> for User { type Error = (); async fn from_request(request: &'r request::Request<'_>) -> request::Outcome { - - let cookies = request - .guard::<&CookieJar<'_>>() + let jwks_verifier: &State<_> = request + .guard::<&State>() .await - .expect("request cookies"); + .expect("request KeyStore"); - let config = request - .guard::<&State>() - .await - .expect("request Config"); + if let Some(auth_header) = request.headers().get_one("Authorization") { + if !auth_header.starts_with("Bearer") { + return request::Outcome::Forward(Status::Forbidden); + } - if let Some(cookie) = cookies.get_private("vicky_username") { + let token = auth_header.trim_start_matches("Bearer "); - let username = cookie.value().to_string(); - - let cfg_user = config.users.get(&username); - match cfg_user { - Some(cfg_user) => { - return request::Outcome::Success(User { - full_name: cfg_user.full_name.clone(), - role: cfg_user.role.clone(), + return match jwks_verifier.verify::>(token).await { + Ok(jwt) => { + debug!("{:?}", jwt); + request::Outcome::Success(User { + full_name: "Test Wurst".to_string(), + role: Role::Admin, }) - }, - None => { - return request::Outcome::Error((Status::Forbidden, ())) } - } + Err(x) => { + warn!("Login failed: {:?}", x); + request::Outcome::Error((Status::Forbidden, ())) + } + }; } request::Outcome::Forward(Status::Forbidden) @@ -69,7 +62,6 @@ impl<'r> request::FromRequest<'r> for Machine { type Error = (); async fn from_request(request: &'r request::Request<'_>) -> request::Outcome { - let config = request .guard::<&State>() .await @@ -78,79 +70,12 @@ impl<'r> request::FromRequest<'r> for Machine { if let Some(auth_header) = request.headers().get_one("Authorization") { let cfg_user = config.machines.iter().find(|x| *x == auth_header); - match cfg_user { - Some(_) => { - return request::Outcome::Success(Machine {}) - }, - None => { - return request::Outcome::Error((Status::Forbidden, ())) - } - } + return match cfg_user { + Some(_) => request::Outcome::Success(Machine {}), + None => request::Outcome::Error((Status::Forbidden, ())), + }; } request::Outcome::Forward(Status::Forbidden) } } - -/// User information to be retrieved from the GitHub API. -#[derive(serde::Deserialize)] -pub struct GitHubUserInfo { - #[serde(default)] - login: String, -} - -// NB: Here we are using the same struct as a type parameter to OAuth2 and -// TokenResponse as we use for the user's GitHub login details. For -// `TokenResponse` and `OAuth2` the actual type does not matter; only that they -// are matched up. -#[get("/login/github")] -pub fn github_login(oauth2: OAuth2, cookies: &CookieJar<'_>) -> Redirect { - oauth2.get_redirect(cookies, &["user:read"]).unwrap() -} - -#[get("/callback/github")] -pub async fn github_callback( - token: TokenResponse, - cookies: &CookieJar<'_>, - config: &State, -) -> Result> { - // Use the token to retrieve the user's GitHub account information. - let user_info: GitHubUserInfo = reqwest::Client::builder() - .build() - .context("failed to build reqwest client")? - .get("https://api.github.com/user") - .header(AUTHORIZATION, format!("token {}", token.access_token())) - .header(ACCEPT, "application/vnd.github.v3+json") - .header(USER_AGENT, "Vicky") - .send() - .await - .context("failed to complete request")? - .json() - .await - .context("failed to deserialize response")?; - - - // We only set a cookie, if the user was allowed to do this. - // We also check, if the username within the cookie still matches our list later on. - - let user = config.users.get(&user_info.login); - - if user.is_some() { - // Set a private cookie with the user's name, and redirect to the home page. - cookies.add_private( - Cookie::build(("vicky_username", user_info.login)) - .same_site(SameSite::Lax) - ); - Ok(Redirect::to("/")) - } else { - Ok(Redirect::to("/login/error")) - } - - -} - -#[get("/logout")] -pub fn logout(cookies: &CookieJar<'_>) -> Redirect { - cookies.remove(Cookie::from("vicky_username")); - Redirect::to("/") -} diff --git a/vicky/src/bin/vicky/errors.rs b/vicky/src/bin/vicky/errors.rs index cab040d..6e6373c 100644 --- a/vicky/src/bin/vicky/errors.rs +++ b/vicky/src/bin/vicky/errors.rs @@ -1,22 +1,23 @@ use log::error; -use rocket::{response::Responder, Request, http::Status}; +use rocket::{http::Status, response::Responder, Request}; +use thiserror::Error; use tokio::sync::broadcast::error::SendError; use vickylib::errors::VickyError; -use thiserror::Error; use crate::events::GlobalEvent; #[derive(Error, Debug)] pub enum AppError { - #[error("GlobalEvent Push Error {source:?}")] PushError2 { - #[from] source: SendError, + #[from] + source: SendError, }, #[error("Vicky Error {source:?}")] VickyError { - #[from] source: VickyError, + #[from] + source: VickyError, }, #[error("HTTP Error {0:?}")] @@ -24,12 +25,11 @@ pub enum AppError { #[error("uuid Error {source:?}")] Uuid { - #[from] source: uuid::Error, + #[from] + source: uuid::Error, }, } - - impl<'r, 'o: 'r> Responder<'r, 'o> for AppError { fn respond_to(self, req: &'r Request<'_>) -> rocket::response::Result<'o> { // log `self` to your favored error tracker, e.g. @@ -38,9 +38,7 @@ impl<'r, 'o: 'r> Responder<'r, 'o> for AppError { match self { Self::HttpError(x) => x.respond_to(req), - _ => { - Status::InternalServerError.respond_to(req) - } + _ => Status::InternalServerError.respond_to(req), } } } diff --git a/vicky/src/bin/vicky/events.rs b/vicky/src/bin/vicky/events.rs index d50a8dc..b2ef736 100644 --- a/vicky/src/bin/vicky/events.rs +++ b/vicky/src/bin/vicky/events.rs @@ -1,25 +1,24 @@ +use rocket::response::stream::{Event, EventStream}; use rocket::{get, State}; use serde::{Deserialize, Serialize}; -use rocket::response::stream::{EventStream, Event}; use std::time; -use tokio::sync::broadcast::{error::{TryRecvError}, self}; - +use tokio::sync::broadcast::{self, error::TryRecvError}; #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] #[serde(tag = "type")] pub enum GlobalEvent { TaskAdd, - TaskUpdate { - uuid: uuid::Uuid - } + TaskUpdate { uuid: uuid::Uuid }, } #[get("/")] -pub fn get_global_events(global_events: &State>) -> EventStream![Event + '_] { +pub fn get_global_events( + global_events: &State>, +) -> EventStream![Event + '_] { EventStream! { let mut global_events_rx = global_events.subscribe(); - + loop { let read_val = global_events_rx.try_recv(); @@ -40,4 +39,4 @@ pub fn get_global_events(global_events: &State>) } } } -} \ No newline at end of file +} diff --git a/vicky/src/bin/vicky/main.rs b/vicky/src/bin/vicky/main.rs index 73a7a1e..4e99011 100644 --- a/vicky/src/bin/vicky/main.rs +++ b/vicky/src/bin/vicky/main.rs @@ -1,45 +1,45 @@ -use std::collections::HashMap; +use std::time::Duration; -use auth::User; use aws_sdk_s3::config::{Credentials, Region}; -use etcd_client::{Identity, Certificate, TlsOptions, ConnectOptions}; +use etcd_client::{Certificate, ConnectOptions, Identity, TlsOptions}; +use jwtk::jwk::RemoteJwksVerifier; use log::info; use rand::Rng; use rocket::fairing::AdHoc; +use rocket::figment::providers::{Env, Format, Toml}; use rocket::figment::{Figment, Profile}; -use rocket::figment::providers::{Toml, Env, Format}; use rocket::routes; -use rocket_oauth2::OAuth2; use serde::Deserialize; use tokio::sync::broadcast; -use vickylib::etcd::election::{NodeId, Election}; +use vickylib::etcd::election::{Election, NodeId}; use vickylib::logs::LogDrain; use vickylib::s3::client::S3Client; -use crate::tasks::{tasks_claim, tasks_finish, tasks_get_machine, tasks_get_user, tasks_add, tasks_get_logs, tasks_put_logs, tasks_specific_get_machine, tasks_specific_get_user}; use crate::events::{get_global_events, GlobalEvent}; +use crate::tasks::{ + tasks_add, tasks_claim, tasks_finish, tasks_get_logs, tasks_get_machine, tasks_get_user, + tasks_put_logs, tasks_specific_get_machine, tasks_specific_get_user, +}; -use crate::auth::{github_login, github_callback, logout, GitHubUserInfo}; -use crate::user::{get_user}; +use crate::user::get_user; -mod tasks; mod auth; -mod user; -mod events; mod errors; +mod events; +mod tasks; +mod user; #[derive(Deserialize)] pub struct TlsConfigOptions { ca_file: String, certificate_file: String, key_file: String, - } #[derive(Deserialize)] pub struct EtcdConfig { endpoints: Vec, - tls_options: Option + tls_options: Option, } #[derive(Deserialize)] @@ -48,21 +48,25 @@ pub struct S3Config { access_key_id: String, secret_access_key: String, region: String, - + log_bucket: String, } +#[derive(Deserialize)] +pub struct OIDCConfig { + jwks_url: String, +} + #[derive(Deserialize)] pub struct Config { - users: HashMap, machines: Vec, etcd_config: EtcdConfig, s3_config: S3Config, + oidc_config: OIDCConfig, } - #[tokio::main] async fn main() -> anyhow::Result<()> { env_logger::builder().filter_module("vicky", log::LevelFilter::Debug).init(); @@ -70,8 +74,16 @@ async fn main() -> anyhow::Result<()> { // Took from rocket source code and added .split("__") to be able to add keys in nested structures. let rocket_config_figment = Figment::from(rocket::Config::default()) .merge(Toml::file(Env::var_or("ROCKET_CONFIG", "Rocket.toml")).nested()) - .merge(Env::prefixed("ROCKET_").ignore(&["PROFILE"]).split("__").global()) - .select(Profile::from_env_or("ROCKET_PROFILE", rocket::Config::DEFAULT_PROFILE)); + .merge( + Env::prefixed("ROCKET_") + .ignore(&["PROFILE"]) + .split("__") + .global(), + ) + .select(Profile::from_env_or( + "ROCKET_PROFILE", + rocket::Config::DEFAULT_PROFILE, + )); let build_rocket = rocket::custom(rocket_config_figment); @@ -88,26 +100,38 @@ async fn main() -> anyhow::Result<()> { Some( TlsOptions::new() .ca_certificate(server_root_ca_cert) - .identity(client_identity) + .identity(client_identity), ) - - }, + } None => None, }; - let connect_options = options.map(|options: TlsOptions| ConnectOptions::new().with_tls(options)); - let etcd_client = etcd_client::Client::connect(app_config.etcd_config.endpoints, connect_options).await?; + let connect_options = + options.map(|options: TlsOptions| ConnectOptions::new().with_tls(options)); + let etcd_client = + etcd_client::Client::connect(app_config.etcd_config.endpoints, connect_options).await?; + + let jwks_verifier = RemoteJwksVerifier::new( + app_config.oidc_config.jwks_url, + None, + Duration::from_secs(300), + ); - let aws_cfg = app_config.s3_config; + let aws_cfg = app_config.s3_config; let aws_conf = aws_config::from_env() .endpoint_url(aws_cfg.endpoint) - .credentials_provider(Credentials::new(aws_cfg.access_key_id, aws_cfg.secret_access_key, None, None, "static")) + .credentials_provider(Credentials::new( + aws_cfg.access_key_id, + aws_cfg.secret_access_key, + None, + None, + "static", + )) .region(Region::new(aws_cfg.region)) .load() .await; - - + let s3_client = aws_sdk_s3::Client::new(&aws_conf); let s3_ext_client_drain = S3Client::new(s3_client.clone(), aws_cfg.log_bucket.clone()); let s3_ext_client = S3Client::new(s3_client, aws_cfg.log_bucket.clone()); @@ -130,13 +154,25 @@ async fn main() -> anyhow::Result<()> { .manage(etcd_client) .manage(s3_ext_client) .manage(log_drain) + .manage(jwks_verifier) .manage(tx_global_events) - .attach(OAuth2::::fairing("github")) .attach(AdHoc::config::()) .mount("/api/v1/user", routes![get_user]) - .mount("/api/v1/auth", routes![github_login, github_callback, logout]) .mount("/api/v1/events", routes![get_global_events]) - .mount("/api/v1/tasks", routes![tasks_get_machine, tasks_get_user, tasks_specific_get_machine, tasks_specific_get_user, tasks_claim, tasks_finish, tasks_add, tasks_get_logs, tasks_put_logs]) + .mount( + "/api/v1/tasks", + routes![ + tasks_get_machine, + tasks_get_user, + tasks_specific_get_machine, + tasks_specific_get_user, + tasks_claim, + tasks_finish, + tasks_add, + tasks_get_logs, + tasks_put_logs + ], + ) .launch() .await?; diff --git a/vicky/src/bin/vicky/tasks.rs b/vicky/src/bin/vicky/tasks.rs index 9e37708..de7e39e 100644 --- a/vicky/src/bin/vicky/tasks.rs +++ b/vicky/src/bin/vicky/tasks.rs @@ -1,23 +1,36 @@ -use etcd_client::{Client}; -use rocket::{get, post, State, serde::json::Json}; +use etcd_client::Client; +use rocket::http::Status; +use rocket::response::stream::{Event, EventStream}; +use rocket::{get, post, serde::json::Json, State}; use serde::{Deserialize, Serialize}; -use uuid::Uuid; -use vickylib::{documents::{Task, TaskStatus, TaskResult, FlakeRef, Lock, DocumentClient}, vicky::{scheduler::Scheduler}, logs::LogDrain, s3::client::S3Client, errors::VickyError}; -use rocket::response::stream::{EventStream, Event}; use std::time; -use tokio::sync::broadcast::{error::{TryRecvError}, self}; -use rocket::{http::Status}; - - -use crate::{auth::{User, Machine}, errors::AppError, events::GlobalEvent}; - +use tokio::sync::broadcast::{self, error::TryRecvError}; +use uuid::Uuid; +use vickylib::{ + documents::{DocumentClient, FlakeRef, Lock, Task, TaskResult, TaskStatus}, + errors::VickyError, + logs::LogDrain, + s3::client::S3Client, + vicky::scheduler::Scheduler, +}; +use crate::{ + auth::{Machine, User}, + errors::AppError, + events::GlobalEvent, +}; #[derive(Debug, PartialEq, Serialize, Deserialize)] pub struct RoTaskNew { display_name: String, flake_ref: FlakeRef, locks: Vec, + features: Vec, +} + +#[derive(Debug, PartialEq, Serialize, Deserialize)] +pub struct RoTaskClaim { + features: Vec, } #[derive(Debug, PartialEq, Serialize, Deserialize)] @@ -33,41 +46,65 @@ pub struct RoTaskFinish { #[derive(Debug, PartialEq, Serialize, Deserialize)] pub struct LogLines { - lines: Vec + lines: Vec, } #[get("/")] -pub async fn tasks_get_user(etcd: &State, _user: User) -> Result>, VickyError> { +pub async fn tasks_get_user( + etcd: &State, + _user: User, +) -> Result>, VickyError> { let tasks: Vec = etcd.get_all_tasks().await?; Ok(Json(tasks)) } -#[get("/", rank=2)] -pub async fn tasks_get_machine(etcd: &State, _machine: Machine) -> Result>, VickyError> { +#[get("/", rank = 2)] +pub async fn tasks_get_machine( + etcd: &State, + _machine: Machine, +) -> Result>, VickyError> { let tasks: Vec = etcd.get_all_tasks().await?; Ok(Json(tasks)) } #[get("/")] -pub async fn tasks_specific_get_user(id: String, etcd: &State, _user: User) -> Result>, VickyError> { +pub async fn tasks_specific_get_user( + id: String, + etcd: &State, + _user: User, +) -> Result>, VickyError> { let task_uuid = Uuid::parse_str(&id).unwrap(); let tasks: Option = etcd.get_task(task_uuid).await?; Ok(Json(tasks)) } -#[get("/", rank=2)] -pub async fn tasks_specific_get_machine(id: String, etcd: &State, _machine: Machine) -> Result>, VickyError> { +#[get("/", rank = 2)] +pub async fn tasks_specific_get_machine( + id: String, + etcd: &State, + _machine: Machine, +) -> Result>, VickyError> { let task_uuid = Uuid::parse_str(&id).unwrap(); let tasks: Option = etcd.get_task(task_uuid).await?; Ok(Json(tasks)) } #[get("//logs")] -pub async fn tasks_get_logs<'a>(id: String, etcd: &State, s3: &'a State, _user: User, log_drain: &'a State<&'_ LogDrain>) -> EventStream![Event + 'a] { +pub async fn tasks_get_logs<'a>( + id: String, + etcd: &State, + s3: &'a State, + _user: User, + log_drain: &'a State<&'_ LogDrain>, +) -> EventStream![Event + 'a] { // TODO: Fix Error Handling let task_uuid = Uuid::parse_str(&id).unwrap(); - let task = etcd.get_task(task_uuid).await.unwrap().ok_or(AppError::HttpError(Status::NotFound)).unwrap(); - + let task = etcd + .get_task(task_uuid) + .await + .unwrap() + .ok_or(AppError::HttpError(Status::NotFound)) + .unwrap(); EventStream! { @@ -80,10 +117,10 @@ pub async fn tasks_get_logs<'a>(id: String, etcd: &State, s3: &'a State< for element in existing_log_messages { yield Event::data(element) } - + loop { let read_val = recv.try_recv(); - + match read_val { Ok((task_id, log_text)) => { if task_id == id { @@ -94,7 +131,7 @@ pub async fn tasks_get_logs<'a>(id: String, etcd: &State, s3: &'a State< break; }, Err(TryRecvError::Lagged(_)) => { - // Immediate Retry, doing our best efford ehre. + // Immediate Retry, doing our best effort here. }, Err(TryRecvError::Empty) => { tokio::time::sleep(time::Duration::from_millis(100)).await; @@ -112,48 +149,74 @@ pub async fn tasks_get_logs<'a>(id: String, etcd: &State, s3: &'a State< } }, } - + } } #[post("//logs", format = "json", data = "")] -pub async fn tasks_put_logs(id: String, etcd: &State, logs: Json, _machine: Machine, log_drain: &State<&LogDrain>) -> Result, AppError> { +pub async fn tasks_put_logs( + id: String, + etcd: &State, + logs: Json, + _machine: Machine, + log_drain: &State<&LogDrain>, +) -> Result, AppError> { let task_uuid = Uuid::parse_str(&id)?; - let task = etcd.get_task(task_uuid).await?.ok_or(AppError::HttpError(Status::NotFound))?; + let task = etcd + .get_task(task_uuid) + .await? + .ok_or(AppError::HttpError(Status::NotFound))?; match task.status { TaskStatus::RUNNING => { log_drain.push_logs(id, logs.lines.clone())?; Ok(Json(())) - }, - _ => { - Err(AppError::HttpError(Status::Locked))? } + _ => Err(AppError::HttpError(Status::Locked))?, } } -#[post("/claim")] -pub async fn tasks_claim(etcd: &State, global_events: &State>, _machine: Machine) -> Result>, AppError> { +#[post("/claim", format = "json", data = "")] +pub async fn tasks_claim( + etcd: &State, + features: Json, + global_events: &State>, + _machine: Machine, +) -> Result>, AppError> { let tasks = etcd.get_all_tasks().await?; - let scheduler = Scheduler::new(tasks).map_err(|x| VickyError::Scheduler { source: x })?; + let scheduler = Scheduler::new(tasks, &features.features) + .map_err(|x| VickyError::Scheduler { source: x })?; let next_task = scheduler.get_next_task(); match next_task { Some(next_task) => { - let mut task = etcd.get_task(next_task.id).await?.ok_or(AppError::HttpError(Status::NotFound))?; + let mut task = etcd + .get_task(next_task.id) + .await? + .ok_or(AppError::HttpError(Status::NotFound))?; task.status = TaskStatus::RUNNING; etcd.put_task(&task).await?; global_events.send(GlobalEvent::TaskUpdate { uuid: task.id })?; Ok(Json(Some(task))) - }, + } None => Ok(Json(None)), } } #[post("//finish", format = "json", data = "")] -pub async fn tasks_finish(id: String, finish: Json, etcd: &State, global_events: &State>, _machine: Machine, log_drain: &State<&LogDrain>) -> Result, AppError> { +pub async fn tasks_finish( + id: String, + finish: Json, + etcd: &State, + global_events: &State>, + _machine: Machine, + log_drain: &State<&LogDrain>, +) -> Result, AppError> { let task_uuid = Uuid::parse_str(&id)?; - let mut task = etcd.get_task(task_uuid).await?.ok_or(AppError::HttpError(Status::NotFound))?; + let mut task = etcd + .get_task(task_uuid) + .await? + .ok_or(AppError::HttpError(Status::NotFound))?; log_drain.finish_logs(&id).await?; @@ -165,15 +228,24 @@ pub async fn tasks_finish(id: String, finish: Json, etcd: &State, etcd: &State, global_events: &State>, _machine: Machine) -> Result, AppError> { +pub async fn tasks_add( + task: Json, + etcd: &State, + global_events: &State>, + _machine: Machine, +) -> Result, AppError> { let task_uuid = Uuid::new_v4(); - let task_manifest = Task { + let task_manifest = Task { id: task_uuid, status: TaskStatus::NEW, locks: task.locks.clone(), display_name: task.display_name.clone(), - flake_ref: FlakeRef { flake: task.flake_ref.flake.clone(), args: task.flake_ref.args.clone() }, + flake_ref: FlakeRef { + flake: task.flake_ref.flake.clone(), + args: task.flake_ref.args.clone(), + }, + features: task.features.clone(), }; etcd.put_task(&task_manifest).await?; @@ -181,7 +253,7 @@ pub async fn tasks_add(task: Json, etcd: &State, global_event let ro_task = RoTask { id: task_uuid, - status: TaskStatus::NEW + status: TaskStatus::NEW, }; Ok(Json(ro_task)) diff --git a/vicky/src/bin/vicky/user.rs b/vicky/src/bin/vicky/user.rs index 4a08261..f583832 100644 --- a/vicky/src/bin/vicky/user.rs +++ b/vicky/src/bin/vicky/user.rs @@ -1,10 +1,8 @@ use rocket::{get, serde::json::Json}; -use serde::{Serialize, Deserialize}; +use serde::{Deserialize, Serialize}; use crate::{auth::User, errors::AppError}; - - #[derive(Debug, PartialEq, Serialize, Deserialize)] pub struct Me { @@ -13,13 +11,11 @@ pub struct Me { } #[get("/")] -pub fn get_user(user: User) -> Result, AppError> { - +pub fn get_user(user: User) -> Result, AppError> { let me = Me { full_name: user.full_name, role: String::from("admin"), }; Ok(Json(me)) - -} \ No newline at end of file +} diff --git a/vicky/src/lib/documents/mod.rs b/vicky/src/lib/documents/mod.rs index 1f2401b..c8fb9a6 100644 --- a/vicky/src/lib/documents/mod.rs +++ b/vicky/src/lib/documents/mod.rs @@ -1,11 +1,10 @@ - use async_trait::async_trait; use etcd_client::GetOptions; -use serde::{Serialize, Deserialize}; +use serde::{Deserialize, Serialize}; use uuid::Uuid; -use crate::{etcd::client::ClientExt, errors::VickyError}; +use crate::{errors::VickyError, etcd::client::ClientExt}; #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] #[serde(tag = "result")] @@ -21,19 +20,14 @@ pub enum TaskResult { pub enum TaskStatus { NEW, RUNNING, - FINISHED(TaskResult) - + FINISHED(TaskResult), } #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] #[serde(tag = "type")] pub enum Lock { - WRITE { - object: String - }, - READ { - object: String - }, + WRITE { object: String }, + READ { object: String }, } type FlakeURI = String; @@ -44,7 +38,6 @@ pub struct FlakeRef { pub args: Vec, } - #[derive(Debug, PartialEq, Serialize, Deserialize)] pub struct Task { pub id: Uuid, @@ -52,6 +45,7 @@ pub struct Task { pub status: TaskStatus, pub locks: Vec, pub flake_ref: FlakeRef, + pub features: Vec, } #[async_trait] @@ -59,15 +53,22 @@ pub trait DocumentClient { async fn get_all_tasks(&self) -> Result, VickyError>; async fn get_task(&self, task_id: Uuid) -> Result, VickyError>; async fn put_task(&self, task: &Task) -> Result<(), VickyError>; - } #[async_trait] impl DocumentClient for etcd_client::Client { async fn get_all_tasks(&self) -> Result, VickyError> { let mut kv = self.kv_client(); - let get_options: GetOptions = GetOptions::new().with_prefix().with_sort(etcd_client::SortTarget::Create, etcd_client::SortOrder::Descend); - let tasks: Vec = kv.get_yaml_list("vicky.wobcom.de/task/manifest".to_string(), Some(get_options)).await?; + let get_options: GetOptions = GetOptions::new().with_prefix().with_sort( + etcd_client::SortTarget::Create, + etcd_client::SortOrder::Descend, + ); + let tasks: Vec = kv + .get_yaml_list( + "vicky.wobcom.de/task/manifest".to_string(), + Some(get_options), + ) + .await?; Ok(tasks) } @@ -84,4 +85,4 @@ impl DocumentClient for etcd_client::Client { kv.put_yaml(key, &task, None).await?; Ok(()) } -} \ No newline at end of file +} diff --git a/vicky/src/lib/errors.rs b/vicky/src/lib/errors.rs index aaaed97..1bc5484 100644 --- a/vicky/src/lib/errors.rs +++ b/vicky/src/lib/errors.rs @@ -1,6 +1,9 @@ -use aws_sdk_s3::{operation::{put_object::PutObjectError, get_object::GetObjectError}, primitives::ByteStreamError}; +use aws_sdk_s3::{ + operation::{get_object::GetObjectError, put_object::PutObjectError}, + primitives::ByteStreamError, +}; use log::error; -use rocket::{response::Responder, Request, http::Status}; +use rocket::{http::Status, response::Responder, Request}; use thiserror::Error; use tokio::sync::broadcast::error::SendError; @@ -8,43 +11,46 @@ use tokio::sync::broadcast::error::SendError; pub enum VickyError { #[error("serde_json Error {source:?}")] SerdeJson { - #[from] source: serde_json::Error, + #[from] + source: serde_json::Error, }, #[error("serde_yaml Error {source:?}")] SerdeYaml { - #[from] source: serde_yaml::Error, + #[from] + source: serde_yaml::Error, }, #[error("etcd Error {source:?}")] EtcdClient { - #[from] source: etcd_client::Error, + #[from] + source: etcd_client::Error, }, - - #[error("Scheduling Error {source:?}")] Scheduler { - #[from] source: SchedulerError, + #[from] + source: SchedulerError, }, #[error("Log Push Error {source:?}")] PushError { - #[from] source: SendError<(String, String)>, + #[from] + source: SendError<(String, String)>, }, #[error("S3 Client Error {source:?}")] S3ClientError { - #[from] source: S3ClientError, - } + #[from] + source: S3ClientError, + }, } #[derive(Error, Debug)] pub enum SchedulerError { #[error("Invalid Scheduling")] - GeneralSchedulingError + GeneralSchedulingError, } - #[derive(Error, Debug)] pub enum S3ClientError { #[error("Object Already Exists")] @@ -52,24 +58,27 @@ pub enum S3ClientError { #[error(transparent)] SdkError { - #[from] source: aws_sdk_s3::Error, + #[from] + source: aws_sdk_s3::Error, }, #[error(transparent)] SdkPutObjectError { - #[from] source: aws_sdk_s3::error::SdkError, + #[from] + source: aws_sdk_s3::error::SdkError, }, #[error(transparent)] SdkGetObjectError { - #[from] source: aws_sdk_s3::error::SdkError, + #[from] + source: aws_sdk_s3::error::SdkError, }, #[error(transparent)] ByteStreamError { - #[from] source: ByteStreamError, - } - + #[from] + source: ByteStreamError, + }, } impl<'r, 'o: 'r> Responder<'r, 'o> for VickyError { diff --git a/vicky/src/lib/etcd/client.rs b/vicky/src/lib/etcd/client.rs index 061def7..476ef9e 100644 --- a/vicky/src/lib/etcd/client.rs +++ b/vicky/src/lib/etcd/client.rs @@ -1,13 +1,10 @@ use async_trait::async_trait; -use etcd_client::{ - GetOptions, PutOptions, KvClient -}; +use etcd_client::{GetOptions, KvClient, PutOptions}; use serde::de::DeserializeOwned; -use serde::{Serialize}; +use serde::Serialize; use crate::errors::VickyError; - #[async_trait] pub trait ClientExt { async fn get_yaml_list( @@ -75,4 +72,4 @@ impl ClientExt for KvClient { self.put(key, yaml_str, options).await?; return Ok(()); } -} \ No newline at end of file +} diff --git a/vicky/src/lib/etcd/election.rs b/vicky/src/lib/etcd/election.rs index fab3e34..dd98831 100644 --- a/vicky/src/lib/etcd/election.rs +++ b/vicky/src/lib/etcd/election.rs @@ -1,7 +1,7 @@ -use etcd_client::{ElectionClient, Client, LeaseClient}; -use log::{debug}; +use etcd_client::{Client, ElectionClient, LeaseClient}; +use log::debug; use std::sync::Arc; -use std::{time}; +use std::time; use tokio::sync::Mutex; use crate::errors::VickyError; @@ -10,7 +10,6 @@ const ELECTION_NAME: &str = "vicky.wobcom.de/leader-election"; pub type NodeId = String; - enum ElectionState { Idle, Waiting, @@ -97,7 +96,6 @@ impl Election { let mut lease_client = self.lease_client.clone(); - // tokio does some funky stuff here, it blocks the requests sometimes. tokio::spawn(async move { loop { diff --git a/vicky/src/lib/etcd/mod.rs b/vicky/src/lib/etcd/mod.rs index 5455128..ae6a77c 100644 --- a/vicky/src/lib/etcd/mod.rs +++ b/vicky/src/lib/etcd/mod.rs @@ -1,4 +1,2 @@ - - pub mod client; pub mod election; diff --git a/vicky/src/lib/lib.rs b/vicky/src/lib/lib.rs index abcd671..b8a8707 100644 --- a/vicky/src/lib/lib.rs +++ b/vicky/src/lib/lib.rs @@ -1,6 +1,6 @@ -pub mod etcd; pub mod documents; -pub mod vicky; +pub mod errors; +pub mod etcd; pub mod logs; pub mod s3; -pub mod errors; \ No newline at end of file +pub mod vicky; diff --git a/vicky/src/lib/logs/mod.rs b/vicky/src/lib/logs/mod.rs index 2e0d50c..39d0acd 100644 --- a/vicky/src/lib/logs/mod.rs +++ b/vicky/src/lib/logs/mod.rs @@ -1,13 +1,13 @@ -use std::{collections::{VecDeque, HashMap}}; -use std::{time}; +use std::collections::{HashMap, VecDeque}; +use std::time; + use rocket::futures::lock::Mutex; -use tokio::sync::broadcast::{Sender, self, error::{TryRecvError}}; +use tokio::sync::broadcast::{self, error::TryRecvError, Sender}; -use crate::{s3::client::S3Client, errors::{VickyError} }; +use crate::{errors::VickyError, s3::client::S3Client}; const LOG_BUFFER: usize = 10000; - pub struct LogDrain { pub send_handle: Sender<(String, String)>, @@ -18,17 +18,15 @@ pub struct LogDrain { } impl LogDrain { - pub fn new(s3_client: S3Client) -> &'static LogDrain { let (tx, rx1) = broadcast::channel(1000); let s3_client_m = Box::leak(Box::new(s3_client.clone())); - - let ld: LogDrain = LogDrain { - send_handle: tx, + let ld: LogDrain = LogDrain { + send_handle: tx, live_log_buffers: Mutex::new(HashMap::new()), push_log_buffers: Mutex::new(HashMap::new()), - + s3_client, }; @@ -36,15 +34,11 @@ impl LogDrain { let rx1r = Box::leak(Box::new(rx1)); tokio::spawn(async { - - loop { let read_val = rx1r.try_recv(); match read_val { Ok((task_id, log_text)) => { - - { let mut llb = ldr.live_log_buffers.lock().await; @@ -65,31 +59,32 @@ impl LogDrain { if !push_log_buffers.contains_key(&task_id) { push_log_buffers.insert(task_id.clone(), vec![]); } - + let push_log_buffer = push_log_buffers.get_mut(&task_id).unwrap(); push_log_buffer.push(log_text.clone()); - - + // TODO: Figure out a good buffer length for our use case. if push_log_buffer.len() > 16 { // Push buffer to S3 - - s3_client_m.upload_log_parts(&task_id, push_log_buffer.to_vec()).await.unwrap(); + + s3_client_m + .upload_log_parts(&task_id, push_log_buffer.to_vec()) + .await + .unwrap(); push_log_buffer.clear() } - } - }, + } Err(TryRecvError::Closed) => { // TODO: Do something about this. // Technically, this should not happen, because we control all of the send handles. - }, + } Err(TryRecvError::Lagged(_)) => { // Immediate Retry, doing our best efford ehre. - }, + } Err(TryRecvError::Empty) => { tokio::time::sleep(time::Duration::from_millis(10)).await; - }, + } } } }); @@ -97,7 +92,6 @@ impl LogDrain { ldr } - pub fn push_logs(&self, task_id: String, logs: Vec) -> Result<(), VickyError> { for log in logs { self.send_handle.send((task_id.clone(), log))?; @@ -107,20 +101,26 @@ impl LogDrain { } pub async fn get_logs(&self, task_id: String) -> Option> { - let new_vec: Vec = self.live_log_buffers.lock().await.get(&task_id)?.clone().into(); + let new_vec: Vec = self + .live_log_buffers + .lock() + .await + .get(&task_id)? + .clone() + .into(); Some(new_vec) } pub async fn finish_logs(&self, task_id: &str) -> Result<(), VickyError> { let mut push_log_buffers = self.push_log_buffers.lock().await; if let Some(push_log_buffer) = push_log_buffers.get_mut(task_id) { - if !push_log_buffer.is_empty(){ - self.s3_client.upload_log_parts(task_id, push_log_buffer.to_vec()).await?; + if !push_log_buffer.is_empty() { + self.s3_client + .upload_log_parts(task_id, push_log_buffer.to_vec()) + .await?; } push_log_buffers.remove(task_id); } Ok(()) } - } - diff --git a/vicky/src/lib/s3/client.rs b/vicky/src/lib/s3/client.rs index ed681b9..13228aa 100644 --- a/vicky/src/lib/s3/client.rs +++ b/vicky/src/lib/s3/client.rs @@ -1,41 +1,54 @@ use aws_sdk_s3::primitives::ByteStream; -use log::{info}; +use log::info; use crate::errors::S3ClientError; -#[derive(Clone, )] +#[derive(Clone)] pub struct S3Client { inner: aws_sdk_s3::Client, bucket: String, } impl S3Client { - pub fn new(inner: aws_sdk_s3::Client, bucket: String) -> Self { S3Client { inner, bucket } } pub async fn get_logs(&self, task_id: &str) -> Result, S3ClientError> { - let key = format!("vicky-logs/{}.log", task_id); - let get_object_result = self.inner.get_object() + let get_object_result = self + .inner + .get_object() .bucket(self.bucket.clone()) - .key(key.clone()).send().await?; + .key(key.clone()) + .send() + .await?; let existing_vec = get_object_result.body.collect().await?; - let res = String::from_utf8(existing_vec.to_vec()).unwrap().split('\n').map(|x| x.to_string()).collect(); + let res = String::from_utf8(existing_vec.to_vec()) + .unwrap() + .split('\n') + .map(|x| x.to_string()) + .collect(); Ok(res) } - pub async fn upload_log_parts(&self, task_id: &str, log_lines: Vec) -> Result<(), S3ClientError> { - + pub async fn upload_log_parts( + &self, + task_id: &str, + log_lines: Vec, + ) -> Result<(), S3ClientError> { let key = format!("vicky-logs/{}.log", task_id); info!("Checking, if {} already exists", key); - let get_object_result = self.inner.get_object() + let get_object_result = self + .inner + .get_object() .bucket(self.bucket.clone()) - .key(key.clone()).send().await; + .key(key.clone()) + .send() + .await; let mut new_vec = vec![]; @@ -44,22 +57,26 @@ impl S3Client { info!("{} already exists, downloading...", key); let existing_vec = gor.body.collect().await?; new_vec.append(&mut existing_vec.to_vec()); - }, + } // This object does not exist, this is fine, currently there is no better way to do this. Err(_) => { info!("{} does not exist", key); } } - let new_line_log_lines: Vec = log_lines.iter().map(|x| format!("{}\n", x)).collect(); + let new_line_log_lines: Vec = + log_lines.iter().map(|x| format!("{}\n", x)).collect(); new_vec.append(&mut new_line_log_lines.join("").as_bytes().to_vec()); let bs = ByteStream::from(new_vec); info!("Uploading {}", key); - self.inner.put_object().bucket(&self.bucket).key(&key).body(bs).send().await?; + self.inner + .put_object() + .bucket(&self.bucket) + .key(&key) + .body(bs) + .send() + .await?; Ok(()) } - } - - diff --git a/vicky/src/lib/s3/mod.rs b/vicky/src/lib/s3/mod.rs index 3935517..b9babe5 100644 --- a/vicky/src/lib/s3/mod.rs +++ b/vicky/src/lib/s3/mod.rs @@ -1 +1 @@ -pub mod client; \ No newline at end of file +pub mod client; diff --git a/vicky/src/lib/vicky/mod.rs b/vicky/src/lib/vicky/mod.rs index f44ba73..81b3546 100644 --- a/vicky/src/lib/vicky/mod.rs +++ b/vicky/src/lib/vicky/mod.rs @@ -1 +1 @@ -pub mod scheduler; \ No newline at end of file +pub mod scheduler; diff --git a/vicky/src/lib/vicky/scheduler.rs b/vicky/src/lib/vicky/scheduler.rs index 6840627..f7738d5 100644 --- a/vicky/src/lib/vicky/scheduler.rs +++ b/vicky/src/lib/vicky/scheduler.rs @@ -2,7 +2,10 @@ use std::collections::HashMap; use log::debug; -use crate::{documents::{Lock, Task, TaskStatus}, errors::SchedulerError}; +use crate::{ + documents::{Lock, Task, TaskStatus}, + errors::SchedulerError, +}; type Constraints = HashMap; @@ -11,20 +14,13 @@ trait ConstraintMgmt { fn get_mut_lock_sum(&mut self, lock: &Lock) -> Option<&mut LockSum>; fn get_lock_sum(&self, lock: &Lock) -> Option<&LockSum>; fn insert_lock(&mut self, lock: &Lock) -> Result<(), SchedulerError>; - - } impl ConstraintMgmt for Constraints { - fn get_map_key(lock: &Lock) -> &String { match lock { - Lock::WRITE { object } => { - object - }, - Lock::READ { object } => { - object - }, + Lock::WRITE { object } => object, + Lock::READ { object } => object, } } @@ -43,29 +39,29 @@ impl ConstraintMgmt for Constraints { Some(c) => { debug!("Found existing LockSum {:?}", lock); c.add_lock(lock)?; - }, + } None => { debug!("Found no LockSum"); let object = Constraints::get_map_key(lock); self.insert(object.clone(), LockSum::from_lock(lock)); - }, + } } Ok(()) } - - } - struct LockSum { lock: Lock, - count: i32 + count: i32, } impl LockSum { pub fn from_lock(lock: &Lock) -> LockSum { - LockSum { lock: lock.clone(), count: 1 } + LockSum { + lock: lock.clone(), + count: 1, + } } pub fn can_add_lock(&self, lock: &Lock) -> bool { @@ -78,7 +74,6 @@ impl LockSum { } pub fn add_lock(&mut self, lock: &Lock) -> Result<(), SchedulerError> { - let can_add_lock = self.can_add_lock(lock); if !can_add_lock { return Err(SchedulerError::GeneralSchedulingError); @@ -88,27 +83,23 @@ impl LockSum { Lock::READ { object: _ } => { self.count += 1; Ok(()) - }, - _ => unreachable!() + } + _ => unreachable!(), } } } - pub struct Scheduler { constraints: Constraints, tasks: Vec, + machine_features: Vec, } - impl Scheduler { - - pub fn new(tasks: Vec) -> Result { - + pub fn new(tasks: Vec, machine_features: &[String]) -> Result { let mut constraints: Constraints = HashMap::new(); for task in &tasks { - if task.status != TaskStatus::RUNNING { continue; } @@ -121,150 +112,322 @@ impl Scheduler { let s = Scheduler { constraints, tasks, + machine_features: machine_features.to_vec(), }; Ok(s) } + fn is_unconstrained(&self, task: &Task) -> bool { + task.locks.iter().all(|lock| { + self.constraints + .get_lock_sum(lock) + .map_or(true, |ls| ls.can_add_lock(lock)) + }) + } - pub fn get_next_task(self) -> Option { - - for task in self.tasks { - if task.status != TaskStatus::NEW { - continue; - } - - let mut has_conflicts = false; - - for lock in &task.locks { - let lock_sum = self.constraints.get_lock_sum(lock); - match lock_sum { - Some(ls) => { - if !ls.can_add_lock(lock) { - has_conflicts = true; - } - }, - None => continue, - } - } - - if !has_conflicts { - return Some(task) - } - } + fn supports_all_features(&self, task: &Task) -> bool { + task.features + .iter() + .all(|feat| self.machine_features.contains(feat)) + } - None + fn should_pick_task(&self, task: &Task) -> bool { + task.status == TaskStatus::NEW + && self.supports_all_features(task) + && self.is_unconstrained(task) + } - } + pub fn get_next_task(mut self) -> Option { + self.tasks + .iter() + .position(|task| self.should_pick_task(task)) + .map(|idx| self.tasks.remove(idx)) + } } - #[cfg(test)] mod tests { use uuid::Uuid; - use crate::documents::{TaskStatus, Task, FlakeRef, Lock}; + use crate::documents::{FlakeRef, Lock, Task, TaskStatus}; use super::Scheduler; #[test] fn scheduler_creation_no_constraints() { - let tasks = vec![ - Task { id: Uuid::new_v4(), display_name: String::from("Test 1"), status: TaskStatus::RUNNING, locks: vec![], flake_ref: FlakeRef { flake: String::from(""), args: vec![] } }, - Task { id: Uuid::new_v4(), display_name: String::from("Test 2"), status: TaskStatus::RUNNING, locks: vec![], flake_ref: FlakeRef { flake: String::from(""), args: vec![] } }, + Task { + id: Uuid::new_v4(), + display_name: String::from("Test 1"), + status: TaskStatus::RUNNING, + locks: vec![], + flake_ref: FlakeRef { + flake: String::from(""), + args: vec![], + }, + features: vec![], + }, + Task { + id: Uuid::new_v4(), + display_name: String::from("Test 2"), + status: TaskStatus::RUNNING, + locks: vec![], + flake_ref: FlakeRef { + flake: String::from(""), + args: vec![], + }, + features: vec![], + }, ]; - Scheduler::new(tasks).unwrap(); - + Scheduler::new(tasks, &[]).unwrap(); } #[test] fn scheduler_creation_multiple_read_constraints() { - let tasks = vec![ - Task { id: Uuid::new_v4(), display_name: String::from("Test 1"), status: TaskStatus::RUNNING, locks: vec![Lock::READ { object: String::from("foo1") }], flake_ref: FlakeRef { flake: String::from(""), args: vec![] } }, - Task { id: Uuid::new_v4(), display_name: String::from("Test 2"), status: TaskStatus::RUNNING, locks: vec![Lock::READ { object: String::from("foo1") }], flake_ref: FlakeRef { flake: String::from(""), args: vec![] } }, + Task { + id: Uuid::new_v4(), + display_name: String::from("Test 1"), + status: TaskStatus::RUNNING, + locks: vec![Lock::READ { + object: String::from("foo1"), + }], + flake_ref: FlakeRef { + flake: String::from(""), + args: vec![], + }, + features: vec![], + }, + Task { + id: Uuid::new_v4(), + display_name: String::from("Test 2"), + status: TaskStatus::RUNNING, + locks: vec![Lock::READ { + object: String::from("foo1"), + }], + flake_ref: FlakeRef { + flake: String::from(""), + args: vec![], + }, + features: vec![], + }, ]; - Scheduler::new(tasks).unwrap(); - + Scheduler::new(tasks, &[]).unwrap(); } #[test] fn scheduler_creation_single_write_constraints() { - let tasks = vec![ - Task { id: Uuid::new_v4(), display_name: String::from("Test 1"), status: TaskStatus::RUNNING, locks: vec![Lock::WRITE { object: String::from("foo1") }], flake_ref: FlakeRef { flake: String::from(""), args: vec![] } }, - Task { id: Uuid::new_v4(), display_name: String::from("Test 2"), status: TaskStatus::RUNNING, locks: vec![Lock::WRITE { object: String::from("foo2") }], flake_ref: FlakeRef { flake: String::from(""), args: vec![] } }, + Task { + id: Uuid::new_v4(), + display_name: String::from("Test 1"), + status: TaskStatus::RUNNING, + locks: vec![Lock::WRITE { + object: String::from("foo1"), + }], + flake_ref: FlakeRef { + flake: String::from(""), + args: vec![], + }, + features: vec![], + }, + Task { + id: Uuid::new_v4(), + display_name: String::from("Test 2"), + status: TaskStatus::RUNNING, + locks: vec![Lock::WRITE { + object: String::from("foo2"), + }], + flake_ref: FlakeRef { + flake: String::from(""), + args: vec![], + }, + features: vec![], + }, ]; - Scheduler::new(tasks).unwrap(); - + Scheduler::new(tasks, &[]).unwrap(); } #[test] fn scheduler_creation_multiple_write_constraints() { - let tasks = vec![ - Task { id: Uuid::new_v4(), display_name: String::from("Test 1"), status: TaskStatus::RUNNING, locks: vec![Lock::WRITE { object: String::from("foo1") }], flake_ref: FlakeRef { flake: String::from(""), args: vec![] } }, - Task { id: Uuid::new_v4(), display_name: String::from("Test 2"), status: TaskStatus::RUNNING, locks: vec![Lock::WRITE { object: String::from("foo1") }], flake_ref: FlakeRef { flake: String::from(""), args: vec![] } }, + Task { + id: Uuid::new_v4(), + display_name: String::from("Test 1"), + status: TaskStatus::RUNNING, + locks: vec![Lock::WRITE { + object: String::from("foo1"), + }], + flake_ref: FlakeRef { + flake: String::from(""), + args: vec![], + }, + features: vec![], + }, + Task { + id: Uuid::new_v4(), + display_name: String::from("Test 2"), + status: TaskStatus::RUNNING, + locks: vec![Lock::WRITE { + object: String::from("foo1"), + }], + flake_ref: FlakeRef { + flake: String::from(""), + args: vec![], + }, + features: vec![], + }, ]; - - let res = Scheduler::new(tasks); - assert!(res.is_err()); + let res = Scheduler::new(tasks, &[]); + assert!(res.is_err()); } #[test] fn scheduler_no_new_task() { let tasks = vec![ - Task { id: Uuid::new_v4(), display_name: String::from("Test 1"), status: TaskStatus::RUNNING, locks: vec![Lock::WRITE { object: String::from("foo1") }], flake_ref: FlakeRef { flake: String::from(""), args: vec![] } }, - Task { id: Uuid::new_v4(), display_name: String::from("Test 2"), status: TaskStatus::NEW, locks: vec![Lock::WRITE { object: String::from("foo1") }], flake_ref: FlakeRef { flake: String::from(""), args: vec![] } }, + Task { + id: Uuid::new_v4(), + display_name: String::from("Test 1"), + status: TaskStatus::RUNNING, + locks: vec![Lock::WRITE { + object: String::from("foo1"), + }], + flake_ref: FlakeRef { + flake: String::from(""), + args: vec![], + }, + features: vec![], + }, + Task { + id: Uuid::new_v4(), + display_name: String::from("Test 2"), + status: TaskStatus::NEW, + locks: vec![Lock::WRITE { + object: String::from("foo1"), + }], + flake_ref: FlakeRef { + flake: String::from(""), + args: vec![], + }, + features: vec![], + }, ]; - - let res = Scheduler::new(tasks).unwrap(); + + let res = Scheduler::new(tasks, &[]).unwrap(); // Test 1 is currently running and has the write lock assert_eq!(res.get_next_task(), None) - } #[test] fn scheduler_new_task() { let tasks = vec![ - Task { id: Uuid::new_v4(), display_name: String::from("Test 1"), status: TaskStatus::RUNNING, locks: vec![Lock::WRITE { object: String::from("foo1") }], flake_ref: FlakeRef { flake: String::from(""), args: vec![] } }, - Task { id: Uuid::new_v4(), display_name: String::from("Test 2"), status: TaskStatus::NEW, locks: vec![Lock::WRITE { object: String::from("foo2") }], flake_ref: FlakeRef { flake: String::from(""), args: vec![] } }, + Task { + id: Uuid::new_v4(), + display_name: String::from("Test 1"), + status: TaskStatus::RUNNING, + locks: vec![Lock::WRITE { + object: String::from("foo1"), + }], + flake_ref: FlakeRef { + flake: String::from(""), + args: vec![], + }, + features: vec![], + }, + Task { + id: Uuid::new_v4(), + display_name: String::from("Test 2"), + status: TaskStatus::NEW, + locks: vec![Lock::WRITE { + object: String::from("foo2"), + }], + flake_ref: FlakeRef { + flake: String::from(""), + args: vec![], + }, + features: vec![], + }, ]; - - let res = Scheduler::new(tasks).unwrap(); + + let res = Scheduler::new(tasks, &[]).unwrap(); // Test 1 is currently running and has the write lock - assert!(res.get_next_task().unwrap().display_name == "Test 2") - + assert_eq!(res.get_next_task().unwrap().display_name, "Test 2") } #[test] fn scheduler_new_task_ro() { let tasks = vec![ - Task { id: Uuid::new_v4(), display_name: String::from("Test 1"), status: TaskStatus::RUNNING, locks: vec![Lock::READ { object: String::from("foo1") }], flake_ref: FlakeRef { flake: String::from(""), args: vec![] } }, - Task { id: Uuid::new_v4(), display_name: String::from("Test 2"), status: TaskStatus::NEW, locks: vec![Lock::READ { object: String::from("foo1") }], flake_ref: FlakeRef { flake: String::from(""), args: vec![] } }, + Task { + id: Uuid::new_v4(), + display_name: String::from("Test 1"), + status: TaskStatus::RUNNING, + locks: vec![Lock::READ { + object: String::from("foo1"), + }], + flake_ref: FlakeRef { + flake: String::from(""), + args: vec![], + }, + features: vec![], + }, + Task { + id: Uuid::new_v4(), + display_name: String::from("Test 2"), + status: TaskStatus::NEW, + locks: vec![Lock::READ { + object: String::from("foo1"), + }], + flake_ref: FlakeRef { + flake: String::from(""), + args: vec![], + }, + features: vec![], + }, ]; - - let res = Scheduler::new(tasks).unwrap(); + + let res = Scheduler::new(tasks, &[]).unwrap(); // Test 1 is currently running and has the write lock - assert!(res.get_next_task().unwrap().display_name == "Test 2") - + assert_eq!(res.get_next_task().unwrap().display_name, "Test 2") } #[test] fn scheduler_new_task_rw_ro() { let tasks = vec![ - Task { id: Uuid::new_v4(), display_name: String::from("Test 1"), status: TaskStatus::RUNNING, locks: vec![Lock::WRITE { object: String::from("foo1") }], flake_ref: FlakeRef { flake: String::from(""), args: vec![] } }, - Task { id: Uuid::new_v4(), display_name: String::from("Test 2"), status: TaskStatus::NEW, locks: vec![Lock::READ { object: String::from("foo1") }], flake_ref: FlakeRef { flake: String::from(""), args: vec![] } }, + Task { + id: Uuid::new_v4(), + display_name: String::from("Test 1"), + status: TaskStatus::RUNNING, + locks: vec![Lock::WRITE { + object: String::from("foo1"), + }], + flake_ref: FlakeRef { + flake: String::from(""), + args: vec![], + }, + features: vec![], + }, + Task { + id: Uuid::new_v4(), + display_name: String::from("Test 2"), + status: TaskStatus::NEW, + locks: vec![Lock::READ { + object: String::from("foo1"), + }], + flake_ref: FlakeRef { + flake: String::from(""), + args: vec![], + }, + features: vec![], + }, ]; - - let res = Scheduler::new(tasks).unwrap(); + + let res = Scheduler::new(tasks, &[]).unwrap(); // Test 1 is currently running and has the write lock assert_eq!(res.get_next_task(), None) - } -} \ No newline at end of file +}