Skip to content

Commit 8542d3e

Browse files
committed
chore: initial PoC impl
1 parent bcb129b commit 8542d3e

File tree

9 files changed

+251
-15
lines changed

9 files changed

+251
-15
lines changed

Cargo.lock

Lines changed: 7 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

migration/Cargo.toml

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,24 @@ name = "migration"
1010
path = "src/lib.rs"
1111

1212
[dependencies]
13+
trustify-common = { workspace = true }
14+
trustify-entity = { workspace = true }
15+
trustify-module-storage = { workspace = true }
16+
17+
bytes = { workspace = true }
18+
futures-util = { workspace = true }
19+
anyhow = { workspace = true }
20+
sea-orm = { workspace = true }
1321
sea-orm-migration = { workspace = true, features = ["runtime-tokio-rustls", "sqlx-postgres", "with-uuid"] }
22+
serde-cyclonedx = { workspace = true }
23+
serde_json = { workspace = true }
24+
spdx-rs = { workspace = true }
1425
tokio = { workspace = true, features = ["full"] }
1526
uuid = { workspace = true, features = ["v5"] }
1627

1728
[dev-dependencies]
1829
trustify-common = { workspace = true }
19-
trustify-db = { workspace = true }
30+
trustify-db = { workspace = true }
2031
trustify-entity = { workspace = true }
2132
trustify-test-context = { workspace = true }
2233

migration/src/data/mod.rs

Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
use anyhow::{anyhow, bail};
2+
use bytes::BytesMut;
3+
use futures_util::stream::TryStreamExt;
4+
use futures_util::{StreamExt, stream};
5+
use sea_orm::{
6+
ConnectionTrait, DatabaseTransaction, DbErr, EntityTrait, ModelTrait, TransactionTrait,
7+
};
8+
use sea_orm_migration::SchemaManager;
9+
use trustify_common::id::Id;
10+
use trustify_entity::{sbom, source_document};
11+
use trustify_module_storage::service::{StorageBackend, StorageKey, dispatch::DispatchBackend};
12+
13+
#[allow(clippy::large_enum_variant)]
14+
pub enum Sbom {
15+
CycloneDx(serde_cyclonedx::cyclonedx::v_1_6::CycloneDx),
16+
Spdx(spdx_rs::models::SPDX),
17+
}
18+
19+
pub trait Document: Sized + Send + Sync {
20+
type Model: Send;
21+
22+
async fn all<C>(tx: &C) -> Result<Vec<Self::Model>, DbErr>
23+
where
24+
C: ConnectionTrait;
25+
26+
async fn source<S, C>(model: &Self::Model, storage: &S, tx: &C) -> Result<Self, anyhow::Error>
27+
where
28+
S: StorageBackend + Send + Sync,
29+
C: ConnectionTrait;
30+
}
31+
32+
impl Document for Sbom {
33+
type Model = sbom::Model;
34+
35+
async fn all<C: ConnectionTrait>(tx: &C) -> Result<Vec<Self::Model>, DbErr> {
36+
sbom::Entity::find().all(tx).await
37+
}
38+
39+
async fn source<S, C>(model: &Self::Model, storage: &S, tx: &C) -> Result<Self, anyhow::Error>
40+
where
41+
S: StorageBackend + Send + Sync,
42+
C: ConnectionTrait,
43+
{
44+
let source = model.find_related(source_document::Entity).one(tx).await?;
45+
46+
let Some(source) = source else {
47+
bail!("Missing source document ID for SBOM: {}", model.sbom_id);
48+
};
49+
50+
let stream = storage
51+
.retrieve(
52+
StorageKey::try_from(Id::Sha256(source.sha256))
53+
.map_err(|err| anyhow!("Invalid ID: {err}"))?,
54+
)
55+
.await
56+
.map_err(|err| anyhow!("Failed to retrieve document: {err}"))?
57+
.ok_or_else(|| anyhow!("Missing source document for SBOM: {}", model.sbom_id))?;
58+
59+
stream
60+
.try_collect::<BytesMut>()
61+
.await
62+
.map_err(|err| anyhow!("Failed to collect bytes: {err}"))
63+
.map(|bytes| bytes.freeze())
64+
.and_then(|bytes| {
65+
serde_json::from_slice(&bytes)
66+
.map(Sbom::Spdx)
67+
.or_else(|_| serde_json::from_slice(&bytes).map(Sbom::CycloneDx))
68+
.map_err(|err| anyhow!("Failed to parse document: {err}"))
69+
})
70+
}
71+
}
72+
73+
pub trait Handler<D>: Send
74+
where
75+
D: Document,
76+
{
77+
async fn call(
78+
&self,
79+
document: D,
80+
model: D::Model,
81+
tx: &DatabaseTransaction,
82+
) -> anyhow::Result<()>;
83+
}
84+
85+
pub trait DocumentProcessor {
86+
async fn process<D>(
87+
&self,
88+
storage: &DispatchBackend,
89+
f: impl Handler<D>,
90+
) -> anyhow::Result<(), DbErr>
91+
where
92+
D: Document;
93+
}
94+
95+
impl<'c> DocumentProcessor for SchemaManager<'c> {
96+
async fn process<D>(
97+
&self,
98+
storage: &DispatchBackend,
99+
f: impl Handler<D>,
100+
) -> anyhow::Result<(), DbErr>
101+
where
102+
D: Document,
103+
{
104+
let db = self.get_connection();
105+
let tx = db.begin().await?;
106+
107+
// TODO: soft-lock database
108+
// In order to prevent new documents with an old version to be created in the meantime, we
109+
// should soft-lock the database.
110+
111+
let all = D::all(&tx).await?;
112+
113+
stream::iter(all)
114+
.map(async |model| {
115+
let doc = D::source(&model, storage, &tx).await.map_err(|err| {
116+
DbErr::Migration(format!("Failed to load source document: {err}"))
117+
})?;
118+
f.call(doc, model, &tx).await.map_err(|err| {
119+
DbErr::Migration(format!("Failed to process document: {err}"))
120+
})?;
121+
122+
Ok::<_, DbErr>(())
123+
})
124+
.buffer_unordered(10) // TODO: make this configurable
125+
.try_collect::<Vec<_>>()
126+
.await?;
127+
128+
// TODO: soft-unlock database
129+
130+
Ok(())
131+
}
132+
}
133+
134+
#[macro_export]
135+
macro_rules! handler {
136+
(async | $doc:ident: $doc_ty:ty, $model:ident, $tx:ident | $body:block) => {{
137+
struct H;
138+
139+
impl $crate::data::Handler<$doc_ty> for H {
140+
async fn call(
141+
&self,
142+
$doc: $doc_ty,
143+
$model: <$doc_ty as $crate::data::Document>::Model,
144+
$tx: &sea_orm::DatabaseTransaction,
145+
) -> anyhow::Result<()> {
146+
$body
147+
}
148+
}
149+
150+
H
151+
}};
152+
}
153+
154+
#[macro_export]
155+
macro_rules! sbom {
156+
(async | $doc:ident, $model:ident, $tx:ident | $body:block) => {
157+
$crate::handler!(async |$doc: $crate::data::Sbom, $model, $tx| $body)
158+
};
159+
}

migration/src/lib.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
pub use sea_orm_migration::prelude::*;
22

3+
mod data;
4+
35
mod m0000010_init;
46
mod m0000020_add_sbom_group;
57
mod m0000030_perf_adv_vuln;
@@ -25,6 +27,7 @@ mod m0001100_remove_get_purl;
2527
mod m0001110_sbom_node_checksum_indexes;
2628
mod m0001120_sbom_external_node_indexes;
2729
mod m0001130_gover_cmp;
30+
mod m0001140_example_data_migration;
2831

2932
pub struct Migrator;
3033

@@ -57,6 +60,7 @@ impl MigratorTrait for Migrator {
5760
Box::new(m0001110_sbom_node_checksum_indexes::Migration),
5861
Box::new(m0001120_sbom_external_node_indexes::Migration),
5962
Box::new(m0001130_gover_cmp::Migration),
63+
Box::new(m0001140_example_data_migration::Migration),
6064
]
6165
}
6266
}
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
use crate::{
2+
data::{DocumentProcessor, Sbom},
3+
sbom,
4+
sea_orm::{ActiveModelTrait, IntoActiveModel, Set},
5+
};
6+
use sea_orm_migration::prelude::*;
7+
use trustify_module_storage::service::{dispatch::DispatchBackend, fs::FileSystemBackend};
8+
9+
#[derive(DeriveMigrationName)]
10+
pub struct Migration;
11+
12+
#[async_trait::async_trait]
13+
impl MigrationTrait for Migration {
14+
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
15+
// TODO: make this configurable
16+
let (storage, _tmp) = FileSystemBackend::for_test()
17+
.await
18+
.map_err(|err| DbErr::Migration(format!("failed to create storage backend: {err}")))?;
19+
let storage = DispatchBackend::Filesystem(storage);
20+
21+
// process data
22+
23+
manager
24+
.process(
25+
&storage,
26+
sbom!(async |sbom, model, tx| {
27+
let mut model = model.into_active_model();
28+
match sbom {
29+
Sbom::CycloneDx(_sbom) => {
30+
// TODO: just an example
31+
model.authors = Set(vec![]);
32+
}
33+
Sbom::Spdx(_sbom) => {
34+
// TODO: just an example
35+
model.authors = Set(vec![]);
36+
}
37+
}
38+
39+
model.save(tx).await?;
40+
41+
Ok(())
42+
}),
43+
)
44+
.await?;
45+
46+
Ok(())
47+
}
48+
49+
async fn down(&self, _manager: &SchemaManager) -> Result<(), DbErr> {
50+
Ok(())
51+
}
52+
}

modules/storage/src/service/dispatch.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,18 +22,18 @@ impl StorageBackend for DispatchBackend {
2222

2323
async fn store<S>(&self, stream: S) -> Result<StorageResult, StoreError<Self::Error>>
2424
where
25-
S: AsyncRead + Unpin,
25+
S: AsyncRead + Unpin + Send,
2626
{
2727
match self {
2828
Self::Filesystem(backend) => backend.store(stream).await.map_err(Self::map_err),
2929
Self::S3(backend) => backend.store(stream).await.map_err(Self::map_err),
3030
}
3131
}
3232

33-
async fn retrieve<'a>(
33+
async fn retrieve(
3434
&self,
3535
key: StorageKey,
36-
) -> Result<Option<impl Stream<Item = Result<Bytes, Self::Error>> + 'a>, Self::Error>
36+
) -> Result<Option<impl Stream<Item = Result<Bytes, Self::Error>> + use<>>, Self::Error>
3737
where
3838
Self: Sized,
3939
{

modules/storage/src/service/fs.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -150,10 +150,10 @@ impl StorageBackend for FileSystemBackend {
150150
Ok(result)
151151
}
152152

153-
async fn retrieve<'a>(
153+
async fn retrieve(
154154
&self,
155155
key: StorageKey,
156-
) -> Result<Option<impl Stream<Item = Result<Bytes, Self::Error>> + 'a>, Self::Error> {
156+
) -> Result<Option<impl Stream<Item = Result<Bytes, Self::Error>> + use<>>, Self::Error> {
157157
match self.locate(key).await? {
158158
Some((path, compression)) => File::open(&path)
159159
.await

modules/storage/src/service/mod.rs

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -85,23 +85,26 @@ impl StorageResult {
8585
}
8686

8787
pub trait StorageBackend {
88-
type Error: Debug;
88+
type Error: Debug + Display;
8989

9090
/// Store the content from a stream
9191
fn store<S>(
9292
&self,
9393
stream: S,
94-
) -> impl Future<Output = Result<StorageResult, StoreError<Self::Error>>>
94+
) -> impl Future<Output = Result<StorageResult, StoreError<Self::Error>>> + Send
9595
where
96-
S: AsyncRead + Unpin;
96+
S: AsyncRead + Unpin + Send;
9797

9898
/// Retrieve the content as an async reader
99-
fn retrieve<'a>(
99+
fn retrieve(
100100
&self,
101101
key: StorageKey,
102102
) -> impl Future<
103-
Output = Result<Option<impl Stream<Item = Result<Bytes, Self::Error>> + 'a>, Self::Error>,
104-
>;
103+
Output = Result<
104+
Option<impl Stream<Item = Result<Bytes, Self::Error>> + Send + use<Self>>,
105+
Self::Error,
106+
>,
107+
> + Send;
105108

106109
/// Delete the stored content.
107110
///

modules/storage/src/service/s3.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ impl StorageBackend for S3Backend {
135135
#[instrument(skip(self, stream), err(Debug, level=tracing::Level::INFO))]
136136
async fn store<S>(&self, stream: S) -> Result<StorageResult, StoreError<Self::Error>>
137137
where
138-
S: AsyncRead + Unpin,
138+
S: AsyncRead + Unpin + Send,
139139
{
140140
let file = TempFile::with_compression(stream, self.compression).await?;
141141
let result = file.to_result();
@@ -163,10 +163,10 @@ impl StorageBackend for S3Backend {
163163
Ok(result)
164164
}
165165

166-
async fn retrieve<'a>(
166+
async fn retrieve(
167167
&self,
168168
StorageKey(key): StorageKey,
169-
) -> Result<Option<impl Stream<Item = Result<Bytes, Self::Error>> + 'a>, Self::Error> {
169+
) -> Result<Option<impl Stream<Item = Result<Bytes, Self::Error>> + use<>>, Self::Error> {
170170
let req = self.client.get_object().bucket(&self.bucket).key(&key);
171171

172172
match req.send().await {

0 commit comments

Comments
 (0)