Skip to content

Commit ad42a6c

Browse files
committed
feat: add way to run data migrations from main binary
1 parent a6f20d8 commit ad42a6c

File tree

10 files changed

+194
-100
lines changed

10 files changed

+194
-100
lines changed

Cargo.lock

Lines changed: 3 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

common/db/src/lib.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ pub mod embedded;
22

33
use anyhow::ensure;
44
use migration::Migrator;
5+
use migration::data::Runner;
56
use sea_orm::{ConnectionTrait, Statement};
67
use sea_orm_migration::prelude::MigratorTrait;
78
use tracing::instrument;
@@ -35,7 +36,7 @@ impl<'a> Database<'a> {
3536
"Unable to bootstrap database with '--db-url'"
3637
);
3738

38-
let url = crate::config::Database {
39+
let url = config::Database {
3940
name: "postgres".into(),
4041
..database.clone()
4142
}
@@ -64,4 +65,8 @@ impl<'a> Database<'a> {
6465

6566
Ok(db)
6667
}
68+
69+
pub async fn data_migrate(&self, runner: Runner) -> Result<(), anyhow::Error> {
70+
runner.run::<Migrator>().await
71+
}
6772
}

migration/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ trustify-module-storage = { workspace = true }
1717
anyhow = { workspace = true }
1818
bytes = { workspace = true }
1919
clap = { workspace = true, features = ["derive", "env"] }
20+
futures = { workspace = true }
2021
futures-util = { workspace = true }
2122
sea-orm = { workspace = true }
2223
sea-orm-migration = { workspace = true, features = ["runtime-tokio-rustls", "sqlx-postgres", "with-uuid"] }

migration/src/bin/data.rs

Lines changed: 18 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,8 @@
1-
use anyhow::bail;
21
use clap::Parser;
3-
use migration::{Migrator, Options, SchemaDataManager};
4-
use sea_orm::{ConnectOptions, Database};
5-
use sea_orm_migration::{IntoSchemaManagerConnection, SchemaManager};
6-
use std::collections::HashMap;
2+
use migration::{
3+
Migrator,
4+
data::{Direction, MigratorWithData, Options, Runner},
5+
};
76
use trustify_module_storage::config::StorageConfig;
87

98
#[derive(clap::Parser, Debug, Clone)]
@@ -13,6 +12,7 @@ struct Cli {
1312
}
1413

1514
#[derive(clap::Subcommand, Debug, Clone)]
15+
#[allow(clippy::large_enum_variant)]
1616
enum Command {
1717
/// List all data migrations
1818
List,
@@ -67,13 +67,6 @@ struct Run {
6767
storage: StorageConfig,
6868
}
6969

70-
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, clap::ValueEnum)]
71-
pub enum Direction {
72-
#[default]
73-
Up,
74-
Down,
75-
}
76-
7770
impl Run {
7871
fn direction(&self) -> Direction {
7972
if self.down {
@@ -83,47 +76,23 @@ impl Run {
8376
}
8477
}
8578

79+
#[allow(clippy::expect_used)]
8680
pub async fn run(self) -> anyhow::Result<()> {
8781
let direction = self.direction();
88-
89-
let migrations = Migrator::data_migrations()
90-
.into_iter()
91-
.map(|migration| (migration.name().to_string(), migration))
92-
.collect::<HashMap<_, _>>();
93-
94-
let mut running = vec![];
95-
96-
for migration in self.migrations {
97-
let Some(migration) = migrations.get(&migration) else {
98-
bail!("Migration {migration} not found");
99-
};
100-
running.push(migration);
101-
}
102-
10382
let storage = self.storage.into_storage(false).await?;
10483

105-
let url = self
106-
.database_url
107-
.expect("Environment variable 'DATABASE_URL' not set");
108-
let schema = self.database_schema.unwrap_or_else(|| "public".to_owned());
109-
110-
let connect_options = ConnectOptions::new(url)
111-
.set_schema_search_path(schema)
112-
.to_owned();
113-
114-
let db = Database::connect(connect_options).await?;
115-
116-
let manager = SchemaManager::new(db.into_schema_manager_connection());
117-
let manager = SchemaDataManager::new(&manager, &storage, &self.options);
118-
119-
for run in running {
120-
tracing::info!("Running data migration: {}", run.name());
121-
122-
match direction {
123-
Direction::Up => run.up(&manager).await?,
124-
Direction::Down => run.down(&manager).await?,
125-
}
84+
Runner {
85+
direction,
86+
storage,
87+
migrations: self.migrations,
88+
database_url: self
89+
.database_url
90+
.expect("Environment variable 'DATABASE_URL' not set"),
91+
database_schema: self.database_schema,
92+
options: self.options,
12693
}
94+
.run::<Migrator>()
95+
.await?;
12796

12897
Ok(())
12998
}
@@ -143,6 +112,7 @@ impl Command {
143112
}
144113
}
145114

115+
#[allow(clippy::unwrap_used)]
146116
#[tokio::main]
147117
async fn main() {
148118
let cli = Cli::parse();

migration/src/data/migration.rs

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use crate::{
33
data::{Document, DocumentProcessor, Handler, Options},
44
};
55
use clap::Parser;
6+
use futures::executor::block_on;
67
use sea_orm::DbErr;
78
use sea_orm_migration::{MigrationName, MigrationTrait, SchemaManager};
89
use std::{ffi::OsString, sync::LazyLock};
@@ -22,14 +23,7 @@ fn init_storage() -> DispatchBackend {
2223
// create from env-vars only
2324
let config = StorageConfig::parse_from::<_, OsString>(vec![]);
2425

25-
tokio::task::block_in_place(|| {
26-
tokio::runtime::Handle::current().block_on(async {
27-
config
28-
.into_storage(false)
29-
.await
30-
.expect("Failed to create storage")
31-
})
32-
})
26+
block_on(config.into_storage(false)).expect("task panicked")
3327
}
3428

3529
fn init_options() -> Options {

migration/src/data/mod.rs

Lines changed: 45 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
mod migration;
2+
mod run;
3+
24
pub use migration::*;
5+
pub use run::*;
36

47
use anyhow::{anyhow, bail};
58
use bytes::BytesMut;
@@ -10,7 +13,7 @@ use futures_util::{
1013
use sea_orm::{
1114
ConnectionTrait, DatabaseTransaction, DbErr, EntityTrait, ModelTrait, TransactionTrait,
1215
};
13-
use sea_orm_migration::SchemaManager;
16+
use sea_orm_migration::{MigrationTrait, SchemaManager};
1417
use std::num::NonZeroUsize;
1518
use trustify_common::id::Id;
1619
use trustify_entity::{sbom, source_document};
@@ -105,12 +108,12 @@ impl Default for Options {
105108
}
106109

107110
pub trait DocumentProcessor {
108-
async fn process<D>(
111+
fn process<D>(
109112
&self,
110113
storage: &DispatchBackend,
111114
options: &Options,
112115
f: impl Handler<D>,
113-
) -> anyhow::Result<(), DbErr>
116+
) -> impl Future<Output = anyhow::Result<(), DbErr>>
114117
where
115118
D: Document;
116119
}
@@ -180,3 +183,42 @@ macro_rules! sbom {
180183
$crate::handler!(async |$doc: $crate::data::Sbom, $model, $tx| $body)
181184
};
182185
}
186+
187+
pub trait MigratorWithData {
188+
fn data_migrations() -> Vec<Box<dyn MigrationTraitWithData>>;
189+
}
190+
191+
#[derive(Default)]
192+
pub struct Migrations {
193+
all: Vec<Migration>,
194+
}
195+
196+
impl IntoIterator for Migrations {
197+
type Item = Migration;
198+
type IntoIter = std::vec::IntoIter<Self::Item>;
199+
200+
fn into_iter(self) -> Self::IntoIter {
201+
self.all.into_iter()
202+
}
203+
}
204+
205+
pub enum Migration {
206+
Normal(Box<dyn MigrationTrait>),
207+
Data(Box<dyn MigrationTraitWithData>),
208+
}
209+
210+
impl Migrations {
211+
pub fn new() -> Self {
212+
Self::default()
213+
}
214+
215+
pub fn normal(mut self, migration: impl MigrationTrait + 'static) -> Self {
216+
self.all.push(Migration::Normal(Box::new(migration)));
217+
self
218+
}
219+
220+
pub fn data(mut self, migration: impl MigrationTraitWithData + 'static) -> Self {
221+
self.all.push(Migration::Data(Box::new(migration)));
222+
self
223+
}
224+
}

migration/src/data/run.rs

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
use crate::data::{MigratorWithData, Options, SchemaDataManager};
2+
use anyhow::bail;
3+
use sea_orm::{ConnectOptions, Database};
4+
use sea_orm_migration::{IntoSchemaManagerConnection, SchemaManager};
5+
use std::collections::HashMap;
6+
use trustify_module_storage::service::dispatch::DispatchBackend;
7+
8+
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, clap::ValueEnum)]
9+
pub enum Direction {
10+
#[default]
11+
Up,
12+
Down,
13+
}
14+
15+
pub struct Runner {
16+
pub database_url: String,
17+
pub database_schema: Option<String>,
18+
pub storage: DispatchBackend,
19+
pub direction: Direction,
20+
pub migrations: Vec<String>,
21+
pub options: Options,
22+
}
23+
24+
impl Runner {
25+
pub async fn run<M: MigratorWithData>(self) -> anyhow::Result<()> {
26+
let migrations = M::data_migrations()
27+
.into_iter()
28+
.map(|migration| (migration.name().to_string(), migration))
29+
.collect::<HashMap<_, _>>();
30+
31+
let mut running = vec![];
32+
33+
for migration in self.migrations {
34+
let Some(migration) = migrations.get(&migration) else {
35+
bail!("Migration '{migration}' not found");
36+
};
37+
running.push(migration);
38+
}
39+
40+
let schema = self.database_schema.unwrap_or_else(|| "public".to_owned());
41+
42+
let connect_options = ConnectOptions::new(self.database_url)
43+
.set_schema_search_path(schema)
44+
.to_owned();
45+
46+
let db = Database::connect(connect_options).await?;
47+
48+
let manager = SchemaManager::new(db.into_schema_manager_connection());
49+
let manager = SchemaDataManager::new(&manager, &self.storage, &self.options);
50+
51+
for run in running {
52+
tracing::info!(name = run.name(), "Running data migration");
53+
54+
match self.direction {
55+
Direction::Up => run.up(&manager).await?,
56+
Direction::Down => run.down(&manager).await?,
57+
}
58+
}
59+
60+
Ok(())
61+
}
62+
}

migration/src/lib.rs

Lines changed: 8 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
1+
use crate::data::{
2+
Migration, MigrationTraitWithData, MigrationWithData, Migrations, MigratorWithData,
3+
};
14
pub use sea_orm_migration::prelude::*;
25

3-
mod data;
4-
pub use crate::data::{MigrationTraitWithData, MigrationWithData, Options, SchemaDataManager};
6+
pub mod data;
57

68
mod m0000010_init;
79
mod m0000020_add_sbom_group;
@@ -30,41 +32,6 @@ mod m0001120_sbom_external_node_indexes;
3032
mod m0001130_gover_cmp;
3133
mod m0001140_example_data_migration;
3234

33-
#[derive(Default)]
34-
pub struct Migrations {
35-
all: Vec<Migration>,
36-
}
37-
38-
impl IntoIterator for Migrations {
39-
type Item = Migration;
40-
type IntoIter = std::vec::IntoIter<Self::Item>;
41-
42-
fn into_iter(self) -> Self::IntoIter {
43-
self.all.into_iter()
44-
}
45-
}
46-
47-
pub enum Migration {
48-
Normal(Box<dyn MigrationTrait>),
49-
Data(Box<dyn MigrationTraitWithData>),
50-
}
51-
52-
impl Migrations {
53-
pub fn new() -> Self {
54-
Self::default()
55-
}
56-
57-
pub fn normal(mut self, migration: impl MigrationTrait + 'static) -> Self {
58-
self.all.push(Migration::Normal(Box::new(migration)));
59-
self
60-
}
61-
62-
pub fn data(mut self, migration: impl MigrationTraitWithData + 'static) -> Self {
63-
self.all.push(Migration::Data(Box::new(migration)));
64-
self
65-
}
66-
}
67-
6835
pub struct Migrator;
6936

7037
impl Migrator {
@@ -97,8 +64,10 @@ impl Migrator {
9764
.normal(m0001130_gover_cmp::Migration)
9865
.data(m0001140_example_data_migration::Migration)
9966
}
67+
}
10068

101-
pub fn data_migrations() -> Vec<Box<dyn MigrationTraitWithData>> {
69+
impl MigratorWithData for Migrator {
70+
fn data_migrations() -> Vec<Box<dyn MigrationTraitWithData>> {
10271
Self::migrations()
10372
.into_iter()
10473
.filter_map(|migration| match migration {
@@ -112,6 +81,7 @@ impl Migrator {
11281
#[async_trait::async_trait]
11382
impl MigratorTrait for Migrator {
11483
fn migrations() -> Vec<Box<dyn MigrationTrait>> {
84+
// Get all migrations, wrap data migrations. This will initialize the storage config.
11585
Self::migrations()
11686
.into_iter()
11787
.map(|migration| match migration {

0 commit comments

Comments
 (0)