Skip to content

Commit

Permalink
Merge pull request #2 from civitaspo/fix-migration-error
Browse files Browse the repository at this point in the history
Set baselineOnMigrate=true
  • Loading branch information
civitaspo authored Oct 7, 2019
2 parents a800c49 + 22eca28 commit 0fdc487
Show file tree
Hide file tree
Showing 6 changed files with 167 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,29 @@ package pro.civitaspo.digdag.plugin.pg_lock.pg

import javax.sql.DataSource
import org.flywaydb.core.Flyway
import pro.civitaspo.digdag.plugin.pg_lock.pg.migration.{V0_0_1_1__CreateTable_digdag_pg_locks, V0_0_1_2__CreateIndex_digdag_pg_locks_idx_expires_on, V0_0_1_3__CreateIndex_digdag_pg_locks_idx_named_lock}
import pro.civitaspo.digdag.plugin.pg_lock.pg.migration.{V0_0_0__Baseline_do_nothing, V0_0_1_1__CreateTable_digdag_pg_locks, V0_0_1_2__CreateIndex_digdag_pg_locks_idx_expires_on, V0_0_1_3__CreateIndex_digdag_pg_locks_idx_named_lock}


case class PgLockPgDatabaseMigrator(config: PgLockPgConfig,
ds: DataSource)
{
private val v0_0_0__Baseline = new V0_0_0__Baseline_do_nothing()
private val v0_0_1_1__CreateTable_digdag_pg_locks = new V0_0_1_1__CreateTable_digdag_pg_locks()
private val v0_0_1_2__CreateIndex_digdag_pg_locks_idx_expires_on = new V0_0_1_2__CreateIndex_digdag_pg_locks_idx_expires_on()
private val v0_0_1_3__CreateIndex_digdag_pg_locks_idx_named_lock = new V0_0_1_3__CreateIndex_digdag_pg_locks_idx_named_lock()

def migrate(): Unit =
{
Flyway.configure()
.baselineOnMigrate(true)
.dataSource(ds)
.table(config.schemaMigrationHistoryTable)
.baselineVersion(v0_0_0__Baseline.getVersion)
.javaMigrations(
new V0_0_1_1__CreateTable_digdag_pg_locks(),
new V0_0_1_2__CreateIndex_digdag_pg_locks_idx_expires_on(),
new V0_0_1_3__CreateIndex_digdag_pg_locks_idx_named_lock()
v0_0_0__Baseline,
v0_0_1_1__CreateTable_digdag_pg_locks,
v0_0_1_2__CreateIndex_digdag_pg_locks_idx_expires_on,
v0_0_1_3__CreateIndex_digdag_pg_locks_idx_named_lock
)
.load()
.migrate()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package pro.civitaspo.digdag.plugin.pg_lock.pg.migration


import org.flywaydb.core.api.migration.{BaseJavaMigration, Context}

import scala.util.Using


class V0_0_0__Baseline_do_nothing
extends BaseJavaMigration
{
override def migrate(context: Context): Unit =
{
Using(context.getConnection.prepareStatement("SELECT 1")) { stmt =>
stmt.execute()
}

}
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package pro.civitaspo.digdag.plugin.pg_lock.pg.migration


import org.flywaydb.core.api.migration.{BaseJavaMigration, Context}

import scala.util.Using


class V0_0_1_1__CreateTable_digdag_pg_locks extends BaseJavaMigration
class V0_0_1_1__CreateTable_digdag_pg_locks
extends BaseJavaMigration
{
override def migrate(context: Context): Unit =
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package pro.civitaspo.digdag.plugin.pg_lock.pg.migration


import org.flywaydb.core.api.migration.{BaseJavaMigration, Context}

import scala.util.Using
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package pro.civitaspo.digdag.plugin.pg_lock.pg.migration


import org.flywaydb.core.api.migration.{BaseJavaMigration, Context}

import scala.util.Using
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,12 @@ class PgLockPluginTest
Using.resource(getJdbcPostgresConnection) { conn =>
// recreate database
Using.resource(conn.createStatement()) { stmt =>
stmt.execute( // Need to kill all processes to force dropping database
s"""
|SELECT pg_terminate_backend(pg_stat_activity.pid)
| FROM pg_stat_activity
| WHERE pg_stat_activity.datname = '${p("database")}'
| AND pid <> pg_backend_pid()
stmt.execute( // Need to kill all processes to force dropping database
s"""
|SELECT pg_terminate_backend(pg_stat_activity.pid)
| FROM pg_stat_activity
| WHERE pg_stat_activity.datname = '${p("database")}'
| AND pid <> pg_backend_pid()
""".stripMargin)
stmt.executeUpdate(s"DROP DATABASE IF EXISTS ${p("database")}")
stmt.executeUpdate(s"CREATE DATABASE ${p("database")}")
Expand Down Expand Up @@ -200,6 +200,25 @@ class PgLockPluginTest
}
}

Using.resource(getJdbcPgLockConnection) { conn =>
Using.resource(conn.createStatement()) { stmt =>
val rs: ResultSet = stmt.executeQuery(
s"""
|SELECT description
| FROM pg_lock_schema_migrations
| WHERE version = '0.0.0'
| LIMIT 1
""".stripMargin)

val description = Iterator.continually(rs)
.takeWhile(_.next())
.map(_.getString("description"))
.toSeq
.head
assert(description === "Baseline do nothing")
}
}

Using.resource(getJdbcPgLockConnection) { conn =>
Using.resource(conn.createStatement()) { stmt =>
val rs: ResultSet = stmt.executeQuery(
Expand Down Expand Up @@ -249,7 +268,112 @@ class PgLockPluginTest

}

behavior of "the pg_lock> operator"
it should "migrate tables when digdag tables already exists" in {
Using.resource(getJdbcPgLockConnection) { conn =>
Using.resource(conn.createStatement()) { stmt =>
stmt.executeUpdate("CREATE TABLE digdag_table (id integer)")
}
}
val digString = readResource("/simple.dig")

val status: CommandStatus = digdagRun(
projectPath = tmpDir.toPath,
configString = defaultSystemConfig,
digString = digString
)

assert(status.code === 0)

Using.resource(getJdbcPgLockConnection) { conn =>
Using.resource(conn.createStatement()) { stmt =>
val rs: ResultSet = stmt.executeQuery(
s"""
|SELECT table_name
| FROM information_schema.tables
| WHERE table_catalog = '${p("database")}'
| AND table_name IN ('digdag_pg_locks', 'pg_lock_schema_migrations')
| ORDER
| BY 1
""".stripMargin)
val tableNames: Seq[String] = Iterator.continually(rs)
.takeWhile(_.next())
.map(_.getString("table_name"))
.toSeq

assert(tableNames.size === 2)
assert(tableNames(0) === "digdag_pg_locks")
assert(tableNames(1) === "pg_lock_schema_migrations")
}
}

Using.resource(getJdbcPgLockConnection) { conn =>
Using.resource(conn.createStatement()) { stmt =>
val rs: ResultSet = stmt.executeQuery(
s"""
|SELECT description
| FROM pg_lock_schema_migrations
| WHERE version = '0.0.0'
| LIMIT 1
""".stripMargin)

val description = Iterator.continually(rs)
.takeWhile(_.next())
.map(_.getString("description"))
.toSeq
.head
assert(description === "<< Flyway Baseline >>")
}
}

Using.resource(getJdbcPgLockConnection) { conn =>
Using.resource(conn.createStatement()) { stmt =>
val rs: ResultSet = stmt.executeQuery(
s"""
| SELECT db.datname AS database_name
| , t.relname AS table_name
| , i.relname AS index_name
| , a.attname AS column_name
| , a.attnum AS column_pos
| FROM pg_class t
| , pg_class i
| , pg_index ix
| , pg_attribute a
| , pg_database db
| WHERE t.oid = ix.indrelid
| AND i.oid = ix.indexrelid
| AND t.relowner = db.datdba
| AND a.attrelid = t.oid
| AND a.attnum = ANY(ix.indkey)
| AND t.relkind = 'r'
| AND t.relname = 'digdag_pg_locks'
| AND db.datname = 'digdag'
| ORDER BY t.relname
| , i.relname
| , a.attnum
""".stripMargin)

val idxCols: Map[String, Seq[String]] = Iterator.continually(rs)
.takeWhile(_.next())
.foldLeft(Map[String, Seq[String]]()) { (result,
rs) =>
val idxName: String = rs.getString("index_name")
val cols: Seq[String] = result.getOrElse(idxName, Seq()) :+ rs.getString("column_name")
result.updated(idxName, cols)
}

assert(idxCols.contains("digdag_pg_locks_pkey"))
assert(idxCols.getOrElse("digdag_pg_locks_pkey", Seq()) === Seq("id"))

assert(idxCols.contains("digdag_pg_locks_idx_named_locks"))
assert(idxCols.getOrElse("digdag_pg_locks_idx_named_locks", Seq()) === Seq("namespace_type", "namespace_value", "name"))

assert(idxCols.contains("digdag_pg_locks_idx_expires_on"))
assert(idxCols.getOrElse("digdag_pg_locks_idx_expires_on", Seq()) === Seq("expires_on"))
}
}
}

behavior of "the pg_lock> operator"
it should "fail with wait_timeout: 0s if another task locks" in {
val digString = readResource("/wait-timeout.dig")

Expand Down

0 comments on commit 0fdc487

Please sign in to comment.