Skip to content

Commit a6f20d8

Browse files
committed
feat: add a way to run data migrations individually
1 parent d949397 commit a6f20d8

File tree

6 files changed

+293
-71
lines changed

6 files changed

+293
-71
lines changed

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

migration/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ serde-cyclonedx = { workspace = true }
2424
serde_json = { workspace = true }
2525
spdx-rs = { workspace = true }
2626
tokio = { workspace = true, features = ["full"] }
27+
tracing = { workspace = true }
28+
tracing-subscriber = { workspace = true }
2729
uuid = { workspace = true, features = ["v5"] }
2830

2931
[dev-dependencies]

migration/src/bin/data.rs

Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
use anyhow::bail;
2+
use clap::Parser;
3+
use migration::{Migrator, Options, SchemaDataManager};
4+
use sea_orm::{ConnectOptions, Database};
5+
use sea_orm_migration::{IntoSchemaManagerConnection, SchemaManager};
6+
use std::collections::HashMap;
7+
use trustify_module_storage::config::StorageConfig;
8+
9+
#[derive(clap::Parser, Debug, Clone)]
10+
struct Cli {
11+
#[command(subcommand)]
12+
command: Command,
13+
}
14+
15+
#[derive(clap::Subcommand, Debug, Clone)]
16+
enum Command {
17+
/// List all data migrations
18+
List,
19+
/// Run a list of migrations
20+
Run(Run),
21+
}
22+
23+
#[derive(clap::Args, Debug, Clone)]
24+
struct Run {
25+
/// Migration direction to run
26+
#[arg(
27+
long,
28+
value_enum,
29+
default_value_t = Direction::Up,
30+
overrides_with = "down"
31+
)]
32+
direction: Direction,
33+
34+
/// Shortcut for `--direction down`
35+
#[arg(long, action = clap::ArgAction::SetTrue, overrides_with = "direction")]
36+
down: bool,
37+
38+
// from sea_orm
39+
#[arg(
40+
global = true,
41+
short = 's',
42+
long,
43+
env = "DATABASE_SCHEMA",
44+
long_help = "Database schema\n \
45+
- For MySQL and SQLite, this argument is ignored.\n \
46+
- For PostgreSQL, this argument is optional with default value 'public'.\n"
47+
)]
48+
database_schema: Option<String>,
49+
50+
// from sea_orm
51+
#[arg(
52+
global = true,
53+
short = 'u',
54+
long,
55+
env = "DATABASE_URL",
56+
help = "Database URL"
57+
)]
58+
database_url: Option<String>,
59+
60+
#[arg()]
61+
migrations: Vec<String>,
62+
63+
#[command(flatten)]
64+
options: Options,
65+
66+
#[command(flatten)]
67+
storage: StorageConfig,
68+
}
69+
70+
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, clap::ValueEnum)]
71+
pub enum Direction {
72+
#[default]
73+
Up,
74+
Down,
75+
}
76+
77+
impl Run {
78+
fn direction(&self) -> Direction {
79+
if self.down {
80+
Direction::Down
81+
} else {
82+
self.direction
83+
}
84+
}
85+
86+
pub async fn run(self) -> anyhow::Result<()> {
87+
let direction = self.direction();
88+
89+
let migrations = Migrator::data_migrations()
90+
.into_iter()
91+
.map(|migration| (migration.name().to_string(), migration))
92+
.collect::<HashMap<_, _>>();
93+
94+
let mut running = vec![];
95+
96+
for migration in self.migrations {
97+
let Some(migration) = migrations.get(&migration) else {
98+
bail!("Migration {migration} not found");
99+
};
100+
running.push(migration);
101+
}
102+
103+
let storage = self.storage.into_storage(false).await?;
104+
105+
let url = self
106+
.database_url
107+
.expect("Environment variable 'DATABASE_URL' not set");
108+
let schema = self.database_schema.unwrap_or_else(|| "public".to_owned());
109+
110+
let connect_options = ConnectOptions::new(url)
111+
.set_schema_search_path(schema)
112+
.to_owned();
113+
114+
let db = Database::connect(connect_options).await?;
115+
116+
let manager = SchemaManager::new(db.into_schema_manager_connection());
117+
let manager = SchemaDataManager::new(&manager, &storage, &self.options);
118+
119+
for run in running {
120+
tracing::info!("Running data migration: {}", run.name());
121+
122+
match direction {
123+
Direction::Up => run.up(&manager).await?,
124+
Direction::Down => run.down(&manager).await?,
125+
}
126+
}
127+
128+
Ok(())
129+
}
130+
}
131+
132+
impl Command {
133+
pub async fn run(self) -> anyhow::Result<()> {
134+
match self {
135+
Command::Run(run) => run.run().await,
136+
Command::List => {
137+
for m in Migrator::data_migrations() {
138+
println!("{}", m.name());
139+
}
140+
Ok(())
141+
}
142+
}
143+
}
144+
}
145+
146+
#[tokio::main]
147+
async fn main() {
148+
let cli = Cli::parse();
149+
150+
tracing_subscriber::fmt::init();
151+
152+
cli.command.run().await.unwrap();
153+
}

migration/src/data/migration.rs

Lines changed: 41 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,21 @@
11
use crate::{
22
async_trait,
3-
data::{Document, DocumentProcessor, Handler},
3+
data::{Document, DocumentProcessor, Handler, Options},
44
};
55
use clap::Parser;
66
use sea_orm::DbErr;
77
use sea_orm_migration::{MigrationName, MigrationTrait, SchemaManager};
88
use std::{ffi::OsString, sync::LazyLock};
99
use trustify_module_storage::{config::StorageConfig, service::dispatch::DispatchBackend};
1010

11-
pub struct MigrationWithData<M>
12-
where
13-
M: MigrationTraitWithData,
14-
{
11+
pub struct MigrationWithData {
1512
pub storage: DispatchBackend,
16-
pub migration: M,
13+
pub options: Options,
14+
pub migration: Box<dyn MigrationTraitWithData>,
1715
}
1816

1917
static STORAGE: LazyLock<DispatchBackend> = LazyLock::new(init_storage);
18+
static OPTIONS: LazyLock<Options> = LazyLock::new(init_options);
2019

2120
#[allow(clippy::expect_used)]
2221
fn init_storage() -> DispatchBackend {
@@ -33,76 +32,84 @@ fn init_storage() -> DispatchBackend {
3332
})
3433
}
3534

36-
impl<M> MigrationWithData<M>
37-
where
38-
M: MigrationTraitWithData,
39-
{
35+
fn init_options() -> Options {
36+
// create from env-vars only
37+
Options::parse_from::<_, OsString>(vec![])
38+
}
39+
40+
impl MigrationWithData {
4041
#[allow(clippy::expect_used)]
41-
pub fn new(migration: M) -> Self {
42+
pub fn new(migration: Box<dyn MigrationTraitWithData>) -> Self {
4243
Self {
4344
storage: STORAGE.clone(),
45+
options: OPTIONS.clone(),
4446
migration,
4547
}
4648
}
4749
}
4850

49-
impl<M> From<M> for MigrationWithData<M>
51+
impl<M> From<M> for MigrationWithData
5052
where
51-
M: MigrationTraitWithData,
53+
M: MigrationTraitWithData + 'static,
5254
{
5355
fn from(value: M) -> Self {
54-
MigrationWithData::new(value)
56+
MigrationWithData::new(Box::new(value))
5557
}
5658
}
5759

5860
pub struct SchemaDataManager<'c> {
5961
pub manager: &'c SchemaManager<'c>,
6062
storage: &'c DispatchBackend,
63+
options: &'c Options,
6164
}
6265

6366
impl<'c> SchemaDataManager<'c> {
64-
pub fn new(manager: &'c SchemaManager<'c>, storage: &'c DispatchBackend) -> Self {
65-
Self { manager, storage }
67+
pub fn new(
68+
manager: &'c SchemaManager<'c>,
69+
storage: &'c DispatchBackend,
70+
options: &'c Options,
71+
) -> Self {
72+
Self {
73+
manager,
74+
storage,
75+
options,
76+
}
6677
}
6778

6879
pub async fn process<D>(&self, f: impl Handler<D>) -> Result<(), DbErr>
6980
where
7081
D: Document,
7182
{
72-
self.manager
73-
.process(self.storage, Default::default(), f)
74-
.await
83+
self.manager.process(self.storage, self.options, f).await
7584
}
7685
}
7786

7887
#[async_trait::async_trait]
79-
pub trait MigrationTraitWithData {
88+
pub trait MigrationTraitWithData: MigrationName + Send + Sync {
8089
async fn up(&self, manager: &SchemaDataManager) -> Result<(), DbErr>;
8190
async fn down(&self, manager: &SchemaDataManager) -> Result<(), DbErr>;
8291
}
8392

8493
#[async_trait::async_trait]
85-
impl<M> MigrationTrait for MigrationWithData<M>
86-
where
87-
M: MigrationTraitWithData + MigrationName + Send + Sync,
88-
{
94+
impl MigrationTrait for MigrationWithData {
8995
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
90-
self.migration
91-
.up(&SchemaDataManager::new(manager, &self.storage))
92-
.await
96+
MigrationTraitWithData::up(
97+
&*self.migration,
98+
&SchemaDataManager::new(manager, &self.storage, &self.options),
99+
)
100+
.await
93101
}
94102

95103
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
96-
self.migration
97-
.down(&SchemaDataManager::new(manager, &self.storage))
98-
.await
104+
MigrationTraitWithData::down(
105+
&*self.migration,
106+
&SchemaDataManager::new(manager, &self.storage, &self.options),
107+
)
108+
.await
99109
}
100110
}
101111

102-
impl<M> MigrationName for MigrationWithData<M>
103-
where
104-
M: MigrationTraitWithData + MigrationName + Send + Sync,
105-
{
112+
impl MigrationName for MigrationWithData {
106113
fn name(&self) -> &str {
107114
self.migration.name()
108115
}

migration/src/data/mod.rs

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -90,8 +90,9 @@ where
9090
) -> anyhow::Result<()>;
9191
}
9292

93-
#[derive(Clone, Debug, PartialEq, Eq)]
93+
#[derive(Clone, Debug, PartialEq, Eq, clap::Parser)]
9494
pub struct Options {
95+
#[arg(long, env = "MIGRATION_DATA_CONCURRENT", default_value = "5")]
9596
pub concurrent: NonZeroUsize,
9697
}
9798

@@ -107,7 +108,7 @@ pub trait DocumentProcessor {
107108
async fn process<D>(
108109
&self,
109110
storage: &DispatchBackend,
110-
options: Options,
111+
options: &Options,
111112
f: impl Handler<D>,
112113
) -> anyhow::Result<(), DbErr>
113114
where
@@ -118,34 +119,37 @@ impl<'c> DocumentProcessor for SchemaManager<'c> {
118119
async fn process<D>(
119120
&self,
120121
storage: &DispatchBackend,
121-
options: Options,
122+
options: &Options,
122123
f: impl Handler<D>,
123124
) -> Result<(), DbErr>
124125
where
125126
D: Document,
126127
{
127128
let db = self.get_connection();
128-
let tx = db.begin().await?;
129129

130+
let tx = db.begin().await?;
130131
let all = D::all(&tx).await?;
132+
drop(tx);
131133

132134
stream::iter(all)
133135
.map(async |model| {
136+
let tx = db.begin().await?;
137+
134138
let doc = D::source(&model, storage, &tx).await.map_err(|err| {
135139
DbErr::Migration(format!("Failed to load source document: {err}"))
136140
})?;
137141
f.call(doc, model, &tx).await.map_err(|err| {
138142
DbErr::Migration(format!("Failed to process document: {err}"))
139143
})?;
140144

145+
tx.commit().await?;
146+
141147
Ok::<_, DbErr>(())
142148
})
143149
.buffer_unordered(options.concurrent.into())
144150
.try_collect::<Vec<_>>()
145151
.await?;
146152

147-
tx.commit().await?;
148-
149153
Ok(())
150154
}
151155
}

0 commit comments

Comments
 (0)