Skip to content

Commit

Permalink
fix TestMigrate
Browse files Browse the repository at this point in the history
  • Loading branch information
meiji163 committed Oct 16, 2024
1 parent 3f47ebd commit e81aabf
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 68 deletions.
38 changes: 19 additions & 19 deletions go/binlog/gomysql_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,29 +31,29 @@ func getMigrationContext() *base.MigrationContext {
Port: 3306,
},
User: "root",
Password: "",
Password: "root",
}
migrationContext.SetConnectionConfig("innodb")
migrationContext.ReplicaServerId = 99999
return migrationContext
}

func prepareDatabase(t *testing.T, db *sql.DB) {
_, err := db.Exec("DROP DATABASE testing")
_, err := db.Exec("DROP DATABASE test")
require.NoError(t, err)

_, err = db.Exec("CREATE DATABASE testing")
_, err = db.Exec("CREATE DATABASE test")
require.NoError(t, err)

_, err = db.Exec("CREATE TABLE testing.gh_ost_test (id int NOT NULL AUTO_INCREMENT, name varchar(255), PRIMARY KEY (id)) ENGINE=InnoDB")
_, err = db.Exec("CREATE TABLE test.gh_ost_test (id int NOT NULL AUTO_INCREMENT, name varchar(255), PRIMARY KEY (id)) ENGINE=InnoDB")
require.NoError(t, err)

_, err = db.Exec("CREATE TABLE testing.gh_ost_test2 (id int NOT NULL AUTO_INCREMENT, name varchar(255), PRIMARY KEY (id)) ENGINE=InnoDB")
_, err = db.Exec("CREATE TABLE test.gh_ost_test2 (id int NOT NULL AUTO_INCREMENT, name varchar(255), PRIMARY KEY (id)) ENGINE=InnoDB")
require.NoError(t, err)
}

func TestStreamTransactionSingleAutoCommitChange(t *testing.T) {
db, err := sql.Open("mysql", "root:@/")
db, err := sql.Open("mysql", "root:root@/")
require.NoError(t, err)
defer db.Close()

Expand All @@ -62,7 +62,7 @@ func TestStreamTransactionSingleAutoCommitChange(t *testing.T) {
binlogCoordinates := getCurrentBinlogCoordinates(t, db)

migrationContext := getMigrationContext()
migrationContext.DatabaseName = "testing"
migrationContext.DatabaseName = "test"
migrationContext.OriginalTableName = "gh_ost_test"
migrationContext.AlterStatement = "ALTER TABLE gh_ost_test ENGINE=InnoDB"

Expand All @@ -79,7 +79,7 @@ func TestStreamTransactionSingleAutoCommitChange(t *testing.T) {
require.Equal(t, err, context.Canceled)
}()

_, err = db.Exec("INSERT INTO testing.gh_ost_test (name) VALUES ('test')")
_, err = db.Exec("INSERT INTO test.gh_ost_test (name) VALUES ('test')")
require.NoError(t, err)

tx := <-transactionsChan
Expand All @@ -96,7 +96,7 @@ func TestStreamTransactionSingleAutoCommitChange(t *testing.T) {
}

func TestStreamTransactionSingleChangeInTransaction(t *testing.T) {
db, err := sql.Open("mysql", "root:@/")
db, err := sql.Open("mysql", "root:root@/")
require.NoError(t, err)
defer db.Close()

Expand All @@ -105,7 +105,7 @@ func TestStreamTransactionSingleChangeInTransaction(t *testing.T) {
binlogCoordinates := getCurrentBinlogCoordinates(t, db)

migrationContext := getMigrationContext()
migrationContext.DatabaseName = "testing"
migrationContext.DatabaseName = "test"
migrationContext.OriginalTableName = "gh_ost_test"
migrationContext.AlterStatement = "ALTER TABLE gh_ost_test ENGINE=InnoDB"

Expand All @@ -125,7 +125,7 @@ func TestStreamTransactionSingleChangeInTransaction(t *testing.T) {
sqlTx, err := db.Begin()
require.NoError(t, err)

_, err = sqlTx.Exec("INSERT INTO testing.gh_ost_test (name) VALUES ('test')")
_, err = sqlTx.Exec("INSERT INTO test.gh_ost_test (name) VALUES ('test')")
require.NoError(t, err)

err = sqlTx.Commit()
Expand All @@ -145,7 +145,7 @@ func TestStreamTransactionSingleChangeInTransaction(t *testing.T) {
}

func TestStreamTransactionMultipleChangesInTransaction(t *testing.T) {
db, err := sql.Open("mysql", "root:@/")
db, err := sql.Open("mysql", "root:root@/")
require.NoError(t, err)
defer db.Close()

Expand All @@ -154,7 +154,7 @@ func TestStreamTransactionMultipleChangesInTransaction(t *testing.T) {
binlogCoordinates := getCurrentBinlogCoordinates(t, db)

migrationContext := getMigrationContext()
migrationContext.DatabaseName = "testing"
migrationContext.DatabaseName = "test"
migrationContext.OriginalTableName = "gh_ost_test"
migrationContext.AlterStatement = "ALTER TABLE gh_ost_test ENGINE=InnoDB"

Expand All @@ -173,13 +173,13 @@ func TestStreamTransactionMultipleChangesInTransaction(t *testing.T) {
sqlTx, err := db.Begin()
require.NoError(t, err)

_, err = sqlTx.Exec("INSERT INTO testing.gh_ost_test (name) VALUES ('test1')")
_, err = sqlTx.Exec("INSERT INTO test.gh_ost_test (name) VALUES ('test1')")
require.NoError(t, err)

_, err = sqlTx.Exec("INSERT INTO testing.gh_ost_test (name) VALUES ('test2')")
_, err = sqlTx.Exec("INSERT INTO test.gh_ost_test (name) VALUES ('test2')")
require.NoError(t, err)

_, err = sqlTx.Exec("INSERT INTO testing.gh_ost_test (name) VALUES ('test3')")
_, err = sqlTx.Exec("INSERT INTO test.gh_ost_test (name) VALUES ('test3')")
require.NoError(t, err)

err = sqlTx.Commit()
Expand All @@ -200,7 +200,7 @@ func TestStreamTransactionMultipleChangesInTransaction(t *testing.T) {
}

func TestStreamTransactionWithDDL(t *testing.T) {
db, err := sql.Open("mysql", "root:@/")
db, err := sql.Open("mysql", "root:root@/")
require.NoError(t, err)
defer db.Close()

Expand All @@ -209,7 +209,7 @@ func TestStreamTransactionWithDDL(t *testing.T) {
binlogCoordinates := getCurrentBinlogCoordinates(t, db)

migrationContext := getMigrationContext()
migrationContext.DatabaseName = "testing"
migrationContext.DatabaseName = "test"
migrationContext.OriginalTableName = "gh_ost_test"
migrationContext.AlterStatement = "ALTER TABLE gh_ost_test ENGINE=InnoDB"

Expand All @@ -226,7 +226,7 @@ func TestStreamTransactionWithDDL(t *testing.T) {
require.Equal(t, err, context.Canceled)
}()

_, err = db.Exec("ALTER TABLE testing.gh_ost_test ADD COLUMN age INT")
_, err = db.Exec("ALTER TABLE test.gh_ost_test ADD COLUMN age INT")
require.NoError(t, err)

tx := <-transactionsChan
Expand Down
16 changes: 8 additions & 8 deletions go/logic/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
)

func TestCoordinator(t *testing.T) {
db, err := gosql.Open("mysql", "root:@/")
db, err := gosql.Open("mysql", "root:root@/")
require.NoError(t, err)

t.Cleanup(func() {
Expand All @@ -27,12 +27,12 @@ func TestCoordinator(t *testing.T) {

prepareDatabase(t, db)

_, err = db.Exec("CREATE TABLE testing._gh_ost_test_gho (id INT PRIMARY KEY AUTO_INCREMENT, name VARCHAR(255))")
_, err = db.Exec("CREATE TABLE test._gh_ost_test_gho (id INT PRIMARY KEY AUTO_INCREMENT, name VARCHAR(255))")
require.NoError(t, err)

migrationContext := base.NewMigrationContext()
migrationContext.Hostname = "localhost"
migrationContext.DatabaseName = "testing"
migrationContext.DatabaseName = "test"
migrationContext.OriginalTableName = "gh_ost_test"
migrationContext.AlterStatement = "ALTER TABLE gh_ost_test ENGINE=InnoDB"
migrationContext.AllowedRunningOnMaster = true
Expand All @@ -48,7 +48,7 @@ func TestCoordinator(t *testing.T) {
Port: 3306,
},
User: "root",
Password: "",
Password: "root",
}

migrationContext.InspectorConnectionConfig = &mysql.ConnectionConfig{
Expand All @@ -57,7 +57,7 @@ func TestCoordinator(t *testing.T) {
Port: 3306,
},
User: "root",
Password: "",
Password: "root",
}

migrationContext.OriginalTableColumns = sql.NewColumnList([]string{"id", "name"})
Expand Down Expand Up @@ -87,18 +87,18 @@ func TestCoordinator(t *testing.T) {
require.NoError(t, err)

for j := 0; j < 100; j++ {
_, err = tx.Exec("INSERT INTO testing.gh_ost_test (name) VALUES ('test')")
_, err = tx.Exec("INSERT INTO test.gh_ost_test (name) VALUES ('test')")
require.NoError(t, err)
}

err = tx.Commit()
require.NoError(t, err)
}

_, err = db.Exec("UPDATE testing.gh_ost_test SET name = 'foobar' WHERE id = 1")
_, err = db.Exec("UPDATE test.gh_ost_test SET name = 'foobar' WHERE id = 1")
require.NoError(t, err)

_, err = db.Exec("INSERT INTO testing.gh_ost_test (name) VALUES ('test')")
_, err = db.Exec("INSERT INTO test.gh_ost_test (name) VALUES ('test')")
require.NoError(t, err)

_, err = applier.WriteChangelogState("completed")
Expand Down
74 changes: 35 additions & 39 deletions go/logic/migrator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/github/gh-ost/go/binlog"
"github.com/github/gh-ost/go/mysql"
"github.com/github/gh-ost/go/sql"
"sync"
)

func TestMigratorOnChangelogEvent(t *testing.T) {
Expand Down Expand Up @@ -267,18 +268,18 @@ func prepareDatabase(t *testing.T, db *gosql.DB) {
_, err = db.Exec("SET @@GLOBAL. binlog_transaction_dependency_tracking = WRITESET")
require.NoError(t, err)

_, err = db.Exec("DROP DATABASE IF EXISTS testing")
_, err = db.Exec("DROP DATABASE test")
require.NoError(t, err)

_, err = db.Exec("CREATE DATABASE testing")
_, err = db.Exec("CREATE DATABASE test")
require.NoError(t, err)

_, err = db.Exec("CREATE TABLE testing.gh_ost_test (id INT PRIMARY KEY AUTO_INCREMENT, name VARCHAR(255)) ENGINE=InnoDB")
_, err = db.Exec("CREATE TABLE test.gh_ost_test (id INT PRIMARY KEY AUTO_INCREMENT, name VARCHAR(255)) ENGINE=InnoDB")
require.NoError(t, err)
}

func TestMigrate(t *testing.T) {
db, err := gosql.Open("mysql", "root:@/")
db, err := gosql.Open("mysql", "root:root@/")
require.NoError(t, err)

t.Cleanup(func() {
Expand All @@ -291,7 +292,7 @@ func TestMigrate(t *testing.T) {

migrationContext := base.NewMigrationContext()
migrationContext.Hostname = "localhost"
migrationContext.DatabaseName = "testing"
migrationContext.DatabaseName = "test"
migrationContext.OriginalTableName = "gh_ost_test"
migrationContext.AlterStatement = "ALTER TABLE gh_ost_test ENGINE=InnoDB"
migrationContext.AllowedRunningOnMaster = true
Expand All @@ -306,7 +307,7 @@ func TestMigrate(t *testing.T) {
Port: 3306,
},
User: "root",
Password: "",
Password: "root",
}

migrationContext.SetConnectionConfig("innodb")
Expand All @@ -317,73 +318,68 @@ func TestMigrate(t *testing.T) {

rowsWritten := atomic.Int32{}

go func() {
ticker := time.NewTicker(time.Millisecond)
var wg sync.WaitGroup

go func() {
wg.Add(1)

Check failure on line 324 in go/logic/migrator_test.go

View workflow job for this annotation

GitHub Actions / lint

SA2000: should call wg.Add(1) before starting the goroutine to avoid a race (staticcheck)
defer wg.Done()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
// fmt.Printf("Inserting row\n")
_, err := db.Exec("INSERT INTO testing.gh_ost_test (name) VALUES ('test')")
default:
_, err := db.ExecContext(ctx, "INSERT INTO test.gh_ost_test (name) VALUES ('test')")
if errors.Is(err, context.Canceled) {
return
}
require.NoError(t, err)

rowsWritten.Add(1)

time.Sleep(time.Millisecond)
}
}
}()

go func() {
ticker := time.NewTicker(time.Millisecond)

wg.Add(1)

Check failure on line 344 in go/logic/migrator_test.go

View workflow job for this annotation

GitHub Actions / lint

SA2000: should call wg.Add(1) before starting the goroutine to avoid a race (staticcheck)
defer wg.Done()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
// fmt.Printf("Inserting row\n")
tx, err := db.Begin()
default:
tx, err := db.BeginTx(ctx, &gosql.TxOptions{})
if errors.Is(err, context.Canceled) {
return
} else if err != nil {
fmt.Println(err.Error())
}
require.NoError(t, err)

for i := 0; i < 10; i++ {
_, err := tx.Exec("INSERT INTO testing.gh_ost_test (name) VALUES ('test')")
_, err = tx.ExecContext(ctx, "INSERT INTO test.gh_ost_test (name) VALUES ('test')")
if errors.Is(err, context.Canceled) {
tx.Rollback()
return
}
require.NoError(t, err)

rowsWritten.Add(1)
}
tx.Commit()

err = tx.Commit()
require.NoError(t, err)
time.Sleep(time.Millisecond)
}
}
}()

// go func() {
// ticker := time.NewTicker(time.Millisecond)

// for {
// select {
// case <-ctx.Done():
// return
// case <-ticker.C:
// // fmt.Printf("Inserting row\n")
// _, err := db.Exec("INSERT INTO testing.gh_ost_test (name) VALUES ('test')")
// require.NoError(t, err)

// rowsWritten.Add(1)
// }
// }
// }()

go func() {
time.Sleep(10 * time.Second)
cancel()
}()

err = migrator.Migrate()
require.NoError(t, err)
wg.Wait()

fmt.Printf("Rows written: %d\n", rowsWritten.Load())

}
4 changes: 2 additions & 2 deletions go/logic/streamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ func TestStreamEvents(t *testing.T) {
Port: 3306,
},
User: "root",
Password: "",
Password: "root",
}
migrationContext.SetConnectionConfig("innodb")
migrationContext.ReplicaServerId = 99999
migrationContext.DatabaseName = "testing"
migrationContext.DatabaseName = "test"
migrationContext.OriginalTableName = "gh_ost_test"
migrationContext.AlterStatement = "ALTER TABLE gh_ost_test ENGINE=InnoDB"

Expand Down

0 comments on commit e81aabf

Please sign in to comment.