diff --git a/go/binlog/gomysql_reader_test.go b/go/binlog/gomysql_reader_test.go deleted file mode 100644 index d09379e30..000000000 --- a/go/binlog/gomysql_reader_test.go +++ /dev/null @@ -1,313 +0,0 @@ -package binlog - -import ( - "context" - "database/sql" - "testing" - - "github.com/github/gh-ost/go/base" - "github.com/github/gh-ost/go/mysql" - "github.com/stretchr/testify/require" - "github.com/stretchr/testify/suite" - "github.com/testcontainers/testcontainers-go" - "github.com/testcontainers/testcontainers-go/wait" -) - -func getCurrentBinlogCoordinates(t *testing.T, db *sql.DB) mysql.BinlogCoordinates { - var file string - var position int64 - var binlogDoDb string - var binlogIgnoreDb string - var executedGtidSet string - - //nolint:execinquery // SHOW MASTER STATUS returns a result set - err := db.QueryRow("SHOW MASTER STATUS").Scan(&file, &position, &binlogDoDb, &binlogIgnoreDb, &executedGtidSet) - require.NoError(t, err) - - return mysql.BinlogCoordinates{LogFile: file, LogPos: position} -} - -func getMigrationContext(host string) *base.MigrationContext { - migrationContext := base.NewMigrationContext() - migrationContext.InspectorConnectionConfig = &mysql.ConnectionConfig{ - Key: mysql.InstanceKey{ - Hostname: host, - Port: 3306, - }, - User: "root", - Password: "root", - } - migrationContext.SetConnectionConfig("innodb") - migrationContext.ReplicaServerId = 99999 - return migrationContext -} - -func prepareDatabase(t *testing.T, db *sql.DB) { - _, err := db.Exec("CREATE TABLE test.gh_ost_test (id int NOT NULL AUTO_INCREMENT, name varchar(255), PRIMARY KEY (id)) ENGINE=InnoDB") - require.NoError(t, err) - - _, err = db.Exec("CREATE TABLE test.gh_ost_test2 (id int NOT NULL AUTO_INCREMENT, name varchar(255), PRIMARY KEY (id)) ENGINE=InnoDB") - require.NoError(t, err) -} - -type GoMySQLReaderTestSuite struct { - suite.Suite - - mysqlContainer testcontainers.Container -} - -func (suite *GoMySQLReaderTestSuite) SetupSuite() { - ctx := context.Background() - - req := testcontainers.ContainerRequest{ - Image: "mysql:8.0", - Env: map[string]string{"MYSQL_ROOT_PASSWORD": "root"}, - WaitingFor: wait.ForLog("port: 3306 MySQL Community Server - GPL"), - } - - mysqlContainer, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{ - ContainerRequest: req, - Started: true, - }) - suite.Require().NoError(err) - suite.mysqlContainer = mysqlContainer -} - -func (suite *GoMySQLReaderTestSuite) TearDownSuite() { - ctx := context.Background() - suite.Require().NoError(suite.mysqlContainer.Terminate(ctx)) -} - -func (suite *GoMySQLReaderTestSuite) SetupTest() { - ctx := context.Background() - - rc, _, err := suite.mysqlContainer.Exec(ctx, []string{"mysql", "-proot", "-e", "CREATE DATABASE test"}) - suite.Require().NoError(err) - suite.Require().Equal(0, rc, "expected exit code 0") -} - -func (suite *GoMySQLReaderTestSuite) TearDownTest() { - ctx := context.Background() - - rc, _, err := suite.mysqlContainer.Exec(ctx, []string{"mysql", "-proot", "-e", "DROP DATABASE test"}) - suite.Require().NoError(err) - suite.Require().Equal(0, rc, "expected exit code 0") -} - -func (suite *GoMySQLReaderTestSuite) TestStreamTransactionSingleAutoCommitChange() { - t := suite.T() - - ctx := context.Background() - host, err := suite.mysqlContainer.ContainerIP(ctx) - require.NoError(t, err) - - db, err := sql.Open("mysql", "root:root@tcp("+host+")/") - require.NoError(t, err) - defer db.Close() - - prepareDatabase(t, db) - - binlogCoordinates := getCurrentBinlogCoordinates(t, db) - - migrationContext := getMigrationContext(host) - migrationContext.DatabaseName = "test" - migrationContext.OriginalTableName = "gh_ost_test" - migrationContext.AlterStatement = "ALTER TABLE gh_ost_test ENGINE=InnoDB" - - transactionsChan := make(chan *Transaction) - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - go func() { - reader := NewGoMySQLReader(migrationContext) - reader.ConnectBinlogStreamer(binlogCoordinates) - - err := reader.StreamTransactions(ctx, transactionsChan) - require.Equal(t, err, context.Canceled) - }() - - _, err = db.Exec("INSERT INTO test.gh_ost_test (name) VALUES ('test')") - require.NoError(t, err) - - tx := <-transactionsChan - - changes := make([]*BinlogEntry, 0) - for change := range tx.Changes { - changes = append(changes, change) - } - require.Len(t, changes, 1) - - cancel() - close(transactionsChan) - require.Len(t, transactionsChan, 0) -} - -func (suite *GoMySQLReaderTestSuite) TestStreamTransactionSingleChangeInTransaction() { - t := suite.T() - - ctx := context.Background() - host, err := suite.mysqlContainer.ContainerIP(ctx) - require.NoError(t, err) - - db, err := sql.Open("mysql", "root:root@tcp("+host+")/") - require.NoError(t, err) - defer db.Close() - - prepareDatabase(t, db) - - binlogCoordinates := getCurrentBinlogCoordinates(t, db) - - migrationContext := getMigrationContext(host) - migrationContext.DatabaseName = "test" - migrationContext.OriginalTableName = "gh_ost_test" - migrationContext.AlterStatement = "ALTER TABLE gh_ost_test ENGINE=InnoDB" - - transactionsChan := make(chan *Transaction) - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - go func() { - reader := NewGoMySQLReader(migrationContext) - reader.ConnectBinlogStreamer(binlogCoordinates) - - err := reader.StreamTransactions(ctx, transactionsChan) - require.Equal(t, err, context.Canceled) - }() - - sqlTx, err := db.Begin() - require.NoError(t, err) - - _, err = sqlTx.Exec("INSERT INTO test.gh_ost_test (name) VALUES ('test')") - require.NoError(t, err) - - err = sqlTx.Commit() - require.NoError(t, err) - - tx := <-transactionsChan - - changes := make([]*BinlogEntry, 0) - for change := range tx.Changes { - changes = append(changes, change) - } - require.Len(t, changes, 1) - - cancel() - close(transactionsChan) - require.Len(t, transactionsChan, 0) -} - -func (suite *GoMySQLReaderTestSuite) TestStreamTransactionMultipleChangesInTransaction() { - t := suite.T() - - ctx := context.Background() - host, err := suite.mysqlContainer.ContainerIP(ctx) - require.NoError(t, err) - - db, err := sql.Open("mysql", "root:root@tcp("+host+")/") - require.NoError(t, err) - defer db.Close() - - prepareDatabase(t, db) - - binlogCoordinates := getCurrentBinlogCoordinates(t, db) - - migrationContext := getMigrationContext(host) - migrationContext.DatabaseName = "test" - migrationContext.OriginalTableName = "gh_ost_test" - migrationContext.AlterStatement = "ALTER TABLE gh_ost_test ENGINE=InnoDB" - - transactionsChan := make(chan *Transaction) - - ctx, cancel := context.WithCancel(context.Background()) - - go func() { - reader := NewGoMySQLReader(migrationContext) - reader.ConnectBinlogStreamer(binlogCoordinates) - - err := reader.StreamTransactions(ctx, transactionsChan) - require.Equal(t, err, context.Canceled) - }() - - sqlTx, err := db.Begin() - require.NoError(t, err) - - _, err = sqlTx.Exec("INSERT INTO test.gh_ost_test (name) VALUES ('test1')") - require.NoError(t, err) - - _, err = sqlTx.Exec("INSERT INTO test.gh_ost_test (name) VALUES ('test2')") - require.NoError(t, err) - - _, err = sqlTx.Exec("INSERT INTO test.gh_ost_test (name) VALUES ('test3')") - require.NoError(t, err) - - err = sqlTx.Commit() - require.NoError(t, err) - - tx := <-transactionsChan - require.NotNil(t, tx) - - changes := make([]*BinlogEntry, 0) - for change := range tx.Changes { - changes = append(changes, change) - } - require.Len(t, changes, 3) - - cancel() - close(transactionsChan) - require.Len(t, transactionsChan, 0) -} - -func (suite *GoMySQLReaderTestSuite) TestStreamTransactionWithDDL() { - t := suite.T() - - ctx := context.Background() - host, err := suite.mysqlContainer.ContainerIP(ctx) - require.NoError(t, err) - - db, err := sql.Open("mysql", "root:root@tcp("+host+")/") - require.NoError(t, err) - defer db.Close() - - prepareDatabase(t, db) - - binlogCoordinates := getCurrentBinlogCoordinates(t, db) - - migrationContext := getMigrationContext(host) - migrationContext.DatabaseName = "test" - migrationContext.OriginalTableName = "gh_ost_test" - migrationContext.AlterStatement = "ALTER TABLE gh_ost_test ENGINE=InnoDB" - - transactionsChan := make(chan *Transaction) - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - go func() { - reader := NewGoMySQLReader(migrationContext) - reader.ConnectBinlogStreamer(binlogCoordinates) - - err := reader.StreamTransactions(ctx, transactionsChan) - require.Equal(t, err, context.Canceled) - }() - - _, err = db.Exec("ALTER TABLE test.gh_ost_test ADD COLUMN age INT") - require.NoError(t, err) - - tx := <-transactionsChan - - changes := make([]*BinlogEntry, 0) - for change := range tx.Changes { - changes = append(changes, change) - } - require.Len(t, changes, 0) - - cancel() - close(transactionsChan) - require.Len(t, transactionsChan, 0) -} - -func TestGoMySQLReader(t *testing.T) { - suite.Run(t, new(GoMySQLReaderTestSuite)) -} diff --git a/go/logic/coordinator.go b/go/logic/coordinator.go index 2bfd54e2f..2c904c08a 100644 --- a/go/logic/coordinator.go +++ b/go/logic/coordinator.go @@ -260,8 +260,7 @@ func NewCoordinator(migrationContext *base.MigrationContext, applier *Applier, o } } -func (c *Coordinator) StartStreaming(canStopStreaming func() bool) error { - ctx := context.TODO() +func (c *Coordinator) StartStreaming(ctx context.Context, canStopStreaming func() bool) error { streamer, err := c.binlogSyncer.StartSync(gomysql.Position{ Name: c.currentCoordinates.LogFile, Pos: uint32(c.currentCoordinates.LogPos), @@ -269,13 +268,20 @@ func (c *Coordinator) StartStreaming(canStopStreaming func() bool) error { if err != nil { return err } + defer c.binlogSyncer.Close() var retries int64 for { if canStopStreaming() { return nil } + if err := ctx.Err(); err != nil { + return err + } ev, err := streamer.GetEvent(ctx) + if err := ctx.Err(); err != nil { + return err + } if err != nil { coords := c.GetCurrentBinlogCoordinates() if retries >= c.migrationContext.MaxRetries() { diff --git a/go/logic/coordinator_test.go b/go/logic/coordinator_test.go index 400f60d91..86da4b4e5 100644 --- a/go/logic/coordinator_test.go +++ b/go/logic/coordinator_test.go @@ -20,9 +20,10 @@ import ( func TestCoordinator(t *testing.T) { ctx := context.Background() req := testcontainers.ContainerRequest{ - Image: "mysql:8.0", - Env: map[string]string{"MYSQL_ROOT_PASSWORD": "root"}, - WaitingFor: wait.ForLog("port: 3306 MySQL Community Server - GPL"), + Image: "mysql:8.0", + Env: map[string]string{"MYSQL_ROOT_PASSWORD": "root"}, + WaitingFor: wait.ForLog("port: 3306 MySQL Community Server - GPL"), + ExposedPorts: []string{"3306"}, } mysqlContainer, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{ @@ -35,10 +36,12 @@ func TestCoordinator(t *testing.T) { require.NoError(t, mysqlContainer.Terminate(ctx)) }) - host, err := mysqlContainer.ContainerIP(ctx) + host, err := mysqlContainer.Host(ctx) require.NoError(t, err) - db, err := gosql.Open("mysql", "root:root@tcp("+host+")/") + mappedPort, err := mysqlContainer.MappedPort(ctx, "3306") + + db, err := gosql.Open("mysql", "root:root@tcp("+host+":"+mappedPort.Port()+")/") require.NoError(t, err) t.Cleanup(func() { @@ -66,7 +69,7 @@ func TestCoordinator(t *testing.T) { migrationContext.ApplierConnectionConfig = &mysql.ConnectionConfig{ Key: mysql.InstanceKey{ Hostname: host, - Port: 3306, + Port: mappedPort.Int(), }, User: "root", Password: "root", @@ -75,7 +78,7 @@ func TestCoordinator(t *testing.T) { migrationContext.InspectorConnectionConfig = &mysql.ConnectionConfig{ Key: mysql.InstanceKey{ Hostname: host, - Port: 3306, + Port: mappedPort.Int(), }, User: "root", Password: "root", @@ -93,6 +96,8 @@ func TestCoordinator(t *testing.T) { migrationContext.SetConnectionConfig("innodb") migrationContext.NumWorkers = 4 + // HACK: so + migrationContext.AzureMySQL = true applier := NewApplier(migrationContext) err = applier.InitDBConnections(migrationContext.NumWorkers) @@ -143,8 +148,8 @@ func TestCoordinator(t *testing.T) { return streamCtx.Err() != nil } go func() { - err = coord.StartStreaming(canStopStreaming) - require.NoError(t, err) + err = coord.StartStreaming(streamCtx, canStopStreaming) + require.Equal(t, context.Canceled, err) }() // Give streamer some time to start diff --git a/go/logic/migrator.go b/go/logic/migrator.go index 32a65df8f..1a7d5007b 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -1117,7 +1117,8 @@ func (this *Migrator) initiateStreaming() error { go func() { this.migrationContext.Log.Debugf("Beginning streaming") //err := this.eventsStreamer.StreamEvents(this.canStopStreaming) - err := this.trxCoordinator.StartStreaming(this.canStopStreaming) + ctx := context.TODO() + err := this.trxCoordinator.StartStreaming(ctx, this.canStopStreaming) if err != nil { this.migrationContext.PanicAbort <- err } diff --git a/go/logic/migrator_test.go b/go/logic/migrator_test.go index 35c67c7f6..e4234d80e 100644 --- a/go/logic/migrator_test.go +++ b/go/logic/migrator_test.go @@ -295,21 +295,21 @@ func TestMigrate(t *testing.T) { require.NoError(t, mysqlContainer.Terminate(ctx)) }) - host, err := mysqlContainer.ContainerIP(ctx) + host, err := mysqlContainer.Host(ctx) + mappedPort, err := mysqlContainer.MappedPort(ctx, "3306") + db, err := gosql.Open("mysql", "root:root@tcp("+host+":"+mappedPort.Port()+")/") require.NoError(t, err) - db, err := gosql.Open("mysql", "root:root@tcp("+host+")/") - require.NoError(t, err) - - t.Cleanup(func() { + defer func() { require.NoError(t, db.Close()) - }) + }() _ = os.Remove("/tmp/gh-ost.sock") prepareDatabase(t, db) migrationContext := base.NewMigrationContext() + migrationContext.AzureMySQL = true migrationContext.DatabaseName = "test" migrationContext.OriginalTableName = "gh_ost_test" migrationContext.AlterStatement = "ALTER TABLE gh_ost_test ENGINE=InnoDB" @@ -322,7 +322,7 @@ func TestMigrate(t *testing.T) { migrationContext.InspectorConnectionConfig = &mysql.ConnectionConfig{ Key: mysql.InstanceKey{ Hostname: host, - Port: 3306, + Port: mappedPort.Int(), }, User: "root", Password: "root", @@ -342,19 +342,18 @@ func TestMigrate(t *testing.T) { go func() { defer wg.Done() for { - select { - case <-ctx.Done(): + if ctx.Err() != nil { + return + } + _, err := db.ExecContext(ctx, "INSERT INTO test.gh_ost_test (name) VALUES ('test')") + fmt.Printf("err: %+v, ctx: %+v\n", err, ctx.Err()) + if errors.Is(err, context.Canceled) { return - default: - _, err := db.ExecContext(ctx, "INSERT INTO test.gh_ost_test (name) VALUES ('test')") - if errors.Is(err, context.Canceled) { - return - } - require.NoError(t, err) - rowsWritten.Add(1) - - time.Sleep(time.Millisecond) } + require.NoError(t, err) + rowsWritten.Add(1) + + time.Sleep(time.Millisecond) } }() @@ -362,36 +361,37 @@ func TestMigrate(t *testing.T) { go func() { defer wg.Done() for { - select { - case <-ctx.Done(): + tx, err := db.BeginTx(ctx, &gosql.TxOptions{}) + if errors.Is(err, context.Canceled) { return - default: - tx, err := db.BeginTx(ctx, &gosql.TxOptions{}) + } + // if err != nil { + // fmt.Println(err.Error()) + // } + require.NoError(t, err) + + for i := 0; i < 10; i++ { + _, err = tx.ExecContext(ctx, "INSERT INTO test.gh_ost_test (name) VALUES ('test')") if errors.Is(err, context.Canceled) { return - } else if err != nil { - fmt.Println(err.Error()) } require.NoError(t, err) - - for i := 0; i < 10; i++ { - _, err = tx.ExecContext(ctx, "INSERT INTO test.gh_ost_test (name) VALUES ('test')") - if errors.Is(err, context.Canceled) { - tx.Rollback() - return - } - require.NoError(t, err) - rowsWritten.Add(1) - } - tx.Commit() - - time.Sleep(time.Millisecond) + rowsWritten.Add(1) + } + err = tx.Commit() + if errors.Is(err, context.Canceled) { + return } + require.NoError(t, err) + + time.Sleep(time.Millisecond) } }() + wg.Add(1) go func() { - time.Sleep(10 * time.Second) + defer wg.Done() + time.Sleep(5 * time.Second) cancel() }()