Skip to content

Commit d949397

Browse files
committed
chore: make concurrency configurable
1 parent 516fdc8 commit d949397

File tree

2 files changed

+26
-9
lines changed

2 files changed

+26
-9
lines changed

migration/src/data/migration.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,9 @@ impl<'c> SchemaDataManager<'c> {
6969
where
7070
D: Document,
7171
{
72-
self.manager.process(self.storage, f).await
72+
self.manager
73+
.process(self.storage, Default::default(), f)
74+
.await
7375
}
7476
}
7577

migration/src/data/mod.rs

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
mod migration;
2-
32
pub use migration::*;
43

54
use anyhow::{anyhow, bail};
@@ -12,6 +11,7 @@ use sea_orm::{
1211
ConnectionTrait, DatabaseTransaction, DbErr, EntityTrait, ModelTrait, TransactionTrait,
1312
};
1413
use sea_orm_migration::SchemaManager;
14+
use std::num::NonZeroUsize;
1515
use trustify_common::id::Id;
1616
use trustify_entity::{sbom, source_document};
1717
use trustify_module_storage::service::{StorageBackend, StorageKey, dispatch::DispatchBackend};
@@ -90,28 +90,43 @@ where
9090
) -> anyhow::Result<()>;
9191
}
9292

93+
#[derive(Clone, Debug, PartialEq, Eq)]
94+
pub struct Options {
95+
pub concurrent: NonZeroUsize,
96+
}
97+
98+
impl Default for Options {
99+
fn default() -> Self {
100+
Self {
101+
concurrent: unsafe { NonZeroUsize::new_unchecked(5) },
102+
}
103+
}
104+
}
105+
93106
pub trait DocumentProcessor {
94107
async fn process<D>(
95108
&self,
96109
storage: &DispatchBackend,
110+
options: Options,
97111
f: impl Handler<D>,
98112
) -> anyhow::Result<(), DbErr>
99113
where
100114
D: Document;
101115
}
102116

103117
impl<'c> DocumentProcessor for SchemaManager<'c> {
104-
async fn process<D>(&self, storage: &DispatchBackend, f: impl Handler<D>) -> Result<(), DbErr>
118+
async fn process<D>(
119+
&self,
120+
storage: &DispatchBackend,
121+
options: Options,
122+
f: impl Handler<D>,
123+
) -> Result<(), DbErr>
105124
where
106125
D: Document,
107126
{
108127
let db = self.get_connection();
109128
let tx = db.begin().await?;
110129

111-
// TODO: soft-lock database
112-
// In order to prevent new documents with an old version to be created in the meantime, we
113-
// should soft-lock the database.
114-
115130
let all = D::all(&tx).await?;
116131

117132
stream::iter(all)
@@ -125,11 +140,11 @@ impl<'c> DocumentProcessor for SchemaManager<'c> {
125140

126141
Ok::<_, DbErr>(())
127142
})
128-
.buffer_unordered(10) // TODO: make this configurable
143+
.buffer_unordered(options.concurrent.into())
129144
.try_collect::<Vec<_>>()
130145
.await?;
131146

132-
// TODO: soft-unlock database
147+
tx.commit().await?;
133148

134149
Ok(())
135150
}

0 commit comments

Comments
 (0)