diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 000000000..2ac0bebaa --- /dev/null +++ b/.dockerignore @@ -0,0 +1,7 @@ +.idea +.DS_Store +/data +.trustify +/target +/.dockerignore +/Containerfile diff --git a/Cargo.lock b/Cargo.lock index ff40c704f..1909fa6bd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2079,11 +2079,10 @@ dependencies = [ [[package]] name = "csaf" version = "0.5.0" -source = "git+https://github.com/trustification/csaf-rs#17620a225744b4a18845d4f7bf63354e01109b91" +source = "git+https://github.com/trustification/csaf-rs?branch=cvss#d6df319076cdc685a2cded0cb6797ad7ce92b9c9" dependencies = [ "chrono", "cpe", - "cvss", "packageurl", "serde", "serde_json", @@ -2190,13 +2189,20 @@ dependencies = [ [[package]] name = "cvss" -version = "2.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f4f643e062e9a8e26edea270945e05011c441ca6a56e9d9d4464c6b0be1352bd" +version = "0.1.0" +source = "git+https://github.com/dejanb/cvss#aadb35f033d124da099bf74fb1383fc30a62c163" dependencies = [ "serde", + "serde_json", + "strum 0.26.3", ] +[[package]] +name = "cvss" +version = "2.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f4f643e062e9a8e26edea270945e05011c441ca6a56e9d9d4464c6b0be1352bd" + [[package]] name = "darling" version = "0.10.2" @@ -3872,7 +3878,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "70a646d946d06bedbbc4cac4c218acf4bbf2d87757a784857025f4d447e4e1cd" dependencies = [ "console", + "futures-core", "portable-atomic", + "tokio", "unicode-width", "unit-prefix", "web-time", @@ -8346,6 +8354,7 @@ dependencies = [ "sea-orm-migration", "sea-query", "serde", + "serde-cyclonedx", "serde_json", "spdx-expression", "spdx-rs", @@ -8397,6 +8406,8 @@ dependencies = [ "anyhow", "async-graphql", "cpe", + "cvss 0.1.0", + "cvss 2.1.1", "deepsize", "log", "rstest", @@ -8457,14 +8468,32 @@ name = "trustify-migration" version = "0.4.0-beta.1" dependencies = [ "anyhow", + "bytes", + "clap", + "csaf", + "cve", + "futures", + "futures-util", + "humantime", + "indicatif", + "osv", + "sea-orm", "sea-orm-migration", + "serde-cyclonedx", + "serde_json", + "spdx-rs", + "strum 0.27.2", "test-context", "test-log", "tokio", "tokio-util", + "tracing", + "tracing-subscriber", "trustify-common", "trustify-db", "trustify-entity", + "trustify-module-ingestor", + "trustify-module-storage", "trustify-test-context", "uuid", ] @@ -8687,6 +8716,7 @@ dependencies = [ "cpe", "csaf", "cve", + "cvss 0.1.0", "hex", "humantime", "jsn", @@ -8920,6 +8950,8 @@ dependencies = [ "trustify-common", "trustify-db", "trustify-infrastructure", + "trustify-migration", + "trustify-module-storage", "trustify-server", ] diff --git a/Cargo.toml b/Cargo.toml index 4eecdfab0..ab67995ef 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -63,6 +63,8 @@ csaf = { version = "0.5.0", default-features = false } csaf-walker = { version = "0.14.1", default-features = false } csv = "1.3.0" cve = "0.4.0" +cvss = { git = "https://github.com/dejanb/cvss" } +cvss-old = { package = "cvss", version = "2" } deepsize = "0.2.0" fixedbitset = "0.5.7" flate2 = "1.0.35" @@ -77,6 +79,7 @@ http = "1" human-date-parser = "0.3" humantime = "2" humantime-serde = "1" +indicatif = "0.18.0" itertools = "0.14" jsn = "0.14" json-merge-patch = "0.0.1" @@ -203,7 +206,7 @@ postgresql_commands = { version = "0.20.0", default-features = false, features = # required due to https://github.com/KenDJohnson/cpe-rs/pull/15 #cpe = { git = "https://github.com/ctron/cpe-rs", rev = "c3c05e637f6eff7dd4933c2f56d070ee2ddfb44b" } # required due to https://github.com/voteblake/csaf-rs/pull/29 -csaf = { git = "https://github.com/trustification/csaf-rs" } +csaf = { git = "https://github.com/trustification/csaf-rs", branch = "cvss" } # required due to https://github.com/gcmurphy/osv/pull/58 #osv = { git = "https://github.com/ctron/osv", branch = "feature/drop_deps_1" } diff --git a/Containerfile b/Containerfile new file mode 100644 index 000000000..3445ea13d --- /dev/null +++ b/Containerfile @@ -0,0 +1,22 @@ +FROM registry.access.redhat.com/ubi9/ubi:latest AS builder + +RUN dnf install --setop install_weak_deps=false --nodocs -y git python gcc g++ cmake ninja-build openssl-devel xz + +RUN curl https://sh.rustup.rs -sSf | bash -s -- -y +ENV PATH="/root/.cargo/bin:${PATH}" + +RUN mkdir /build + +COPY . /build + +WORKDIR /build + +RUN ls + +RUN rm rust-toolchain.toml + +RUN cargo build --release + +FROM registry.access.redhat.com/ubi9/ubi-minimal:latest + +COPY --from=builder /build/target/release/trustd /usr/local/bin/ diff --git a/TODO.md b/TODO.md new file mode 100644 index 000000000..94ecc2b5d --- /dev/null +++ b/TODO.md @@ -0,0 +1,4 @@ +# ToDo + +* [ ] Allowing skipping data part of the migration +* [x] Allow concurrent instances (x of y) diff --git a/common/Cargo.toml b/common/Cargo.toml index e295d30aa..2cb41bffd 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -17,6 +17,7 @@ deepsize = { workspace = true } hex = { workspace = true } hide = { workspace = true } human-date-parser = { workspace = true } +humantime = { workspace = true } itertools = { workspace = true } lenient_semver = { workspace = true } log = { workspace = true } @@ -32,6 +33,7 @@ sea-orm = { workspace = true, features = ["sea-query-binder", "sqlx-postgres", " sea-orm-migration = { workspace = true } sea-query = { workspace = true } serde = { workspace = true, features = ["derive"] } +serde-cyclonedx = { workspace = true } serde_json = { workspace = true } spdx-expression = { workspace = true } spdx-rs = { workspace = true } @@ -45,7 +47,6 @@ urlencoding = { workspace = true } utoipa = { workspace = true, features = ["url"] } uuid = { workspace = true, features = ["v5", "serde"] } walker-common = { workspace = true, features = ["bzip2", "liblzma", "flate2"] } -humantime = { workspace = true } [dev-dependencies] chrono = { workspace = true } diff --git a/common/db/src/lib.rs b/common/db/src/lib.rs index c74ddb8ea..4e245e2cf 100644 --- a/common/db/src/lib.rs +++ b/common/db/src/lib.rs @@ -2,6 +2,7 @@ pub mod embedded; use anyhow::{Context, anyhow, ensure}; use migration::Migrator; +use migration::data::Runner; use postgresql_commands::{CommandBuilder, psql::PsqlBuilder}; use sea_orm::{ConnectionTrait, Statement}; use sea_orm_migration::prelude::MigratorTrait; @@ -121,4 +122,8 @@ impl<'a> Database<'a> { Ok(db) } + + pub async fn data_migrate(&self, runner: Runner) -> Result<(), anyhow::Error> { + runner.run::().await + } } diff --git a/common/src/advisory/cyclonedx.rs b/common/src/advisory/cyclonedx.rs new file mode 100644 index 000000000..b05b627af --- /dev/null +++ b/common/src/advisory/cyclonedx.rs @@ -0,0 +1,27 @@ +use serde_cyclonedx::cyclonedx::v_1_6::CycloneDx; +use std::collections::HashMap; + +/// extract CycloneDX SBOM general purpose properties +pub fn extract_properties(sbom: &CycloneDx) -> HashMap> { + sbom.properties + .iter() + .flatten() + .map(|e| (e.name.clone(), e.value.clone())) + .collect() +} + +/// extract CycloneDX SBOM general purpose properties, convert into [`serde_json::Value`] +pub fn extract_properties_json(sbom: &CycloneDx) -> serde_json::Value { + serde_json::Value::Object( + extract_properties(sbom) + .into_iter() + .map(|(k, v)| { + ( + k, + v.map(serde_json::Value::String) + .unwrap_or(serde_json::Value::Null), + ) + }) + .collect(), + ) +} diff --git a/common/src/advisory/mod.rs b/common/src/advisory/mod.rs index 6ae311f95..b5e0a5f6a 100644 --- a/common/src/advisory/mod.rs +++ b/common/src/advisory/mod.rs @@ -1,3 +1,5 @@ +pub mod cyclonedx; + use serde::{Deserialize, Serialize}; use std::collections::HashMap; use utoipa::ToSchema; diff --git a/common/src/db/create.rs b/common/src/db/create.rs new file mode 100644 index 000000000..bd72677c2 --- /dev/null +++ b/common/src/db/create.rs @@ -0,0 +1,37 @@ +use sea_orm::{ConnectionTrait, DbErr}; +use sea_orm_migration::SchemaManager; +use sea_query::{IntoIden, extension::postgres::Type}; + +/// create a type, if it not already exists +/// +/// This is required as Postgres doesn't support `CREATE TYPE IF NOT EXISTS` +pub async fn create_enum_if_not_exists( + manager: &SchemaManager<'_>, + name: impl IntoIden + Clone, + values: I, +) -> Result<(), DbErr> +where + T: IntoIden, + I: IntoIterator, +{ + let builder = manager.get_connection().get_database_backend(); + let r#type = name.clone().into_iden(); + let stmt = builder.build(Type::create().as_enum(name).values(values)); + let stmt = format!( + r#" +DO $$ +BEGIN + IF NOT EXISTS ( + SELECT 1 FROM pg_type WHERE typname = '{name}' + ) THEN + {stmt}; + END IF; +END$$; +"#, + name = r#type.to_string() + ); + + manager.get_connection().execute_unprepared(&stmt).await?; + + Ok(()) +} diff --git a/common/src/db/mod.rs b/common/src/db/mod.rs index c68f8dece..9b0ba09a0 100644 --- a/common/src/db/mod.rs +++ b/common/src/db/mod.rs @@ -3,7 +3,10 @@ pub mod limiter; pub mod multi_model; pub mod query; +mod create; mod func; + +pub use create::*; pub use func::*; use anyhow::Context; @@ -103,6 +106,10 @@ impl Database { pub fn name(&self) -> &str { &self.name } + + pub fn into_connection(self) -> DatabaseConnection { + self.db + } } impl Deref for Database { diff --git a/data-migration.yaml b/data-migration.yaml new file mode 100644 index 000000000..bfd6f3615 --- /dev/null +++ b/data-migration.yaml @@ -0,0 +1,80 @@ +kind: Job +apiVersion: batch/v1 +metadata: + name: data-migration-test +spec: + completions: 4 + completionMode: Indexed + parallelism: 4 # same as completions + template: + spec: + restartPolicy: OnFailure + affinity: + nodeAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + nodeSelectorTerms: + - matchExpressions: + - key: "kubernetes.io/arch" + operator: In + values: ["amd64"] + containers: + - name: run + image: quay.io/ctrontesting/trustd:latest + imagePullPolicy: Always + command: + - /usr/local/bin/trustd + - db + - data + - m0002010_add_advisory_scores # name of the migration + env: + - name: MIGRATION_DATA_CONCURRENT + value: "5" # in-process parallelism + - name: MIGRATION_DATA_TOTAL_RUNNER + value: "4" # same as completions + - name: MIGRATION_DATA_CURRENT_RUNNER + valueFrom: + fieldRef: + fieldPath: metadata.annotations['batch.kubernetes.io/job-completion-index'] + + - name: TRUSTD_STORAGE_STRATEGY + value: s3 + - name: TRUSTD_S3_ACCESS_KEY + valueFrom: + secretKeyRef: + name: storage-credentials + key: aws_access_key_id + - name: TRUSTD_S3_SECRET_KEY + valueFrom: + secretKeyRef: + name: storage-credentials + key: aws_secret_access_key + - name: TRUSTD_S3_REGION + valueFrom: + configMapKeyRef: + name: aws-storage + key: region + - name: TRUSTD_S3_BUCKET + value: trustify-default + + - name: TRUSTD_DB_NAME + value: trustify_default + - name: TRUSTD_DB_USER + valueFrom: + secretKeyRef: + name: postgresql + key: username + - name: TRUSTD_DB_PASSWORD + valueFrom: + secretKeyRef: + name: postgresql + key: password + - name: TRUSTD_DB_HOST + valueFrom: + secretKeyRef: + name: postgresql + key: host + - name: TRUSTD_DB_PORT + value: "5432" + + - name: RUST_LOG + value: info diff --git a/docs/adrs/00011-re-process-documents.md b/docs/adrs/00011-re-process-documents.md new file mode 100644 index 000000000..d77a102e3 --- /dev/null +++ b/docs/adrs/00011-re-process-documents.md @@ -0,0 +1,132 @@ +# 00011. Re-process documents + +Date: 2025-08-08 + +## Status + +DRAFT + +## Context + +During the process of ingestion, we extract certain information of the uploaded documents and store that information +in the database. We also store the original source document "as-is". + +When making changes to the database structure, we also have a migration process, which takes care of upgrading the +database structures during an upgrade. + +However, in some cases, changing the database structure actually means extracting more information from documents than +is currently stored in the database. Or information is extracted in a different way. This requires a re-processing of +all documents affected by this change. + +### Example + +We do ignore all CVSS v2 scores at the moment. Adding new fields for storing v2 scores, we wouldn't have +any stored values in the database. It therefore is necessary to re-process documents and extracting this information. + +### Assumptions + +This ADR makes the following assumptions: + +* All documents are stored in the storage +* It is expected that the step of upgrading has to be performed by someone, it is not magically happening +* Running such migrations is expected to take a long time +* The management of infrastructure (PostgreSQL) is not in the scope of Trustify + +Question? Do we want to support downgrades? + +## Decision + +During the migration of database structures (sea orm), we also re-process all documents (if required). + +For Helm deployments, this would be running during the migration job of the Helm chart and would have an impact on +updates as the rollout of newer version pods would be delayed until the migration (of data) has been finished. + +This would also require to prevent users from creating new documents during that time. Otherwise, we would need to +re-process documents ingested during the migration time. A way of doing this could be to leverage PostgreSQL's ability +to switch into read-only mode. Having mutable operations fail with a 503 (Service Unavailable) error. This would also +allow for easy A/B (green/blue) database setups. Switching the main one to read-only, having the other one run the +migration. + +We could provide an endpoint to the UI, reporting the fact that the system is in read-only mode during a migration. + +* 👍 Can fully migrate database (create mandatory field as optional -> re-process -> make mandatory) +* 👍 Might allow for an out-of-band migration of data, before running the upgrade (even on a staging env) +* 👍 Would allow to continue serving data while the process is running +* 👎 Might be tricky to create a combined re-processing of multiple ones at the same time +* 👎 Might block an upgrade if re-processing fails + +We do want to support different approaches of this migration. Depending on the needs of the user, the size of the +data store and the infrastructure used. + +### Approach 1 + +The "lazy" approach, where the user just runs the migration (or the new version of the application with migrations +enabled). The process will migrate schema and data. This might block the startup for a bit. But would be fast and +simple for small systems. + +### Approach 2 + +The user uses a green/blue deployment. Switching the application to use green and run migrations against blue. Once +the migrations are complete, switching back to blue. Green will be read-only and mutable API calls will fail with a 503 +error. + +An alternative to this could also be to configure the system first to go into "read-only mode", by using a default +transaction mode of read-only. + +## Consequences + +Migrations which do re-process data have to be written in a way, that they can be run and re-run without failing +during the migration of the schema (e.g. add "if not exist"). In this case, the data migration job can be run +"out of band" (beforehand) and the data be processed. Then, the actual upgrade and schema migration can run, keeping +the SeaORM process. + +* The migration will block the upgrade process until it is finished +* Ansible and the operator will need to handle this as well +* The system will become read-only during a migration +* The UI should let the user know the system is in read-only mode. This is a feature which has to be rolled out before + the data migration can be used. + +## Open items + +* [ ] Do we want to support downgrades? + * I'd say no. Downgrades could also be handled by keeping a snapshot of the original database. +* [ ] How to handle unparsable or failing documents during migration? + * Pass them in as "unsupported" +* [ ] Add a version number to the document, tracking upgrades + * This adds some complexity, but might allow to track the progress and identify upgraded documents. This could also + ensure the correct order of applying data migrations out of band. + +## Alternative approaches + +### Option 2 + +We create a similar module as for the importer. Running migrations after an upgrade. Accepting that in the meantime, +we might service inaccurate data. + +* 👎 Might serve inaccurate data for a longer time +* 👎 Can't fully migrate database (new mandatory field won't work) +* 👍 Upgrade process itself is faster and less complex +* 👎 Requires some coordination between instances (only one processor at a time, maybe one after the other) + +### Option 3 + +We change ingestion in a way to it is possible to just re-ingest every document. Meaning, we re-ingest from the +original sources. + +* 👎 Might serve inaccurate data for a while for a longer time +* 👎 Can't fully migrate database (new mandatory field won't work) +* 👍 Upgrade process is faster and less complex +* 👎 Original sources might no longer have the documents +* 👎 Won't work for manual (API) uploads +* 👎 Would require removing optimizations for existing documents + +### Option 4 + +Have the operator orchestrate the process of switching the database into read-only mode and running the migrations. + +* 👍 Very user friendly +* 👎 Rather complex +* 👎 Required access to the user's DB infrastructure + +This adds a lot of user-friendliness. However, it also is rather complex and so we should, as a first step, have this +as a manual step. diff --git a/docs/book/modules/migration/pages/data.adoc b/docs/book/modules/migration/pages/data.adoc new file mode 100644 index 000000000..64716ce60 --- /dev/null +++ b/docs/book/modules/migration/pages/data.adoc @@ -0,0 +1,54 @@ += Data migration guide + +In some cases, it is necessary to also migrate data during an upgrade. This may require the re-ingestion of all stored +documents. + +This is the case, for example, a new field is added, which requires the extraction of information from the original +document. + +== Strategy + +The overall strategy for this is: + +* Prevent access to the in-migration database +* Create new database features (columns, …) in a compatible way (e.g. "allow nulls") +* Process re-ingestion of documents +* Modify database features to the target definition (e.g. "remove nullable") +* Switch to new software version +* Grant access to the new database structures + +This can be achieved with a read-only replica: + +* Prevent access to the in-migration database + * Create a read-only replica of the database + * Reconfigure trustify to use the read-only replica + * All mutable operations will fail, reporting `503 Service Unavailable` +* Create new database features +* Process re-ingestion of documents +* Modify database features to the target definition +* Switch to new software version +* Grant access to the new database structures + * Switch to new database version + * Drop old, read-only replica + +== Running the re-ingestion + +The re-ingestion can be run in two ways: + +* During the SeoORM migration +* Before the SeoORM migration + +The main difference is that when running during the SeoORM migration, you have less control over the process. It will +be driven by the SeoORM migration, and you have to wait until everything is finished. + +Running before the SeoORM migration, you can, for example, run multiple instances of the re-ingestion. You can also run +re-ingestion on a database copy, and then switch over to replace the database with the migrated version. + +Afterwards, you can run SeoORM migrations, which will then skip those DB modifications (as they are already applied) and +also skip the re-ingestion. + +== The lazy way + +The lazy way, which is the default, will simply perform those steps during the SeaORM migration. However, there are a +bunch of downsides. That's why it is not recommended for production setups. However, it may just work fine for small +test setup. Making the process a lot easier. diff --git a/docs/book/package-lock.json b/docs/book/package-lock.json index 7851be6b6..3ba9bfa57 100644 --- a/docs/book/package-lock.json +++ b/docs/book/package-lock.json @@ -1,5 +1,5 @@ { - "name": "relock-npm-lock-v2-wmxVvW", + "name": "book", "lockfileVersion": 3, "requires": true, "packages": { diff --git a/entity/Cargo.toml b/entity/Cargo.toml index 36c2ce44c..4bbbefbb4 100644 --- a/entity/Cargo.toml +++ b/entity/Cargo.toml @@ -13,6 +13,8 @@ trustify-common = { workspace = true } trustify-cvss = { workspace = true } cpe = { workspace = true } +cvss = { workspace = true } +cvss-old = { workspace = true } deepsize = { workspace = true } schemars = { workspace = true } sea-orm = { workspace = true, features = ["sqlx-postgres", "runtime-tokio-rustls", "macros", "with-json", "postgres-array"] } diff --git a/entity/src/advisory_vulnerability.rs b/entity/src/advisory_vulnerability.rs index d21d80284..fd2687e02 100644 --- a/entity/src/advisory_vulnerability.rs +++ b/entity/src/advisory_vulnerability.rs @@ -37,6 +37,9 @@ pub enum Relation { #[sea_orm(has_many = "super::purl_status::Entity")] PurlStatus, + + #[sea_orm(has_many = "super::advisory_vulnerability_score::Entity")] + Score, } impl Related for Entity { diff --git a/entity/src/advisory_vulnerability_score.rs b/entity/src/advisory_vulnerability_score.rs new file mode 100644 index 000000000..1cb1b53df --- /dev/null +++ b/entity/src/advisory_vulnerability_score.rs @@ -0,0 +1,176 @@ +use crate::{advisory, advisory_vulnerability, cvss3, vulnerability}; +use cvss::{v3, v4_0}; +use sea_orm::entity::prelude::*; + +#[derive(Clone, Debug, PartialEq, DeriveEntityModel)] +#[sea_orm(table_name = "advisory_vulnerability_score")] +pub struct Model { + #[sea_orm(primary_key)] + pub id: Uuid, + + pub advisory_id: Uuid, + pub vulnerability_id: String, + + pub r#type: ScoreType, + pub vector: String, + pub score: f64, + pub severity: Severity, +} + +#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] +pub enum Relation { + #[sea_orm( + belongs_to = "super::advisory_vulnerability::Entity", + from = "(super::advisory_vulnerability_score::Column::AdvisoryId, super::advisory_vulnerability_score::Column::VulnerabilityId)" + to = "(super::advisory_vulnerability::Column::AdvisoryId, super::advisory_vulnerability::Column::VulnerabilityId)" + )] + AdvisoryVulnerability, + #[sea_orm( + belongs_to = "super::advisory::Entity", + from = "super::advisory_vulnerability_score::Column::AdvisoryId" + to = "super::advisory::Column::Id" + )] + Advisory, + #[sea_orm( + belongs_to = "super::vulnerability::Entity", + from = "super::advisory_vulnerability_score::Column::VulnerabilityId" + to = "super::vulnerability::Column::Id" + )] + Vulnerability, +} + +impl Related for Entity { + fn to() -> RelationDef { + Relation::Advisory.def() + } +} + +impl Related for Entity { + fn to() -> RelationDef { + Relation::Vulnerability.def() + } +} + +impl Related for Entity { + fn to() -> RelationDef { + Relation::AdvisoryVulnerability.def() + } +} + +impl ActiveModelBehavior for ActiveModel {} + +// score type + +#[derive(Debug, Copy, Clone, PartialEq, Eq, EnumIter, DeriveActiveEnum)] +#[sea_orm(rs_type = "String", db_type = "Enum", enum_name = "score_type")] +pub enum ScoreType { + #[sea_orm(string_value = "2.0")] + V2_0, + #[sea_orm(string_value = "3.0")] + V3_0, + #[sea_orm(string_value = "3.1")] + V3_1, + #[sea_orm(string_value = "4.0")] + V4_0, +} + +// severity + +#[derive( + Debug, + Copy, + Clone, + PartialEq, + Eq, + EnumIter, + DeriveActiveEnum, + strum::EnumString, + strum::Display, + strum::VariantNames, +)] +#[sea_orm(rs_type = "String", db_type = "Enum", enum_name = "severity")] +#[strum(serialize_all = "lowercase")] +pub enum Severity { + #[sea_orm(string_value = "none")] + None, + #[sea_orm(string_value = "low")] + Low, + #[sea_orm(string_value = "medium")] + Medium, + #[sea_orm(string_value = "high")] + High, + #[sea_orm(string_value = "critical")] + Critical, +} + +impl From for Severity { + fn from(value: cvss3::Severity) -> Self { + match value { + cvss3::Severity::None => Self::None, + cvss3::Severity::Low => Self::Low, + cvss3::Severity::Medium => Self::Medium, + cvss3::Severity::High => Self::High, + cvss3::Severity::Critical => Self::Critical, + } + } +} + +impl From for cvss3::Severity { + fn from(value: Severity) -> Self { + match value { + Severity::None => Self::None, + Severity::Low => Self::Low, + Severity::Medium => Self::Medium, + Severity::High => Self::High, + Severity::Critical => Self::Critical, + } + } +} + +impl From for Severity { + fn from(value: cvss::Severity) -> Self { + match value { + cvss::Severity::None => Self::None, + cvss::Severity::Low => Self::Low, + cvss::Severity::Medium => Self::Medium, + cvss::Severity::High => Self::High, + cvss::Severity::Critical => Self::Critical, + } + } +} + +impl From for Severity { + fn from(value: cvss_old::Severity) -> Self { + match value { + cvss_old::Severity::None => Self::None, + cvss_old::Severity::Low => Self::Low, + cvss_old::Severity::Medium => Self::Medium, + cvss_old::Severity::High => Self::High, + cvss_old::Severity::Critical => Self::Critical, + } + } +} + +impl From for Severity { + fn from(value: v3::Severity) -> Self { + match value { + v3::Severity::None => Self::None, + v3::Severity::Low => Self::Low, + v3::Severity::Medium => Self::Medium, + v3::Severity::High => Self::High, + v3::Severity::Critical => Self::Critical, + } + } +} + +impl From for Severity { + fn from(value: v4_0::Severity) -> Self { + match value { + v4_0::Severity::None => Self::None, + v4_0::Severity::Low => Self::Low, + v4_0::Severity::Medium => Self::Medium, + v4_0::Severity::High => Self::High, + v4_0::Severity::Critical => Self::Critical, + } + } +} diff --git a/entity/src/lib.rs b/entity/src/lib.rs index d012b29b1..61fd84c4e 100644 --- a/entity/src/lib.rs +++ b/entity/src/lib.rs @@ -1,5 +1,6 @@ pub mod advisory; pub mod advisory_vulnerability; +pub mod advisory_vulnerability_score; pub mod base_purl; pub mod conversation; pub mod cpe; diff --git a/entity/src/sbom.rs b/entity/src/sbom.rs index d8632bc4d..9063f733b 100644 --- a/entity/src/sbom.rs +++ b/entity/src/sbom.rs @@ -26,6 +26,9 @@ pub struct Model { graphql(derived(owned, into = "HashMap", with = "Labels::from")) )] pub labels: Labels, + + /// properties from the SBOM document + pub properties: serde_json::Value, } #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] diff --git a/migration/Cargo.toml b/migration/Cargo.toml index 9a38e3a98..c5af6efc5 100644 --- a/migration/Cargo.toml +++ b/migration/Cargo.toml @@ -10,13 +10,35 @@ name = "migration" path = "src/lib.rs" [dependencies] +trustify-common = { workspace = true } +trustify-entity = { workspace = true } +trustify-module-ingestor = { workspace = true } +trustify-module-storage = { workspace = true } + +anyhow = { workspace = true } +bytes = { workspace = true } +clap = { workspace = true, features = ["derive", "env"] } +csaf = { workspace = true } +cve = { workspace = true } +futures = { workspace = true } +futures-util = { workspace = true } +humantime = { workspace = true } +indicatif = { workspace = true, features = ["tokio", "futures"] } +osv = { workspace = true, features = ["schema"] } +sea-orm = { workspace = true } sea-orm-migration = { workspace = true, features = ["runtime-tokio-rustls", "sqlx-postgres", "with-uuid"] } +serde-cyclonedx = { workspace = true } +serde_json = { workspace = true } +spdx-rs = { workspace = true } +strum = { workspace = true, features = ["derive"] } tokio = { workspace = true, features = ["full"] } +tracing = { workspace = true } +tracing-subscriber = { workspace = true } uuid = { workspace = true, features = ["v5"] } [dev-dependencies] trustify-common = { workspace = true } -trustify-db = { workspace = true } +trustify-db = { workspace = true } trustify-entity = { workspace = true } trustify-test-context = { workspace = true } diff --git a/migration/src/bin/data.rs b/migration/src/bin/data.rs new file mode 100644 index 000000000..f67a6b8c6 --- /dev/null +++ b/migration/src/bin/data.rs @@ -0,0 +1,126 @@ +use clap::Parser; +use migration::{ + Migrator, + data::{Database, Direction, MigratorWithData, Options, Runner}, +}; +use trustify_module_storage::config::StorageConfig; + +#[derive(clap::Parser, Debug, Clone)] +struct Cli { + #[command(subcommand)] + command: Command, +} + +#[allow(clippy::large_enum_variant)] +#[derive(clap::Subcommand, Debug, Clone)] +#[allow(clippy::large_enum_variant)] +enum Command { + /// List all data migrations + List, + /// Run a list of migrations + Run(Run), +} + +#[derive(clap::Args, Debug, Clone)] +struct Run { + /// Migration direction to run + #[arg( + long, + value_enum, + default_value_t = Direction::Up, + overrides_with = "down" + )] + direction: Direction, + + /// Shortcut for `--direction down` + #[arg(long, action = clap::ArgAction::SetTrue, overrides_with = "direction")] + down: bool, + + // from sea_orm + #[arg( + global = true, + short = 's', + long, + env = "DATABASE_SCHEMA", + long_help = "Database schema\n \ + - For MySQL and SQLite, this argument is ignored.\n \ + - For PostgreSQL, this argument is optional with default value 'public'.\n" + )] + database_schema: Option, + + // from sea_orm + #[arg( + global = true, + short = 'u', + long, + env = "DATABASE_URL", + help = "Database URL" + )] + database_url: Option, + + #[arg()] + migrations: Vec, + + #[command(flatten)] + options: Options, + + #[command(flatten)] + storage: StorageConfig, +} + +impl Run { + fn direction(&self) -> Direction { + if self.down { + Direction::Down + } else { + self.direction + } + } + + #[allow(clippy::expect_used)] + pub async fn run(self) -> anyhow::Result<()> { + let direction = self.direction(); + let storage = self.storage.into_storage(false).await?; + + Runner { + direction, + storage, + migrations: self.migrations, + database: Database::Config { + url: self + .database_url + .expect("Environment variable 'DATABASE_URL' not set"), + schema: self.database_schema, + }, + options: self.options, + } + .run::() + .await?; + + Ok(()) + } +} + +impl Command { + pub async fn run(self) -> anyhow::Result<()> { + match self { + Command::Run(run) => run.run().await, + Command::List => { + for m in Migrator::data_migrations() { + println!("{}", m.name()); + } + Ok(()) + } + } + } +} + +#[allow(clippy::unwrap_used)] +#[tokio::main] +async fn main() { + let cli = Cli::parse(); + + tracing_subscriber::fmt::init(); + + cli.command.run().await.unwrap(); +} diff --git a/migration/src/data/document/advisory.rs b/migration/src/data/document/advisory.rs new file mode 100644 index 000000000..c5fc3bf2d --- /dev/null +++ b/migration/src/data/document/advisory.rs @@ -0,0 +1,39 @@ +use super::Document; +use bytes::Bytes; +use sea_orm::prelude::*; +use trustify_entity::advisory; +use trustify_module_storage::service::StorageBackend; + +#[allow(clippy::large_enum_variant)] +pub enum Advisory { + Cve(cve::Cve), + Csaf(csaf::Csaf), + Osv(osv::schema::Vulnerability), + Other(Bytes), +} + +impl From for Advisory { + fn from(value: Bytes) -> Self { + serde_json::from_slice(&value) + .map(Advisory::Cve) + .or_else(|_| serde_json::from_slice(&value).map(Advisory::Csaf)) + .or_else(|_| serde_json::from_slice(&value).map(Advisory::Osv)) + .unwrap_or_else(|_err| Advisory::Other(value)) + } +} + +impl Document for Advisory { + type Model = advisory::Model; + + async fn all(tx: &C) -> Result, DbErr> { + advisory::Entity::find().all(tx).await + } + + async fn source(model: &Self::Model, storage: &S, tx: &C) -> Result + where + S: StorageBackend + Send + Sync, + C: ConnectionTrait, + { + super::load(model.source_document_id, storage, tx).await + } +} diff --git a/migration/src/data/document/mod.rs b/migration/src/data/document/mod.rs new file mode 100644 index 000000000..440548d78 --- /dev/null +++ b/migration/src/data/document/mod.rs @@ -0,0 +1,61 @@ +mod advisory; + +pub use advisory::*; +use anyhow::{anyhow, bail}; +use bytes::{Bytes, BytesMut}; +use futures_util::TryStreamExt; +mod sbom; +pub use sbom::*; + +use crate::data::Partitionable; +use sea_orm::{ConnectionTrait, DbErr, EntityTrait}; +use trustify_common::id::Id; +use trustify_entity::source_document; +use trustify_module_storage::service::{StorageBackend, StorageKey}; +use uuid::Uuid; + +/// A document eligible for re-processing. +#[allow(async_fn_in_trait)] +pub trait Document: Sized + Send + Sync { + type Model: Partitionable + Send; + + async fn all(tx: &C) -> Result, DbErr> + where + C: ConnectionTrait; + + async fn source(model: &Self::Model, storage: &S, tx: &C) -> Result + where + S: StorageBackend + Send + Sync, + C: ConnectionTrait; +} + +pub(crate) async fn load( + id: Uuid, + storage: &(impl StorageBackend + Send + Sync), + tx: &impl ConnectionTrait, +) -> anyhow::Result +where + D: Document + From, +{ + let source = source_document::Entity::find_by_id(id).one(tx).await?; + + let Some(source) = source else { + bail!("Missing source document entry for: {id}"); + }; + + let stream = storage + .retrieve( + StorageKey::try_from(Id::Sha256(source.sha256)) + .map_err(|err| anyhow!("Invalid ID: {err}"))?, + ) + .await + .map_err(|err| anyhow!("Failed to retrieve document: {err}"))? + .ok_or_else(|| anyhow!("Missing source document for: {id}"))?; + + stream + .try_collect::() + .await + .map_err(|err| anyhow!("Failed to collect bytes: {err}")) + .map(|bytes| bytes.freeze()) + .map(|bytes| bytes.into()) +} diff --git a/migration/src/data/document/sbom.rs b/migration/src/data/document/sbom.rs new file mode 100644 index 000000000..cf964938f --- /dev/null +++ b/migration/src/data/document/sbom.rs @@ -0,0 +1,37 @@ +use super::Document; +use bytes::Bytes; +use sea_orm::prelude::*; +use trustify_entity::sbom; +use trustify_module_storage::service::StorageBackend; + +#[allow(clippy::large_enum_variant)] +pub enum Sbom { + CycloneDx(serde_cyclonedx::cyclonedx::v_1_6::CycloneDx), + Spdx(spdx_rs::models::SPDX), + Other(Bytes), +} + +impl From for Sbom { + fn from(value: Bytes) -> Self { + serde_json::from_slice(&value) + .map(Sbom::Spdx) + .or_else(|_| serde_json::from_slice(&value).map(Sbom::CycloneDx)) + .unwrap_or_else(|_err| Sbom::Other(value)) + } +} + +impl Document for Sbom { + type Model = sbom::Model; + + async fn all(tx: &C) -> Result, DbErr> { + sbom::Entity::find().all(tx).await + } + + async fn source(model: &Self::Model, storage: &S, tx: &C) -> Result + where + S: StorageBackend + Send + Sync, + C: ConnectionTrait, + { + super::load(model.source_document_id, storage, tx).await + } +} diff --git a/migration/src/data/migration.rs b/migration/src/data/migration.rs new file mode 100644 index 000000000..ccc1bb6bb --- /dev/null +++ b/migration/src/data/migration.rs @@ -0,0 +1,163 @@ +use crate::{ + async_trait, + data::{Document, DocumentProcessor, Handler, Options}, +}; +use clap::Parser; +use futures::executor::block_on; +use sea_orm::DbErr; +use sea_orm_migration::{MigrationName, MigrationTrait, SchemaManager}; +use std::{ffi::OsString, ops::Deref, sync::LazyLock}; +use tokio::task_local; +use trustify_module_storage::{config::StorageConfig, service::dispatch::DispatchBackend}; + +/// A migration which also processes data. +pub struct MigrationWithData { + pub storage: DispatchBackend, + pub options: Options, + pub migration: Box, +} + +static STORAGE: LazyLock = LazyLock::new(init_storage); +static OPTIONS: LazyLock = LazyLock::new(init_options); + +task_local! { + static TEST_STORAGE: DispatchBackend; + static TEST_OPTIONS: Options; +} + +#[allow(clippy::expect_used)] +fn init_storage() -> DispatchBackend { + // create from env-vars only + let config = StorageConfig::parse_from::<_, OsString>(vec![]); + + block_on(config.into_storage(false)).expect("task panicked") +} + +fn init_options() -> Options { + // create from env-vars only + Options::parse_from::<_, OsString>(vec![]) +} + +impl MigrationWithData { + /// Wrap a data migration, turning it into a combined schema/data migration. + /// + /// **NOTE:** This may panic if the storage configuration is missing. + pub fn new(migration: Box) -> Self { + // if we have a test storage set, use this instead. + let storage = TEST_STORAGE + .try_with(|s| s.clone()) + .unwrap_or_else(|_| STORAGE.clone()); + + let options = TEST_OPTIONS + .try_with(|o| o.clone()) + .unwrap_or_else(|_| OPTIONS.clone()); + + Self { + storage, + options, + migration, + } + } + + /// Set a storage backend to be used for running tests. + /// + /// This will, for the duration of the call, initialize the migrator with the provided storage + /// backend. + pub async fn run_with_test( + storage: impl Into, + options: impl Into, + f: F, + ) -> F::Output + where + F: Future, + { + TEST_STORAGE + .scope(storage.into(), async { + TEST_OPTIONS.scope(options.into(), f).await + }) + .await + } +} + +impl From for MigrationWithData +where + M: MigrationTraitWithData + 'static, +{ + fn from(value: M) -> Self { + MigrationWithData::new(Box::new(value)) + } +} + +/// A [`SchemaManager`], extended with data migration features. +pub struct SchemaDataManager<'c> { + pub manager: &'c SchemaManager<'c>, + storage: &'c DispatchBackend, + options: &'c Options, +} + +impl<'a> Deref for SchemaDataManager<'a> { + type Target = SchemaManager<'a>; + + fn deref(&self) -> &Self::Target { + self.manager + } +} + +impl<'c> SchemaDataManager<'c> { + pub fn new( + manager: &'c SchemaManager<'c>, + storage: &'c DispatchBackend, + options: &'c Options, + ) -> Self { + Self { + manager, + storage, + options, + } + } + + /// Run a data migration + pub async fn process(&self, name: &N, f: impl Handler) -> Result<(), DbErr> + where + D: Document, + N: MigrationName + Send + Sync, + { + if self.options.should_skip(name.name()) { + return Ok(()); + } + + self.manager.process(self.storage, self.options, f).await + } +} + +#[async_trait::async_trait] +pub trait MigrationTraitWithData: MigrationName + Send + Sync { + async fn up(&self, manager: &SchemaDataManager) -> Result<(), DbErr>; + async fn down(&self, manager: &SchemaDataManager) -> Result<(), DbErr>; +} + +#[async_trait::async_trait] +impl MigrationTrait for MigrationWithData { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + MigrationTraitWithData::up( + &*self.migration, + &SchemaDataManager::new(manager, &self.storage, &self.options), + ) + .await + .inspect_err(|err| tracing::warn!("Migration failed: {err}")) + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + MigrationTraitWithData::down( + &*self.migration, + &SchemaDataManager::new(manager, &self.storage, &self.options), + ) + .await + } +} + +impl MigrationName for MigrationWithData { + fn name(&self) -> &str { + self.migration.name() + } +} diff --git a/migration/src/data/mod.rs b/migration/src/data/mod.rs new file mode 100644 index 000000000..8e8ad784a --- /dev/null +++ b/migration/src/data/mod.rs @@ -0,0 +1,302 @@ +mod document; +mod migration; +mod partition; +mod run; + +pub use document::*; +pub use migration::*; +pub use partition::*; +pub use run::*; + +use futures_util::{ + StreamExt, + stream::{self, TryStreamExt}, +}; +use indicatif::{ProgressBar, ProgressStyle}; +use sea_orm::{DatabaseTransaction, DbErr, TransactionTrait}; +use sea_orm_migration::{MigrationTrait, SchemaManager}; +use std::{ + num::{NonZeroU64, NonZeroUsize}, + sync::Arc, +}; +use trustify_module_storage::service::dispatch::DispatchBackend; + +/// A handler for processing a [`Document`] data migration. +#[allow(async_fn_in_trait)] +pub trait Handler: Send +where + D: Document, +{ + async fn call( + &self, + document: D, + model: D::Model, + tx: &DatabaseTransaction, + ) -> anyhow::Result<()>; +} + +impl Handler for F +where + D: Document, + for<'x> F: AsyncFn(D, D::Model, &'x DatabaseTransaction) -> anyhow::Result<()> + Send, +{ + async fn call( + &self, + document: D, + model: D::Model, + tx: &DatabaseTransaction, + ) -> anyhow::Result<()> { + (self)(document, model, tx).await + } +} + +#[derive(Clone, Debug, PartialEq, Eq, clap::Parser)] +pub struct Options { + /// Number of concurrent documents being processes + #[arg(long, env = "MIGRATION_DATA_CONCURRENT", default_value = "5")] + pub concurrent: NonZeroUsize, + + /// The instance number of the current runner (zero based) + #[arg(long, env = "MIGRATION_DATA_CURRENT_RUNNER", default_value = "0")] + pub current: u64, + /// The total number of runners + #[arg(long, env = "MIGRATION_DATA_TOTAL_RUNNER", default_value = "1")] + pub total: NonZeroU64, + + /// Skip running all data migrations + #[arg( + long, + env = "MIGRATION_DATA_SKIP_ALL", + default_value_t, + conflicts_with = "skip" + )] + pub skip_all: bool, + + /// Skip the provided list of data migrations + #[arg(long, env = "MIGRATION_DATA_SKIP", conflicts_with = "skip_all")] + pub skip: Vec, +} + +impl Default for Options { + fn default() -> Self { + Self { + concurrent: unsafe { NonZeroUsize::new_unchecked(5) }, + current: 0, + total: unsafe { NonZeroU64::new_unchecked(1) }, + skip_all: false, + skip: vec![], + } + } +} + +impl From<()> for Options { + fn from(_: ()) -> Self { + Self::default() + } +} + +impl Options { + /// Check if we should skip a data migration. Returns `true` if it should be skipped. + /// + /// Skipping means that the "data" part of the migration should not be processes. The schema + /// part still will be processes. + pub fn should_skip(&self, name: &str) -> bool { + if self.skip_all { + // we skip all migration + return true; + } + + if self.skip.iter().any(|s| s == name) { + // we skip a list of migrations, and it's on the list + return true; + } + + false + } +} + +impl From<&Options> for Partition { + fn from(value: &Options) -> Self { + Self { + current: value.current, + total: value.total, + } + } +} + +/// A trait for processing documents using a [`Handler`]. +pub trait DocumentProcessor { + fn process( + &self, + storage: &DispatchBackend, + options: &Options, + f: impl Handler, + ) -> impl Future> + where + D: Document; +} + +impl<'c> DocumentProcessor for SchemaManager<'c> { + /// Process documents for a schema *data* migration. + /// + /// ## Pre-requisites + /// + /// The database should be maintenance mode. Meaning that the actual application should be + /// running from a read-only clone for the time of processing. + /// + /// ## Partitioning + /// + /// This will partition documents and only process documents selected for *this* partition. + /// The partition configuration normally comes from outside, as configuration through env-vars. + /// + /// This means that there may be other instances of this processor running in a different + /// process instance. However, not touching documents of our partition. + /// + /// ## Transaction strategy + /// + /// The processor will identify all documents, filtering out all which are not part of this + /// partition. This is done in a dedicated transaction. As the database is supposed to be in + /// read-only mode for the running instance, this is ok as no additional documents will be + /// created during the time of processing. + /// + /// Next, it is processing all found documents, in a concurrent way. Meaning, this single + /// process instance, will process multiple documents in parallel. + /// + /// Each document is loaded and processed within a dedicated transaction. Commiting the + /// transaction at the end each step and before moving on the next document. + /// + /// As handlers are intended to be idempotent, there's no harm in re-running them, in case + /// things go wrong. + /// + /// ## Caveats + /// + /// However, this may lead to a situation where only a part of the documents is processed. + /// But, this is ok, as the migration is supposed to run on a clone of the database and so the + /// actual system is still running from the read-only clone of the original data. + async fn process( + &self, + storage: &DispatchBackend, + options: &Options, + f: impl Handler, + ) -> Result<(), DbErr> + where + D: Document, + { + let partition: Partition = options.into(); + let db = self.get_connection(); + + let tx = db.begin().await?; + let all: Vec<_> = D::all(&tx) + .await? + .into_iter() + .filter(|model| partition.is_selected::(model)) + .collect(); + drop(tx); + + let count = all.len(); + let pb = Arc::new(ProgressBar::new(count as u64)); + pb.set_style( + ProgressStyle::with_template( + "{spinner:.green} [{elapsed_precise}] [{wide_bar:.cyan/blue}] {pos}/{len} ({eta})", + ) + .map_err(|err| DbErr::Migration(err.to_string()))? + .progress_chars("##-"), + ); + + let pb = Some(pb); + + stream::iter(all) + .map(async |model| { + let tx = db.begin().await?; + + let doc = D::source(&model, storage, &tx) + .await + .inspect_err(|err| tracing::info!("Failed to load source document: {err}")) + .map_err(|err| { + DbErr::Migration(format!("Failed to load source document: {err}")) + })?; + f.call(doc, model, &tx) + .await + .inspect_err(|err| tracing::info!("Failed to process document: {err}")) + .map_err(|err| { + DbErr::Migration(format!("Failed to process document: {err}")) + })?; + + tx.commit().await?; + + if let Some(pb) = &pb { + pb.inc(1); + } + + Ok::<_, DbErr>(()) + }) + .buffer_unordered(options.concurrent.into()) + .try_collect::>() + .await?; + + if let Some(pb) = &pb { + pb.finish_with_message("Done"); + } + + tracing::info!("Processed {count} documents"); + + Ok(()) + } +} + +pub trait MigratorWithData { + fn data_migrations() -> Vec>; +} + +#[derive(Default)] +pub struct Migrations { + all: Vec, +} + +impl Migrations { + /// Return only [`Migration::Data`] migrations. + pub fn only_data(self) -> Vec> { + self.into_iter() + .filter_map(|migration| match migration { + Migration::Normal(_) => None, + Migration::Data(migration) => Some(migration), + }) + .collect() + } +} + +impl Extend for Migrations { + fn extend>(&mut self, iter: T) { + self.all.extend(iter) + } +} + +impl IntoIterator for Migrations { + type Item = Migration; + type IntoIter = std::vec::IntoIter; + + fn into_iter(self) -> Self::IntoIter { + self.all.into_iter() + } +} + +pub enum Migration { + Normal(Box), + Data(Box), +} + +impl Migrations { + pub fn new() -> Self { + Self::default() + } + + pub fn normal(mut self, migration: impl MigrationTrait + 'static) -> Self { + self.all.push(Migration::Normal(Box::new(migration))); + self + } + + pub fn data(mut self, migration: impl MigrationTraitWithData + 'static) -> Self { + self.all.push(Migration::Data(Box::new(migration))); + self + } +} diff --git a/migration/src/data/partition.rs b/migration/src/data/partition.rs new file mode 100644 index 000000000..37d599798 --- /dev/null +++ b/migration/src/data/partition.rs @@ -0,0 +1,64 @@ +use crate::data::Document; +use std::{ + hash::{DefaultHasher, Hash, Hasher}, + num::NonZeroU64, +}; +use trustify_entity::{advisory, sbom}; + +/// Information required for partitioning data +#[derive(Debug, Copy, Clone)] +pub struct Partition { + pub current: u64, + pub total: NonZeroU64, +} + +/// A thing which can be distributed over different partitions via a hashed id. +/// +/// The idea is that the thing returns a hash ID, which can then be distributed over partitions +/// by using a "X of Y" approach. Where the thing is processed when "ID modulo Y == X". +pub trait Partitionable { + /// Get the hashed ID for the thing. + fn hashed_id(&self) -> u64; +} + +impl Partitionable for sbom::Model { + fn hashed_id(&self) -> u64 { + let mut hasher = DefaultHasher::new(); + self.sbom_id.hash(&mut hasher); + hasher.finish() + } +} + +impl Partitionable for advisory::Model { + fn hashed_id(&self) -> u64 { + let mut hasher = DefaultHasher::new(); + self.id.hash(&mut hasher); + hasher.finish() + } +} + +impl Default for Partition { + fn default() -> Self { + Self::new_one() + } +} + +impl Partition { + /// Create a new partition of one. + /// + /// This will be one processor processing everything. + pub const fn new_one() -> Self { + Self { + current: 0, + total: unsafe { NonZeroU64::new_unchecked(1) }, + } + } + + pub fn is_selected(&self, document: &D::Model) -> bool + where + D: Document, + D::Model: Partitionable, + { + document.hashed_id() % self.total == self.current + } +} diff --git a/migration/src/data/run.rs b/migration/src/data/run.rs new file mode 100644 index 000000000..b576870fa --- /dev/null +++ b/migration/src/data/run.rs @@ -0,0 +1,81 @@ +use crate::data::{MigratorWithData, Options, SchemaDataManager}; +use anyhow::bail; +use sea_orm::ConnectOptions; +use sea_orm_migration::{IntoSchemaManagerConnection, SchemaManager}; +use std::{collections::HashMap, time::SystemTime}; +use trustify_module_storage::service::dispatch::DispatchBackend; + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, clap::ValueEnum)] +pub enum Direction { + #[default] + Up, + Down, +} + +pub struct Runner { + pub database: Database, + pub storage: DispatchBackend, + pub direction: Direction, + pub migrations: Vec, + pub options: Options, +} + +pub enum Database { + Config { url: String, schema: Option }, + Provided(sea_orm::DatabaseConnection), +} + +impl Runner { + pub async fn run(self) -> anyhow::Result<()> { + let migrations = M::data_migrations() + .into_iter() + .map(|migration| (migration.name().to_string(), migration)) + .collect::>(); + + let mut running = vec![]; + + for migration in self.migrations { + let Some(migration) = migrations.get(&migration) else { + bail!("Migration '{migration}' not found"); + }; + running.push(migration); + } + + let database = match self.database { + Database::Config { url, schema } => { + let schema = schema.unwrap_or_else(|| "public".to_owned()); + + let connect_options = ConnectOptions::new(url) + .set_schema_search_path(schema) + .to_owned(); + + sea_orm::Database::connect(connect_options).await? + } + Database::Provided(database) => database, + }; + + let manager = SchemaManager::new(database.into_schema_manager_connection()); + let manager = SchemaDataManager::new(&manager, &self.storage, &self.options); + + for run in running { + tracing::info!(name = run.name(), "Running data migration"); + + let start = SystemTime::now(); + + match self.direction { + Direction::Up => run.up(&manager).await?, + Direction::Down => run.down(&manager).await?, + } + + if let Ok(duration) = start.elapsed() { + tracing::info!( + name = run.name(), + "Took {}", + humantime::Duration::from(duration) + ) + } + } + + Ok(()) + } +} diff --git a/migration/src/lib.rs b/migration/src/lib.rs index b015392ac..fc8cd35ad 100644 --- a/migration/src/lib.rs +++ b/migration/src/lib.rs @@ -1,5 +1,10 @@ +use crate::data::{ + Migration, MigrationTraitWithData, MigrationWithData, Migrations, MigratorWithData, +}; pub use sea_orm_migration::prelude::*; +pub mod data; + mod m0000010_init; mod m0000020_add_sbom_group; mod m0000030_perf_adv_vuln; @@ -31,45 +36,75 @@ mod m0001160_improve_expand_spdx_licenses_function; mod m0001170_non_null_source_document_id; mod m0001180_expand_spdx_licenses_with_mappings_function; mod m0001190_optimize_product_advisory_query; +mod m0002000_add_sbom_properties; +mod m0002010_add_advisory_scores; + +pub trait MigratorExt: Send { + fn build_migrations() -> Migrations; + + fn into_migrations() -> Vec> { + // Get all migrations, wrap data migrations. This will initialize the storage config. + Self::build_migrations() + .into_iter() + .map(|migration| match migration { + Migration::Normal(migration) => migration, + Migration::Data(migration) => Box::new(MigrationWithData::new(migration)), + }) + .collect() + } +} pub struct Migrator; +impl MigratorExt for Migrator { + fn build_migrations() -> Migrations { + Migrations::new() + .normal(m0000010_init::Migration) + .normal(m0000020_add_sbom_group::Migration) + .normal(m0000030_perf_adv_vuln::Migration) + .normal(m0000040_create_license_export::Migration) + .normal(m0000050_perf_adv_vuln2::Migration) + .normal(m0000060_perf_adv_vuln3::Migration) + .normal(m0000070_perf_adv_vuln4::Migration) + .normal(m0000080_get_purl_refactor::Migration) + .normal(m0000090_release_perf::Migration) + .normal(m0000100_perf_adv_vuln5::Migration) + .normal(m0000970_alter_importer_add_heartbeat::Migration) + .normal(m0000980_get_purl_fix::Migration) + .normal(m0000990_sbom_add_suppliers::Migration) + .normal(m0001000_sbom_non_null_suppliers::Migration) + .normal(m0001010_alter_mavenver_cmp::Migration) + .normal(m0001020_alter_pythonver_cmp::Migration) + .normal(m0001030_perf_adv_gin_index::Migration) + .normal(m0001040_alter_pythonver_cmp::Migration) + .normal(m0001050_foreign_key_cascade::Migration) + .normal(m0001060_advisory_vulnerability_indexes::Migration) + .normal(m0001070_vulnerability_scores::Migration) + .normal(m0001100_remove_get_purl::Migration) + .normal(m0001110_sbom_node_checksum_indexes::Migration) + .normal(m0001120_sbom_external_node_indexes::Migration) + .normal(m0001130_gover_cmp::Migration) + .normal(m0001140_expand_spdx_licenses_function::Migration) + .normal(m0001150_case_license_text_sbom_id_function::Migration) + .normal(m0001160_improve_expand_spdx_licenses_function::Migration) + .normal(m0001170_non_null_source_document_id::Migration) + .normal(m0001180_expand_spdx_licenses_with_mappings_function::Migration) + .normal(m0001190_optimize_product_advisory_query::Migration) + .data(m0002000_add_sbom_properties::Migration) + .data(m0002010_add_advisory_scores::Migration) + } +} + +impl MigratorWithData for M { + fn data_migrations() -> Vec> { + Self::build_migrations().only_data() + } +} + #[async_trait::async_trait] impl MigratorTrait for Migrator { fn migrations() -> Vec> { - vec![ - Box::new(m0000010_init::Migration), - Box::new(m0000020_add_sbom_group::Migration), - Box::new(m0000030_perf_adv_vuln::Migration), - Box::new(m0000040_create_license_export::Migration), - Box::new(m0000050_perf_adv_vuln2::Migration), - Box::new(m0000060_perf_adv_vuln3::Migration), - Box::new(m0000070_perf_adv_vuln4::Migration), - Box::new(m0000080_get_purl_refactor::Migration), - Box::new(m0000090_release_perf::Migration), - Box::new(m0000100_perf_adv_vuln5::Migration), - Box::new(m0000970_alter_importer_add_heartbeat::Migration), - Box::new(m0000980_get_purl_fix::Migration), - Box::new(m0000990_sbom_add_suppliers::Migration), - Box::new(m0001000_sbom_non_null_suppliers::Migration), - Box::new(m0001010_alter_mavenver_cmp::Migration), - Box::new(m0001020_alter_pythonver_cmp::Migration), - Box::new(m0001030_perf_adv_gin_index::Migration), - Box::new(m0001040_alter_pythonver_cmp::Migration), - Box::new(m0001050_foreign_key_cascade::Migration), - Box::new(m0001060_advisory_vulnerability_indexes::Migration), - Box::new(m0001070_vulnerability_scores::Migration), - Box::new(m0001100_remove_get_purl::Migration), - Box::new(m0001110_sbom_node_checksum_indexes::Migration), - Box::new(m0001120_sbom_external_node_indexes::Migration), - Box::new(m0001130_gover_cmp::Migration), - Box::new(m0001140_expand_spdx_licenses_function::Migration), - Box::new(m0001150_case_license_text_sbom_id_function::Migration), - Box::new(m0001160_improve_expand_spdx_licenses_function::Migration), - Box::new(m0001170_non_null_source_document_id::Migration), - Box::new(m0001180_expand_spdx_licenses_with_mappings_function::Migration), - Box::new(m0001190_optimize_product_advisory_query::Migration), - ] + Self::into_migrations() } } diff --git a/migration/src/m0002000_add_sbom_properties.rs b/migration/src/m0002000_add_sbom_properties.rs new file mode 100644 index 000000000..31dc7c696 --- /dev/null +++ b/migration/src/m0002000_add_sbom_properties.rs @@ -0,0 +1,81 @@ +use crate::data::{MigrationTraitWithData, Sbom as SbomDoc, SchemaDataManager}; +use sea_orm::{ActiveModelTrait, DatabaseTransaction, IntoActiveModel, Set}; +use sea_orm_migration::prelude::*; +use trustify_common::advisory::cyclonedx::extract_properties_json; +use trustify_entity::sbom; + +#[derive(DeriveMigrationName)] +pub struct Migration; + +#[async_trait::async_trait] +impl MigrationTraitWithData for Migration { + async fn up(&self, manager: &SchemaDataManager) -> Result<(), DbErr> { + manager + .alter_table( + Table::alter() + .table(Sbom::Table) + .add_column_if_not_exists( + ColumnDef::new(Sbom::Properties) + .json() + .default(serde_json::Value::Null) + .to_owned(), + ) + .to_owned(), + ) + .await?; + + manager + .alter_table( + Table::alter() + .table(Sbom::Table) + .modify_column(ColumnDef::new(Sbom::Properties).not_null().to_owned()) + .to_owned(), + ) + .await?; + + manager + .process( + self, + async |sbom: SbomDoc, model: sbom::Model, tx: &DatabaseTransaction| { + let mut model = model.into_active_model(); + match sbom { + SbomDoc::CycloneDx(sbom) => { + model.properties = Set(extract_properties_json(&sbom)); + } + SbomDoc::Spdx(_sbom) => { + model.properties = Set(serde_json::Value::Object(Default::default())); + } + SbomDoc::Other(_) => { + // we ignore others + } + } + + model.save(tx).await?; + + Ok(()) + }, + ) + .await?; + + Ok(()) + } + + async fn down(&self, manager: &SchemaDataManager) -> Result<(), DbErr> { + manager + .alter_table( + Table::alter() + .table(Sbom::Table) + .drop_column(Sbom::Properties) + .to_owned(), + ) + .await?; + + Ok(()) + } +} + +#[derive(DeriveIden)] +enum Sbom { + Table, + Properties, +} diff --git a/migration/src/m0002010_add_advisory_scores.rs b/migration/src/m0002010_add_advisory_scores.rs new file mode 100644 index 000000000..1b9e54f27 --- /dev/null +++ b/migration/src/m0002010_add_advisory_scores.rs @@ -0,0 +1,192 @@ +use crate::data::{Advisory, MigrationTraitWithData, SchemaDataManager}; +use sea_orm::{DatabaseTransaction, sea_query::extension::postgres::*}; +use sea_orm_migration::prelude::*; +use strum::VariantNames; +use trustify_common::db::create_enum_if_not_exists; +use trustify_entity::advisory; +use trustify_module_ingestor::{ + graph::cvss::ScoreCreator, + service::advisory::{csaf, cve, osv}, +}; + +#[derive(DeriveMigrationName)] +pub struct Migration; + +#[async_trait::async_trait] +impl MigrationTraitWithData for Migration { + async fn up(&self, manager: &SchemaDataManager) -> Result<(), DbErr> { + create_enum_if_not_exists( + manager, + Severity::Table, + Severity::VARIANTS.iter().skip(1).copied(), + ) + .await?; + + create_enum_if_not_exists( + manager, + ScoreType::Table, + ScoreType::VARIANTS.iter().skip(1).copied(), + ) + .await?; + + manager + .create_table( + Table::create() + .table(AdvisoryVulnerabilityScore::Table) + .if_not_exists() + .col( + ColumnDef::new(AdvisoryVulnerabilityScore::Id) + .uuid() + .not_null() + .primary_key() + .to_owned(), + ) + .col( + ColumnDef::new(AdvisoryVulnerabilityScore::AdvisoryId) + .uuid() + .not_null() + .to_owned(), + ) + .col( + ColumnDef::new(AdvisoryVulnerabilityScore::VulnerabilityId) + .string() + .not_null() + .to_owned(), + ) + .col( + ColumnDef::new(AdvisoryVulnerabilityScore::Type) + .custom(ScoreType::Table) + .not_null() + .to_owned(), + ) + .col( + ColumnDef::new(AdvisoryVulnerabilityScore::Vector) + .string() + .not_null() + .to_owned(), + ) + .col( + ColumnDef::new(AdvisoryVulnerabilityScore::Score) + .float() + .not_null() + .to_owned(), + ) + .col( + ColumnDef::new(AdvisoryVulnerabilityScore::Severity) + .custom(Severity::Table) + .not_null() + .to_owned(), + ) + .foreign_key( + ForeignKey::create() + .from_col(AdvisoryVulnerabilityScore::AdvisoryId) + .from_col(AdvisoryVulnerabilityScore::VulnerabilityId) + .to( + AdvisoryVulnerability::Table, + ( + AdvisoryVulnerability::AdvisoryId, + AdvisoryVulnerability::VulnerabilityId, + ), + ) + .on_delete(ForeignKeyAction::Cascade), + ) + .to_owned(), + ) + .await?; + + manager + .process( + self, + async |advisory: Advisory, model: advisory::Model, tx: &DatabaseTransaction| { + let mut creator = ScoreCreator::new(model.id); + match advisory { + Advisory::Cve(advisory) => { + cve::extract_scores(&advisory, &mut creator); + } + Advisory::Csaf(advisory) => { + csaf::extract_scores(&advisory, &mut creator); + } + Advisory::Osv(advisory) => { + osv::extract_scores(&advisory, &mut creator); + } + _ => { + // we ignore others + } + } + + creator.create(tx).await?; + + Ok(()) + }, + ) + .await?; + + Ok(()) + } + + async fn down(&self, manager: &SchemaDataManager) -> Result<(), DbErr> { + manager + .drop_table( + Table::drop() + .table(AdvisoryVulnerabilityScore::Table) + .if_exists() + .to_owned(), + ) + .await?; + + manager + .drop_type(Type::drop().if_exists().name(Severity::Table).to_owned()) + .await?; + + manager + .drop_type(Type::drop().if_exists().name(ScoreType::Table).to_owned()) + .await?; + + Ok(()) + } +} + +#[derive(DeriveIden)] +enum AdvisoryVulnerability { + Table, + AdvisoryId, + VulnerabilityId, +} + +#[derive(DeriveIden)] +enum AdvisoryVulnerabilityScore { + Table, + Id, + AdvisoryId, + VulnerabilityId, + Type, + Vector, + Score, + Severity, +} + +#[derive(DeriveIden, strum::VariantNames, strum::Display, Clone)] +#[allow(unused)] +enum ScoreType { + Table, + #[strum(to_string = "2.0")] + V2_0, + #[strum(to_string = "3.0")] + V3_0, + #[strum(to_string = "3.1")] + V3_1, + #[strum(to_string = "4.0")] + V4_0, +} + +#[derive(DeriveIden, strum::VariantNames, strum::Display, Clone)] +#[strum(serialize_all = "lowercase")] +#[allow(unused)] +enum Severity { + Table, + None, + Low, + Medium, + High, + Critical, +} diff --git a/migration/tests/data/m0002010.rs b/migration/tests/data/m0002010.rs new file mode 100644 index 000000000..4fdefb4f2 --- /dev/null +++ b/migration/tests/data/m0002010.rs @@ -0,0 +1,44 @@ +use crate::MigratorTest; +use migration::Migrator; +use migration::data::{Database, Direction, MigrationWithData, Options, Runner}; +use sea_orm_migration::MigratorTrait; +use test_context::test_context; +use test_log::test; +use trustify_test_context::{TrustifyMigrationContext, commit, ctx::DumpId}; + +commit!(Commit("8c6ad23172e66a6c923dcc8f702e6125a8d48723")); + +#[test_context(TrustifyMigrationContext)] +#[test(tokio::test)] +async fn examples( + ctx: &TrustifyMigrationContext, /* commit previous to this PR */ +) -> Result<(), anyhow::Error> { + let migrations = vec!["m0002010_add_advisory_scores".into()]; + + // first run the data migration + Runner { + direction: Direction::Up, + storage: ctx.storage.clone().into(), + migrations: migrations.clone(), + database: Database::Provided(ctx.db.clone().into_connection()), + options: Default::default(), + } + .run::() + .await?; + + // now run the migrations, but skip the already run migration + + MigrationWithData::run_with_test( + ctx.storage.clone(), + Options { + skip: migrations, + ..Default::default() + }, + async { MigratorTest::up(&ctx.db, None).await }, + ) + .await?; + + // done + + Ok(()) +} diff --git a/migration/tests/data/main.rs b/migration/tests/data/main.rs new file mode 100644 index 000000000..0e950e1bd --- /dev/null +++ b/migration/tests/data/main.rs @@ -0,0 +1,152 @@ +mod m0002010; + +use migration::{ + Migrator, MigratorExt, + data::{MigrationWithData, Migrations}, +}; +use sea_orm::{ConnectionTrait, Statement}; +use sea_orm_migration::{MigrationTrait, MigratorTrait}; +use std::collections::BTreeSet; +use test_context::test_context; +use test_log::test; +use trustify_test_context::TrustifyMigrationContext; + +struct MigratorTest; + +mod sbom { + use migration::{ + ColumnDef, DeriveIden, DeriveMigrationName, Table, async_trait, + data::{MigrationTraitWithData, Sbom as SbomDoc, SchemaDataManager}, + }; + use sea_orm::{ConnectionTrait, DatabaseTransaction, DbErr, Statement}; + use trustify_entity::sbom; + + #[derive(DeriveMigrationName)] + pub struct Migration; + + #[async_trait::async_trait] + impl MigrationTraitWithData for Migration { + async fn up(&self, manager: &SchemaDataManager) -> Result<(), DbErr> { + manager + .alter_table( + Table::alter() + .table(Sbom::Table) + .add_column_if_not_exists( + ColumnDef::new(Sbom::Foo).string().default("").to_owned(), + ) + .to_owned(), + ) + .await?; + + manager + .alter_table( + Table::alter() + .table(Sbom::Table) + .modify_column(ColumnDef::new(Sbom::Foo).not_null().to_owned()) + .to_owned(), + ) + .await?; + + manager + .process( + self, + async |sbom: SbomDoc, model: sbom::Model, tx: &DatabaseTransaction| { + // we just pick a random value + let value = match sbom { + SbomDoc::CycloneDx(sbom) => sbom.serial_number, + SbomDoc::Spdx(sbom) => { + Some(sbom.document_creation_information.spdx_document_namespace) + } + SbomDoc::Other(_) => None, + }; + + if let Some(value) = value { + let stmt = Statement::from_sql_and_values( + tx.get_database_backend(), + r#"UPDATE SBOM SET FOO = $1 WHERE SBOM_ID = $2"#, + [value.into(), model.sbom_id.into()], + ); + tx.execute(stmt).await?; + } + + Ok(()) + }, + ) + .await?; + + Ok(()) + } + + async fn down(&self, manager: &SchemaDataManager) -> Result<(), DbErr> { + manager + .alter_table( + Table::alter() + .table(Sbom::Table) + .drop_column(Sbom::Foo) + .to_owned(), + ) + .await?; + + Ok(()) + } + } + + #[derive(DeriveIden)] + enum Sbom { + Table, + Foo, + } +} + +impl MigratorExt for MigratorTest { + fn build_migrations() -> Migrations { + Migrator::build_migrations().data(sbom::Migration) + } +} + +impl MigratorTrait for MigratorTest { + fn migrations() -> Vec> { + Self::into_migrations() + } +} + +/// test an example migration base on an existing database dump from the previous commit. +/// +/// The idea is to add a new field and populate it with data. +/// +/// As we don't actually change the entities, this has to work with plain SQL. +#[test_context(TrustifyMigrationContext)] +#[test(tokio::test)] +async fn examples(ctx: &TrustifyMigrationContext) -> Result<(), anyhow::Error> { + MigrationWithData::run_with_test(ctx.storage.clone(), (), async { + MigratorTest::up(&ctx.db, None).await + }) + .await?; + + let result = ctx + .db + .query_all(Statement::from_string( + ctx.db.get_database_backend(), + r#"SELECT FOO FROM SBOM"#, + )) + .await?; + + let foos = result + .into_iter() + .map(|row| row.try_get_by(0)) + .collect::, _>>()?; + + assert_eq!( + [ + "", + "", + "", + "https://access.redhat.com/security/data/sbom/beta/spdx/ubi8-micro-container-0ca57f3b-b0e7-4251-b32b-d2929a52f05c", + "https://access.redhat.com/security/data/sbom/beta/spdx/ubi9-container-f8098ef8-eee0-4ee6-b5d1-b00d992adef5", + "https://access.redhat.com/security/data/sbom/beta/spdx/ubi9-minimal-container-9b954617-943f-43ab-bd5b-3df62a706ed6" + ].into_iter().map(|s| s.to_owned()).collect::>(), + foos + ); + + Ok(()) +} diff --git a/migration/tests/previous.rs b/migration/tests/previous.rs index 543ae4aa4..7035707d0 100644 --- a/migration/tests/previous.rs +++ b/migration/tests/previous.rs @@ -1,3 +1,4 @@ +use migration::data::MigrationWithData; use test_context::test_context; use test_log::test; use trustify_db::Database; @@ -10,7 +11,10 @@ async fn from_previous(ctx: &TrustifyMigrationContext) -> Result<(), anyhow::Err // We automatically start with a database imported from the previous commit. // But we haven't migrated to the most recent schema so far. That's done by the next step. - Database(&ctx.db).migrate().await?; + MigrationWithData::run_with_test(ctx.storage.clone(), (), async { + Database(&ctx.db).migrate().await + }) + .await?; Ok(()) } diff --git a/modules/fundamental/src/vulnerability/model/details/vulnerability_advisory.rs b/modules/fundamental/src/vulnerability/model/details/vulnerability_advisory.rs index 046ee07fd..8c92fbb59 100644 --- a/modules/fundamental/src/vulnerability/model/details/vulnerability_advisory.rs +++ b/modules/fundamental/src/vulnerability/model/details/vulnerability_advisory.rs @@ -215,6 +215,7 @@ impl VulnerabilityAdvisorySummary { "sbom"."data_licenses" AS "sbom$data_licenses", "sbom"."source_document_id" AS "sbom$source_document_id", "sbom"."labels" AS "sbom$labels", + "sbom"."properties" AS "sbom$properties", "sbom_package"."sbom_id" AS "sbom_package$sbom_id", "sbom_package"."node_id" AS "sbom_package$node_id", "sbom_package"."version" AS "sbom_package$version", diff --git a/modules/graphql/src/sbom.rs b/modules/graphql/src/sbom.rs index 39f590448..f931aecb3 100644 --- a/modules/graphql/src/sbom.rs +++ b/modules/graphql/src/sbom.rs @@ -26,6 +26,7 @@ impl SbomQuery { suppliers: sbom_context.sbom.suppliers, source_document_id: sbom_context.sbom.source_document_id, data_licenses: sbom_context.sbom.data_licenses, + properties: sbom_context.sbom.properties, }), Ok(None) => Err(FieldError::new("SBOM not found")), Err(err) => Err(FieldError::from(err)), @@ -68,6 +69,7 @@ impl SbomQuery { suppliers: sbom.sbom.suppliers, source_document_id: sbom.sbom.source_document_id, data_licenses: sbom.sbom.data_licenses, + properties: sbom.sbom.properties, }) }) .collect() diff --git a/modules/ingestor/Cargo.toml b/modules/ingestor/Cargo.toml index c26d10b9d..4fe24e7f3 100644 --- a/modules/ingestor/Cargo.toml +++ b/modules/ingestor/Cargo.toml @@ -19,6 +19,7 @@ bytes = { workspace = true } cpe = { workspace = true } csaf = { workspace = true } cve = { workspace = true } +cvss = { workspace = true } hex = { workspace = true } humantime = { workspace = true } jsn = { workspace = true } diff --git a/modules/ingestor/src/graph/cvss.rs b/modules/ingestor/src/graph/cvss.rs new file mode 100644 index 000000000..cdd537051 --- /dev/null +++ b/modules/ingestor/src/graph/cvss.rs @@ -0,0 +1,197 @@ +use cvss::version::{VersionV2, VersionV3, VersionV4}; +use cvss::{Cvss, v2_0, v3, v4_0}; +use sea_orm::{ColumnTrait, ConnectionTrait, DbErr, EntityTrait, QueryFilter, Set}; +use trustify_entity::advisory_vulnerability_score::{self, ScoreType, Severity}; +use uuid::Uuid; + +#[derive(Debug)] +pub struct ScoreCreator { + advisory_id: Uuid, + scores: Vec, +} + +/// Information required to create a new +#[derive(Clone, Debug)] +pub struct ScoreInformation { + pub vulnerability_id: String, + pub r#type: ScoreType, + pub vector: String, + pub score: f64, + pub severity: Severity, +} + +impl From for advisory_vulnerability_score::ActiveModel { + fn from(value: ScoreInformation) -> Self { + let ScoreInformation { + vulnerability_id, + r#type, + vector, + score, + severity, + } = value; + + Self { + vulnerability_id: Set(vulnerability_id), + r#type: Set(r#type), + vector: Set(vector), + score: Set(score), + severity: Set(severity), + ..Default::default() + } + } +} + +impl From<(String, v2_0::CvssV2)> for ScoreInformation { + fn from((vulnerability_id, score): (String, v2_0::CvssV2)) -> Self { + let v2_0::CvssV2 { + version, + vector_string, + severity, + base_score, + .. + } = score; + + Self { + vulnerability_id, + r#type: match version { + VersionV2::V2_0 => ScoreType::V2_0, + }, + vector: vector_string, + score: base_score, + severity: match severity { + None => Severity::None, + Some(v2_0::Severity::Low) => Severity::Low, + Some(v2_0::Severity::Medium) => Severity::Medium, + Some(v2_0::Severity::High) => Severity::High, + }, + } + } +} + +impl From<(String, v3::CvssV3)> for ScoreInformation { + fn from((vulnerability_id, score): (String, v3::CvssV3)) -> Self { + let v3::CvssV3 { + version, + vector_string, + base_severity, + base_score, + .. + } = score; + + Self { + vulnerability_id, + r#type: match version { + VersionV3::V3_0 => ScoreType::V3_0, + VersionV3::V3_1 => ScoreType::V3_1, + }, + vector: vector_string, + score: base_score, + severity: base_severity.into(), + } + } +} + +impl From<(String, v4_0::CvssV4)> for ScoreInformation { + fn from((vulnerability_id, score): (String, v4_0::CvssV4)) -> Self { + let v4_0::CvssV4 { + version, + vector_string, + base_severity, + base_score, + .. + } = score; + + Self { + vulnerability_id, + r#type: match version { + VersionV4::V4_0 => ScoreType::V4_0, + }, + vector: vector_string, + score: base_score, + severity: base_severity.into(), + } + } +} + +impl From<(String, Cvss)> for ScoreInformation { + fn from((vulnerability_id, score): (String, Cvss)) -> Self { + match score { + Cvss::V2(score) => (vulnerability_id, score).into(), + Cvss::V3_0(score) => (vulnerability_id, score).into(), + Cvss::V3_1(score) => (vulnerability_id, score).into(), + Cvss::V4(score) => (vulnerability_id, score).into(), + } + } +} + +impl ScoreCreator { + pub fn new(advisory_id: Uuid) -> Self { + Self { + advisory_id, + scores: Vec::new(), + } + } + + pub fn add(&mut self, model: impl Into) { + self.scores.push(model.into()); + } + + pub fn extend(&mut self, items: impl IntoIterator>) { + self.scores.extend(items.into_iter().map(Into::into)); + } + + pub async fn create(self, db: &C) -> Result<(), DbErr> + where + C: ConnectionTrait, + { + let Self { + advisory_id, + scores, + } = self; + + // delete existing entries + + advisory_vulnerability_score::Entity::delete_many() + .filter(advisory_vulnerability_score::Column::AdvisoryId.eq(advisory_id)) + .exec(db) + .await?; + + // if we have none, return now + + if scores.is_empty() { + return Ok(()); + } + + // transform and set advisory + + let scores = scores.into_iter().map(|score| { + let ScoreInformation { + vulnerability_id, + r#type, + vector, + score, + severity, + } = score; + + advisory_vulnerability_score::ActiveModel { + id: Set(Uuid::now_v7()), + advisory_id: Set(advisory_id), + vulnerability_id: Set(vulnerability_id), + r#type: Set(r#type), + vector: Set(vector), + score: Set(score), + severity: Set(severity), + } + }); + + // insert chunked + + advisory_vulnerability_score::Entity::insert_many(scores) + .exec(db) + .await?; + + // done + + Ok(()) + } +} diff --git a/modules/ingestor/src/graph/mod.rs b/modules/ingestor/src/graph/mod.rs index 3379e8fca..88fdc20ac 100644 --- a/modules/ingestor/src/graph/mod.rs +++ b/modules/ingestor/src/graph/mod.rs @@ -1,5 +1,6 @@ pub mod advisory; pub mod cpe; +pub mod cvss; pub mod db_context; pub mod error; pub mod organization; diff --git a/modules/ingestor/src/graph/sbom/clearly_defined.rs b/modules/ingestor/src/graph/sbom/clearly_defined.rs index 6864c5f79..306326162 100644 --- a/modules/ingestor/src/graph/sbom/clearly_defined.rs +++ b/modules/ingestor/src/graph/sbom/clearly_defined.rs @@ -88,6 +88,7 @@ impl Into for &Curation { authors: vec!["ClearlyDefined: Community-Curated".to_string()], suppliers: vec![], data_licenses: vec![], + properties: Default::default(), } } } diff --git a/modules/ingestor/src/graph/sbom/cyclonedx.rs b/modules/ingestor/src/graph/sbom/cyclonedx.rs index 32332f354..c030514c8 100644 --- a/modules/ingestor/src/graph/sbom/cyclonedx.rs +++ b/modules/ingestor/src/graph/sbom/cyclonedx.rs @@ -27,7 +27,7 @@ use serde_cyclonedx::cyclonedx::v_1_6::{ use std::{borrow::Cow, str::FromStr}; use time::{OffsetDateTime, format_description::well_known::Iso8601}; use tracing::instrument; -use trustify_common::{cpe::Cpe, purl::Purl}; +use trustify_common::{advisory::cyclonedx::extract_properties_json, cpe::Cpe, purl::Purl}; use trustify_entity::relationship::Relationship; use uuid::Uuid; @@ -128,6 +128,7 @@ impl<'a> From> for SbomInformation { authors, suppliers, data_licenses, + properties: extract_properties_json(sbom), } } } diff --git a/modules/ingestor/src/graph/sbom/mod.rs b/modules/ingestor/src/graph/sbom/mod.rs index dcc3fca1e..be2c4f174 100644 --- a/modules/ingestor/src/graph/sbom/mod.rs +++ b/modules/ingestor/src/graph/sbom/mod.rs @@ -6,7 +6,6 @@ pub mod processor; pub mod spdx; mod common; - pub use common::*; use super::error::Error; @@ -50,6 +49,8 @@ pub struct SbomInformation { pub suppliers: Vec, /// The licenses of the data itself, if known. pub data_licenses: Vec, + /// general purpose properties from the SBOM + pub properties: serde_json::Value, } impl From<()> for SbomInformation { @@ -110,6 +111,7 @@ impl Graph { authors, suppliers, data_licenses, + properties, } = info.into(); let new_id = match self @@ -137,6 +139,8 @@ impl Graph { source_document_id: Set(new_id), labels: Set(labels.into().validate()?), data_licenses: Set(data_licenses), + + properties: Set(properties), }; let node_model = sbom_node::ActiveModel { diff --git a/modules/ingestor/src/graph/sbom/spdx.rs b/modules/ingestor/src/graph/sbom/spdx.rs index 106e84251..0f0129209 100644 --- a/modules/ingestor/src/graph/sbom/spdx.rs +++ b/modules/ingestor/src/graph/sbom/spdx.rs @@ -96,6 +96,7 @@ impl<'a> From> for SbomInformation { .clone(), suppliers: suppliers(sbom), data_licenses: vec![value.0.document_creation_information.data_license.clone()], + properties: Default::default(), } } } diff --git a/modules/ingestor/src/service/advisory/csaf/loader.rs b/modules/ingestor/src/service/advisory/csaf/loader.rs index 06be3ed9b..3c94cb94c 100644 --- a/modules/ingestor/src/service/advisory/csaf/loader.rs +++ b/modules/ingestor/src/service/advisory/csaf/loader.rs @@ -1,3 +1,5 @@ +use crate::graph::cvss::ScoreCreator; +use crate::service::advisory::csaf::extract_scores; use crate::{ graph::{ Graph, @@ -16,6 +18,7 @@ use csaf::{ Csaf, vulnerability::{ProductStatus, Vulnerability}, }; +use cvss::v3::CvssV3; use hex::ToHex; use sbom_walker::report::ReportSink; use sea_orm::{ConnectionTrait, TransactionTrait}; @@ -116,6 +119,10 @@ impl<'g> CsafLoader<'g> { .await?; } + let mut creator = ScoreCreator::new(advisory.advisory.id); + extract_scores(&csaf, &mut creator); + creator.create(&tx).await?; + tx.commit().await?; Ok(IngestResult { @@ -173,16 +180,23 @@ impl<'g> CsafLoader<'g> { } for score in vulnerability.scores.iter().flatten() { - if let Some(v3) = &score.cvss_v3 { - match Cvss3Base::from_str(&v3.to_string()) { - Ok(cvss3) => { - log::debug!("{cvss3:?}"); - advisory_vulnerability - .ingest_cvss3_score(cvss3, connection) - .await?; - } + if let Some(cvss_v3) = &score.cvss_v3 { + match serde_json::from_value::(cvss_v3.clone()) { + Ok(cvss) => match Cvss3Base::from_str(&cvss.vector_string) { + Ok(cvss3) => { + log::debug!("{cvss3:?}"); + advisory_vulnerability + .ingest_cvss3_score(cvss3, connection) + .await?; + } + Err(err) => { + let msg = format!("Unable to parse CVSS3: {err:#?}"); + log::info!("{msg}"); + report.error(msg); + } + }, Err(err) => { - let msg = format!("Unable to parse CVSS3: {err:#?}"); + let msg = format!("Unable to deserialize CVSS3 JSON: {err:#?}"); log::info!("{msg}"); report.error(msg); } diff --git a/modules/ingestor/src/service/advisory/csaf/mod.rs b/modules/ingestor/src/service/advisory/csaf/mod.rs index 94f090702..96501d50c 100644 --- a/modules/ingestor/src/service/advisory/csaf/mod.rs +++ b/modules/ingestor/src/service/advisory/csaf/mod.rs @@ -4,3 +4,31 @@ mod util; mod creator; pub use creator::*; + +use crate::graph::cvss::ScoreCreator; +use csaf::Csaf; +use cvss::v3::CvssV3; + +/// Extract scores from a CSAF document +pub fn extract_scores(csaf: &Csaf, creator: &mut ScoreCreator) { + for vuln in csaf.vulnerabilities.iter().flatten() { + let Some(vulnerability_id) = &vuln.cve else { + // we only process CVEs + continue; + }; + + for score in vuln.scores.iter().flatten() { + if let Some(score) = &score.cvss_v2 + && let Ok(score) = serde_json::from_value::(score.clone()) + { + creator.add((vulnerability_id.clone(), score)) + } + + if let Some(cvss_v3) = &score.cvss_v3 + && let Ok(cvss) = serde_json::from_value::(cvss_v3.clone()) + { + creator.add((vulnerability_id.clone(), cvss)) + } + } + } +} diff --git a/modules/ingestor/src/service/advisory/cve/loader.rs b/modules/ingestor/src/service/advisory/cve/loader.rs index 2c6206270..ccd18ed40 100644 --- a/modules/ingestor/src/service/advisory/cve/loader.rs +++ b/modules/ingestor/src/service/advisory/cve/loader.rs @@ -5,10 +5,14 @@ use crate::{ AdvisoryInformation, AdvisoryVulnerabilityInformation, version::{Version, VersionInfo, VersionSpec}, }, + cvss::ScoreCreator, vulnerability::VulnerabilityInformation, }, model::IngestResult, - service::{Error, Warnings, advisory::cve::divination::divine_purl}, + service::{ + Error, Warnings, + advisory::cve::{divination::divine_purl, extract_scores}, + }, }; use cve::{ Cve, Timestamp, @@ -16,8 +20,7 @@ use cve::{ }; use sea_orm::TransactionTrait; use serde_json::Value; -use std::fmt::Debug; -use std::str::FromStr; +use std::{fmt::Debug, str::FromStr}; use time::OffsetDateTime; use tracing::instrument; use trustify_common::{hashing::Digests, id::Id}; @@ -114,6 +117,10 @@ impl<'g> CveLoader<'g> { } } + let mut score_creator = ScoreCreator::new(advisory.advisory.id); + extract_scores(&cve, &mut score_creator); + score_creator.create(&tx).await?; + if let Some(affected) = affected { for product in affected { if let Some(purl) = divine_purl(product) { diff --git a/modules/ingestor/src/service/advisory/cve/mod.rs b/modules/ingestor/src/service/advisory/cve/mod.rs index 9ec27d9bd..b6383df77 100644 --- a/modules/ingestor/src/service/advisory/cve/mod.rs +++ b/modules/ingestor/src/service/advisory/cve/mod.rs @@ -1,2 +1,71 @@ +use crate::graph::cvss::ScoreCreator; +use cve::Cve; +use cvss::{Cvss, v2_0::CvssV2, v3::CvssV3, v4_0::CvssV4}; + pub mod divination; pub mod loader; + +#[derive(Clone, Debug, PartialEq, Default)] +struct CvssMetric { + pub cvss_v2_0: Option, + pub cvss_v3_0: Option, + pub cvss_v3_1: Option, + pub cvss_v4_0: Option, +} + +impl From<&cve::published::Metric> for CvssMetric { + fn from(metric: &cve::published::Metric) -> Self { + Self { + cvss_v2_0: metric + .cvss_v2_0 + .as_ref() + .and_then(|v| serde_json::from_value(v.clone()).ok()), + cvss_v3_0: metric + .cvss_v3_0 + .as_ref() + .and_then(|v| serde_json::from_value(v.clone()).ok()), + cvss_v3_1: metric + .cvss_v3_1 + .as_ref() + .and_then(|v| serde_json::from_value(v.clone()).ok()), + cvss_v4_0: metric + .cvss_v4_0 + .as_ref() + .and_then(|v| serde_json::from_value(v.clone()).ok()), + } + } +} + +pub fn extract_scores(cve: &Cve, creator: &mut ScoreCreator) { + let Cve::Published(published) = cve else { + return; + }; + + let vulnerability_id = &published.metadata.id; + + let all_metrics = published.containers.cna.metrics.iter().chain( + published + .containers + .adp + .iter() + .flat_map(|adp| adp.metrics.iter()), + ); + + for cve_metric in all_metrics { + let metric = CvssMetric::from(cve_metric); + + let cvss_objects: Vec = vec![ + metric.cvss_v3_1.map(Cvss::V3_1), + metric.cvss_v3_0.map(Cvss::V3_0), + metric.cvss_v2_0.map(Cvss::V2), + metric.cvss_v4_0.map(Cvss::V4), + ] + .into_iter() + .flatten() + .collect(); + + for score in cvss_objects { + creator.add((vulnerability_id.clone(), score)); + } + } +} diff --git a/modules/ingestor/src/service/advisory/osv/loader.rs b/modules/ingestor/src/service/advisory/osv/loader.rs index 206ba4dac..82a65feca 100644 --- a/modules/ingestor/src/service/advisory/osv/loader.rs +++ b/modules/ingestor/src/service/advisory/osv/loader.rs @@ -1,3 +1,4 @@ +use crate::service::advisory::osv::extract_vulnerability_ids; use crate::{ graph::{ Graph, @@ -6,12 +7,13 @@ use crate::{ advisory_vulnerability::AdvisoryVulnerabilityContext, version::{Version, VersionInfo, VersionSpec}, }, + cvss::ScoreCreator, purl::creator::PurlCreator, }, model::IngestResult, service::{ Error, Warnings, - advisory::osv::{prefix::get_well_known_prefixes, translate}, + advisory::osv::{extract_scores, prefix::get_well_known_prefixes, translate}, }, }; use osv::schema::{Ecosystem, Event, Range, RangeType, ReferenceType, SeverityType, Vulnerability}; @@ -48,14 +50,6 @@ impl<'g> OsvLoader<'g> { let tx = self.graph.db.begin().await?; - let cve_ids = osv.aliases.iter().flat_map(|aliases| { - aliases - .iter() - .filter(|e| e.starts_with("CVE-")) - .cloned() - .collect::>() - }); - let information = AdvisoryInformation { id: osv.id.clone(), title: osv.summary.clone(), @@ -78,13 +72,16 @@ impl<'g> OsvLoader<'g> { } let mut purl_creator = PurlCreator::new(); + let mut score_creator = ScoreCreator::new(advisory.advisory.id); + + extract_scores(&osv, &mut score_creator); - for cve_id in cve_ids { - self.graph.ingest_vulnerability(&cve_id, (), &tx).await?; + for cve_id in extract_vulnerability_ids(&osv) { + self.graph.ingest_vulnerability(cve_id, (), &tx).await?; let advisory_vuln = advisory .link_to_vulnerability( - &cve_id, + cve_id, Some(AdvisoryVulnerabilityInformation { title: osv.summary.clone(), summary: osv.summary.clone(), @@ -281,6 +278,7 @@ impl<'g> OsvLoader<'g> { } purl_creator.create(&tx).await?; + score_creator.create(&tx).await?; tx.commit().await?; diff --git a/modules/ingestor/src/service/advisory/osv/mod.rs b/modules/ingestor/src/service/advisory/osv/mod.rs index 90cfb623d..51e9a602f 100644 --- a/modules/ingestor/src/service/advisory/osv/mod.rs +++ b/modules/ingestor/src/service/advisory/osv/mod.rs @@ -3,8 +3,12 @@ mod prefix; pub mod loader; pub mod translate; -use crate::service::Error; -use osv::schema::Vulnerability; +use crate::{ + graph::cvss::{ScoreCreator, ScoreInformation}, + service::Error, +}; +use osv::schema::{SeverityType, Vulnerability}; +use trustify_entity::advisory_vulnerability_score::{ScoreType, Severity}; /// Load a [`Vulnerability`] from YAML, using the "classic" enum representation. pub fn from_yaml(data: &[u8]) -> Result { @@ -34,3 +38,89 @@ pub fn parse(buffer: &[u8]) -> Result { Ok(osv) } + +/// extract vulnerability IDs +pub fn extract_vulnerability_ids(osv: &Vulnerability) -> impl IntoIterator { + osv.aliases + .iter() + .flat_map(|aliases| aliases.iter().filter(|e| e.starts_with("CVE-"))) + .map(|s| s.as_str()) +} + +/// extract scores from OSV +pub fn extract_scores(osv: &Vulnerability, creator: &mut ScoreCreator) { + #[derive(Clone)] + struct ScoreInfo { + pub r#type: ScoreType, + pub vector: String, + pub score: f64, + pub severity: Severity, + } + + impl From<(String, ScoreInfo)> for ScoreInformation { + fn from( + ( + vulnerability_id, + ScoreInfo { + r#type, + vector, + score, + severity, + }, + ): (String, ScoreInfo), + ) -> Self { + Self { + vulnerability_id, + r#type, + vector, + score, + severity, + } + } + } + + // TODO: validate score type by prefix + let scores = osv + .severity + .iter() + .flatten() + .flat_map(|severity| match severity.severity_type { + SeverityType::CVSSv2 => Some(ScoreInfo { + r#type: ScoreType::V2_0, + vector: severity.score.clone(), + score: 10f64, // TODO: replace with actual evaluated score + severity: Severity::Critical, // TODO: replace with actual evaluated severity + }), + SeverityType::CVSSv3 => Some(ScoreInfo { + r#type: match severity.score.starts_with("CVSS:3.1/") { + true => ScoreType::V3_1, + false => ScoreType::V3_0, + }, + vector: severity.score.clone(), + score: 10f64, // TODO: replace with actual evaluated score + severity: Severity::Critical, // TODO: replace with actual evaluated severity + }), + SeverityType::CVSSv4 => Some(ScoreInfo { + r#type: ScoreType::V4_0, + vector: severity.score.clone(), + score: 10f64, // TODO: replace with actual evaluated score + severity: Severity::Critical, // TODO: replace with actual evaluated severity + }), + + _ => None, + }); + + // get all vulnerability IDs + + let ids = extract_vulnerability_ids(osv) + .into_iter() + .collect::>(); + + // create scores for each vulnerability (alias) + + creator.extend( + scores + .into_iter() + .flat_map(|score| ids.iter().map(move |id| (id.to_string(), score.clone()))), + ); +} diff --git a/modules/ingestor/src/service/sbom/clearly_defined.rs b/modules/ingestor/src/service/sbom/clearly_defined.rs index 1e66083cc..5acb55151 100644 --- a/modules/ingestor/src/service/sbom/clearly_defined.rs +++ b/modules/ingestor/src/service/sbom/clearly_defined.rs @@ -66,6 +66,7 @@ impl<'g> ClearlyDefinedLoader<'g> { authors: vec!["ClearlyDefined Definitions".to_string()], suppliers: vec![], data_licenses: vec![], + properties: Default::default(), }, &tx, ) diff --git a/modules/ingestor/tests/issues.rs b/modules/ingestor/tests/issues.rs index a52907aa3..af940e67c 100644 --- a/modules/ingestor/tests/issues.rs +++ b/modules/ingestor/tests/issues.rs @@ -6,7 +6,7 @@ use trustify_common::id::Id; use trustify_test_context::TrustifyContext; #[test_context(TrustifyContext)] -#[test(actix_web::test)] +#[test(tokio::test)] /// Ingested SBOM should not fail async fn issue_1492(ctx: &TrustifyContext) -> anyhow::Result<()> { let result = ctx @@ -19,17 +19,14 @@ async fn issue_1492(ctx: &TrustifyContext) -> anyhow::Result<()> { } #[test_context(TrustifyContext)] -#[test(actix_web::test)] +#[test(tokio::test)] /// Ingested SBOM should not fail async fn cvss_issue_1(ctx: &TrustifyContext) -> anyhow::Result<()> { let result = ctx .ingest_document("csaf/issues/cvss_1/ssa-054046.json") - .await; + .await?; - assert_eq!( - result.expect_err("must be an error").to_string(), - "unknown CVSS metric name: `E` at line 3001 column 11" - ); + assert!(matches!(result.id, Id::Uuid(_))); Ok(()) } diff --git a/modules/ingestor/tests/parallel.rs b/modules/ingestor/tests/parallel.rs index a42819082..9c748e5c1 100644 --- a/modules/ingestor/tests/parallel.rs +++ b/modules/ingestor/tests/parallel.rs @@ -8,14 +8,13 @@ use test_context::{futures, test_context}; use test_log::test; use tracing::instrument; use trustify_common::{cpe::Cpe, purl::Purl, sbom::spdx::parse_spdx}; -use trustify_module_ingestor::service::Cache; use trustify_module_ingestor::{ graph::{ cpe::CpeCreator, purl::creator::PurlCreator, sbom::{LicenseCreator, LicenseInfo}, }, - service::{Discard, Format}, + service::{Cache, Discard, Format}, }; use trustify_test_context::{TrustifyContext, document_bytes, spdx::fix_spdx_rels}; use uuid::Uuid; diff --git a/modules/storage/src/config.rs b/modules/storage/src/config.rs index 91fa1ff42..67419123c 100644 --- a/modules/storage/src/config.rs +++ b/modules/storage/src/config.rs @@ -1,6 +1,12 @@ -use crate::service::Compression; -use std::fmt::{Display, Formatter}; -use std::path::PathBuf; +use crate::service::{ + Compression, dispatch::DispatchBackend, fs::FileSystemBackend, s3::S3Backend, +}; +use anyhow::Context; +use std::{ + fmt::{Display, Formatter}, + fs::create_dir_all, + path::PathBuf, +}; #[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, clap::ValueEnum)] pub enum StorageStrategy { @@ -17,7 +23,7 @@ impl Display for StorageStrategy { } } -#[derive(clap::Args, Debug, Clone)] +#[derive(clap::Parser, Debug, Clone)] #[command(next_help_heading = "Storage")] pub struct StorageConfig { #[arg( @@ -51,6 +57,33 @@ pub struct StorageConfig { pub s3_config: S3Config, } +impl StorageConfig { + /// Create a storage backend from a storage config + pub async fn into_storage(self, devmode: bool) -> anyhow::Result { + Ok(match self.storage_strategy { + StorageStrategy::Fs => { + let storage = self + .fs_path + .as_ref() + .cloned() + .unwrap_or_else(|| PathBuf::from("./.trustify/storage")); + if devmode { + create_dir_all(&storage).context(format!( + "Failed to create filesystem storage directory: {:?}", + self.fs_path + ))?; + } + DispatchBackend::Filesystem( + FileSystemBackend::new(storage, self.compression).await?, + ) + } + StorageStrategy::S3 => { + DispatchBackend::S3(S3Backend::new(self.s3_config, self.compression).await?) + } + }) + } +} + #[derive(Clone, Debug, Default, clap::Args)] #[command(next_help_heading = "S3")] #[group(id = "s3", requires = "storage-strategy")] diff --git a/modules/storage/src/service/dispatch.rs b/modules/storage/src/service/dispatch.rs index d59e8f193..039efcf03 100644 --- a/modules/storage/src/service/dispatch.rs +++ b/modules/storage/src/service/dispatch.rs @@ -22,7 +22,7 @@ impl StorageBackend for DispatchBackend { async fn store(&self, stream: S) -> Result> where - S: AsyncRead + Unpin, + S: AsyncRead + Unpin + Send, { match self { Self::Filesystem(backend) => backend.store(stream).await.map_err(Self::map_err), @@ -30,10 +30,10 @@ impl StorageBackend for DispatchBackend { } } - async fn retrieve<'a>( + async fn retrieve( &self, key: StorageKey, - ) -> Result> + 'a>, Self::Error> + ) -> Result> + use<>>, Self::Error> where Self: Sized, { diff --git a/modules/storage/src/service/fs.rs b/modules/storage/src/service/fs.rs index c84396c51..bf9820276 100644 --- a/modules/storage/src/service/fs.rs +++ b/modules/storage/src/service/fs.rs @@ -159,10 +159,10 @@ impl StorageBackend for FileSystemBackend { Ok(result) } - async fn retrieve<'a>( + async fn retrieve( &self, key: StorageKey, - ) -> Result> + 'a>, Self::Error> { + ) -> Result> + use<>>, Self::Error> { match self.locate(key).await? { Some((path, compression)) => File::open(&path) .await diff --git a/modules/storage/src/service/mod.rs b/modules/storage/src/service/mod.rs index 46ccc4fdb..460694cb0 100644 --- a/modules/storage/src/service/mod.rs +++ b/modules/storage/src/service/mod.rs @@ -85,23 +85,26 @@ impl StorageResult { } pub trait StorageBackend { - type Error: Debug; + type Error: Debug + Display; /// Store the content from a stream fn store( &self, stream: S, - ) -> impl Future>> + ) -> impl Future>> + Send where - S: AsyncRead + Unpin; + S: AsyncRead + Unpin + Send; /// Retrieve the content as an async reader - fn retrieve<'a>( + fn retrieve( &self, key: StorageKey, ) -> impl Future< - Output = Result> + 'a>, Self::Error>, - >; + Output = Result< + Option> + Send + use>, + Self::Error, + >, + > + Send; /// Delete the stored content. /// diff --git a/modules/storage/src/service/s3.rs b/modules/storage/src/service/s3.rs index 7758ab5b0..9eeb236b7 100644 --- a/modules/storage/src/service/s3.rs +++ b/modules/storage/src/service/s3.rs @@ -135,7 +135,7 @@ impl StorageBackend for S3Backend { #[instrument(skip(self, stream), err(Debug, level=tracing::Level::INFO))] async fn store(&self, stream: S) -> Result> where - S: AsyncRead + Unpin, + S: AsyncRead + Unpin + Send, { let file = TempFile::with_compression(stream, self.compression).await?; let result = file.to_result(); @@ -163,10 +163,10 @@ impl StorageBackend for S3Backend { Ok(result) } - async fn retrieve<'a>( + async fn retrieve( &self, StorageKey(key): StorageKey, - ) -> Result> + 'a>, Self::Error> { + ) -> Result> + use<>>, Self::Error> { let req = self.client.get_object().bucket(&self.bucket).key(&key); match req.send().await { diff --git a/server/src/profile/api.rs b/server/src/profile/api.rs index c0fb9af49..695bf8ca4 100644 --- a/server/src/profile/api.rs +++ b/server/src/profile/api.rs @@ -6,10 +6,9 @@ use actix_web::middleware; use crate::{endpoints, profile::spawn_db_check, sample_data}; use actix_web::web; -use anyhow::Context; use bytesize::ByteSize; use futures::FutureExt; -use std::{env, fs::create_dir_all, path::PathBuf, process::ExitCode, sync::Arc}; +use std::{env, process::ExitCode, sync::Arc}; use trustify_auth::{ auth::AuthConfigArguments, authenticator::Authenticator, @@ -29,10 +28,7 @@ use trustify_infrastructure::{ }; use trustify_module_analysis::{config::AnalysisConfig, service::AnalysisService}; use trustify_module_ingestor::graph::Graph; -use trustify_module_storage::{ - config::{StorageConfig, StorageStrategy}, - service::{dispatch::DispatchBackend, fs::FileSystemBackend, s3::S3Backend}, -}; +use trustify_module_storage::{config::StorageConfig, service::dispatch::DispatchBackend}; use trustify_module_ui::{UI, endpoints::UiResources}; use utoipa::openapi::{Info, License}; @@ -144,10 +140,12 @@ mod default { #[group(id = "ui")] pub struct UiConfig { /// Issuer URL used by the UI - #[arg(id = "ui-issuer-url", long, env = "UI_ISSUER_URL", default_value_t = ISSUER_URL.to_string())] + #[arg(id = "ui-issuer-url", long, env = "UI_ISSUER_URL", default_value_t = ISSUER_URL.to_string() + )] pub issuer_url: String, /// Client ID used by the UI - #[arg(id = "ui-client-id", long, env = "UI_CLIENT_ID", default_value_t = FRONTEND_CLIENT_ID.to_string())] + #[arg(id = "ui-client-id", long, env = "UI_CLIENT_ID", default_value_t = FRONTEND_CLIENT_ID.to_string() + )] pub client_id: String, /// Scopes to request #[arg(id = "ui-scope", long, env = "UI_SCOPE", default_value = "openid")] @@ -245,28 +243,7 @@ impl InitData { .register("database", spawn_db_check(db.clone())?) .await; - let storage = match run.storage.storage_strategy { - StorageStrategy::Fs => { - let storage = run - .storage - .fs_path - .as_ref() - .cloned() - .unwrap_or_else(|| PathBuf::from("./.trustify/storage")); - if run.devmode { - create_dir_all(&storage).context(format!( - "Failed to create filesystem storage directory: {:?}", - run.storage.fs_path - ))?; - } - DispatchBackend::Filesystem( - FileSystemBackend::new(storage, run.storage.compression).await?, - ) - } - StorageStrategy::S3 => DispatchBackend::S3( - S3Backend::new(run.storage.s3_config, run.storage.compression).await?, - ), - }; + let storage = run.storage.into_storage(run.devmode).await?; let ui = UI { version: env!("CARGO_PKG_VERSION").to_string(), @@ -499,12 +476,12 @@ mod test { let context = InitContext::default(); let run = Run::from_arg_matches(&Run::augment_args(Command::new("cmd")).get_matches_from( vec![ - "cmd", - "--db-name", - "test", - "--db-port", - &ctx.postgresql.as_ref().expect("database").settings().port.to_string(), - ], + "cmd", + "--db-name", + "test", + "--db-port", + &ctx.postgresql.as_ref().expect("database").settings().port.to_string(), + ], ))?; InitData::new(context, run).await.map(|_| ()) } diff --git a/server/src/profile/importer.rs b/server/src/profile/importer.rs index ad0f50f85..ed92506e0 100644 --- a/server/src/profile/importer.rs +++ b/server/src/profile/importer.rs @@ -4,10 +4,7 @@ use std::{path::PathBuf, process::ExitCode}; use trustify_common::{config::Database, db}; use trustify_infrastructure::{Infrastructure, InfrastructureConfig, InitContext}; use trustify_module_importer::server::importer; -use trustify_module_storage::{ - config::{StorageConfig, StorageStrategy}, - service::{dispatch::DispatchBackend, fs::FileSystemBackend, s3::S3Backend}, -}; +use trustify_module_storage::{config::StorageConfig, service::dispatch::DispatchBackend}; /// Run the API server #[derive(clap::Args, Debug)] @@ -73,22 +70,7 @@ impl InitData { .register("database", spawn_db_check(db.clone())?) .await; - let storage = match run.storage.storage_strategy { - StorageStrategy::Fs => { - let storage = run - .storage - .fs_path - .as_ref() - .cloned() - .unwrap_or_else(|| PathBuf::from("./.trustify/storage")); - DispatchBackend::Filesystem( - FileSystemBackend::new(storage, run.storage.compression).await?, - ) - } - StorageStrategy::S3 => DispatchBackend::S3( - S3Backend::new(run.storage.s3_config, run.storage.compression).await?, - ), - }; + let storage = run.storage.into_storage(false).await?; Ok(InitData { db, diff --git a/test-context/src/ctx/migration.rs b/test-context/src/ctx/migration.rs index 5372fb9b5..93e309715 100644 --- a/test-context/src/ctx/migration.rs +++ b/test-context/src/ctx/migration.rs @@ -1,15 +1,43 @@ use crate::{TrustifyTestContext, migration::Migration}; use anyhow::Context; +use std::borrow::Cow; +use std::marker::PhantomData; use std::ops::Deref; use tar::Archive; use test_context::AsyncTestContext; use trustify_db::embedded::{Options, Source, default_settings}; use trustify_module_storage::service::fs::FileSystemBackend; +#[macro_export] +macro_rules! commit { + ($t:ident($id:literal)) => { + pub struct $t; + + impl DumpId for $t { + fn dump_id() -> Option<&'static str> { + Some($id) + } + } + }; +} + +pub trait DumpId { + fn dump_id() -> Option<&'static str>; +} + +impl DumpId for () { + fn dump_id() -> Option<&'static str> { + None + } +} + /// Creates a database and imports the previous DB and storage dump. -pub struct TrustifyMigrationContext(pub(crate) TrustifyTestContext); +pub struct TrustifyMigrationContext( + pub(crate) TrustifyTestContext, + PhantomData, +); -impl Deref for TrustifyMigrationContext { +impl Deref for TrustifyMigrationContext { type Target = TrustifyTestContext; fn deref(&self) -> &Self::Target { @@ -17,10 +45,14 @@ impl Deref for TrustifyMigrationContext { } } -impl TrustifyMigrationContext { +impl TrustifyMigrationContext { pub async fn new() -> anyhow::Result { let migration = Migration::new().expect("failed to create migration manager"); - let base = migration.provide().await?; + let id: Cow<'static, str> = match ID::dump_id() { + Some(id) => format!("commit-{id}").into(), + None => "latest".into(), + }; + let base = migration.provide(&id).await?; // create storage @@ -50,13 +82,14 @@ impl TrustifyMigrationContext { Ok(Self( TrustifyTestContext::new(db, storage, tmp, postgresql).await, + Default::default(), )) } } -impl AsyncTestContext for TrustifyMigrationContext { +impl AsyncTestContext for TrustifyMigrationContext { async fn setup() -> Self { - TrustifyMigrationContext::new() + Self::new() .await .expect("failed to create migration context") } diff --git a/test-context/src/migration.rs b/test-context/src/migration.rs index 8c316e024..3a9463e19 100644 --- a/test-context/src/migration.rs +++ b/test-context/src/migration.rs @@ -68,8 +68,8 @@ impl Migration { /// Provide the base dump path, for this branch. /// /// This may include downloading content from S3. - pub async fn provide(&self) -> anyhow::Result { - let base = self.base.join(&self.branch); + pub async fn provide(&self, id: &str) -> anyhow::Result { + let base = self.base.join(&self.branch).join(id); log::info!("branch base path: '{}'", base.display()); @@ -101,6 +101,7 @@ impl Migration { &self.bucket, &self.region, &self.branch, + id, files, ) .await? @@ -297,6 +298,7 @@ async fn download_artifacts( bucket: &str, region: &str, branch: &str, + commit: &str, files: impl IntoIterator>, ) -> anyhow::Result<()> { let base = base.as_ref(); @@ -305,10 +307,7 @@ async fn download_artifacts( let file = file.as_ref(); vec![file.to_string(), format!("{file}.sha256")] }) { - let url = format!( - "https://{}.s3.{}.amazonaws.com/{}/latest/{}", - bucket, region, branch, file - ); + let url = format!("https://{bucket}.s3.{region}.amazonaws.com/{branch}/{commit}/{file}",); log::info!("downloading file: '{url}'"); diff --git a/trustd/Cargo.toml b/trustd/Cargo.toml index 44622fc06..da6319bc5 100644 --- a/trustd/Cargo.toml +++ b/trustd/Cargo.toml @@ -13,6 +13,8 @@ path = "src/main.rs" trustify-common = { workspace = true } trustify-db = { workspace = true } trustify-infrastructure = { workspace = true } +trustify-migration = { workspace = true } +trustify-module-storage = { workspace = true } trustify-server = { workspace = true } anyhow = { workspace = true } diff --git a/trustd/src/db.rs b/trustd/src/db.rs index af9831859..32ab6a2b1 100644 --- a/trustd/src/db.rs +++ b/trustd/src/db.rs @@ -1,12 +1,9 @@ +use migration::data::{self, Direction, Options, Runner}; use postgresql_embedded::{PostgreSQL, VersionReq}; -use std::collections::HashMap; -use std::env; -use std::fs::create_dir_all; -use std::process::ExitCode; -use std::time::Duration; -use trustify_common::config::Database; -use trustify_common::db; +use std::{collections::HashMap, env, fs::create_dir_all, process::ExitCode, time::Duration}; +use trustify_common::{config::Database, db}; use trustify_infrastructure::otel::{Tracing, init_tracing}; +use trustify_module_storage::config::StorageConfig; #[derive(clap::Args, Debug)] pub struct Run { @@ -16,11 +13,17 @@ pub struct Run { pub(crate) database: Database, } -#[derive(clap::Subcommand, Debug)] +#[derive(clap::Subcommand, Debug, Clone)] +#[allow(clippy::large_enum_variant)] pub enum Command { + /// Create database Create, + /// Run migrations (up) Migrate, + /// Remove all migrations and re-apply them (DANGER) Refresh, + /// Run specific data migrations + Data(Data), } impl Run { @@ -31,6 +34,7 @@ impl Run { Create => self.create().await, Migrate => self.migrate().await, Refresh => self.refresh().await, + Data(data) => data.run(Direction::Up, self.database).await, } } @@ -40,6 +44,7 @@ impl Run { Err(e) => Err(e), } } + async fn refresh(self) -> anyhow::Result { match db::Database::new(&self.database).await { Ok(db) => { @@ -49,6 +54,7 @@ impl Run { Err(e) => Err(e), } } + async fn migrate(self) -> anyhow::Result { match db::Database::new(&self.database).await { Ok(db) => { @@ -104,3 +110,43 @@ impl Run { Ok(postgresql) } } + +#[derive(clap::Args, Debug, Clone)] +pub struct Data { + /// Migrations to run + #[arg()] + name: Vec, + #[command(flatten)] + storage: StorageConfig, + #[command(flatten)] + options: Options, +} + +impl Data { + pub async fn run(self, direction: Direction, database: Database) -> anyhow::Result { + let Self { + name: migrations, + storage, + options, + } = self; + + match db::Database::new(&database).await { + Ok(db) => { + trustify_db::Database(&db) + .data_migrate(Runner { + database: data::Database::Config { + url: database.to_url(), + schema: None, + }, + storage: storage.into_storage(false).await?, + direction, + migrations, + options, + }) + .await?; + Ok(ExitCode::SUCCESS) + } + Err(e) => Err(e), + } + } +}