From 8937578f05a11ff7c1141b1af0edf4859f319c50 Mon Sep 17 00:00:00 2001 From: KX Date: Fri, 26 Apr 2024 08:55:19 +0200 Subject: [PATCH 01/42] changed database deployment from etcd to postgresql --- deployment/docker-compose.yaml | 33 ++++++++++++--------------------- 1 file changed, 12 insertions(+), 21 deletions(-) diff --git a/deployment/docker-compose.yaml b/deployment/docker-compose.yaml index 6f67a08..8bf6631 100644 --- a/deployment/docker-compose.yaml +++ b/deployment/docker-compose.yaml @@ -1,28 +1,19 @@ services: - etcd: - image: quay.io/coreos/etcd:v3.5.9 - entrypoint: /usr/local/bin/etcd + postgres: + image: postgres ports: - - "2379:2379" - - "2380:2380" + - 5432:5432 + restart: always - command: - - '--data-dir=/etcd-data' - - '--name=node1' - - '--initial-advertise-peer-urls=http://127.0.0.1:2380' - - '--listen-peer-urls=http://0.0.0.0:2380' - - '--advertise-client-urls=https://127.0.0.1:2379' - - '--listen-client-urls=https://0.0.0.0:2379' - - '--initial-cluster=node1=http://127.0.0.1:2380' - - '--trusted-ca-file=/certs/ca.crt' - - '--client-cert-auth' - - '--cert-file=/certs/cert.crt' - - '--key-file=/certs/cert.key' volumes: - - "./data/etcd_data:/etcd-data" - - "../certs/Vicky_CA.crt:/certs/ca.crt" - - "../certs/etcd.crt:/certs/cert.crt" - - "../certs/etcd.key:/certs/cert.key" + - "./config/postgres-passwd:/run/secrets/postgres-passwd" + - "./data/postgres_data:/var/lib/postgresql/data" + environment: + # POSTGRES_PASSWORD_FILE: /run/secrets/postgres-passwd + POSTGRES_USER: vicky + POSTGRES_PASSWORD: vicky + POSTGRES_DB: vicky + minio: image: minio/minio ports: From d00f94f2716f90e04ed7313d15be0e9e77a7eacf Mon Sep 17 00:00:00 2001 From: Kek5chen Date: Fri, 26 Apr 2024 09:51:21 +0200 Subject: [PATCH 02/42] refactor: changed config example --- vicky/Rocket.example.toml | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/vicky/Rocket.example.toml b/vicky/Rocket.example.toml index 28a1709..fc5b564 100644 --- a/vicky/Rocket.example.toml +++ b/vicky/Rocket.example.toml @@ -4,12 +4,12 @@ machines = [ "abc1234" ] -[default.etcd_config] -endpoints = [ "https://localhost:2379" ] -[default.etcd_config.tls_options] -ca_file = "../certs/Vicky_CA.crt" -certificate_file = "../certs/Vicky.crt" -key_file = "../certs/Vicky.key" +[default.db] +endpoints = [ "https://localhost:5432" ] +user = "vicky" +database = "vicky" +password = "vicky" + [default.s3_config] endpoint = "http://localhost:9000" From 01a92ae35b8571bd93687588999847e80e13f3ea Mon Sep 17 00:00:00 2001 From: Kek5chen Date: Fri, 26 Apr 2024 10:26:16 +0200 Subject: [PATCH 03/42] vicky: removed etcd-client added diesel --- Cargo.lock | 396 +++++++++------------------------------------- vicky/Cargo.toml | 3 +- vicky/diesel.toml | 9 ++ 3 files changed, 81 insertions(+), 327 deletions(-) create mode 100644 vicky/diesel.toml diff --git a/Cargo.lock b/Cargo.lock index 8cd9a61..3c6544e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -149,7 +149,7 @@ checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193" dependencies = [ "proc-macro2", "quote", - "syn 2.0.60", + "syn", ] [[package]] @@ -160,7 +160,7 @@ checksum = "c6fa2087f2753a7da8cc1c0dbfcf89579dd57458e36769de5ac750b4671737ca" dependencies = [ "proc-macro2", "quote", - "syn 2.0.60", + "syn", ] [[package]] @@ -432,7 +432,7 @@ dependencies = [ "hyper-rustls", "lazy_static", "pin-project-lite", - "rustls 0.20.9", + "rustls", "tokio", "tower", "tracing", @@ -545,51 +545,6 @@ dependencies = [ "tracing", ] -[[package]] -name = "axum" -version = "0.6.20" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3b829e4e32b91e643de6eafe82b1d90675f5874230191a4ffbc1b336dec4d6bf" -dependencies = [ - "async-trait", - "axum-core", - "bitflags 1.3.2", - "bytes", - "futures-util", - "http", - "http-body", - "hyper", - "itoa", - "matchit", - "memchr", - "mime", - "percent-encoding", - "pin-project-lite", - "rustversion", - "serde", - "sync_wrapper", - "tower", - "tower-layer", - "tower-service", -] - -[[package]] -name = "axum-core" -version = "0.3.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "759fa577a247914fd3f7f76d62972792636412fbfd634cd452f6a385a74d2d2c" -dependencies = [ - "async-trait", - "bytes", - "futures-util", - "http", - "http-body", - "mime", - "rustversion", - "tower-layer", - "tower-service", -] - [[package]] name = "backtrace" version = "0.3.71" @@ -682,6 +637,12 @@ version = "1.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5d6d68c57235a3a081186990eca2867354726650f42f7516ca50c28d6281fd15" +[[package]] +name = "byteorder" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" + [[package]] name = "bytes" version = "1.6.0" @@ -783,10 +744,10 @@ version = "4.5.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "528131438037fd55894f62d6e9f068b8f45ac57ffa77517819645d10aed04f64" dependencies = [ - "heck 0.5.0", + "heck", "proc-macro2", "quote", - "syn 2.0.60", + "syn", ] [[package]] @@ -936,7 +897,7 @@ dependencies = [ "proc-macro2", "quote", "strsim 0.10.0", - "syn 2.0.60", + "syn", ] [[package]] @@ -947,7 +908,7 @@ checksum = "a668eda54683121533a393014d8692171709ff57a7d61f187b6e782719f8933f" dependencies = [ "darling_core", "quote", - "syn 2.0.60", + "syn", ] [[package]] @@ -996,7 +957,41 @@ dependencies = [ "proc-macro2", "proc-macro2-diagnostics", "quote", - "syn 2.0.60", + "syn", +] + +[[package]] +name = "diesel" +version = "2.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ff236accb9a5069572099f0b350a92e9560e8e63a9b8d546162f4a5e03026bb2" +dependencies = [ + "bitflags 2.5.0", + "byteorder", + "diesel_derives", + "itoa", + "pq-sys", +] + +[[package]] +name = "diesel_derives" +version = "2.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "14701062d6bed917b5c7103bdffaee1e4609279e240488ad24e7bd979ca6866c" +dependencies = [ + "diesel_table_macro_syntax", + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "diesel_table_macro_syntax" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc5557efc453706fed5e4fa85006fe9817c224c3f480a34c7e5959fd700921c5" +dependencies = [ + "syn", ] [[package]] @@ -1054,22 +1049,6 @@ dependencies = [ "windows-sys 0.52.0", ] -[[package]] -name = "etcd-client" -version = "0.11.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f4b0ea5ef6dc2388a4b1669fa32097249bc03a15417b97cb75e38afb309e4a89" -dependencies = [ - "http", - "prost", - "tokio", - "tokio-stream", - "tonic", - "tonic-build", - "tower", - "tower-service", -] - [[package]] name = "fastrand" version = "1.9.0" @@ -1111,12 +1090,6 @@ dependencies = [ "windows-sys 0.52.0", ] -[[package]] -name = "fixedbitset" -version = "0.4.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80" - [[package]] name = "fnv" version = "1.0.7" @@ -1200,7 +1173,7 @@ checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" dependencies = [ "proc-macro2", "quote", - "syn 2.0.60", + "syn", ] [[package]] @@ -1344,12 +1317,6 @@ version = "0.14.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "290f1a1d9242c78d09ce40a5e87e7554ee637af1351968159f4952f028f75604" -[[package]] -name = "heck" -version = "0.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "95505c38b4572b2d910cecb0281560f54b440a19336cbbcb27bf6ce6adc6f5a8" - [[package]] name = "heck" version = "0.5.0" @@ -1386,15 +1353,6 @@ dependencies = [ "digest", ] -[[package]] -name = "home" -version = "0.5.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e3d1354bf6b7235cb4a0576c2619fd4ed18183f689b12b006a0ee7329eeff9a5" -dependencies = [ - "windows-sys 0.52.0", -] - [[package]] name = "http" version = "0.2.12" @@ -1477,22 +1435,10 @@ dependencies = [ "http", "hyper", "log", - "rustls 0.20.9", + "rustls", "rustls-native-certs", "tokio", - "tokio-rustls 0.23.4", -] - -[[package]] -name = "hyper-timeout" -version = "0.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bbb958482e8c7be4bc3cf272a766a2b0bf1a6755e7a6ae777f017a31d11b13b1" -dependencies = [ - "hyper", - "pin-project-lite", - "tokio", - "tokio-io-timeout", + "tokio-rustls", ] [[package]] @@ -1646,15 +1592,6 @@ dependencies = [ "windows-sys 0.52.0", ] -[[package]] -name = "itertools" -version = "0.10.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b0fd2260e829bddf4cb6ea802289de2f86d6a7a690192fbe91b3f46e0f2c8473" -dependencies = [ - "either", -] - [[package]] name = "itoa" version = "1.0.11" @@ -1772,12 +1709,6 @@ dependencies = [ "regex-automata 0.1.10", ] -[[package]] -name = "matchit" -version = "0.7.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94" - [[package]] name = "md-5" version = "0.10.6" @@ -1841,12 +1772,6 @@ dependencies = [ "version_check", ] -[[package]] -name = "multimap" -version = "0.8.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a" - [[package]] name = "native-tls" version = "0.2.11" @@ -1981,7 +1906,7 @@ checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" dependencies = [ "proc-macro2", "quote", - "syn 2.0.60", + "syn", ] [[package]] @@ -2066,7 +1991,7 @@ dependencies = [ "proc-macro2", "proc-macro2-diagnostics", "quote", - "syn 2.0.60", + "syn", ] [[package]] @@ -2106,7 +2031,7 @@ dependencies = [ "pest_meta", "proc-macro2", "quote", - "syn 2.0.60", + "syn", ] [[package]] @@ -2120,16 +2045,6 @@ dependencies = [ "sha2", ] -[[package]] -name = "petgraph" -version = "0.6.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e1d3afd2628e69da2be385eb6f2fd57c8ac7977ceeff6dc166ff1657b0e386a9" -dependencies = [ - "fixedbitset", - "indexmap 2.2.6", -] - [[package]] name = "phf" version = "0.11.2" @@ -2185,7 +2100,7 @@ checksum = "2f38a4412a78282e09a2cf38d195ea5420d15ba0602cb375210efbc877243965" dependencies = [ "proc-macro2", "quote", - "syn 2.0.60", + "syn", ] [[package]] @@ -2231,13 +2146,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" [[package]] -name = "prettyplease" -version = "0.1.25" +name = "pq-sys" +version = "0.4.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6c8646e95016a7a6c4adea95bafa8a16baab64b583356217f2c85db4a39d9a86" +checksum = "31c0052426df997c0cbd30789eb44ca097e3541717a7b8fa36b1c464ee7edebd" dependencies = [ - "proc-macro2", - "syn 1.0.109", + "vcpkg", ] [[package]] @@ -2257,65 +2171,11 @@ checksum = "af066a9c399a26e020ada66a034357a868728e72cd426f3adcd35f80d88d88c8" dependencies = [ "proc-macro2", "quote", - "syn 2.0.60", + "syn", "version_check", "yansi", ] -[[package]] -name = "prost" -version = "0.11.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0b82eaa1d779e9a4bc1c3217db8ffbeabaae1dca241bf70183242128d48681cd" -dependencies = [ - "bytes", - "prost-derive", -] - -[[package]] -name = "prost-build" -version = "0.11.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "119533552c9a7ffacc21e099c24a0ac8bb19c2a2a3f363de84cd9b844feab270" -dependencies = [ - "bytes", - "heck 0.4.1", - "itertools", - "lazy_static", - "log", - "multimap", - "petgraph", - "prettyplease", - "prost", - "prost-types", - "regex", - "syn 1.0.109", - "tempfile", - "which", -] - -[[package]] -name = "prost-derive" -version = "0.11.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e5d2d8d10f3c6ded6da8b05b5fb3b8a5082514344d56c9f871412d29b4e075b4" -dependencies = [ - "anyhow", - "itertools", - "proc-macro2", - "quote", - "syn 1.0.109", -] - -[[package]] -name = "prost-types" -version = "0.11.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "213622a1460818959ac1181aaeb2dc9c7f63df720db7d788b3e24eacd1983e13" -dependencies = [ - "prost", -] - [[package]] name = "quote" version = "1.0.36" @@ -2381,7 +2241,7 @@ checksum = "5fddb4f8d99b0a2ebafc65a87a69a7b9875e4b1ae1f00db265d300ef7f28bccc" dependencies = [ "proc-macro2", "quote", - "syn 2.0.60", + "syn", ] [[package]] @@ -2548,7 +2408,7 @@ dependencies = [ "proc-macro2", "quote", "rocket_http", - "syn 2.0.60", + "syn", "unicode-xid", "version_check", ] @@ -2633,18 +2493,6 @@ dependencies = [ "webpki", ] -[[package]] -name = "rustls" -version = "0.21.11" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7fecbfb7b1444f477b345853b1fce097a2c6fb637b2bfb87e6bc5db0f043fae4" -dependencies = [ - "log", - "ring 0.17.8", - "rustls-webpki", - "sct", -] - [[package]] name = "rustls-native-certs" version = "0.6.3" @@ -2666,16 +2514,6 @@ dependencies = [ "base64 0.21.7", ] -[[package]] -name = "rustls-webpki" -version = "0.101.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8b6275d1ee7a1cd780b64aca7726599a1dbc893b1e64144529e55c3c2f745765" -dependencies = [ - "ring 0.17.8", - "untrusted 0.9.0", -] - [[package]] name = "rustversion" version = "1.0.15" @@ -2774,7 +2612,7 @@ checksum = "e88edab869b01783ba905e7d0153f9fc1a6505a96e4ad3018011eedb838566d9" dependencies = [ "proc-macro2", "quote", - "syn 2.0.60", + "syn", ] [[package]] @@ -2836,7 +2674,7 @@ dependencies = [ "darling", "proc-macro2", "quote", - "syn 2.0.60", + "syn", ] [[package]] @@ -2981,17 +2819,6 @@ version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "81cdd64d312baedb58e21336b31bc043b77e01cc99033ce76ef539f78e965ebc" -[[package]] -name = "syn" -version = "1.0.109" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "72b64191b275b66ffe2469e8af2c1cfe3bafa67b529ead792a6d0160888b4237" -dependencies = [ - "proc-macro2", - "quote", - "unicode-ident", -] - [[package]] name = "syn" version = "2.0.60" @@ -3090,7 +2917,7 @@ checksum = "d1cd413b5d558b4c5bf3680e324a6fa5014e7b7c067a51e69dbdf47eb7148b66" dependencies = [ "proc-macro2", "quote", - "syn 2.0.60", + "syn", ] [[package]] @@ -3168,16 +2995,6 @@ dependencies = [ "windows-sys 0.48.0", ] -[[package]] -name = "tokio-io-timeout" -version = "1.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "30b74022ada614a1b4834de765f9bb43877f910cc8ce4be40e89042c9223a8bf" -dependencies = [ - "pin-project-lite", - "tokio", -] - [[package]] name = "tokio-macros" version = "2.2.0" @@ -3186,7 +3003,7 @@ checksum = "5b8a1e28f2deaa14e508979454cb3a223b10b938b45af148bc0986de36f1923b" dependencies = [ "proc-macro2", "quote", - "syn 2.0.60", + "syn", ] [[package]] @@ -3205,21 +3022,11 @@ version = "0.23.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c43ee83903113e03984cb9e5cebe6c04a5116269e900e3ddba8f068a62adda59" dependencies = [ - "rustls 0.20.9", + "rustls", "tokio", "webpki", ] -[[package]] -name = "tokio-rustls" -version = "0.24.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c28327cf380ac148141087fbfb9de9d7bd4e84ab5d2c28fbc911d753de8a7081" -dependencies = [ - "rustls 0.21.11", - "tokio", -] - [[package]] name = "tokio-stream" version = "0.1.15" @@ -3279,50 +3086,6 @@ dependencies = [ "winnow", ] -[[package]] -name = "tonic" -version = "0.9.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3082666a3a6433f7f511c7192923fa1fe07c69332d3c6a2e6bb040b569199d5a" -dependencies = [ - "async-stream", - "async-trait", - "axum", - "base64 0.21.7", - "bytes", - "futures-core", - "futures-util", - "h2", - "http", - "http-body", - "hyper", - "hyper-timeout", - "percent-encoding", - "pin-project", - "prost", - "rustls-pemfile", - "tokio", - "tokio-rustls 0.24.1", - "tokio-stream", - "tower", - "tower-layer", - "tower-service", - "tracing", -] - -[[package]] -name = "tonic-build" -version = "0.9.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a6fdaae4c2c638bb70fe42803a26fbd6fc6ac8c72f5c59f67ecc2a2dcabf4b07" -dependencies = [ - "prettyplease", - "proc-macro2", - "prost-build", - "quote", - "syn 1.0.109", -] - [[package]] name = "tower" version = "0.4.13" @@ -3331,13 +3094,9 @@ checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c" dependencies = [ "futures-core", "futures-util", - "indexmap 1.9.3", "pin-project", "pin-project-lite", - "rand", - "slab", "tokio", - "tokio-util", "tower-layer", "tower-service", "tracing", @@ -3375,7 +3134,7 @@ checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.60", + "syn", ] [[package]] @@ -3620,8 +3379,8 @@ dependencies = [ "aws-config", "aws-sdk-s3", "clap", + "diesel", "env_logger", - "etcd-client", "jwtk", "log", "rand", @@ -3630,7 +3389,6 @@ dependencies = [ "rocket_dyn_templates", "serde", "serde_json", - "serde_yaml", "thiserror", "tokio", "uuid", @@ -3706,7 +3464,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn 2.0.60", + "syn", "wasm-bindgen-shared", ] @@ -3740,7 +3498,7 @@ checksum = "e94f17b526d0a461a191c78ea52bbce64071ed5c04c9ffe424dcb38f74171bb7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.60", + "syn", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -3771,18 +3529,6 @@ dependencies = [ "untrusted 0.9.0", ] -[[package]] -name = "which" -version = "4.4.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "87ba24419a2078cd2b0f2ede2691b6c66d8e47836da3b6db8265ebad47afbfc7" -dependencies = [ - "either", - "home", - "once_cell", - "rustix", -] - [[package]] name = "winapi" version = "0.3.9" diff --git a/vicky/Cargo.toml b/vicky/Cargo.toml index 3695fe5..1a1bec6 100644 --- a/vicky/Cargo.toml +++ b/vicky/Cargo.toml @@ -9,13 +9,11 @@ edition = "2021" anyhow = "1.0.72" clap = { version = "4.3.17", features = ["derive"] } env_logger = "0.10.0" -etcd-client = { version = "0.11.1", features = ["tls"] } log = "0.4.19" rand = "0.8.5" thiserror = "1.0.43" tokio = { version = "1", features = ["full", "sync"] } serde = { version = "1.0", features = ["derive"] } -serde_yaml = "0.9" serde_json = "1.0" async-trait = "0.1.71" aws-sdk-s3 = "0.28.0" @@ -25,6 +23,7 @@ rocket = { version="0.5.0", features = ["json", "secrets"] } rocket_dyn_templates = { version = "0.1.0", features = ["tera"] } reqwest = { version="0.11.20", features = ["json"]} jwtk = "0.3.0" +diesel = { version = "2.1.6", features = ["postgres"]} [[bin]] name = "vicky" diff --git a/vicky/diesel.toml b/vicky/diesel.toml new file mode 100644 index 0000000..c028f4a --- /dev/null +++ b/vicky/diesel.toml @@ -0,0 +1,9 @@ +# For documentation on how to configure this file, +# see https://diesel.rs/guides/configuring-diesel-cli + +[print_schema] +file = "src/schema.rs" +custom_type_derives = ["diesel::query_builder::QueryId"] + +[migrations_directory] +dir = "migrations" From aad13dfe8382d1bc3e7ba1aa92d286756fd6cf28 Mon Sep 17 00:00:00 2001 From: Kek5chen Date: Fri, 26 Apr 2024 10:27:19 +0200 Subject: [PATCH 04/42] vicky: added migrations and schema --- vicky/migrations/.keep | 0 .../down.sql | 6 ++++ .../up.sql | 36 +++++++++++++++++++ .../2024-04-26-080445_create_tasks/down.sql | 2 ++ .../2024-04-26-080445_create_tasks/up.sql | 20 +++++++++++ 5 files changed, 64 insertions(+) create mode 100644 vicky/migrations/.keep create mode 100644 vicky/migrations/00000000000000_diesel_initial_setup/down.sql create mode 100644 vicky/migrations/00000000000000_diesel_initial_setup/up.sql create mode 100644 vicky/migrations/2024-04-26-080445_create_tasks/down.sql create mode 100644 vicky/migrations/2024-04-26-080445_create_tasks/up.sql diff --git a/vicky/migrations/.keep b/vicky/migrations/.keep new file mode 100644 index 0000000..e69de29 diff --git a/vicky/migrations/00000000000000_diesel_initial_setup/down.sql b/vicky/migrations/00000000000000_diesel_initial_setup/down.sql new file mode 100644 index 0000000..a9f5260 --- /dev/null +++ b/vicky/migrations/00000000000000_diesel_initial_setup/down.sql @@ -0,0 +1,6 @@ +-- This file was automatically created by Diesel to setup helper functions +-- and other internal bookkeeping. This file is safe to edit, any future +-- changes will be added to existing projects as new migrations. + +DROP FUNCTION IF EXISTS diesel_manage_updated_at(_tbl regclass); +DROP FUNCTION IF EXISTS diesel_set_updated_at(); diff --git a/vicky/migrations/00000000000000_diesel_initial_setup/up.sql b/vicky/migrations/00000000000000_diesel_initial_setup/up.sql new file mode 100644 index 0000000..d68895b --- /dev/null +++ b/vicky/migrations/00000000000000_diesel_initial_setup/up.sql @@ -0,0 +1,36 @@ +-- This file was automatically created by Diesel to setup helper functions +-- and other internal bookkeeping. This file is safe to edit, any future +-- changes will be added to existing projects as new migrations. + + + + +-- Sets up a trigger for the given table to automatically set a column called +-- `updated_at` whenever the row is modified (unless `updated_at` was included +-- in the modified columns) +-- +-- # Example +-- +-- ```sql +-- CREATE TABLE users (id SERIAL PRIMARY KEY, updated_at TIMESTAMP NOT NULL DEFAULT NOW()); +-- +-- SELECT diesel_manage_updated_at('users'); +-- ``` +CREATE OR REPLACE FUNCTION diesel_manage_updated_at(_tbl regclass) RETURNS VOID AS $$ +BEGIN + EXECUTE format('CREATE TRIGGER set_updated_at BEFORE UPDATE ON %s + FOR EACH ROW EXECUTE PROCEDURE diesel_set_updated_at()', _tbl); +END; +$$ LANGUAGE plpgsql; + +CREATE OR REPLACE FUNCTION diesel_set_updated_at() RETURNS trigger AS $$ +BEGIN + IF ( + NEW IS DISTINCT FROM OLD AND + NEW.updated_at IS NOT DISTINCT FROM OLD.updated_at + ) THEN + NEW.updated_at := current_timestamp; + END IF; + RETURN NEW; +END; +$$ LANGUAGE plpgsql; diff --git a/vicky/migrations/2024-04-26-080445_create_tasks/down.sql b/vicky/migrations/2024-04-26-080445_create_tasks/down.sql new file mode 100644 index 0000000..7d3aad5 --- /dev/null +++ b/vicky/migrations/2024-04-26-080445_create_tasks/down.sql @@ -0,0 +1,2 @@ +DROP TABLE locks; +DROP TABLE tasks; \ No newline at end of file diff --git a/vicky/migrations/2024-04-26-080445_create_tasks/up.sql b/vicky/migrations/2024-04-26-080445_create_tasks/up.sql new file mode 100644 index 0000000..03dc3ea --- /dev/null +++ b/vicky/migrations/2024-04-26-080445_create_tasks/up.sql @@ -0,0 +1,20 @@ +CREATE TABLE tasks +( + id uuid PRIMARY KEY, + display_name VARCHAR, + status VARCHAR, + flake_ref_uri VARCHAR, + args VARCHAR +); + +CREATE TABLE locks +( + id INT, + task_id uuid, + name VARCHAR NOT NULL, + type VARCHAR NOT NULL, + PRIMARY KEY (id), + CONSTRAINT fk_task + FOREIGN KEY (task_id) + REFERENCES tasks (id) +); From 252b81a25a07c0f4f5d62ed3611c1d74e63e26e7 Mon Sep 17 00:00:00 2001 From: Kek5chen Date: Fri, 26 Apr 2024 10:35:20 +0200 Subject: [PATCH 05/42] vickylib: renamed documents module to database --- vicky/src/bin/vicky/tasks.rs | 4 ++-- vicky/src/lib/{documents => database}/mod.rs | 1 - vicky/src/lib/lib.rs | 2 +- vicky/src/lib/vicky/scheduler.rs | 4 ++-- 4 files changed, 5 insertions(+), 6 deletions(-) rename vicky/src/lib/{documents => database}/mod.rs (99%) diff --git a/vicky/src/bin/vicky/tasks.rs b/vicky/src/bin/vicky/tasks.rs index c5c6dae..7d08999 100644 --- a/vicky/src/bin/vicky/tasks.rs +++ b/vicky/src/bin/vicky/tasks.rs @@ -7,7 +7,7 @@ use std::time; use tokio::sync::broadcast::{self, error::TryRecvError}; use uuid::Uuid; use vickylib::{ - documents::{DocumentClient, FlakeRef, Lock, Task, TaskResult, TaskStatus}, + database::{TaskDatabase, FlakeRef, Lock, Task, TaskResult, TaskStatus}, errors::VickyError, logs::LogDrain, s3::client::S3Client, @@ -274,7 +274,7 @@ pub async fn tasks_add( mod tests { use crate::tasks::check_lock_conflict; use uuid::Uuid; - use vickylib::documents::{FlakeRef, Lock, Task, TaskBuilder, TaskStatus}; + use vickylib::database::{FlakeRef, Lock, Task, TaskBuilder, TaskStatus}; #[test] fn add_new_conflicting_task() { diff --git a/vicky/src/lib/documents/mod.rs b/vicky/src/lib/database/mod.rs similarity index 99% rename from vicky/src/lib/documents/mod.rs rename to vicky/src/lib/database/mod.rs index fd369e7..88649cc 100644 --- a/vicky/src/lib/documents/mod.rs +++ b/vicky/src/lib/database/mod.rs @@ -1,5 +1,4 @@ use async_trait::async_trait; -use etcd_client::GetOptions; use serde::{Deserialize, Serialize}; use uuid::Uuid; diff --git a/vicky/src/lib/lib.rs b/vicky/src/lib/lib.rs index b8a8707..ec6a3f0 100644 --- a/vicky/src/lib/lib.rs +++ b/vicky/src/lib/lib.rs @@ -1,4 +1,4 @@ -pub mod documents; +pub mod database; pub mod errors; pub mod etcd; pub mod logs; diff --git a/vicky/src/lib/vicky/scheduler.rs b/vicky/src/lib/vicky/scheduler.rs index 99d4450..b51b2a9 100644 --- a/vicky/src/lib/vicky/scheduler.rs +++ b/vicky/src/lib/vicky/scheduler.rs @@ -3,7 +3,7 @@ use std::collections::HashMap; use log::debug; use crate::{ - documents::{Lock, Task, TaskStatus}, + database::{Lock, Task, TaskStatus}, errors::SchedulerError, }; @@ -148,7 +148,7 @@ impl Scheduler { #[cfg(test)] mod tests { - use crate::documents::{Task, TaskStatus}; + use crate::database::{Task, TaskStatus}; use super::Scheduler; From ba04be81c487e7b6b37434f832112ee9968d1b56 Mon Sep 17 00:00:00 2001 From: Kek5chen Date: Fri, 26 Apr 2024 10:35:43 +0200 Subject: [PATCH 06/42] vickylib: have rust schema be generated in src/lib/database --- vicky/diesel.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vicky/diesel.toml b/vicky/diesel.toml index c028f4a..ddae32c 100644 --- a/vicky/diesel.toml +++ b/vicky/diesel.toml @@ -2,7 +2,7 @@ # see https://diesel.rs/guides/configuring-diesel-cli [print_schema] -file = "src/schema.rs" +file = "src/lib/database/schema.rs" custom_type_derives = ["diesel::query_builder::QueryId"] [migrations_directory] From 78205fca0273642a4e5bab1b90212581626e290b Mon Sep 17 00:00:00 2001 From: Kek5chen Date: Fri, 26 Apr 2024 10:36:31 +0200 Subject: [PATCH 07/42] vickylib: added database schema --- vicky/src/lib/database/mod.rs | 2 ++ vicky/src/lib/database/schema.rs | 28 ++++++++++++++++++++++++++++ 2 files changed, 30 insertions(+) create mode 100644 vicky/src/lib/database/schema.rs diff --git a/vicky/src/lib/database/mod.rs b/vicky/src/lib/database/mod.rs index 88649cc..84ba671 100644 --- a/vicky/src/lib/database/mod.rs +++ b/vicky/src/lib/database/mod.rs @@ -1,3 +1,5 @@ +mod schema; + use async_trait::async_trait; use serde::{Deserialize, Serialize}; diff --git a/vicky/src/lib/database/schema.rs b/vicky/src/lib/database/schema.rs new file mode 100644 index 0000000..d084a5d --- /dev/null +++ b/vicky/src/lib/database/schema.rs @@ -0,0 +1,28 @@ +// @generated automatically by Diesel CLI. + +diesel::table! { + locks (id) { + id -> Int4, + task_id -> Nullable, + name -> Varchar, + #[sql_name = "type"] + type_ -> Varchar, + } +} + +diesel::table! { + tasks (id) { + id -> Uuid, + display_name -> Nullable, + status -> Nullable, + flake_ref_uri -> Nullable, + args -> Nullable, + } +} + +diesel::joinable!(locks -> tasks (task_id)); + +diesel::allow_tables_to_appear_in_same_query!( + locks, + tasks, +); From d0e298bf599bdba050a7ee122237e4a2e86fac42 Mon Sep 17 00:00:00 2001 From: Kek5chen Date: Fri, 26 Apr 2024 10:38:14 +0200 Subject: [PATCH 08/42] vickylib: renamed to TaskDatabase --- vicky/src/lib/database/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/vicky/src/lib/database/mod.rs b/vicky/src/lib/database/mod.rs index 84ba671..8bd803f 100644 --- a/vicky/src/lib/database/mod.rs +++ b/vicky/src/lib/database/mod.rs @@ -190,14 +190,14 @@ impl TaskBuilder { } #[async_trait] -pub trait DocumentClient { +pub trait TaskDatabase { async fn get_all_tasks(&self) -> Result, VickyError>; async fn get_task(&self, task_id: Uuid) -> Result, VickyError>; async fn put_task(&self, task: &Task) -> Result<(), VickyError>; } #[async_trait] -impl DocumentClient for etcd_client::Client { +impl TaskDatabase for diesel::pg::PgConnection { async fn get_all_tasks(&self) -> Result, VickyError> { let mut kv = self.kv_client(); let get_options: GetOptions = GetOptions::new().with_prefix().with_sort( From 715ed2eb8943150cee61c406d45fc8190b4ebbd5 Mon Sep 17 00:00:00 2001 From: Kek5chen Date: Fri, 26 Apr 2024 12:18:50 +0200 Subject: [PATCH 09/42] vickylib: fix schema and up migration --- vicky/migrations/2024-04-26-080445_create_tasks/up.sql | 10 +++++----- vicky/src/lib/database/schema.rs | 2 +- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/vicky/migrations/2024-04-26-080445_create_tasks/up.sql b/vicky/migrations/2024-04-26-080445_create_tasks/up.sql index 03dc3ea..b4d6975 100644 --- a/vicky/migrations/2024-04-26-080445_create_tasks/up.sql +++ b/vicky/migrations/2024-04-26-080445_create_tasks/up.sql @@ -1,10 +1,10 @@ CREATE TABLE tasks ( - id uuid PRIMARY KEY, - display_name VARCHAR, - status VARCHAR, - flake_ref_uri VARCHAR, - args VARCHAR + id uuid PRIMARY KEY, + display_name VARCHAR, + status VARCHAR, + flake_ref_uri VARCHAR, + flake_ref_args VARCHAR ); CREATE TABLE locks diff --git a/vicky/src/lib/database/schema.rs b/vicky/src/lib/database/schema.rs index d084a5d..e6cdd65 100644 --- a/vicky/src/lib/database/schema.rs +++ b/vicky/src/lib/database/schema.rs @@ -16,7 +16,7 @@ diesel::table! { display_name -> Nullable, status -> Nullable, flake_ref_uri -> Nullable, - args -> Nullable, + flake_ref_args -> Nullable, } } From 8fab0b7c7004e0cf540de04abff9c37f0076b2d4 Mon Sep 17 00:00:00 2001 From: Kek5chen Date: Fri, 26 Apr 2024 12:19:11 +0200 Subject: [PATCH 10/42] vickylib: remove etcd client --- vicky/src/lib/errors.rs | 11 --- vicky/src/lib/etcd/client.rs | 75 --------------------- vicky/src/lib/etcd/election.rs | 118 --------------------------------- vicky/src/lib/etcd/mod.rs | 2 - vicky/src/lib/lib.rs | 1 - 5 files changed, 207 deletions(-) delete mode 100644 vicky/src/lib/etcd/client.rs delete mode 100644 vicky/src/lib/etcd/election.rs delete mode 100644 vicky/src/lib/etcd/mod.rs diff --git a/vicky/src/lib/errors.rs b/vicky/src/lib/errors.rs index 1bc5484..e1cceae 100644 --- a/vicky/src/lib/errors.rs +++ b/vicky/src/lib/errors.rs @@ -15,17 +15,6 @@ pub enum VickyError { source: serde_json::Error, }, - #[error("serde_yaml Error {source:?}")] - SerdeYaml { - #[from] - source: serde_yaml::Error, - }, - #[error("etcd Error {source:?}")] - EtcdClient { - #[from] - source: etcd_client::Error, - }, - #[error("Scheduling Error {source:?}")] Scheduler { #[from] diff --git a/vicky/src/lib/etcd/client.rs b/vicky/src/lib/etcd/client.rs deleted file mode 100644 index 476ef9e..0000000 --- a/vicky/src/lib/etcd/client.rs +++ /dev/null @@ -1,75 +0,0 @@ -use async_trait::async_trait; -use etcd_client::{GetOptions, KvClient, PutOptions}; -use serde::de::DeserializeOwned; -use serde::Serialize; - -use crate::errors::VickyError; - -#[async_trait] -pub trait ClientExt { - async fn get_yaml_list( - &mut self, - key: String, - options: Option, - ) -> Result, VickyError>; - async fn get_yaml( - &mut self, - key: String, - options: Option, - ) -> Result, VickyError>; - async fn put_yaml( - &mut self, - key: String, - elem: &T, - options: Option, - ) -> Result<(), VickyError>; -} - -#[async_trait] -impl ClientExt for KvClient { - async fn get_yaml_list( - &mut self, - key: String, - options: Option, - ) -> Result, VickyError> { - let get_resp = self.get(key, options).await?; - let x = get_resp.kvs(); - - let mut ret_val = vec![]; - - for elem in x { - let elem: T = serde_yaml::from_str(elem.value_str()?)?; - ret_val.push(elem); - } - - return Ok(ret_val); - } - - async fn get_yaml( - &mut self, - key: String, - options: Option, - ) -> Result, VickyError> { - let mut list: Vec = self.get_yaml_list(key, options).await?; - assert!(list.len() <= 1, "list had too many entries"); - - if list.is_empty() { - return Ok(None); - } - - let out = list.remove(0); - - Ok(Some(out)) - } - - async fn put_yaml( - &mut self, - key: String, - elem: &T, - options: Option, - ) -> Result<(), VickyError> { - let yaml_str = serde_yaml::to_string(elem)?; - self.put(key, yaml_str, options).await?; - return Ok(()); - } -} diff --git a/vicky/src/lib/etcd/election.rs b/vicky/src/lib/etcd/election.rs deleted file mode 100644 index dd98831..0000000 --- a/vicky/src/lib/etcd/election.rs +++ /dev/null @@ -1,118 +0,0 @@ -use etcd_client::{Client, ElectionClient, LeaseClient}; -use log::debug; -use std::sync::Arc; -use std::time; -use tokio::sync::Mutex; - -use crate::errors::VickyError; - -const ELECTION_NAME: &str = "vicky.wobcom.de/leader-election"; - -pub type NodeId = String; - -enum ElectionState { - Idle, - Waiting, - Leader, -} - -#[derive(Debug)] -enum LeaseState { - NoLease, - Lease { lease_id: i64 }, -} - -pub struct Election { - node_id: NodeId, - lease_client: LeaseClient, - election_client: ElectionClient, - - state: ElectionState, - - lease_state: Arc>, -} - -impl Election { - pub fn new(c: &Client, node_id: NodeId) -> Election { - let m = Mutex::new(LeaseState::NoLease); - - let lease_client = c.lease_client(); - let election_client = c.election_client(); - - Election { - lease_client, - election_client, - node_id, - - state: ElectionState::Idle, - lease_state: m.into(), - } - } - - pub async fn elect(&mut self) -> Result<(), VickyError> { - self.state = ElectionState::Waiting; - - let resp = self.lease_client.grant(10, None).await?; - let lease_id = resp.id(); - - { - let mut x = self.lease_state.lock().await; - *x = LeaseState::Lease { lease_id }; - } - - debug!( - "grant ttl:{:?}, id:{:?}, lease_id: {:?}", - resp.ttl(), - resp.id(), - lease_id - ); - - // campaign - let resp = self - .election_client - .campaign(ELECTION_NAME, self.node_id.clone(), lease_id) - .await?; - let leader = resp.leader().unwrap(); - debug!( - "election name:{:?}, leaseId:{:?}", - leader.name_str(), - leader.lease() - ); - - // leader - let resp = self.election_client.leader(ELECTION_NAME).await?; - let kv = resp.kv().unwrap(); - debug!("key is {:?}", kv.key_str()); - debug!("value is {:?}", kv.value_str()); - - self.state = ElectionState::Leader; - - Ok(()) - } - - pub fn keep_alive(&self) { - let lease_state = Arc::clone(&self.lease_state); - debug!("spawning refresh lease thread"); - - let mut lease_client = self.lease_client.clone(); - - // tokio does some funky stuff here, it blocks the requests sometimes. - tokio::spawn(async move { - loop { - { - let x = lease_state.lock().await; - - match *x { - LeaseState::NoLease => {} - LeaseState::Lease { lease_id } => { - debug!("refreshing lease {}", lease_id); - lease_client.keep_alive(lease_id).await.unwrap(); - } - }; - } - - tokio::time::sleep(time::Duration::from_secs(5)).await; - } - }); - } -} diff --git a/vicky/src/lib/etcd/mod.rs b/vicky/src/lib/etcd/mod.rs deleted file mode 100644 index ae6a77c..0000000 --- a/vicky/src/lib/etcd/mod.rs +++ /dev/null @@ -1,2 +0,0 @@ -pub mod client; -pub mod election; diff --git a/vicky/src/lib/lib.rs b/vicky/src/lib/lib.rs index ec6a3f0..17bf932 100644 --- a/vicky/src/lib/lib.rs +++ b/vicky/src/lib/lib.rs @@ -1,6 +1,5 @@ pub mod database; pub mod errors; -pub mod etcd; pub mod logs; pub mod s3; pub mod vicky; From 51ae0ed12015733a1704e2be7ea61871edd00d42 Mon Sep 17 00:00:00 2001 From: Kek5chen Date: Fri, 26 Apr 2024 12:20:23 +0200 Subject: [PATCH 11/42] vickylib: split mod into entities --- vicky/src/lib/database/entities/lock.rs | 19 ++ vicky/src/lib/database/entities/mod.rs | 5 + vicky/src/lib/database/entities/task.rs | 278 ++++++++++++++++++++++++ vicky/src/lib/database/mod.rs | 229 +------------------ vicky/src/lib/vicky/scheduler.rs | 4 +- 5 files changed, 305 insertions(+), 230 deletions(-) create mode 100644 vicky/src/lib/database/entities/lock.rs create mode 100644 vicky/src/lib/database/entities/mod.rs create mode 100644 vicky/src/lib/database/entities/task.rs diff --git a/vicky/src/lib/database/entities/lock.rs b/vicky/src/lib/database/entities/lock.rs new file mode 100644 index 0000000..a3d4ba4 --- /dev/null +++ b/vicky/src/lib/database/entities/lock.rs @@ -0,0 +1,19 @@ +use serde::{Deserialize, Serialize}; + +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +#[serde(tag = "type")] +pub enum Lock { + WRITE { name: String }, + READ { name: String }, +} + +impl Lock { + pub fn is_conflicting(&self, other: &Lock) -> bool { + match (self, other) { + (Lock::WRITE { name: name1 }, Lock::WRITE { name: name2 }) + | (Lock::READ { name: name1 }, Lock::WRITE { name: name2 }) + | (Lock::WRITE { name: name1 }, Lock::READ { name: name2 }) => name1 == name2, + _ => false, + } + } +} diff --git a/vicky/src/lib/database/entities/mod.rs b/vicky/src/lib/database/entities/mod.rs new file mode 100644 index 0000000..fb793e2 --- /dev/null +++ b/vicky/src/lib/database/entities/mod.rs @@ -0,0 +1,5 @@ +mod task; +mod lock; + +pub use task::*; +pub use lock::*; diff --git a/vicky/src/lib/database/entities/task.rs b/vicky/src/lib/database/entities/task.rs new file mode 100644 index 0000000..27137a6 --- /dev/null +++ b/vicky/src/lib/database/entities/task.rs @@ -0,0 +1,278 @@ +use diesel::prelude::*; +use serde::{Deserialize, Serialize}; +use uuid::Uuid; +use crate::database::entities::lock::Lock; + +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +#[serde(tag = "result")] +pub enum TaskResult { + SUCCESS, + ERROR, +} + +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +#[serde(tag = "state")] +pub enum TaskStatus { + NEW, + RUNNING, + FINISHED(TaskResult), +} + +type FlakeURI = String; + +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +pub struct FlakeRef { + pub flake: FlakeURI, + pub args: Vec, +} + +#[derive(Debug, PartialEq, Serialize, Deserialize)] +pub struct Task { + pub id: Uuid, + pub display_name: String, + pub status: TaskStatus, + pub locks: Vec, + pub flake_ref: FlakeRef, + pub features: Vec, +} + +impl Task { + pub fn builder() -> TaskBuilder { + TaskBuilder::default() + } +} + +impl From for Task { + fn from(builder: TaskBuilder) -> Self { + builder.build() + } +} + +pub struct TaskBuilder { + id: Option, + display_name: Option, + status: TaskStatus, + locks: Vec, + flake_ref: FlakeRef, + features: Vec, +} + +impl Default for TaskBuilder { + fn default() -> Self { + TaskBuilder { + id: None, + display_name: None, + status: TaskStatus::NEW, + locks: Vec::new(), + flake_ref: FlakeRef { + flake: "".to_string(), + args: Vec::new(), + }, + features: Vec::new(), + } + } +} + +impl TaskBuilder { + pub fn with_id(mut self, id: Uuid) -> Self { + self.id = Some(id); + self + } + + pub fn with_display_name>(mut self, display_name: S) -> Self { + self.display_name = Some(display_name.into()); + self + } + + pub fn with_status(mut self, status: TaskStatus) -> Self { + self.status = status; + self + } + + pub fn with_read_lock>(mut self, name: S) -> Self { + self.locks.push(Lock::READ { name: name.into() }); + self + } + + pub fn with_write_lock>(mut self, name: S) -> Self { + self.locks.push(Lock::WRITE { name: name.into() }); + self + } + + pub fn with_locks(mut self, locks: Vec) -> Self { + self.locks = locks; + self + } + + pub fn with_flake>(mut self, flake_uri: S) -> Self { + self.flake_ref.flake = flake_uri.into(); + self + } + + pub fn with_flake_arg>(mut self, flake_arg: S) -> Self { + self.flake_ref.args.push(flake_arg.into()); + self + } + + pub fn with_flake_args(mut self, args: Vec) -> Self { + self.flake_ref.args = args; + self + } + + pub fn requires_feature>(mut self, feature: S) -> Self { + self.features.push(feature.into()); + self + } + + pub fn requires_features(mut self, features: Vec) -> Self { + self.features = features; + self + } + + pub fn id(&self) -> Option { + self.id + } + + pub fn display_name(&self) -> &Option { + &self.display_name + } + + pub fn status(&self) -> &TaskStatus { + &self.status + } + + pub fn locks(&self) -> &Vec { + &self.locks + } + + pub fn flake_ref(&self) -> &FlakeRef { + &self.flake_ref + } + + pub fn features(&self) -> &Vec { + &self.features + } + + pub fn build(self) -> Task { + Task { + id: self.id.unwrap_or_else(Uuid::new_v4), + display_name: self.display_name.unwrap_or_else(|| "Task".to_string()), + features: self.features, + status: self.status, + locks: self.locks, + flake_ref: self.flake_ref, + } + } +} + +// this was on purpose because these macro-generated entity types +// mess up the whole namespace and HAVE to be scoped +pub mod db_impl { + use crate::database::entities::task::{Task, TaskResult, TaskStatus}; + use crate::errors::VickyError; + use async_trait::async_trait; + use diesel::{Insertable, Queryable, Selectable}; + use uuid::Uuid; + use crate::database::entities::lock::Lock; + // these here are evil >:( + use crate::database::schema::locks; + use crate::database::schema::tasks; + + #[derive(Insertable, Queryable)] + #[diesel(table_name = tasks)] + struct DbTask { + pub id: Uuid, + pub display_name: Option, + pub status: Option, + pub flake_ref_uri: Option, + pub flake_ref_args: Option, + } + + impl ToString for TaskStatus { + fn to_string(&self) -> String { + match self { + TaskStatus::NEW => "NEW", + TaskStatus::RUNNING => "RUNNING", + TaskStatus::FINISHED(r) => match r { + TaskResult::SUCCESS => "FINISHED::SUCCESS", + TaskResult::ERROR => "FINISHED::ERROR", + }, + }.to_string() + } + } + + impl Into for Task { + fn into(self) -> DbTask { + DbTask { + id: self.id, + display_name: Some(self.display_name), + status: Some(self.status.to_string()), + flake_ref_uri: Some(self.flake_ref.flake), + flake_ref_args: Some(self.flake_ref.args.join("||")), + } + } + } + + #[derive(Insertable, Queryable)] + #[diesel(table_name = locks)] + struct DbLock { + id: Option, + task_id: Uuid, + name: String, + type_: String, + } + + impl DbLock { + fn from_lock(lock: Lock, task_id: Uuid) -> Self { + match lock { + Lock::WRITE { name } => DbLock { id: None, task_id, name, type_: "WRITE".to_string() }, + Lock::READ { name } => DbLock { id: None, task_id, name, type_: "READ".to_string() }, + } + } + } + + impl Into for DbLock { + fn into(self) -> Lock { + match self.type_.as_str() { + "WRITE" => Lock::WRITE { name: self.name }, + "READ" => Lock::READ { name: self.name }, + _ => panic!( + "Can't parse lock from database lock. Database corrupted? \ + Expected READ or WRITE but found {} as type at key {}.", + self.type_, + self.id.unwrap_or(-1) + ), + } + } + } + + #[async_trait] + pub trait TaskDatabase { + async fn get_all_tasks(&mut self) -> Result, VickyError>; + async fn get_task(&self, task_id: Uuid) -> Result, VickyError>; + async fn put_task(&mut self, task: &Task) -> Result<(), VickyError>; + } + + impl TaskDatabase for diesel::pg::PgConnection { + async fn get_all_tasks(mut self) -> Result, VickyError> { + // very evil >>:( + use self::tasks::dsl::*; + + todo!() + } + + async fn get_task(&self, task_id: Uuid) -> Result, VickyError> { + // so evil >:O + use self::tasks::dsl::*; + + todo!(); + } + + async fn put_task(&mut self, task: &Task) -> Result<(), VickyError> { + // even more evil >;( + use self::tasks::dsl::*; + + todo!(); + } + } +} diff --git a/vicky/src/lib/database/mod.rs b/vicky/src/lib/database/mod.rs index 8bd803f..6edc672 100644 --- a/vicky/src/lib/database/mod.rs +++ b/vicky/src/lib/database/mod.rs @@ -1,229 +1,2 @@ mod schema; - -use async_trait::async_trait; -use serde::{Deserialize, Serialize}; - -use uuid::Uuid; - -use crate::{errors::VickyError, etcd::client::ClientExt}; - -#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] -#[serde(tag = "result")] - -pub enum TaskResult { - SUCCESS, - ERROR, -} - -#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] -#[serde(tag = "state")] - -pub enum TaskStatus { - NEW, - RUNNING, - FINISHED(TaskResult), -} - -#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] -#[serde(tag = "type")] -pub enum Lock { - WRITE { name: String }, - READ { name: String }, -} - -impl Lock { - pub fn is_conflicting(&self, other: &Lock) -> bool { - match (self, other) { - (Lock::WRITE { name: name1 }, Lock::WRITE { name: name2 }) - | (Lock::READ { name: name1 }, Lock::WRITE { name: name2 }) - | (Lock::WRITE { name: name1 }, Lock::READ { name: name2 }) => name1 == name2, - _ => false, - } - } -} - -type FlakeURI = String; - -#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] -pub struct FlakeRef { - pub flake: FlakeURI, - pub args: Vec, -} - -#[derive(Debug, PartialEq, Serialize, Deserialize)] -pub struct Task { - pub id: Uuid, - pub display_name: String, - pub status: TaskStatus, - pub locks: Vec, - pub flake_ref: FlakeRef, - pub features: Vec, -} - -impl Task { - pub fn builder() -> TaskBuilder { - TaskBuilder::default() - } -} - -impl From for Task { - fn from(builder: TaskBuilder) -> Self { - builder.build() - } -} - -pub struct TaskBuilder { - id: Option, - display_name: Option, - status: TaskStatus, - locks: Vec, - flake_ref: FlakeRef, - features: Vec, -} - -impl Default for TaskBuilder { - fn default() -> Self { - TaskBuilder { - id: None, - display_name: None, - status: TaskStatus::NEW, - locks: Vec::new(), - flake_ref: FlakeRef { - flake: "".to_string(), - args: Vec::new(), - }, - features: Vec::new(), - } - } -} - -impl TaskBuilder { - pub fn with_id(mut self, id: Uuid) -> Self { - self.id = Some(id); - self - } - - pub fn with_display_name>(mut self, display_name: S) -> Self { - self.display_name = Some(display_name.into()); - self - } - - pub fn with_status(mut self, status: TaskStatus) -> Self { - self.status = status; - self - } - - pub fn with_read_lock>(mut self, name: S) -> Self { - self.locks.push(Lock::READ { name: name.into() }); - self - } - - pub fn with_write_lock>(mut self, name: S) -> Self { - self.locks.push(Lock::WRITE { name: name.into() }); - self - } - - pub fn with_locks(mut self, locks: Vec) -> Self { - self.locks = locks; - self - } - - pub fn with_flake>(mut self, flake_uri: S) -> Self { - self.flake_ref.flake = flake_uri.into(); - self - } - - pub fn with_flake_arg>(mut self, flake_arg: S) -> Self { - self.flake_ref.args.push(flake_arg.into()); - self - } - - pub fn with_flake_args(mut self, args: Vec) -> Self { - self.flake_ref.args = args; - self - } - - pub fn requires_feature>(mut self, feature: S) -> Self { - self.features.push(feature.into()); - self - } - - pub fn requires_features(mut self, features: Vec) -> Self { - self.features = features; - self - } - - pub fn id(&self) -> Option { - self.id - } - - pub fn display_name(&self) -> &Option { - &self.display_name - } - - pub fn status(&self) -> &TaskStatus { - &self.status - } - - pub fn locks(&self) -> &Vec { - &self.locks - } - - pub fn flake_ref(&self) -> &FlakeRef { - &self.flake_ref - } - - pub fn features(&self) -> &Vec { - &self.features - } - - pub fn build(self) -> Task { - Task { - id: self.id.unwrap_or_else(Uuid::new_v4), - display_name: self.display_name.unwrap_or_else(|| "Task".to_string()), - features: self.features, - status: self.status, - locks: self.locks, - flake_ref: self.flake_ref, - } - } -} - -#[async_trait] -pub trait TaskDatabase { - async fn get_all_tasks(&self) -> Result, VickyError>; - async fn get_task(&self, task_id: Uuid) -> Result, VickyError>; - async fn put_task(&self, task: &Task) -> Result<(), VickyError>; -} - -#[async_trait] -impl TaskDatabase for diesel::pg::PgConnection { - async fn get_all_tasks(&self) -> Result, VickyError> { - let mut kv = self.kv_client(); - let get_options: GetOptions = GetOptions::new().with_prefix().with_sort( - etcd_client::SortTarget::Create, - etcd_client::SortOrder::Descend, - ); - let tasks: Vec = kv - .get_yaml_list( - "vicky.wobcom.de/task/manifest".to_string(), - Some(get_options), - ) - .await?; - Ok(tasks) - } - - async fn get_task(&self, task_id: Uuid) -> Result, VickyError> { - let mut kv = self.kv_client(); - let key = format!("vicky.wobcom.de/task/manifest/{}", task_id); - let task: Option = kv.get_yaml(key.clone(), None).await?; - Ok(task) - } - - async fn put_task(&self, task: &Task) -> Result<(), VickyError> { - let mut kv = self.kv_client(); - let key = format!("vicky.wobcom.de/task/manifest/{}", task.id); - kv.put_yaml(key, &task, None).await?; - Ok(()) - } -} +pub mod entities; diff --git a/vicky/src/lib/vicky/scheduler.rs b/vicky/src/lib/vicky/scheduler.rs index b51b2a9..4a6c2d9 100644 --- a/vicky/src/lib/vicky/scheduler.rs +++ b/vicky/src/lib/vicky/scheduler.rs @@ -3,7 +3,7 @@ use std::collections::HashMap; use log::debug; use crate::{ - database::{Lock, Task, TaskStatus}, + database::entities::{Lock, Task, TaskStatus}, errors::SchedulerError, }; @@ -148,7 +148,7 @@ impl Scheduler { #[cfg(test)] mod tests { - use crate::database::{Task, TaskStatus}; + use crate::database::entities::{Task, TaskStatus}; use super::Scheduler; From 5f14320dae636d63c2804838b04b1f4d55319202 Mon Sep 17 00:00:00 2001 From: Kek5chen Date: Fri, 26 Apr 2024 12:20:38 +0200 Subject: [PATCH 12/42] vickylib: add uuid feature to diesel for Uuid de/serialization --- Cargo.lock | 1 + vicky/Cargo.toml | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index 3c6544e..df48fcc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -971,6 +971,7 @@ dependencies = [ "diesel_derives", "itoa", "pq-sys", + "uuid", ] [[package]] diff --git a/vicky/Cargo.toml b/vicky/Cargo.toml index 1a1bec6..204b205 100644 --- a/vicky/Cargo.toml +++ b/vicky/Cargo.toml @@ -23,7 +23,7 @@ rocket = { version="0.5.0", features = ["json", "secrets"] } rocket_dyn_templates = { version = "0.1.0", features = ["tera"] } reqwest = { version="0.11.20", features = ["json"]} jwtk = "0.3.0" -diesel = { version = "2.1.6", features = ["postgres"]} +diesel = { version = "2.1.6", features = ["postgres", "uuid"]} [[bin]] name = "vicky" From dc8d1ddd983fb7878f10b27f2631fab805eff572 Mon Sep 17 00:00:00 2001 From: KX Date: Mon, 29 Apr 2024 14:16:36 +0200 Subject: [PATCH 13/42] comment postgres-passwd --- deployment/docker-compose.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/deployment/docker-compose.yaml b/deployment/docker-compose.yaml index 8bf6631..367ea9d 100644 --- a/deployment/docker-compose.yaml +++ b/deployment/docker-compose.yaml @@ -6,7 +6,7 @@ services: restart: always volumes: - - "./config/postgres-passwd:/run/secrets/postgres-passwd" + # - "./config/postgres-passwd:/run/secrets/postgres-passwd" - "./data/postgres_data:/var/lib/postgresql/data" environment: # POSTGRES_PASSWORD_FILE: /run/secrets/postgres-passwd From 2002f90aa51ec7105e0484c516d2ab188f91a405 Mon Sep 17 00:00:00 2001 From: KX Date: Mon, 29 Apr 2024 14:17:09 +0200 Subject: [PATCH 14/42] add diesel error to thiserror VickyError --- vicky/src/lib/errors.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/vicky/src/lib/errors.rs b/vicky/src/lib/errors.rs index e1cceae..38b3f8c 100644 --- a/vicky/src/lib/errors.rs +++ b/vicky/src/lib/errors.rs @@ -14,6 +14,12 @@ pub enum VickyError { #[from] source: serde_json::Error, }, + + #[error("diesel Error {source:?}")] + Diesel { + #[from] + source: diesel::result::Error, + }, #[error("Scheduling Error {source:?}")] Scheduler { From abefc832ec17011e4d92469da76eaf40c8436865 Mon Sep 17 00:00:00 2001 From: KX Date: Mon, 29 Apr 2024 14:18:12 +0200 Subject: [PATCH 15/42] update migration with nonnull and features --- .../2024-04-26-080445_create_tasks/up.sql | 11 ++++---- vicky/src/lib/database/entities/task.rs | 26 ++++++++++--------- vicky/src/lib/database/schema.rs | 11 ++++---- 3 files changed, 26 insertions(+), 22 deletions(-) diff --git a/vicky/migrations/2024-04-26-080445_create_tasks/up.sql b/vicky/migrations/2024-04-26-080445_create_tasks/up.sql index b4d6975..44a3461 100644 --- a/vicky/migrations/2024-04-26-080445_create_tasks/up.sql +++ b/vicky/migrations/2024-04-26-080445_create_tasks/up.sql @@ -1,16 +1,17 @@ CREATE TABLE tasks ( id uuid PRIMARY KEY, - display_name VARCHAR, - status VARCHAR, - flake_ref_uri VARCHAR, - flake_ref_args VARCHAR + display_name VARCHAR NOT NULL, + status VARCHAR NOT NULL, + features VARCHAR NOT NULL, + flake_ref_uri VARCHAR NOT NULL, + flake_ref_args VARCHAR NOT NULL ); CREATE TABLE locks ( id INT, - task_id uuid, + task_id uuid NOT NULL, name VARCHAR NOT NULL, type VARCHAR NOT NULL, PRIMARY KEY (id), diff --git a/vicky/src/lib/database/entities/task.rs b/vicky/src/lib/database/entities/task.rs index 27137a6..7dda6c4 100644 --- a/vicky/src/lib/database/entities/task.rs +++ b/vicky/src/lib/database/entities/task.rs @@ -182,10 +182,11 @@ pub mod db_impl { #[diesel(table_name = tasks)] struct DbTask { pub id: Uuid, - pub display_name: Option, - pub status: Option, - pub flake_ref_uri: Option, - pub flake_ref_args: Option, + pub display_name: String, + pub status: String, + pub features: String, + pub flake_ref_uri: String, + pub flake_ref_args: String, } impl ToString for TaskStatus { @@ -205,10 +206,11 @@ pub mod db_impl { fn into(self) -> DbTask { DbTask { id: self.id, - display_name: Some(self.display_name), - status: Some(self.status.to_string()), - flake_ref_uri: Some(self.flake_ref.flake), - flake_ref_args: Some(self.flake_ref.args.join("||")), + display_name: self.display_name, + status: self.status.to_string(), + features: self.features.join("||"), + flake_ref_uri: self.flake_ref.flake, + flake_ref_args: self.flake_ref.args.join("||"), } } } @@ -216,7 +218,7 @@ pub mod db_impl { #[derive(Insertable, Queryable)] #[diesel(table_name = locks)] struct DbLock { - id: Option, + id: i32, task_id: Uuid, name: String, type_: String, @@ -225,8 +227,8 @@ pub mod db_impl { impl DbLock { fn from_lock(lock: Lock, task_id: Uuid) -> Self { match lock { - Lock::WRITE { name } => DbLock { id: None, task_id, name, type_: "WRITE".to_string() }, - Lock::READ { name } => DbLock { id: None, task_id, name, type_: "READ".to_string() }, + Lock::WRITE { name } => DbLock { id: -1, task_id, name, type_: "WRITE".to_string() }, + Lock::READ { name } => DbLock { id: -1, task_id, name, type_: "READ".to_string() }, } } } @@ -240,7 +242,7 @@ pub mod db_impl { "Can't parse lock from database lock. Database corrupted? \ Expected READ or WRITE but found {} as type at key {}.", self.type_, - self.id.unwrap_or(-1) + self.id ), } } diff --git a/vicky/src/lib/database/schema.rs b/vicky/src/lib/database/schema.rs index e6cdd65..9053820 100644 --- a/vicky/src/lib/database/schema.rs +++ b/vicky/src/lib/database/schema.rs @@ -3,7 +3,7 @@ diesel::table! { locks (id) { id -> Int4, - task_id -> Nullable, + task_id -> Uuid, name -> Varchar, #[sql_name = "type"] type_ -> Varchar, @@ -13,10 +13,11 @@ diesel::table! { diesel::table! { tasks (id) { id -> Uuid, - display_name -> Nullable, - status -> Nullable, - flake_ref_uri -> Nullable, - flake_ref_args -> Nullable, + display_name -> Varchar, + status -> Varchar, + features -> Varchar, + flake_ref_uri -> Varchar, + flake_ref_args -> Varchar, } } From 2c11eac83fceeebde2382438b698c0be99286566 Mon Sep 17 00:00:00 2001 From: KX Date: Mon, 29 Apr 2024 14:18:35 +0200 Subject: [PATCH 16/42] added into for TaskStatus from &str --- vicky/src/lib/database/entities/task.rs | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/vicky/src/lib/database/entities/task.rs b/vicky/src/lib/database/entities/task.rs index 7dda6c4..40b3ec0 100644 --- a/vicky/src/lib/database/entities/task.rs +++ b/vicky/src/lib/database/entities/task.rs @@ -201,6 +201,17 @@ pub mod db_impl { }.to_string() } } + + impl Into for &str { + fn into(self) -> TaskStatus { + match self { + "RUNNING" => TaskStatus::RUNNING, + "FINISHED::SUCCESS" => TaskStatus::FINISHED(TaskResult::SUCCESS), + "FINISHED::ERROR" => TaskStatus::FINISHED(TaskResult::ERROR), + _ => TaskStatus::NEW, + } + } + } impl Into for Task { fn into(self) -> DbTask { From de365e29269bfbba88d0a312f3ce0c74479738ab Mon Sep 17 00:00:00 2001 From: KX Date: Mon, 29 Apr 2024 14:18:58 +0200 Subject: [PATCH 17/42] change ToString to Display --- vicky/src/lib/database/entities/task.rs | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/vicky/src/lib/database/entities/task.rs b/vicky/src/lib/database/entities/task.rs index 40b3ec0..d6cb9ab 100644 --- a/vicky/src/lib/database/entities/task.rs +++ b/vicky/src/lib/database/entities/task.rs @@ -168,6 +168,7 @@ impl TaskBuilder { // this was on purpose because these macro-generated entity types // mess up the whole namespace and HAVE to be scoped pub mod db_impl { + use std::fmt::Display; use crate::database::entities::task::{Task, TaskResult, TaskStatus}; use crate::errors::VickyError; use async_trait::async_trait; @@ -189,16 +190,17 @@ pub mod db_impl { pub flake_ref_args: String, } - impl ToString for TaskStatus { - fn to_string(&self) -> String { - match self { + impl Display for TaskStatus { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let str = match self { TaskStatus::NEW => "NEW", TaskStatus::RUNNING => "RUNNING", TaskStatus::FINISHED(r) => match r { TaskResult::SUCCESS => "FINISHED::SUCCESS", TaskResult::ERROR => "FINISHED::ERROR", }, - }.to_string() + }; + write!(f, "{}", str) } } From b08327322982bea9a58dff8617e00f329f1d30e0 Mon Sep 17 00:00:00 2001 From: KX Date: Mon, 29 Apr 2024 14:19:20 +0200 Subject: [PATCH 18/42] added async_trait to TaskDatabase --- vicky/src/lib/database/entities/task.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/vicky/src/lib/database/entities/task.rs b/vicky/src/lib/database/entities/task.rs index d6cb9ab..0e83d3f 100644 --- a/vicky/src/lib/database/entities/task.rs +++ b/vicky/src/lib/database/entities/task.rs @@ -268,6 +268,7 @@ pub mod db_impl { async fn put_task(&mut self, task: &Task) -> Result<(), VickyError>; } + #[async_trait] impl TaskDatabase for diesel::pg::PgConnection { async fn get_all_tasks(mut self) -> Result, VickyError> { // very evil >>:( From dc5f5f8a7a893b41b4b08da7b118b8850c6eeaf9 Mon Sep 17 00:00:00 2001 From: KX Date: Mon, 29 Apr 2024 14:19:50 +0200 Subject: [PATCH 19/42] added get all tasks from database --- vicky/src/lib/database/entities/task.rs | 37 +++++++++++++++++++++---- 1 file changed, 32 insertions(+), 5 deletions(-) diff --git a/vicky/src/lib/database/entities/task.rs b/vicky/src/lib/database/entities/task.rs index 0e83d3f..c409388 100644 --- a/vicky/src/lib/database/entities/task.rs +++ b/vicky/src/lib/database/entities/task.rs @@ -172,8 +172,11 @@ pub mod db_impl { use crate::database::entities::task::{Task, TaskResult, TaskStatus}; use crate::errors::VickyError; use async_trait::async_trait; - use diesel::{Insertable, Queryable, Selectable}; + use diesel::{Associations, ExpressionMethods, Identifiable, Insertable, Queryable, QueryDsl, RunQueryDsl, Selectable}; + use diesel::associations::HasTable; + use log::{error, warn}; use uuid::Uuid; + use crate::database::entities::FlakeRef; use crate::database::entities::lock::Lock; // these here are evil >:( use crate::database::schema::locks; @@ -228,7 +231,7 @@ pub mod db_impl { } } - #[derive(Insertable, Queryable)] + #[derive(Selectable, Identifiable, Insertable, Queryable)] #[diesel(table_name = locks)] struct DbLock { id: i32, @@ -270,11 +273,35 @@ pub mod db_impl { #[async_trait] impl TaskDatabase for diesel::pg::PgConnection { - async fn get_all_tasks(mut self) -> Result, VickyError> { + async fn get_all_tasks(&mut self) -> Result, VickyError> { // very evil >>:( use self::tasks::dsl::*; - - todo!() + use self::locks::dsl::*; + + let db_tasks = tasks + .load::(self)?; + let mut real_tasks: Vec = vec![]; + + real_tasks = db_tasks.iter().map(|t| { + let real_locks: Vec = match locks.filter(task_id.eq(t.id)).load::(self) { + Ok(db_locks) => db_locks.into_iter().map(|l| l.into()).collect(), + Err(_) => { warn!("could not load lock"); vec![] }, + }; + + Task { + id: t.id.clone(), + display_name: t.display_name.clone(), + status: t.status.as_str().into(), + locks: real_locks, + features: t.features.split("||").map(|s| s.to_string()).collect(), + flake_ref: FlakeRef { + flake: t.flake_ref_uri.clone(), + args: t.flake_ref_args.split("||").map(|s| s.to_string()).collect(), + } + } + }).collect(); + + Ok(real_tasks) } async fn get_task(&self, task_id: Uuid) -> Result, VickyError> { From 4df0f43fa0aaacdb13b02663d7b9896550cc9ce5 Mon Sep 17 00:00:00 2001 From: KX Date: Mon, 29 Apr 2024 15:07:58 +0200 Subject: [PATCH 20/42] added itertools --- Cargo.lock | 10 ++++++++++ vicky/Cargo.toml | 1 + 2 files changed, 11 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index df48fcc..2479a26 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1593,6 +1593,15 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "itertools" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba291022dbbd398a455acf126c1e341954079855bc60dfdda641363bd6922569" +dependencies = [ + "either", +] + [[package]] name = "itoa" version = "1.0.11" @@ -3382,6 +3391,7 @@ dependencies = [ "clap", "diesel", "env_logger", + "itertools", "jwtk", "log", "rand", diff --git a/vicky/Cargo.toml b/vicky/Cargo.toml index 204b205..d0dad0f 100644 --- a/vicky/Cargo.toml +++ b/vicky/Cargo.toml @@ -24,6 +24,7 @@ rocket_dyn_templates = { version = "0.1.0", features = ["tera"] } reqwest = { version="0.11.20", features = ["json"]} jwtk = "0.3.0" diesel = { version = "2.1.6", features = ["postgres", "uuid"]} +itertools = { version = "0.12.1" } [[bin]] name = "vicky" From fef4464c4e0579c1a91ebadd8a7c48e13e7c1915 Mon Sep 17 00:00:00 2001 From: KX Date: Mon, 29 Apr 2024 15:08:24 +0200 Subject: [PATCH 21/42] added get_task function --- vicky/src/lib/database/entities/task.rs | 31 +++++++++++++++++++++---- 1 file changed, 26 insertions(+), 5 deletions(-) diff --git a/vicky/src/lib/database/entities/task.rs b/vicky/src/lib/database/entities/task.rs index c409388..df447bf 100644 --- a/vicky/src/lib/database/entities/task.rs +++ b/vicky/src/lib/database/entities/task.rs @@ -168,11 +168,12 @@ impl TaskBuilder { // this was on purpose because these macro-generated entity types // mess up the whole namespace and HAVE to be scoped pub mod db_impl { + use std::collections::HashMap; use std::fmt::Display; use crate::database::entities::task::{Task, TaskResult, TaskStatus}; use crate::errors::VickyError; use async_trait::async_trait; - use diesel::{Associations, ExpressionMethods, Identifiable, Insertable, Queryable, QueryDsl, RunQueryDsl, Selectable}; + use diesel::{Connection, ExpressionMethods, Identifiable, Insertable, Queryable, QueryDsl, RunQueryDsl, Selectable}; use diesel::associations::HasTable; use log::{error, warn}; use uuid::Uuid; @@ -206,7 +207,7 @@ pub mod db_impl { write!(f, "{}", str) } } - + impl Into for &str { fn into(self) -> TaskStatus { match self { @@ -267,7 +268,7 @@ pub mod db_impl { #[async_trait] pub trait TaskDatabase { async fn get_all_tasks(&mut self) -> Result, VickyError>; - async fn get_task(&self, task_id: Uuid) -> Result, VickyError>; + async fn get_task(&mut self, task_id: Uuid) -> Result, VickyError>; async fn put_task(&mut self, task: &Task) -> Result<(), VickyError>; } @@ -304,11 +305,31 @@ pub mod db_impl { Ok(real_tasks) } - async fn get_task(&self, task_id: Uuid) -> Result, VickyError> { + async fn get_task(&mut self, tid: Uuid) -> Result, VickyError> { // so evil >:O use self::tasks::dsl::*; + use self::locks::dsl::*; - todo!(); + let db_task= tasks.filter(self::tasks::id.eq(tid)).first::(self); + let db_task = match db_task { + Err(diesel::result::Error::NotFound) => return Ok(None), + _ => db_task? + }; + let db_locks: Vec = locks.filter(task_id.eq(task_id)).load::(self)?; + + let task = Task { + id: db_task.id, + display_name: db_task.display_name.clone(), + locks: db_locks.into_iter().map(|l| l.into()).collect(), + features: db_task.features.split("||").map(|s| s.to_string()).collect(), + flake_ref: FlakeRef { + flake: db_task.flake_ref_uri.clone(), + args: db_task.features.split("||").map(|s| s.to_string()).collect(), + }, + status: db_task.status.as_str().into(), + }; + + Ok(Some(task)) } async fn put_task(&mut self, task: &Task) -> Result<(), VickyError> { From bd09a7ea9f8baf97f9ee1798586ab1f1b66cd5a0 Mon Sep 17 00:00:00 2001 From: KX Date: Mon, 29 Apr 2024 15:09:11 +0200 Subject: [PATCH 22/42] made get_all_tasks better --- vicky/src/lib/database/entities/task.rs | 25 ++++++++++++++----------- 1 file changed, 14 insertions(+), 11 deletions(-) diff --git a/vicky/src/lib/database/entities/task.rs b/vicky/src/lib/database/entities/task.rs index df447bf..761afa4 100644 --- a/vicky/src/lib/database/entities/task.rs +++ b/vicky/src/lib/database/entities/task.rs @@ -181,6 +181,7 @@ pub mod db_impl { use crate::database::entities::lock::Lock; // these here are evil >:( use crate::database::schema::locks; + use itertools::Itertools; use crate::database::schema::tasks; #[derive(Insertable, Queryable)] @@ -279,25 +280,27 @@ pub mod db_impl { use self::tasks::dsl::*; use self::locks::dsl::*; - let db_tasks = tasks - .load::(self)?; - let mut real_tasks: Vec = vec![]; + let db_tasks = tasks.load::(self)?; + + // prefetching all locks here, so we don't run into the N+1 Query Problem and distribute them + let all_locks = locks.load::(self).unwrap_or_else(|_| vec![]); + + let lock_map: HashMap<_, Vec> = all_locks.into_iter() + .map(|db_lock| (db_lock.task_id, db_lock.into())) + .into_group_map(); + + let real_tasks: Vec = db_tasks.into_iter().map(|t| { + let real_locks = lock_map.get(&t.id).cloned().unwrap_or_default(); - real_tasks = db_tasks.iter().map(|t| { - let real_locks: Vec = match locks.filter(task_id.eq(t.id)).load::(self) { - Ok(db_locks) => db_locks.into_iter().map(|l| l.into()).collect(), - Err(_) => { warn!("could not load lock"); vec![] }, - }; - Task { id: t.id.clone(), display_name: t.display_name.clone(), status: t.status.as_str().into(), locks: real_locks, - features: t.features.split("||").map(|s| s.to_string()).collect(), + features: t.features.split("||").map(String::from).collect(), flake_ref: FlakeRef { flake: t.flake_ref_uri.clone(), - args: t.flake_ref_args.split("||").map(|s| s.to_string()).collect(), + args: t.flake_ref_args.split("||").map(String::from).collect(), } } }).collect(); From 02c1c3f5185405536ae4b36c3d6de3b7aff9873a Mon Sep 17 00:00:00 2001 From: KX Date: Mon, 29 Apr 2024 16:51:34 +0200 Subject: [PATCH 23/42] function to put a task into the database --- vicky/src/lib/database/entities/task.rs | 24 ++++++++++++++++-------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/vicky/src/lib/database/entities/task.rs b/vicky/src/lib/database/entities/task.rs index 761afa4..144e1e1 100644 --- a/vicky/src/lib/database/entities/task.rs +++ b/vicky/src/lib/database/entities/task.rs @@ -173,7 +173,7 @@ pub mod db_impl { use crate::database::entities::task::{Task, TaskResult, TaskStatus}; use crate::errors::VickyError; use async_trait::async_trait; - use diesel::{Connection, ExpressionMethods, Identifiable, Insertable, Queryable, QueryDsl, RunQueryDsl, Selectable}; + use diesel::{Connection, ExpressionMethods, Identifiable, insert_into, Insertable, Queryable, QueryDsl, RunQueryDsl, Selectable}; use diesel::associations::HasTable; use log::{error, warn}; use uuid::Uuid; @@ -220,14 +220,14 @@ pub mod db_impl { } } - impl Into for Task { + impl Into for &Task { fn into(self) -> DbTask { DbTask { id: self.id, - display_name: self.display_name, + display_name: self.display_name.clone(), status: self.status.to_string(), features: self.features.join("||"), - flake_ref_uri: self.flake_ref.flake, + flake_ref_uri: self.flake_ref.flake.clone(), flake_ref_args: self.flake_ref.args.join("||"), } } @@ -243,10 +243,10 @@ pub mod db_impl { } impl DbLock { - fn from_lock(lock: Lock, task_id: Uuid) -> Self { + fn from_lock(lock: &Lock, task_id: Uuid) -> Self { match lock { - Lock::WRITE { name } => DbLock { id: -1, task_id, name, type_: "WRITE".to_string() }, - Lock::READ { name } => DbLock { id: -1, task_id, name, type_: "READ".to_string() }, + Lock::WRITE { name } => DbLock { id: -1, task_id, name: name.clone(), type_: "WRITE".to_string() }, + Lock::READ { name } => DbLock { id: -1, task_id, name: name.clone(), type_: "READ".to_string() }, } } } @@ -338,8 +338,16 @@ pub mod db_impl { async fn put_task(&mut self, task: &Task) -> Result<(), VickyError> { // even more evil >;( use self::tasks::dsl::*; + use self::locks::dsl::*; + + let db_locks: Vec = task.locks.iter().map(|l| DbLock::from_lock(l, task.id)).collect(); + let db_task: DbTask = task.into(); - todo!(); + insert_into(tasks).values(db_task).execute(self)?; + for db_lock in db_locks { + insert_into(locks).values(db_lock).execute(self)?; + } + Ok(()) } } } From f5db9672e6c5c9e425c791325a29254dd023269d Mon Sep 17 00:00:00 2001 From: KX Date: Mon, 29 Apr 2024 17:07:29 +0200 Subject: [PATCH 24/42] removed etcd and changed to PgConnection --- vicky/src/bin/vicky/main.rs | 56 +++++++++++------------------------- vicky/src/bin/vicky/tasks.rs | 49 +++++++++++++++---------------- 2 files changed, 41 insertions(+), 64 deletions(-) diff --git a/vicky/src/bin/vicky/main.rs b/vicky/src/bin/vicky/main.rs index 4e99011..7adb316 100644 --- a/vicky/src/bin/vicky/main.rs +++ b/vicky/src/bin/vicky/main.rs @@ -1,9 +1,8 @@ use std::time::Duration; use aws_sdk_s3::config::{Credentials, Region}; -use etcd_client::{Certificate, ConnectOptions, Identity, TlsOptions}; +use diesel::{Connection, PgConnection}; use jwtk::jwk::RemoteJwksVerifier; -use log::info; use rand::Rng; use rocket::fairing::AdHoc; @@ -12,7 +11,6 @@ use rocket::figment::{Figment, Profile}; use rocket::routes; use serde::Deserialize; use tokio::sync::broadcast; -use vickylib::etcd::election::{Election, NodeId}; use vickylib::logs::LogDrain; use vickylib::s3::client::S3Client; @@ -37,9 +35,11 @@ pub struct TlsConfigOptions { key_file: String, } #[derive(Deserialize)] -pub struct EtcdConfig { - endpoints: Vec, - tls_options: Option, +pub struct DatabaseConfig { + endpoint: String, + username: String, + password: String, + database: String, } #[derive(Deserialize)] @@ -61,7 +61,7 @@ pub struct OIDCConfig { pub struct Config { machines: Vec, - etcd_config: EtcdConfig, + db_config: DatabaseConfig, s3_config: S3Config, oidc_config: OIDCConfig, @@ -69,7 +69,9 @@ pub struct Config { #[tokio::main] async fn main() -> anyhow::Result<()> { - env_logger::builder().filter_module("vicky", log::LevelFilter::Debug).init(); + env_logger::builder() + .filter_module("vicky", log::LevelFilter::Debug) + .init(); // Took from rocket source code and added .split("__") to be able to add keys in nested structures. let rocket_config_figment = Figment::from(rocket::Config::default()) @@ -88,28 +90,12 @@ async fn main() -> anyhow::Result<()> { let build_rocket = rocket::custom(rocket_config_figment); let app_config = build_rocket.figment().extract::()?; + let dbc = &app_config.db_config; - let options = match app_config.etcd_config.tls_options { - Some(tls_options) => { - let server_root_ca_cert = std::fs::read_to_string(tls_options.ca_file)?; - let server_root_ca_cert = Certificate::from_pem(server_root_ca_cert); - let client_cert = std::fs::read_to_string(tls_options.certificate_file)?; - let client_key = std::fs::read_to_string(tls_options.key_file)?; - let client_identity = Identity::from_pem(client_cert, client_key); - - Some( - TlsOptions::new() - .ca_certificate(server_root_ca_cert) - .identity(client_identity), - ) - } - None => None, - }; - - let connect_options = - options.map(|options: TlsOptions| ConnectOptions::new().with_tls(options)); - let etcd_client = - etcd_client::Client::connect(app_config.etcd_config.endpoints, connect_options).await?; + let db_client = PgConnection::establish(format!( + "postgres://{}:{}@{}/{}", + dbc.username, dbc.password, dbc.endpoint, dbc.database + )); let jwks_verifier = RemoteJwksVerifier::new( app_config.oidc_config.jwks_url, @@ -136,22 +122,12 @@ async fn main() -> anyhow::Result<()> { let s3_ext_client_drain = S3Client::new(s3_client.clone(), aws_cfg.log_bucket.clone()); let s3_ext_client = S3Client::new(s3_client, aws_cfg.log_bucket.clone()); - let mut rng = rand::thread_rng(); - let node_id: NodeId = format!("node_{}", rng.gen::()).to_string(); - info!("Generated unique node id as {}", node_id); - - let mut election = Election::new(&etcd_client, node_id); - election.keep_alive(); - - election.elect().await?; - info!("Leader election won, we are now the leader!"); - let log_drain = LogDrain::new(s3_ext_client_drain); let (tx_global_events, _rx_task_events) = broadcast::channel::(5); build_rocket - .manage(etcd_client) + .manage(db_client) .manage(s3_ext_client) .manage(log_drain) .manage(jwks_verifier) diff --git a/vicky/src/bin/vicky/tasks.rs b/vicky/src/bin/vicky/tasks.rs index 7d08999..c9d8f85 100644 --- a/vicky/src/bin/vicky/tasks.rs +++ b/vicky/src/bin/vicky/tasks.rs @@ -1,18 +1,19 @@ -use etcd_client::Client; use rocket::http::Status; use rocket::response::stream::{Event, EventStream}; use rocket::{get, post, serde::json::Json, State}; use serde::{Deserialize, Serialize}; use std::time; +use diesel::PgConnection; use tokio::sync::broadcast::{self, error::TryRecvError}; use uuid::Uuid; use vickylib::{ - database::{TaskDatabase, FlakeRef, Lock, Task, TaskResult, TaskStatus}, errors::VickyError, logs::LogDrain, s3::client::S3Client, vicky::scheduler::Scheduler, }; +use vickylib::database::entities::{FlakeRef, Lock, Task, TaskResult, TaskStatus}; +use vickylib::database::entities::db_impl::TaskDatabase; use crate::{ auth::{Machine, User}, @@ -51,55 +52,55 @@ pub struct LogLines { #[get("/")] pub async fn tasks_get_user( - etcd: &State, + db: &mut State, _user: User, ) -> Result>, VickyError> { - let tasks: Vec = etcd.get_all_tasks().await?; + let tasks: Vec = db.get_all_tasks().await?; Ok(Json(tasks)) } #[get("/", rank = 2)] pub async fn tasks_get_machine( - etcd: &State, + db: &mut State, _machine: Machine, ) -> Result>, VickyError> { - let tasks: Vec = etcd.get_all_tasks().await?; + let tasks: Vec = db.get_all_tasks().await?; Ok(Json(tasks)) } #[get("/")] pub async fn tasks_specific_get_user( id: String, - etcd: &State, + db: &mut State, _user: User, ) -> Result>, VickyError> { let task_uuid = Uuid::parse_str(&id).unwrap(); - let tasks: Option = etcd.get_task(task_uuid).await?; + let tasks: Option = db.get_task(task_uuid).await?; Ok(Json(tasks)) } #[get("/", rank = 2)] pub async fn tasks_specific_get_machine( id: String, - etcd: &State, + db: &mut State, _machine: Machine, ) -> Result>, VickyError> { let task_uuid = Uuid::parse_str(&id).unwrap(); - let tasks: Option = etcd.get_task(task_uuid).await?; + let tasks: Option = db.get_task(task_uuid).await?; Ok(Json(tasks)) } #[get("//logs")] pub async fn tasks_get_logs<'a>( id: String, - etcd: &State, + db: &mut State, s3: &'a State, _user: User, log_drain: &'a State<&'_ LogDrain>, ) -> EventStream![Event + 'a] { // TODO: Fix Error Handling let task_uuid = Uuid::parse_str(&id).unwrap(); - let task = etcd + let task = db .get_task(task_uuid) .await .unwrap() @@ -156,13 +157,13 @@ pub async fn tasks_get_logs<'a>( #[post("//logs", format = "json", data = "")] pub async fn tasks_put_logs( id: String, - etcd: &State, + db: &mut State, logs: Json, _machine: Machine, log_drain: &State<&LogDrain>, ) -> Result, AppError> { let task_uuid = Uuid::parse_str(&id)?; - let task = etcd + let task = db .get_task(task_uuid) .await? .ok_or(AppError::HttpError(Status::NotFound))?; @@ -178,24 +179,24 @@ pub async fn tasks_put_logs( #[post("/claim", format = "json", data = "")] pub async fn tasks_claim( - etcd: &State, + db: &mut State, features: Json, global_events: &State>, _machine: Machine, ) -> Result>, AppError> { - let tasks = etcd.get_all_tasks().await?; + let tasks = db.get_all_tasks().await?; let scheduler = Scheduler::new(tasks, &features.features) .map_err(|x| VickyError::Scheduler { source: x })?; let next_task = scheduler.get_next_task(); match next_task { Some(next_task) => { - let mut task = etcd + let mut task = db .get_task(next_task.id) .await? .ok_or(AppError::HttpError(Status::NotFound))?; task.status = TaskStatus::RUNNING; - etcd.put_task(&task).await?; + db.put_task(&task).await?; global_events.send(GlobalEvent::TaskUpdate { uuid: task.id })?; Ok(Json(Some(task))) } @@ -207,13 +208,13 @@ pub async fn tasks_claim( pub async fn tasks_finish( id: String, finish: Json, - etcd: &State, + db: &mut State, global_events: &State>, _machine: Machine, log_drain: &State<&LogDrain>, ) -> Result, AppError> { let task_uuid = Uuid::parse_str(&id)?; - let mut task = etcd + let mut task = db .get_task(task_uuid) .await? .ok_or(AppError::HttpError(Status::NotFound))?; @@ -221,7 +222,7 @@ pub async fn tasks_finish( log_drain.finish_logs(&id).await?; task.status = TaskStatus::FINISHED(finish.result.clone()); - etcd.put_task(&task).await?; + db.put_task(&task).await?; global_events.send(GlobalEvent::TaskUpdate { uuid: task.id })?; Ok(Json(task)) @@ -240,7 +241,7 @@ fn check_lock_conflict(task: &Task) -> bool { #[post("/", data = "")] pub async fn tasks_add( task: Json, - etcd: &State, + db: &mut State, global_events: &State>, _machine: Machine, ) -> Result, AppError> { @@ -259,7 +260,7 @@ pub async fn tasks_add( return Err(AppError::HttpError(Status::Conflict)); } - etcd.put_task(&task_manifest).await?; + db.put_task(&task_manifest).await?; global_events.send(GlobalEvent::TaskAdd)?; let ro_task = RoTask { @@ -274,7 +275,7 @@ pub async fn tasks_add( mod tests { use crate::tasks::check_lock_conflict; use uuid::Uuid; - use vickylib::database::{FlakeRef, Lock, Task, TaskBuilder, TaskStatus}; + use vickylib::database::entities::Task; #[test] fn add_new_conflicting_task() { From 086fa4a00be1d8621cc2f9093130af27f127a544 Mon Sep 17 00:00:00 2001 From: Kek5chen Date: Mon, 29 Apr 2024 17:33:00 +0200 Subject: [PATCH 25/42] remove unused imports --- vicky/src/bin/vicky/main.rs | 1 - vicky/src/lib/database/entities/task.rs | 5 +---- 2 files changed, 1 insertion(+), 5 deletions(-) diff --git a/vicky/src/bin/vicky/main.rs b/vicky/src/bin/vicky/main.rs index 7adb316..d4376c6 100644 --- a/vicky/src/bin/vicky/main.rs +++ b/vicky/src/bin/vicky/main.rs @@ -4,7 +4,6 @@ use aws_sdk_s3::config::{Credentials, Region}; use diesel::{Connection, PgConnection}; use jwtk::jwk::RemoteJwksVerifier; -use rand::Rng; use rocket::fairing::AdHoc; use rocket::figment::providers::{Env, Format, Toml}; use rocket::figment::{Figment, Profile}; diff --git a/vicky/src/lib/database/entities/task.rs b/vicky/src/lib/database/entities/task.rs index 144e1e1..835baf0 100644 --- a/vicky/src/lib/database/entities/task.rs +++ b/vicky/src/lib/database/entities/task.rs @@ -1,4 +1,3 @@ -use diesel::prelude::*; use serde::{Deserialize, Serialize}; use uuid::Uuid; use crate::database::entities::lock::Lock; @@ -173,9 +172,7 @@ pub mod db_impl { use crate::database::entities::task::{Task, TaskResult, TaskStatus}; use crate::errors::VickyError; use async_trait::async_trait; - use diesel::{Connection, ExpressionMethods, Identifiable, insert_into, Insertable, Queryable, QueryDsl, RunQueryDsl, Selectable}; - use diesel::associations::HasTable; - use log::{error, warn}; + use diesel::{ExpressionMethods, Identifiable, insert_into, Insertable, Queryable, QueryDsl, RunQueryDsl, Selectable}; use uuid::Uuid; use crate::database::entities::FlakeRef; use crate::database::entities::lock::Lock; From 43b0539ce1e1df7d43a4e805cc61cfd9a2882e98 Mon Sep 17 00:00:00 2001 From: Kek5chen Date: Mon, 29 Apr 2024 19:02:16 +0200 Subject: [PATCH 26/42] rustfmt --- vicky/src/bin/vicky/tasks.rs | 11 +-- vicky/src/lib/database/entities/task.rs | 102 +++++++++++++++--------- 2 files changed, 69 insertions(+), 44 deletions(-) diff --git a/vicky/src/bin/vicky/tasks.rs b/vicky/src/bin/vicky/tasks.rs index c9d8f85..b857067 100644 --- a/vicky/src/bin/vicky/tasks.rs +++ b/vicky/src/bin/vicky/tasks.rs @@ -1,19 +1,16 @@ +use diesel::PgConnection; use rocket::http::Status; use rocket::response::stream::{Event, EventStream}; use rocket::{get, post, serde::json::Json, State}; use serde::{Deserialize, Serialize}; use std::time; -use diesel::PgConnection; use tokio::sync::broadcast::{self, error::TryRecvError}; use uuid::Uuid; +use vickylib::database::entities::db_impl::TaskDatabase; +use vickylib::database::entities::{FlakeRef, Lock, Task, TaskResult, TaskStatus}; use vickylib::{ - errors::VickyError, - logs::LogDrain, - s3::client::S3Client, - vicky::scheduler::Scheduler, + errors::VickyError, logs::LogDrain, s3::client::S3Client, vicky::scheduler::Scheduler, }; -use vickylib::database::entities::{FlakeRef, Lock, Task, TaskResult, TaskStatus}; -use vickylib::database::entities::db_impl::TaskDatabase; use crate::{ auth::{Machine, User}, diff --git a/vicky/src/lib/database/entities/task.rs b/vicky/src/lib/database/entities/task.rs index 835baf0..dc61aff 100644 --- a/vicky/src/lib/database/entities/task.rs +++ b/vicky/src/lib/database/entities/task.rs @@ -1,6 +1,6 @@ +use crate::database::entities::lock::Lock; use serde::{Deserialize, Serialize}; use uuid::Uuid; -use crate::database::entities::lock::Lock; #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] #[serde(tag = "result")] @@ -167,19 +167,22 @@ impl TaskBuilder { // this was on purpose because these macro-generated entity types // mess up the whole namespace and HAVE to be scoped pub mod db_impl { - use std::collections::HashMap; - use std::fmt::Display; + use crate::database::entities::lock::Lock; use crate::database::entities::task::{Task, TaskResult, TaskStatus}; + use crate::database::entities::FlakeRef; use crate::errors::VickyError; use async_trait::async_trait; - use diesel::{ExpressionMethods, Identifiable, insert_into, Insertable, Queryable, QueryDsl, RunQueryDsl, Selectable}; + use diesel::{ + insert_into, ExpressionMethods, Identifiable, Insertable, QueryDsl, Queryable, RunQueryDsl, + Selectable, + }; + use std::collections::HashMap; + use std::fmt::Display; use uuid::Uuid; - use crate::database::entities::FlakeRef; - use crate::database::entities::lock::Lock; // these here are evil >:( use crate::database::schema::locks; - use itertools::Itertools; use crate::database::schema::tasks; + use itertools::Itertools; #[derive(Insertable, Queryable)] #[diesel(table_name = tasks)] @@ -242,8 +245,18 @@ pub mod db_impl { impl DbLock { fn from_lock(lock: &Lock, task_id: Uuid) -> Self { match lock { - Lock::WRITE { name } => DbLock { id: -1, task_id, name: name.clone(), type_: "WRITE".to_string() }, - Lock::READ { name } => DbLock { id: -1, task_id, name: name.clone(), type_: "READ".to_string() }, + Lock::WRITE { name } => DbLock { + id: -1, + task_id, + name: name.clone(), + type_: "WRITE".to_string(), + }, + Lock::READ { name } => DbLock { + id: -1, + task_id, + name: name.clone(), + type_: "READ".to_string(), + }, } } } @@ -256,8 +269,7 @@ pub mod db_impl { _ => panic!( "Can't parse lock from database lock. Database corrupted? \ Expected READ or WRITE but found {} as type at key {}.", - self.type_, - self.id + self.type_, self.id ), } } @@ -274,46 +286,50 @@ pub mod db_impl { impl TaskDatabase for diesel::pg::PgConnection { async fn get_all_tasks(&mut self) -> Result, VickyError> { // very evil >>:( - use self::tasks::dsl::*; use self::locks::dsl::*; + use self::tasks::dsl::*; let db_tasks = tasks.load::(self)?; // prefetching all locks here, so we don't run into the N+1 Query Problem and distribute them let all_locks = locks.load::(self).unwrap_or_else(|_| vec![]); - let lock_map: HashMap<_, Vec> = all_locks.into_iter() + let lock_map: HashMap<_, Vec> = all_locks + .into_iter() .map(|db_lock| (db_lock.task_id, db_lock.into())) .into_group_map(); - let real_tasks: Vec = db_tasks.into_iter().map(|t| { - let real_locks = lock_map.get(&t.id).cloned().unwrap_or_default(); - - Task { - id: t.id.clone(), - display_name: t.display_name.clone(), - status: t.status.as_str().into(), - locks: real_locks, - features: t.features.split("||").map(String::from).collect(), - flake_ref: FlakeRef { - flake: t.flake_ref_uri.clone(), - args: t.flake_ref_args.split("||").map(String::from).collect(), + let real_tasks: Vec = db_tasks + .into_iter() + .map(|t| { + let real_locks = lock_map.get(&t.id).cloned().unwrap_or_default(); + + Task { + id: t.id.clone(), + display_name: t.display_name.clone(), + status: t.status.as_str().into(), + locks: real_locks, + features: t.features.split("||").map(String::from).collect(), + flake_ref: FlakeRef { + flake: t.flake_ref_uri.clone(), + args: t.flake_ref_args.split("||").map(String::from).collect(), + }, } - } - }).collect(); + }) + .collect(); Ok(real_tasks) } async fn get_task(&mut self, tid: Uuid) -> Result, VickyError> { // so evil >:O - use self::tasks::dsl::*; use self::locks::dsl::*; + use self::tasks::dsl::*; - let db_task= tasks.filter(self::tasks::id.eq(tid)).first::(self); + let db_task = tasks.filter(self::tasks::id.eq(tid)).first::(self); let db_task = match db_task { Err(diesel::result::Error::NotFound) => return Ok(None), - _ => db_task? + _ => db_task?, }; let db_locks: Vec = locks.filter(task_id.eq(task_id)).load::(self)?; @@ -321,25 +337,37 @@ pub mod db_impl { id: db_task.id, display_name: db_task.display_name.clone(), locks: db_locks.into_iter().map(|l| l.into()).collect(), - features: db_task.features.split("||").map(|s| s.to_string()).collect(), + features: db_task + .features + .split("||") + .map(|s| s.to_string()) + .collect(), flake_ref: FlakeRef { flake: db_task.flake_ref_uri.clone(), - args: db_task.features.split("||").map(|s| s.to_string()).collect(), + args: db_task + .features + .split("||") + .map(|s| s.to_string()) + .collect(), }, status: db_task.status.as_str().into(), }; - + Ok(Some(task)) } async fn put_task(&mut self, task: &Task) -> Result<(), VickyError> { // even more evil >;( - use self::tasks::dsl::*; use self::locks::dsl::*; - - let db_locks: Vec = task.locks.iter().map(|l| DbLock::from_lock(l, task.id)).collect(); + use self::tasks::dsl::*; + + let db_locks: Vec = task + .locks + .iter() + .map(|l| DbLock::from_lock(l, task.id)) + .collect(); let db_task: DbTask = task.into(); - + insert_into(tasks).values(db_task).execute(self)?; for db_lock in db_locks { insert_into(locks).values(db_lock).execute(self)?; From e99436b00255b39aed79f839f57cd872118e4514 Mon Sep 17 00:00:00 2001 From: Kek5chen Date: Mon, 29 Apr 2024 19:10:12 +0200 Subject: [PATCH 27/42] changed Into<..> to From<..> --- vicky/src/lib/database/entities/task.rs | 34 ++++++++++++------------- 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/vicky/src/lib/database/entities/task.rs b/vicky/src/lib/database/entities/task.rs index dc61aff..a7966f4 100644 --- a/vicky/src/lib/database/entities/task.rs +++ b/vicky/src/lib/database/entities/task.rs @@ -209,9 +209,9 @@ pub mod db_impl { } } - impl Into for &str { - fn into(self) -> TaskStatus { - match self { + impl From<&str> for TaskStatus { + fn from(str: &str) -> TaskStatus { + match str { "RUNNING" => TaskStatus::RUNNING, "FINISHED::SUCCESS" => TaskStatus::FINISHED(TaskResult::SUCCESS), "FINISHED::ERROR" => TaskStatus::FINISHED(TaskResult::ERROR), @@ -220,15 +220,15 @@ pub mod db_impl { } } - impl Into for &Task { - fn into(self) -> DbTask { + impl From<&Task> for DbTask { + fn from(task: &Task) -> DbTask { DbTask { - id: self.id, - display_name: self.display_name.clone(), - status: self.status.to_string(), - features: self.features.join("||"), - flake_ref_uri: self.flake_ref.flake.clone(), - flake_ref_args: self.flake_ref.args.join("||"), + id: task.id, + display_name: task.display_name.clone(), + status: task.status.to_string(), + features: task.features.join("||"), + flake_ref_uri: task.flake_ref.flake.clone(), + flake_ref_args: task.flake_ref.args.join("||"), } } } @@ -261,15 +261,15 @@ pub mod db_impl { } } - impl Into for DbLock { - fn into(self) -> Lock { - match self.type_.as_str() { - "WRITE" => Lock::WRITE { name: self.name }, - "READ" => Lock::READ { name: self.name }, + impl From for Lock { + fn from(lock: DbLock) -> Lock { + match lock.type_.as_str() { + "WRITE" => Lock::WRITE { name: lock.name }, + "READ" => Lock::READ { name: lock.name }, _ => panic!( "Can't parse lock from database lock. Database corrupted? \ Expected READ or WRITE but found {} as type at key {}.", - self.type_, self.id + lock.type_, lock.id ), } } From 42fd6c4b450a2c2af1f70ba1a75c9c0c53026498 Mon Sep 17 00:00:00 2001 From: Kek5chen Date: Mon, 29 Apr 2024 19:10:25 +0200 Subject: [PATCH 28/42] removed clone from Uuid field t.id --- vicky/src/lib/database/entities/task.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vicky/src/lib/database/entities/task.rs b/vicky/src/lib/database/entities/task.rs index a7966f4..e4b7ad5 100644 --- a/vicky/src/lib/database/entities/task.rs +++ b/vicky/src/lib/database/entities/task.rs @@ -305,7 +305,7 @@ pub mod db_impl { let real_locks = lock_map.get(&t.id).cloned().unwrap_or_default(); Task { - id: t.id.clone(), + id: t.id, display_name: t.display_name.clone(), status: t.status.as_str().into(), locks: real_locks, From 0f23979c2174b705fffeca80cc39eeb2654c17d4 Mon Sep 17 00:00:00 2001 From: Kek5chen Date: Mon, 29 Apr 2024 20:19:08 +0200 Subject: [PATCH 29/42] added r2d2 diesel integration and rocket_sync_db for connection pooling --- vicky/Cargo.toml | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/vicky/Cargo.toml b/vicky/Cargo.toml index d0dad0f..a56b4ac 100644 --- a/vicky/Cargo.toml +++ b/vicky/Cargo.toml @@ -18,12 +18,13 @@ serde_json = "1.0" async-trait = "0.1.71" aws-sdk-s3 = "0.28.0" aws-config = "0.55.3" -uuid = { version="1.4.1", features = ["fast-rng", "v4", "serde"] } -rocket = { version="0.5.0", features = ["json", "secrets"] } +uuid = { version = "1.4.1", features = ["fast-rng", "v4", "serde"] } +rocket = { version = "0.5.0", features = ["json", "secrets"] } rocket_dyn_templates = { version = "0.1.0", features = ["tera"] } -reqwest = { version="0.11.20", features = ["json"]} +rocket_sync_db_pools = { version = "0.1", features = ["diesel_postgres_pool"] } +reqwest = { version = "0.11.20", features = ["json"] } jwtk = "0.3.0" -diesel = { version = "2.1.6", features = ["postgres", "uuid"]} +diesel = { version = "2.1.6", features = ["postgres", "uuid", "r2d2"] } itertools = { version = "0.12.1" } [[bin]] From 890925fe92d5d5e03763c940e8cefcd49972ae51 Mon Sep 17 00:00:00 2001 From: Kek5chen Date: Mon, 29 Apr 2024 20:19:39 +0200 Subject: [PATCH 30/42] removed old database config options --- vicky/src/bin/vicky/main.rs | 24 +----------------------- 1 file changed, 1 insertion(+), 23 deletions(-) diff --git a/vicky/src/bin/vicky/main.rs b/vicky/src/bin/vicky/main.rs index d4376c6..8f783df 100644 --- a/vicky/src/bin/vicky/main.rs +++ b/vicky/src/bin/vicky/main.rs @@ -1,7 +1,6 @@ use std::time::Duration; use aws_sdk_s3::config::{Credentials, Region}; -use diesel::{Connection, PgConnection}; use jwtk::jwk::RemoteJwksVerifier; use rocket::fairing::AdHoc; @@ -10,6 +9,7 @@ use rocket::figment::{Figment, Profile}; use rocket::routes; use serde::Deserialize; use tokio::sync::broadcast; +use vickylib::database::entities::db_impl::Database; use vickylib::logs::LogDrain; use vickylib::s3::client::S3Client; @@ -27,20 +27,6 @@ mod events; mod tasks; mod user; -#[derive(Deserialize)] -pub struct TlsConfigOptions { - ca_file: String, - certificate_file: String, - key_file: String, -} -#[derive(Deserialize)] -pub struct DatabaseConfig { - endpoint: String, - username: String, - password: String, - database: String, -} - #[derive(Deserialize)] pub struct S3Config { endpoint: String, @@ -60,7 +46,6 @@ pub struct OIDCConfig { pub struct Config { machines: Vec, - db_config: DatabaseConfig, s3_config: S3Config, oidc_config: OIDCConfig, @@ -89,12 +74,6 @@ async fn main() -> anyhow::Result<()> { let build_rocket = rocket::custom(rocket_config_figment); let app_config = build_rocket.figment().extract::()?; - let dbc = &app_config.db_config; - - let db_client = PgConnection::establish(format!( - "postgres://{}:{}@{}/{}", - dbc.username, dbc.password, dbc.endpoint, dbc.database - )); let jwks_verifier = RemoteJwksVerifier::new( app_config.oidc_config.jwks_url, @@ -126,7 +105,6 @@ async fn main() -> anyhow::Result<()> { let (tx_global_events, _rx_task_events) = broadcast::channel::(5); build_rocket - .manage(db_client) .manage(s3_ext_client) .manage(log_drain) .manage(jwks_verifier) From 20e003772b8f080f17a42348d5dd0ecb03af2231 Mon Sep 17 00:00:00 2001 From: Kek5chen Date: Mon, 29 Apr 2024 20:21:31 +0200 Subject: [PATCH 31/42] PgConnection can't be async --- vicky/src/lib/database/entities/task.rs | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/vicky/src/lib/database/entities/task.rs b/vicky/src/lib/database/entities/task.rs index e4b7ad5..894b964 100644 --- a/vicky/src/lib/database/entities/task.rs +++ b/vicky/src/lib/database/entities/task.rs @@ -171,7 +171,6 @@ pub mod db_impl { use crate::database::entities::task::{Task, TaskResult, TaskStatus}; use crate::database::entities::FlakeRef; use crate::errors::VickyError; - use async_trait::async_trait; use diesel::{ insert_into, ExpressionMethods, Identifiable, Insertable, QueryDsl, Queryable, RunQueryDsl, Selectable, @@ -275,16 +274,14 @@ pub mod db_impl { } } - #[async_trait] pub trait TaskDatabase { - async fn get_all_tasks(&mut self) -> Result, VickyError>; - async fn get_task(&mut self, task_id: Uuid) -> Result, VickyError>; - async fn put_task(&mut self, task: &Task) -> Result<(), VickyError>; + fn get_all_tasks(&mut self) -> Result, VickyError>; + fn get_task(&mut self, task_id: Uuid) -> Result, VickyError>; + fn put_task(&mut self, task: &Task) -> Result<(), VickyError>; } - #[async_trait] impl TaskDatabase for diesel::pg::PgConnection { - async fn get_all_tasks(&mut self) -> Result, VickyError> { + fn get_all_tasks(&mut self) -> Result, VickyError> { // very evil >>:( use self::locks::dsl::*; use self::tasks::dsl::*; @@ -321,7 +318,7 @@ pub mod db_impl { Ok(real_tasks) } - async fn get_task(&mut self, tid: Uuid) -> Result, VickyError> { + fn get_task(&mut self, tid: Uuid) -> Result, VickyError> { // so evil >:O use self::locks::dsl::*; use self::tasks::dsl::*; @@ -356,7 +353,7 @@ pub mod db_impl { Ok(Some(task)) } - async fn put_task(&mut self, task: &Task) -> Result<(), VickyError> { + fn put_task(&mut self, task: &Task) -> Result<(), VickyError> { // even more evil >;( use self::locks::dsl::*; use self::tasks::dsl::*; From c26a1ddfbea06a28f82351ffdf35081e154f0377 Mon Sep 17 00:00:00 2001 From: Kek5chen Date: Mon, 29 Apr 2024 20:22:24 +0200 Subject: [PATCH 32/42] added Database type for connection pooling --- vicky/src/lib/database/entities/task.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/vicky/src/lib/database/entities/task.rs b/vicky/src/lib/database/entities/task.rs index 894b964..09a9e8f 100644 --- a/vicky/src/lib/database/entities/task.rs +++ b/vicky/src/lib/database/entities/task.rs @@ -182,6 +182,7 @@ pub mod db_impl { use crate::database::schema::locks; use crate::database::schema::tasks; use itertools::Itertools; + use rocket_sync_db_pools::database; #[derive(Insertable, Queryable)] #[diesel(table_name = tasks)] @@ -274,6 +275,9 @@ pub mod db_impl { } } + #[database("postgres_db")] + pub struct Database(diesel::PgConnection); + pub trait TaskDatabase { fn get_all_tasks(&mut self) -> Result, VickyError>; fn get_task(&mut self, task_id: Uuid) -> Result, VickyError>; From af14b0b52791f7986911f1c0df8e66ac84c26885 Mon Sep 17 00:00:00 2001 From: Kek5chen Date: Mon, 29 Apr 2024 20:23:28 +0200 Subject: [PATCH 33/42] maneuvered database to a connection pool --- Cargo.lock | 47 +++++++++++++++++++++++ vicky/Rocket.example.toml | 8 +--- vicky/src/bin/vicky/main.rs | 1 + vicky/src/bin/vicky/tasks.rs | 51 ++++++++++++------------- vicky/src/lib/database/entities/task.rs | 2 +- 5 files changed, 75 insertions(+), 34 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2479a26..cf69c7e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -971,6 +971,7 @@ dependencies = [ "diesel_derives", "itoa", "pq-sys", + "r2d2", "uuid", ] @@ -2195,6 +2196,17 @@ dependencies = [ "proc-macro2", ] +[[package]] +name = "r2d2" +version = "0.8.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51de85fb3fb6524929c8a2eb85e6b6d363de4e8c48f9e2c2eac4944abc181c93" +dependencies = [ + "log", + "parking_lot", + "scheduled-thread-pool", +] + [[package]] name = "rand" version = "0.8.5" @@ -2463,6 +2475,31 @@ dependencies = [ "uncased", ] +[[package]] +name = "rocket_sync_db_pools" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d83f32721ed79509adac4328e97f817a8f55a47c4b64799f6fd6cc3adb6e42ff" +dependencies = [ + "diesel", + "r2d2", + "rocket", + "rocket_sync_db_pools_codegen", + "serde", + "tokio", + "version_check", +] + +[[package]] +name = "rocket_sync_db_pools_codegen" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5cc890925dc79370c28eb15c9957677093fdb7e8c44966d189f38cedb995ee68" +dependencies = [ + "devise", + "quote", +] + [[package]] name = "rustc-demangle" version = "0.1.23" @@ -2554,6 +2591,15 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "scheduled-thread-pool" +version = "0.2.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3cbc66816425a074528352f5789333ecff06ca41b36b0b0efdfbb29edc391a19" +dependencies = [ + "parking_lot", +] + [[package]] name = "scoped-tls" version = "1.0.1" @@ -3398,6 +3444,7 @@ dependencies = [ "reqwest", "rocket", "rocket_dyn_templates", + "rocket_sync_db_pools", "serde", "serde_json", "thiserror", diff --git a/vicky/Rocket.example.toml b/vicky/Rocket.example.toml index fc5b564..886bd60 100644 --- a/vicky/Rocket.example.toml +++ b/vicky/Rocket.example.toml @@ -4,12 +4,8 @@ machines = [ "abc1234" ] -[default.db] -endpoints = [ "https://localhost:5432" ] -user = "vicky" -database = "vicky" -password = "vicky" - +[default.databases] +postgres_db = { url = "postgres://vicky:vicky@localhost/vicky" } [default.s3_config] endpoint = "http://localhost:9000" diff --git a/vicky/src/bin/vicky/main.rs b/vicky/src/bin/vicky/main.rs index 8f783df..5f692a8 100644 --- a/vicky/src/bin/vicky/main.rs +++ b/vicky/src/bin/vicky/main.rs @@ -109,6 +109,7 @@ async fn main() -> anyhow::Result<()> { .manage(log_drain) .manage(jwks_verifier) .manage(tx_global_events) + .attach(Database::fairing()) .attach(AdHoc::config::()) .mount("/api/v1/user", routes![get_user]) .mount("/api/v1/events", routes![get_global_events]) diff --git a/vicky/src/bin/vicky/tasks.rs b/vicky/src/bin/vicky/tasks.rs index b857067..1be94ae 100644 --- a/vicky/src/bin/vicky/tasks.rs +++ b/vicky/src/bin/vicky/tasks.rs @@ -1,4 +1,3 @@ -use diesel::PgConnection; use rocket::http::Status; use rocket::response::stream::{Event, EventStream}; use rocket::{get, post, serde::json::Json, State}; @@ -6,7 +5,7 @@ use serde::{Deserialize, Serialize}; use std::time; use tokio::sync::broadcast::{self, error::TryRecvError}; use uuid::Uuid; -use vickylib::database::entities::db_impl::TaskDatabase; +use vickylib::database::entities::db_impl::{Database, TaskDatabase}; use vickylib::database::entities::{FlakeRef, Lock, Task, TaskResult, TaskStatus}; use vickylib::{ errors::VickyError, logs::LogDrain, s3::client::S3Client, vicky::scheduler::Scheduler, @@ -48,49 +47,46 @@ pub struct LogLines { } #[get("/")] -pub async fn tasks_get_user( - db: &mut State, - _user: User, -) -> Result>, VickyError> { - let tasks: Vec = db.get_all_tasks().await?; +pub async fn tasks_get_user(db: Database, _user: User) -> Result>, VickyError> { + let tasks: Vec = db.run(|conn| conn.get_all_tasks()).await?; Ok(Json(tasks)) } #[get("/", rank = 2)] pub async fn tasks_get_machine( - db: &mut State, + db: Database, _machine: Machine, ) -> Result>, VickyError> { - let tasks: Vec = db.get_all_tasks().await?; + let tasks: Vec = db.run(|conn| conn.get_all_tasks()).await?; Ok(Json(tasks)) } #[get("/")] pub async fn tasks_specific_get_user( id: String, - db: &mut State, + db: Database, _user: User, ) -> Result>, VickyError> { let task_uuid = Uuid::parse_str(&id).unwrap(); - let tasks: Option = db.get_task(task_uuid).await?; + let tasks: Option = db.run(move |conn| conn.get_task(task_uuid)).await?; Ok(Json(tasks)) } #[get("/", rank = 2)] pub async fn tasks_specific_get_machine( id: String, - db: &mut State, + db: Database, _machine: Machine, ) -> Result>, VickyError> { let task_uuid = Uuid::parse_str(&id).unwrap(); - let tasks: Option = db.get_task(task_uuid).await?; + let tasks: Option = db.run(move |conn| conn.get_task(task_uuid)).await?; Ok(Json(tasks)) } #[get("//logs")] pub async fn tasks_get_logs<'a>( id: String, - db: &mut State, + db: Database, s3: &'a State, _user: User, log_drain: &'a State<&'_ LogDrain>, @@ -98,7 +94,7 @@ pub async fn tasks_get_logs<'a>( // TODO: Fix Error Handling let task_uuid = Uuid::parse_str(&id).unwrap(); let task = db - .get_task(task_uuid) + .run(move |conn| conn.get_task(task_uuid)) .await .unwrap() .ok_or(AppError::HttpError(Status::NotFound)) @@ -154,14 +150,14 @@ pub async fn tasks_get_logs<'a>( #[post("//logs", format = "json", data = "")] pub async fn tasks_put_logs( id: String, - db: &mut State, + db: Database, logs: Json, _machine: Machine, log_drain: &State<&LogDrain>, ) -> Result, AppError> { let task_uuid = Uuid::parse_str(&id)?; let task = db - .get_task(task_uuid) + .run(move |conn| conn.get_task(task_uuid)) .await? .ok_or(AppError::HttpError(Status::NotFound))?; @@ -176,12 +172,12 @@ pub async fn tasks_put_logs( #[post("/claim", format = "json", data = "")] pub async fn tasks_claim( - db: &mut State, + db: Database, features: Json, global_events: &State>, _machine: Machine, ) -> Result>, AppError> { - let tasks = db.get_all_tasks().await?; + let tasks = db.run(|conn| conn.get_all_tasks()).await?; let scheduler = Scheduler::new(tasks, &features.features) .map_err(|x| VickyError::Scheduler { source: x })?; let next_task = scheduler.get_next_task(); @@ -189,11 +185,12 @@ pub async fn tasks_claim( match next_task { Some(next_task) => { let mut task = db - .get_task(next_task.id) + .run(move |conn| conn.get_task(next_task.id)) .await? .ok_or(AppError::HttpError(Status::NotFound))?; task.status = TaskStatus::RUNNING; - db.put_task(&task).await?; + let task2 = task.clone(); + db.run(move |conn| conn.put_task(&task2)).await?; global_events.send(GlobalEvent::TaskUpdate { uuid: task.id })?; Ok(Json(Some(task))) } @@ -205,21 +202,22 @@ pub async fn tasks_claim( pub async fn tasks_finish( id: String, finish: Json, - db: &mut State, + db: Database, global_events: &State>, _machine: Machine, log_drain: &State<&LogDrain>, ) -> Result, AppError> { let task_uuid = Uuid::parse_str(&id)?; let mut task = db - .get_task(task_uuid) + .run(move |conn| conn.get_task(task_uuid)) .await? .ok_or(AppError::HttpError(Status::NotFound))?; log_drain.finish_logs(&id).await?; task.status = TaskStatus::FINISHED(finish.result.clone()); - db.put_task(&task).await?; + let task2 = task.clone(); + db.run(move |conn| conn.put_task(&task2)).await?; global_events.send(GlobalEvent::TaskUpdate { uuid: task.id })?; Ok(Json(task)) @@ -238,7 +236,7 @@ fn check_lock_conflict(task: &Task) -> bool { #[post("/", data = "")] pub async fn tasks_add( task: Json, - db: &mut State, + db: Database, global_events: &State>, _machine: Machine, ) -> Result, AppError> { @@ -257,7 +255,7 @@ pub async fn tasks_add( return Err(AppError::HttpError(Status::Conflict)); } - db.put_task(&task_manifest).await?; + db.run(move |conn| conn.put_task(&task_manifest)).await?; global_events.send(GlobalEvent::TaskAdd)?; let ro_task = RoTask { @@ -271,7 +269,6 @@ pub async fn tasks_add( #[cfg(test)] mod tests { use crate::tasks::check_lock_conflict; - use uuid::Uuid; use vickylib::database::entities::Task; #[test] diff --git a/vicky/src/lib/database/entities/task.rs b/vicky/src/lib/database/entities/task.rs index 09a9e8f..0806ba3 100644 --- a/vicky/src/lib/database/entities/task.rs +++ b/vicky/src/lib/database/entities/task.rs @@ -25,7 +25,7 @@ pub struct FlakeRef { pub args: Vec, } -#[derive(Debug, PartialEq, Serialize, Deserialize)] +#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)] pub struct Task { pub id: Uuid, pub display_name: String, From 28ba6885c879e8c09539b448f4b4281c44f5f4b7 Mon Sep 17 00:00:00 2001 From: Kek5chen Date: Mon, 29 Apr 2024 20:49:57 +0200 Subject: [PATCH 34/42] add new struct to dblock to push into database --- .../2024-04-26-080445_create_tasks/up.sql | 3 +-- vicky/src/lib/database/entities/task.rs | 23 +++++++++++++++++-- 2 files changed, 22 insertions(+), 4 deletions(-) diff --git a/vicky/migrations/2024-04-26-080445_create_tasks/up.sql b/vicky/migrations/2024-04-26-080445_create_tasks/up.sql index 44a3461..084b30a 100644 --- a/vicky/migrations/2024-04-26-080445_create_tasks/up.sql +++ b/vicky/migrations/2024-04-26-080445_create_tasks/up.sql @@ -10,11 +10,10 @@ CREATE TABLE tasks CREATE TABLE locks ( - id INT, + id SERIAL PRIMARY KEY, task_id uuid NOT NULL, name VARCHAR NOT NULL, type VARCHAR NOT NULL, - PRIMARY KEY (id), CONSTRAINT fk_task FOREIGN KEY (task_id) REFERENCES tasks (id) diff --git a/vicky/src/lib/database/entities/task.rs b/vicky/src/lib/database/entities/task.rs index 0806ba3..da740f3 100644 --- a/vicky/src/lib/database/entities/task.rs +++ b/vicky/src/lib/database/entities/task.rs @@ -233,7 +233,7 @@ pub mod db_impl { } } - #[derive(Selectable, Identifiable, Insertable, Queryable)] + #[derive(Selectable, Identifiable, Queryable)] #[diesel(table_name = locks)] struct DbLock { id: i32, @@ -242,6 +242,24 @@ pub mod db_impl { type_: String, } + #[derive(Insertable)] + #[diesel(table_name = locks)] + struct NewDbLock { + task_id: Uuid, + name: String, + type_: String, + } + + impl From for NewDbLock { + fn from(value: DbLock) -> Self { + NewDbLock { + task_id: value.task_id, + name: value.name, + type_: value.type_ + } + } + } + impl DbLock { fn from_lock(lock: &Lock, task_id: Uuid) -> Self { match lock { @@ -371,7 +389,8 @@ pub mod db_impl { insert_into(tasks).values(db_task).execute(self)?; for db_lock in db_locks { - insert_into(locks).values(db_lock).execute(self)?; + let new_db_lock: NewDbLock = db_lock.into(); + insert_into(locks).values(new_db_lock).execute(self)?; } Ok(()) } From 8ff3a248e8b6bab5c130cd62c2e7fee296d3317e Mon Sep 17 00:00:00 2001 From: KX Date: Tue, 30 Apr 2024 11:02:34 +0200 Subject: [PATCH 35/42] format migration --- vicky/migrations/2024-04-26-080445_create_tasks/up.sql | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/vicky/migrations/2024-04-26-080445_create_tasks/up.sql b/vicky/migrations/2024-04-26-080445_create_tasks/up.sql index 084b30a..a26b2a9 100644 --- a/vicky/migrations/2024-04-26-080445_create_tasks/up.sql +++ b/vicky/migrations/2024-04-26-080445_create_tasks/up.sql @@ -1,6 +1,6 @@ CREATE TABLE tasks ( - id uuid PRIMARY KEY, + id uuid PRIMARY KEY, display_name VARCHAR NOT NULL, status VARCHAR NOT NULL, features VARCHAR NOT NULL, @@ -10,8 +10,8 @@ CREATE TABLE tasks CREATE TABLE locks ( - id SERIAL PRIMARY KEY, - task_id uuid NOT NULL, + id SERIAL PRIMARY KEY, + task_id uuid NOT NULL, name VARCHAR NOT NULL, type VARCHAR NOT NULL, CONSTRAINT fk_task From 8a868f52212363fda5edf9b9ac0044f4fc3c861f Mon Sep 17 00:00:00 2001 From: KX Date: Tue, 30 Apr 2024 11:03:20 +0200 Subject: [PATCH 36/42] add update_task to update database tasks --- vicky/src/bin/vicky/tasks.rs | 4 ++-- vicky/src/lib/database/entities/task.rs | 31 +++++++++++++++++++------ 2 files changed, 26 insertions(+), 9 deletions(-) diff --git a/vicky/src/bin/vicky/tasks.rs b/vicky/src/bin/vicky/tasks.rs index 1be94ae..d1fe633 100644 --- a/vicky/src/bin/vicky/tasks.rs +++ b/vicky/src/bin/vicky/tasks.rs @@ -190,7 +190,7 @@ pub async fn tasks_claim( .ok_or(AppError::HttpError(Status::NotFound))?; task.status = TaskStatus::RUNNING; let task2 = task.clone(); - db.run(move |conn| conn.put_task(&task2)).await?; + db.run(move |conn| conn.update_task(&task2)).await?; global_events.send(GlobalEvent::TaskUpdate { uuid: task.id })?; Ok(Json(Some(task))) } @@ -217,7 +217,7 @@ pub async fn tasks_finish( task.status = TaskStatus::FINISHED(finish.result.clone()); let task2 = task.clone(); - db.run(move |conn| conn.put_task(&task2)).await?; + db.run(move |conn| conn.update_task(&task2)).await?; global_events.send(GlobalEvent::TaskUpdate { uuid: task.id })?; Ok(Json(task)) diff --git a/vicky/src/lib/database/entities/task.rs b/vicky/src/lib/database/entities/task.rs index da740f3..6688487 100644 --- a/vicky/src/lib/database/entities/task.rs +++ b/vicky/src/lib/database/entities/task.rs @@ -172,8 +172,8 @@ pub mod db_impl { use crate::database::entities::FlakeRef; use crate::errors::VickyError; use diesel::{ - insert_into, ExpressionMethods, Identifiable, Insertable, QueryDsl, Queryable, RunQueryDsl, - Selectable, + insert_into, AsChangeset, ExpressionMethods, Identifiable, Insertable, QueryDsl, Queryable, + RunQueryDsl, Selectable, }; use std::collections::HashMap; use std::fmt::Display; @@ -184,7 +184,7 @@ pub mod db_impl { use itertools::Itertools; use rocket_sync_db_pools::database; - #[derive(Insertable, Queryable)] + #[derive(Insertable, Queryable, AsChangeset, Debug)] #[diesel(table_name = tasks)] struct DbTask { pub id: Uuid, @@ -233,7 +233,7 @@ pub mod db_impl { } } - #[derive(Selectable, Identifiable, Queryable)] + #[derive(Selectable, Identifiable, Queryable, Debug)] #[diesel(table_name = locks)] struct DbLock { id: i32, @@ -242,20 +242,20 @@ pub mod db_impl { type_: String, } - #[derive(Insertable)] + #[derive(Insertable, Debug)] #[diesel(table_name = locks)] struct NewDbLock { task_id: Uuid, name: String, type_: String, } - + impl From for NewDbLock { fn from(value: DbLock) -> Self { NewDbLock { task_id: value.task_id, name: value.name, - type_: value.type_ + type_: value.type_, } } } @@ -300,6 +300,7 @@ pub mod db_impl { fn get_all_tasks(&mut self) -> Result, VickyError>; fn get_task(&mut self, task_id: Uuid) -> Result, VickyError>; fn put_task(&mut self, task: &Task) -> Result<(), VickyError>; + fn update_task(&mut self, task: &Task) -> Result<(), VickyError>; } impl TaskDatabase for diesel::pg::PgConnection { @@ -394,5 +395,21 @@ pub mod db_impl { } Ok(()) } + + fn update_task(&mut self, task: &Task) -> Result<(), VickyError> { + // even more evil >;( + use self::tasks::dsl::*; + + let db_task: DbTask = task.into(); + + insert_into(tasks) + .values(&db_task) + .on_conflict(id) + .do_update() + .set(&db_task) + .execute(self)?; + + Ok(()) + } } } From b3c2dc11599e26acac29f122e31ebe2c1bd7e18c Mon Sep 17 00:00:00 2001 From: KX Date: Thu, 2 May 2024 10:23:58 +0200 Subject: [PATCH 37/42] make use of pg arrays --- .../2024-04-26-080445_create_tasks/up.sql | 4 +-- vicky/src/lib/database/entities/task.rs | 32 +++++++------------ vicky/src/lib/database/schema.rs | 4 +-- 3 files changed, 16 insertions(+), 24 deletions(-) diff --git a/vicky/migrations/2024-04-26-080445_create_tasks/up.sql b/vicky/migrations/2024-04-26-080445_create_tasks/up.sql index a26b2a9..b4457dc 100644 --- a/vicky/migrations/2024-04-26-080445_create_tasks/up.sql +++ b/vicky/migrations/2024-04-26-080445_create_tasks/up.sql @@ -3,9 +3,9 @@ CREATE TABLE tasks id uuid PRIMARY KEY, display_name VARCHAR NOT NULL, status VARCHAR NOT NULL, - features VARCHAR NOT NULL, + features text[] NOT NULL, flake_ref_uri VARCHAR NOT NULL, - flake_ref_args VARCHAR NOT NULL + flake_ref_args text[] NOT NULL ); CREATE TABLE locks diff --git a/vicky/src/lib/database/entities/task.rs b/vicky/src/lib/database/entities/task.rs index 6688487..109cf40 100644 --- a/vicky/src/lib/database/entities/task.rs +++ b/vicky/src/lib/database/entities/task.rs @@ -190,9 +190,9 @@ pub mod db_impl { pub id: Uuid, pub display_name: String, pub status: String, - pub features: String, + pub features: Vec>, pub flake_ref_uri: String, - pub flake_ref_args: String, + pub flake_ref_args: Vec>, } impl Display for TaskStatus { @@ -226,9 +226,9 @@ pub mod db_impl { id: task.id, display_name: task.display_name.clone(), status: task.status.to_string(), - features: task.features.join("||"), + features: task.features.clone().iter().cloned().map(Some).collect(), flake_ref_uri: task.flake_ref.flake.clone(), - flake_ref_args: task.flake_ref.args.join("||"), + flake_ref_args: task.flake_ref.args.iter().cloned().map(Some).collect(), } } } @@ -326,13 +326,13 @@ pub mod db_impl { Task { id: t.id, - display_name: t.display_name.clone(), + display_name: t.display_name, status: t.status.as_str().into(), locks: real_locks, - features: t.features.split("||").map(String::from).collect(), + features: t.features.into_iter().flatten().collect(), flake_ref: FlakeRef { - flake: t.flake_ref_uri.clone(), - args: t.flake_ref_args.split("||").map(String::from).collect(), + flake: t.flake_ref_uri, + args: t.flake_ref_args.into_iter().flatten().collect(), }, } }) @@ -355,20 +355,12 @@ pub mod db_impl { let task = Task { id: db_task.id, - display_name: db_task.display_name.clone(), + display_name: db_task.display_name, locks: db_locks.into_iter().map(|l| l.into()).collect(), - features: db_task - .features - .split("||") - .map(|s| s.to_string()) - .collect(), + features: db_task.features.into_iter().flatten().collect(), flake_ref: FlakeRef { - flake: db_task.flake_ref_uri.clone(), - args: db_task - .features - .split("||") - .map(|s| s.to_string()) - .collect(), + flake: db_task.flake_ref_uri, + args: db_task.flake_ref_args.into_iter().flatten().collect(), }, status: db_task.status.as_str().into(), }; diff --git a/vicky/src/lib/database/schema.rs b/vicky/src/lib/database/schema.rs index 9053820..18d76a2 100644 --- a/vicky/src/lib/database/schema.rs +++ b/vicky/src/lib/database/schema.rs @@ -15,9 +15,9 @@ diesel::table! { id -> Uuid, display_name -> Varchar, status -> Varchar, - features -> Varchar, + features -> Array>, flake_ref_uri -> Varchar, - flake_ref_args -> Varchar, + flake_ref_args -> Array>, } } From 11ec64c39a5f365fbd14dfb2932e14de036aca01 Mon Sep 17 00:00:00 2001 From: KX Date: Thu, 2 May 2024 10:34:09 +0200 Subject: [PATCH 38/42] use update instead of upsert --- vicky/src/lib/database/entities/task.rs | 14 +++----------- 1 file changed, 3 insertions(+), 11 deletions(-) diff --git a/vicky/src/lib/database/entities/task.rs b/vicky/src/lib/database/entities/task.rs index 109cf40..a6ffdfc 100644 --- a/vicky/src/lib/database/entities/task.rs +++ b/vicky/src/lib/database/entities/task.rs @@ -171,10 +171,7 @@ pub mod db_impl { use crate::database::entities::task::{Task, TaskResult, TaskStatus}; use crate::database::entities::FlakeRef; use crate::errors::VickyError; - use diesel::{ - insert_into, AsChangeset, ExpressionMethods, Identifiable, Insertable, QueryDsl, Queryable, - RunQueryDsl, Selectable, - }; + use diesel::{insert_into, AsChangeset, ExpressionMethods, Identifiable, Insertable, QueryDsl, Queryable, RunQueryDsl, Selectable, update}; use std::collections::HashMap; use std::fmt::Display; use uuid::Uuid; @@ -392,13 +389,8 @@ pub mod db_impl { // even more evil >;( use self::tasks::dsl::*; - let db_task: DbTask = task.into(); - - insert_into(tasks) - .values(&db_task) - .on_conflict(id) - .do_update() - .set(&db_task) + update(tasks.filter(id.eq(task.id))) + .set(status.eq(task.status.clone().to_string())) .execute(self)?; Ok(()) From c666092e80b227d4327e72d7aa173460900dc0cd Mon Sep 17 00:00:00 2001 From: KX Date: Thu, 2 May 2024 10:44:25 +0200 Subject: [PATCH 39/42] add comment for -1 in DbLock --- vicky/src/lib/database/entities/task.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/vicky/src/lib/database/entities/task.rs b/vicky/src/lib/database/entities/task.rs index a6ffdfc..39c97ba 100644 --- a/vicky/src/lib/database/entities/task.rs +++ b/vicky/src/lib/database/entities/task.rs @@ -259,6 +259,11 @@ pub mod db_impl { impl DbLock { fn from_lock(lock: &Lock, task_id: Uuid) -> Self { + // Converting a Lock to a DbLock only happens when inserting or updating the database, + // in which case the id column is irrelevant as it's auto generated in the database. + // A DbLock should not be inserted into a database anyway, as it's just a transient type + // for inserting a NewDbLock. Thus, id is set to -1 here. Maybe this can be improved wholly? + // At least it works. match lock { Lock::WRITE { name } => DbLock { id: -1, From 335303d5c89f387da0bd6451f5d20f55cbf8f151 Mon Sep 17 00:00:00 2001 From: KX Date: Thu, 2 May 2024 10:50:17 +0200 Subject: [PATCH 40/42] use TryFrom for TaskStatus instead for now --- vicky/src/lib/database/entities/task.rs | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/vicky/src/lib/database/entities/task.rs b/vicky/src/lib/database/entities/task.rs index 39c97ba..64ff55a 100644 --- a/vicky/src/lib/database/entities/task.rs +++ b/vicky/src/lib/database/entities/task.rs @@ -206,13 +206,16 @@ pub mod db_impl { } } - impl From<&str> for TaskStatus { - fn from(str: &str) -> TaskStatus { + impl TryFrom<&str> for TaskStatus { + type Error = (); + + fn try_from(str: &str) -> Result { match str { - "RUNNING" => TaskStatus::RUNNING, - "FINISHED::SUCCESS" => TaskStatus::FINISHED(TaskResult::SUCCESS), - "FINISHED::ERROR" => TaskStatus::FINISHED(TaskResult::ERROR), - _ => TaskStatus::NEW, + "NEW" => Ok(TaskStatus::NEW), + "RUNNING" => Ok(TaskStatus::RUNNING), + "FINISHED::SUCCESS" => Ok(TaskStatus::FINISHED(TaskResult::SUCCESS)), + "FINISHED::ERROR" => Ok(TaskStatus::FINISHED(TaskResult::ERROR)), + _ => Err(()), } } } @@ -329,7 +332,7 @@ pub mod db_impl { Task { id: t.id, display_name: t.display_name, - status: t.status.as_str().into(), + status: t.status.as_str().try_into().expect("Got unexpected status value. Database is corrupted"), locks: real_locks, features: t.features.into_iter().flatten().collect(), flake_ref: FlakeRef { @@ -364,7 +367,7 @@ pub mod db_impl { flake: db_task.flake_ref_uri, args: db_task.flake_ref_args.into_iter().flatten().collect(), }, - status: db_task.status.as_str().into(), + status: db_task.status.as_str().try_into().expect("Got unexpected status value. Database is corrupted"), }; Ok(Some(task)) From cc6e1ac6d6222ae04f90beaab4248f1e72d2cd29 Mon Sep 17 00:00:00 2001 From: KX Date: Thu, 2 May 2024 10:50:47 +0200 Subject: [PATCH 41/42] mark dead code for CI since User.role is used --- vicky/src/bin/vicky/auth.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/vicky/src/bin/vicky/auth.rs b/vicky/src/bin/vicky/auth.rs index b2fd5e8..37bddcc 100644 --- a/vicky/src/bin/vicky/auth.rs +++ b/vicky/src/bin/vicky/auth.rs @@ -13,6 +13,7 @@ pub enum Role { Admin, } +#[allow(dead_code)] #[derive(Deserialize)] pub struct User { pub full_name: String, From a665df031e6cc3b72989a8c89758485417198010 Mon Sep 17 00:00:00 2001 From: KX Date: Thu, 2 May 2024 11:13:10 +0200 Subject: [PATCH 42/42] restructured modules and formatted --- vicky/src/bin/vicky/main.rs | 2 +- vicky/src/bin/vicky/tasks.rs | 5 +- vicky/src/lib/database/entities/lock.rs | 72 ++++++++++++++++++++ vicky/src/lib/database/entities/mod.rs | 12 ++-- vicky/src/lib/database/entities/task.rs | 89 +++++-------------------- vicky/src/lib/vicky/scheduler.rs | 12 ++-- 6 files changed, 107 insertions(+), 85 deletions(-) diff --git a/vicky/src/bin/vicky/main.rs b/vicky/src/bin/vicky/main.rs index 5f692a8..978d49c 100644 --- a/vicky/src/bin/vicky/main.rs +++ b/vicky/src/bin/vicky/main.rs @@ -9,7 +9,7 @@ use rocket::figment::{Figment, Profile}; use rocket::routes; use serde::Deserialize; use tokio::sync::broadcast; -use vickylib::database::entities::db_impl::Database; +use vickylib::database::entities::Database; use vickylib::logs::LogDrain; use vickylib::s3::client::S3Client; diff --git a/vicky/src/bin/vicky/tasks.rs b/vicky/src/bin/vicky/tasks.rs index d1fe633..caaa628 100644 --- a/vicky/src/bin/vicky/tasks.rs +++ b/vicky/src/bin/vicky/tasks.rs @@ -5,8 +5,9 @@ use serde::{Deserialize, Serialize}; use std::time; use tokio::sync::broadcast::{self, error::TryRecvError}; use uuid::Uuid; -use vickylib::database::entities::db_impl::{Database, TaskDatabase}; -use vickylib::database::entities::{FlakeRef, Lock, Task, TaskResult, TaskStatus}; +use vickylib::database::entities::task::db_impl::TaskDatabase; +use vickylib::database::entities::task::{FlakeRef, TaskResult, TaskStatus}; +use vickylib::database::entities::{Database, Lock, Task}; use vickylib::{ errors::VickyError, logs::LogDrain, s3::client::S3Client, vicky::scheduler::Scheduler, }; diff --git a/vicky/src/lib/database/entities/lock.rs b/vicky/src/lib/database/entities/lock.rs index a3d4ba4..9c2b032 100644 --- a/vicky/src/lib/database/entities/lock.rs +++ b/vicky/src/lib/database/entities/lock.rs @@ -17,3 +17,75 @@ impl Lock { } } } + +pub mod db_impl { + use crate::database::entities::Lock; + use crate::database::schema::locks; + use diesel::{Identifiable, Insertable, Queryable, Selectable}; + use uuid::Uuid; + + #[derive(Selectable, Identifiable, Queryable, Debug)] + #[diesel(table_name = locks)] + pub struct DbLock { + pub id: i32, + pub task_id: Uuid, + pub name: String, + pub type_: String, + } + + #[derive(Insertable, Debug)] + #[diesel(table_name = locks)] + pub struct NewDbLock { + pub task_id: Uuid, + pub name: String, + pub type_: String, + } + + impl From for NewDbLock { + fn from(value: DbLock) -> Self { + NewDbLock { + task_id: value.task_id, + name: value.name, + type_: value.type_, + } + } + } + + impl DbLock { + pub fn from_lock(lock: &Lock, task_id: Uuid) -> Self { + // Converting a Lock to a DbLock only happens when inserting or updating the database, + // in which case the id column is irrelevant as it's auto generated in the database. + // A DbLock should not be inserted into a database anyway, as it's just a transient type + // for inserting a NewDbLock. Thus, id is set to -1 here. Maybe this can be improved wholly? + // At least it works. + match lock { + Lock::WRITE { name } => DbLock { + id: -1, + task_id, + name: name.clone(), + type_: "WRITE".to_string(), + }, + Lock::READ { name } => DbLock { + id: -1, + task_id, + name: name.clone(), + type_: "READ".to_string(), + }, + } + } + } + + impl From for Lock { + fn from(lock: DbLock) -> Lock { + match lock.type_.as_str() { + "WRITE" => Lock::WRITE { name: lock.name }, + "READ" => Lock::READ { name: lock.name }, + _ => panic!( + "Can't parse lock from database lock. Database corrupted? \ + Expected READ or WRITE but found {} as type at key {}.", + lock.type_, lock.id + ), + } + } + } +} diff --git a/vicky/src/lib/database/entities/mod.rs b/vicky/src/lib/database/entities/mod.rs index fb793e2..33cb4c0 100644 --- a/vicky/src/lib/database/entities/mod.rs +++ b/vicky/src/lib/database/entities/mod.rs @@ -1,5 +1,9 @@ -mod task; -mod lock; +pub mod lock; +pub mod task; -pub use task::*; -pub use lock::*; +pub use lock::Lock; +use rocket_sync_db_pools::database; +pub use task::Task; + +#[database("postgres_db")] +pub struct Database(diesel::PgConnection); diff --git a/vicky/src/lib/database/entities/task.rs b/vicky/src/lib/database/entities/task.rs index 64ff55a..e263351 100644 --- a/vicky/src/lib/database/entities/task.rs +++ b/vicky/src/lib/database/entities/task.rs @@ -168,18 +168,21 @@ impl TaskBuilder { // mess up the whole namespace and HAVE to be scoped pub mod db_impl { use crate::database::entities::lock::Lock; + use crate::database::entities::task::FlakeRef; use crate::database::entities::task::{Task, TaskResult, TaskStatus}; - use crate::database::entities::FlakeRef; use crate::errors::VickyError; - use diesel::{insert_into, AsChangeset, ExpressionMethods, Identifiable, Insertable, QueryDsl, Queryable, RunQueryDsl, Selectable, update}; + use diesel::{ + insert_into, update, AsChangeset, ExpressionMethods, Insertable, QueryDsl, Queryable, + RunQueryDsl, + }; use std::collections::HashMap; use std::fmt::Display; use uuid::Uuid; // these here are evil >:( + use crate::database::entities::lock::db_impl::{DbLock, NewDbLock}; use crate::database::schema::locks; use crate::database::schema::tasks; use itertools::Itertools; - use rocket_sync_db_pools::database; #[derive(Insertable, Queryable, AsChangeset, Debug)] #[diesel(table_name = tasks)] @@ -233,74 +236,6 @@ pub mod db_impl { } } - #[derive(Selectable, Identifiable, Queryable, Debug)] - #[diesel(table_name = locks)] - struct DbLock { - id: i32, - task_id: Uuid, - name: String, - type_: String, - } - - #[derive(Insertable, Debug)] - #[diesel(table_name = locks)] - struct NewDbLock { - task_id: Uuid, - name: String, - type_: String, - } - - impl From for NewDbLock { - fn from(value: DbLock) -> Self { - NewDbLock { - task_id: value.task_id, - name: value.name, - type_: value.type_, - } - } - } - - impl DbLock { - fn from_lock(lock: &Lock, task_id: Uuid) -> Self { - // Converting a Lock to a DbLock only happens when inserting or updating the database, - // in which case the id column is irrelevant as it's auto generated in the database. - // A DbLock should not be inserted into a database anyway, as it's just a transient type - // for inserting a NewDbLock. Thus, id is set to -1 here. Maybe this can be improved wholly? - // At least it works. - match lock { - Lock::WRITE { name } => DbLock { - id: -1, - task_id, - name: name.clone(), - type_: "WRITE".to_string(), - }, - Lock::READ { name } => DbLock { - id: -1, - task_id, - name: name.clone(), - type_: "READ".to_string(), - }, - } - } - } - - impl From for Lock { - fn from(lock: DbLock) -> Lock { - match lock.type_.as_str() { - "WRITE" => Lock::WRITE { name: lock.name }, - "READ" => Lock::READ { name: lock.name }, - _ => panic!( - "Can't parse lock from database lock. Database corrupted? \ - Expected READ or WRITE but found {} as type at key {}.", - lock.type_, lock.id - ), - } - } - } - - #[database("postgres_db")] - pub struct Database(diesel::PgConnection); - pub trait TaskDatabase { fn get_all_tasks(&mut self) -> Result, VickyError>; fn get_task(&mut self, task_id: Uuid) -> Result, VickyError>; @@ -332,7 +267,11 @@ pub mod db_impl { Task { id: t.id, display_name: t.display_name, - status: t.status.as_str().try_into().expect("Got unexpected status value. Database is corrupted"), + status: t + .status + .as_str() + .try_into() + .expect("Got unexpected status value. Database is corrupted"), locks: real_locks, features: t.features.into_iter().flatten().collect(), flake_ref: FlakeRef { @@ -367,7 +306,11 @@ pub mod db_impl { flake: db_task.flake_ref_uri, args: db_task.flake_ref_args.into_iter().flatten().collect(), }, - status: db_task.status.as_str().try_into().expect("Got unexpected status value. Database is corrupted"), + status: db_task + .status + .as_str() + .try_into() + .expect("Got unexpected status value. Database is corrupted"), }; Ok(Some(task)) diff --git a/vicky/src/lib/vicky/scheduler.rs b/vicky/src/lib/vicky/scheduler.rs index 4a6c2d9..98cfe2a 100644 --- a/vicky/src/lib/vicky/scheduler.rs +++ b/vicky/src/lib/vicky/scheduler.rs @@ -2,8 +2,9 @@ use std::collections::HashMap; use log::debug; +use crate::database::entities::task::TaskStatus; use crate::{ - database::entities::{Lock, Task, TaskStatus}, + database::entities::{Lock, Task}, errors::SchedulerError, }; @@ -148,7 +149,8 @@ impl Scheduler { #[cfg(test)] mod tests { - use crate::database::entities::{Task, TaskStatus}; + use crate::database::entities::task::TaskStatus; + use crate::database::entities::Task; use super::Scheduler; @@ -162,7 +164,7 @@ mod tests { Task::builder() .with_display_name("Test 2") .with_status(TaskStatus::RUNNING) - .build() + .build(), ]; Scheduler::new(tasks, &[]).unwrap(); @@ -180,7 +182,7 @@ mod tests { .with_display_name("Test 2") .with_status(TaskStatus::RUNNING) .with_read_lock("foo 1") - .build() + .build(), ]; Scheduler::new(tasks, &[]).unwrap(); @@ -198,7 +200,7 @@ mod tests { .with_display_name("Test 2") .with_status(TaskStatus::RUNNING) .with_write_lock("foo2") - .build() + .build(), ]; Scheduler::new(tasks, &[]).unwrap();