diff --git a/go.mod b/go.mod index 5fb6a33c..c0c6fa51 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ go 1.22 require ( github.com/alecthomas/kong v0.7.1 - github.com/go-mysql-org/go-mysql v1.8.0 + github.com/go-mysql-org/go-mysql v1.8.1-0.20240728143959-24fbb5be92c3 github.com/go-sql-driver/mysql v1.7.1 github.com/pingcap/errors v0.11.5-0.20221009092201-b66cddb77c32 github.com/pingcap/tidb/pkg/parser v0.0.0-20231103042308-035ad5ccbe67 @@ -23,7 +23,7 @@ require ( github.com/davecgh/go-spew v1.1.1 // indirect github.com/goccy/go-json v0.10.2 // indirect github.com/google/uuid v1.4.0 // indirect - github.com/klauspost/compress v1.17.1 // indirect + github.com/klauspost/compress v1.17.8 // indirect github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c // indirect github.com/pingcap/log v1.1.1-0.20230317032135-a0d097d16e22 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect diff --git a/go.sum b/go.sum index c3c8b26c..2ff7b5d8 100644 --- a/go.sum +++ b/go.sum @@ -15,10 +15,8 @@ github.com/cznic/mathutil v0.0.0-20181122101859-297441e03548/go.mod h1:e6NPNENfs github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/go-mysql-org/go-mysql v1.7.1-0.20231227030153-e817d9813fe3 h1:3LDcDJa6U5onHlQ6Z0fjEF5up/ZI4mlTRpx6zHjq9hg= -github.com/go-mysql-org/go-mysql v1.7.1-0.20231227030153-e817d9813fe3/go.mod h1:AGmdMIbBbcuUDRjzvHskI7+1SmPKhgKrqf3g8lknIc4= -github.com/go-mysql-org/go-mysql v1.8.0 h1:bN+/Q5yyQXQOAabXPkI3GZX43w4Tsj2DIthjC9i6CkQ= -github.com/go-mysql-org/go-mysql v1.8.0/go.mod h1:kwbF156Z9Sy8amP3E1SZp7/s/0PuJj/xKaOWToQiq0Y= +github.com/go-mysql-org/go-mysql v1.8.1-0.20240728143959-24fbb5be92c3 h1:IaBlE/5xoN3ETAvK8bPYTnzU4fWq9Fd3Si5uaAP/qGA= +github.com/go-mysql-org/go-mysql v1.8.1-0.20240728143959-24fbb5be92c3/go.mod h1:+SgFgTlqjqOQoMc98n9oyUWEgn2KkOL1VmXDoq2ONOs= github.com/go-sql-driver/mysql v1.7.1 h1:lUIinVbN1DY0xBg0eMOzmmtGoHwWBbvnWubQUrtU8EI= github.com/go-sql-driver/mysql v1.7.1/go.mod h1:OXbVy3sEdcQ2Doequ6Z5BW6fXNQTmx+9S1MCJN5yJMI= github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU= @@ -27,8 +25,8 @@ github.com/google/uuid v1.4.0 h1:MtMxsa51/r9yyhkyLsVeVt0B+BGQZzpQiTQ4eHZ8bc4= github.com/google/uuid v1.4.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/hexops/gotextdiff v1.0.3 h1:gitA9+qJrrTCsiCl7+kh75nPqQt1cx4ZkudSTLoUqJM= github.com/hexops/gotextdiff v1.0.3/go.mod h1:pSWU5MAI3yDq+fZBTazCSJysOMbxWL1BSow5/V2vxeg= -github.com/klauspost/compress v1.17.1 h1:NE3C767s2ak2bweCZo3+rdP4U/HoyVXLv/X9f2gPS5g= -github.com/klauspost/compress v1.17.1/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= +github.com/klauspost/compress v1.17.8 h1:YcnTYrq7MikUT7k0Yb5eceMmALQPYBW/Xltxn0NAMnU= +github.com/klauspost/compress v1.17.8/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= diff --git a/pkg/checksum/checker.go b/pkg/checksum/checker.go index 73589109..60c0596e 100644 --- a/pkg/checksum/checker.go +++ b/pkg/checksum/checker.go @@ -332,15 +332,15 @@ func (c *Checker) initConnPool(ctx context.Context) error { // Lock the source table in a trx // so the connection is not used by others c.logger.Info("starting checksum operation, this will require a table lock") - serverLock, err := dbconn.NewTableLock(ctx, c.db, c.table, c.dbConfig, c.logger) + tableLock, err := dbconn.NewTableLock(ctx, c.db, c.table, c.dbConfig, c.logger) if err != nil { return err } - defer serverLock.Close() + defer tableLock.Close() // With the lock held, flush one more time under the lock tables. // Because we know canal is up to date this now guarantees // we have everything in the new table. - if err := c.feed.FlushUnderLock(ctx, serverLock); err != nil { + if err := c.feed.FlushUnderTableLock(ctx, tableLock); err != nil { return err } // Assert that the change set is empty. This should always diff --git a/pkg/migration/cutover.go b/pkg/migration/cutover.go index cffff80e..b1580a05 100644 --- a/pkg/migration/cutover.go +++ b/pkg/migration/cutover.go @@ -87,22 +87,21 @@ func (c *CutOver) Run(ctx context.Context) error { func (c *CutOver) algorithmRenameUnderLock(ctx context.Context) error { // Lock the source table in a trx // so the connection is not used by others - serverLock, err := dbconn.NewTableLock(ctx, c.db, c.table, c.dbConfig, c.logger) + tableLock, err := dbconn.NewTableLock(ctx, c.db, c.table, c.dbConfig, c.logger) if err != nil { return err } - defer serverLock.Close() - if err := c.feed.FlushUnderLock(ctx, serverLock); err != nil { + defer tableLock.Close() + if err := c.feed.FlushUnderTableLock(ctx, tableLock); err != nil { return err } if !c.feed.AllChangesFlushed() { return errors.New("not all changes flushed, final flush might be broken") } - oldName := c.oldTableName - oldQuotedName := fmt.Sprintf("`%s`.`%s`", c.table.SchemaName, oldName) + oldQuotedName := fmt.Sprintf("`%s`.`%s`", c.table.SchemaName, c.oldTableName) renameStatement := fmt.Sprintf("RENAME TABLE %s TO %s, %s TO %s", c.table.QuotedName, oldQuotedName, c.newTable.QuotedName, c.table.QuotedName, ) - return serverLock.ExecUnderLock(ctx, renameStatement) + return tableLock.ExecUnderLock(ctx, renameStatement) } diff --git a/pkg/migration/cutover_test.go b/pkg/migration/cutover_test.go index e7f9095a..1de90d9a 100644 --- a/pkg/migration/cutover_test.go +++ b/pkg/migration/cutover_test.go @@ -40,6 +40,7 @@ func TestCutOver(t *testing.T) { assert.Equal(t, 0, db.Stats().InUse) // no connections in use. t1 := table.NewTableInfo(db, "test", "cutovert1") + assert.NoError(t, t1.SetInfo(context.Background())) // required to extract PK. t1new := table.NewTableInfo(db, "test", "_cutovert1_new") t1old := "_cutovert1_old" logger := logrus.New() @@ -100,6 +101,7 @@ func TestMDLLockFails(t *testing.T) { assert.NoError(t, err) t1 := table.NewTableInfo(db, "test", "mdllocks") + assert.NoError(t, t1.SetInfo(context.Background())) // required to extract PK. t1new := table.NewTableInfo(db, "test", "_mdllocks_new") t1old := "test_old" logger := logrus.New() @@ -136,11 +138,25 @@ func TestInvalidOptions(t *testing.T) { assert.NoError(t, err) logger := logrus.New() + testutils.RunSQL(t, `DROP TABLE IF EXISTS invalid_t1, _invalid_t1_new`) + tbl := `CREATE TABLE invalid_t1 ( + id int(11) NOT NULL AUTO_INCREMENT, + name varchar(255) NOT NULL, + PRIMARY KEY (id) + )` + testutils.RunSQL(t, tbl) + tbl = `CREATE TABLE _invalid_t1_new ( + id int(11) NOT NULL AUTO_INCREMENT, + name varchar(255) NOT NULL, + PRIMARY KEY (id) + )` + testutils.RunSQL(t, tbl) // Invalid options _, err = NewCutOver(db, nil, nil, "", nil, dbconn.NewDBConfig(), logger) assert.Error(t, err) - t1 := table.NewTableInfo(db, "test", "t1") - t1new := table.NewTableInfo(db, "test", "t1_new") + t1 := table.NewTableInfo(db, "test", "invalid_t1") + assert.NoError(t, t1.SetInfo(context.Background())) // required to extract PK. + t1new := table.NewTableInfo(db, "test", "_invalid_t1_new") t1old := "test_old" cfg, err := mysql.ParseDSN(testutils.DSN()) assert.NoError(t, err) diff --git a/pkg/migration/migration_test.go b/pkg/migration/migration_test.go index 400ab0a5..7cb3b29c 100644 --- a/pkg/migration/migration_test.go +++ b/pkg/migration/migration_test.go @@ -181,7 +181,7 @@ func TestRenameInMySQL80(t *testing.T) { migration.Database = cfg.DBName migration.Threads = 16 migration.Checksum = true - migration.Table = "t1" + migration.Table = "renamet1" migration.Alter = "CHANGE name nameNew varchar(255) not null" err = migration.Run() diff --git a/pkg/repl/client.go b/pkg/repl/client.go index 4bdea6dd..0648bf72 100644 --- a/pkg/repl/client.go +++ b/pkg/repl/client.go @@ -41,6 +41,8 @@ const ( // Longer values require more memory, but permit more merging. // I expect we will change this to 1hr-24hr in the future. DefaultFlushInterval = 30 * time.Second + // DefaultTimeout is how long BlockWait is supposed to wait before returning errors. + DefaultTimeout = 10 * time.Second ) type queuedChange struct { @@ -73,8 +75,6 @@ type Client struct { binlogChangeset map[string]bool // bool is deleted binlogChangesetDelta int64 // a special "fix" for keys that have been popped off, use atomic get/set binlogPosSynced mysql.Position // safely written to new table - binlogPosInMemory mysql.Position // available in the binlog binlogChangeset - lastLogFileName string // last log file name we've seen in a rotation event queuedChanges []queuedChange // used when disableDeltaMap is true @@ -191,7 +191,6 @@ func (c *Client) OnRow(e *canal.RowsEvent) error { c.logger.Errorf("unknown action: %v", e.Action) } } - c.updatePosInMemory(e.Header.LogPos) return nil } @@ -203,15 +202,6 @@ func (c *Client) KeyAboveWatermarkEnabled() bool { return c.enableKeyAboveWatermark && c.KeyAboveCopierCallback != nil } -// OnRotate is called when a rotate event is discovered via replication. -// We use this to capture the log file name, since only the position is caught on the row event. -func (c *Client) OnRotate(header *replication.EventHeader, rotateEvent *replication.RotateEvent) error { - c.Lock() - defer c.Unlock() - c.lastLogFileName = string(rotateEvent.NextLogName) - return nil -} - // OnTableChanged is called when a table is changed via DDL. // This is a failsafe because we don't expect DDL to be performed on the table while we are operating. func (c *Client) OnTableChanged(header *replication.EventHeader, schema string, table string) error { @@ -239,12 +229,19 @@ func (c *Client) SetPos(pos mysql.Position) { } func (c *Client) AllChangesFlushed() bool { - if c.GetDeltaLen() > 0 { - return false - } + deltaLen := c.GetDeltaLen() c.Lock() defer c.Unlock() - return c.binlogPosInMemory.Compare(c.binlogPosSynced) == 0 + + // We check if the position canal is up to is the same position + // as what we've made changes for. If this is zero it's a good + // indicator that we are up to date. However, because the + // "server lock" is not a global lock, it's possible that the synced + // position could still advance. + if c.canal.SyncedPosition().Compare(c.binlogPosSynced) != 0 { + c.logger.Warnf("Binlog reader info canal-position=%v synced-position=%v. Discrepancies could be due to modifications on other tables.", c.canal.SyncedPosition(), c.binlogPosSynced) + } + return deltaLen == 0 } func (c *Client) GetBinlogApplyPosition() mysql.Position { @@ -303,7 +300,8 @@ func (c *Client) Run() (err error) { cfg.Password = c.password cfg.Logger = NewLogWrapper(c.logger) // wrapper to filter the noise. cfg.IncludeTableRegex = []string{fmt.Sprintf("^%s\\.%s$", c.table.SchemaName, c.table.TableName)} - cfg.Dump.ExecutionPath = "" // skip dump + cfg.Dump.ExecutionPath = "" // skip dump + cfg.DisableFlushBinlogWhileWaiting = true // can't guarantee privileges exist. if dbconn.IsRDSHost(cfg.Addr) { // create a new TLSConfig for RDS // It needs to be a copy because sharing a global pointer @@ -335,9 +333,6 @@ func (c *Client) Run() (err error) { return errors.New("binlog position is impossible, the source may have already purged it") } - c.binlogPosInMemory = c.binlogPosSynced - c.lastLogFileName = c.binlogPosInMemory.Name - // Call start canal as a go routine. go c.startCanal() return nil @@ -381,7 +376,8 @@ func (c *Client) startCanal() { // but since canal is now closed we can safely return return } - c.logger.Errorf("canal has failed. error: %v", err) + + c.logger.Errorf("canal has failed. error: %v, table: %s", err, c.table.TableName) panic("canal has failed") } } @@ -395,18 +391,23 @@ func (c *Client) Close() { } } -func (c *Client) updatePosInMemory(pos uint32) { - c.Lock() - defer c.Unlock() - c.binlogPosInMemory = mysql.Position{ - Name: c.lastLogFileName, - Pos: pos, +// FlushUnderTableLock is a final flush under an exclusive table lock using the connection +// that holds a write lock. Because flushing generates binary log events, +// we actually want to call flush *twice*: +// - The first time flushes the pending changes to the new table. +// - We then ensure that we have all the binary log changes read from the server. +// - The second time reads through the changes generated by the first flush +// and updates the in memory applied position to match the server's position. +// This is required to satisfy the binlog position is updated for the c.AllChangesFlushed() check. +func (c *Client) FlushUnderTableLock(ctx context.Context, lock *dbconn.TableLock) error { + if err := c.flush(ctx, true, lock); err != nil { + return err } -} - -// FlushUnderLock is a final flush under an exclusive lock using the connection -// that holds a write lock. -func (c *Client) FlushUnderLock(ctx context.Context, lock *dbconn.TableLock) error { + // Wait for the changes flushed to be received. + if err := c.BlockWait(ctx); err != nil { + return err + } + // Do a final flush return c.flush(ctx, true, lock) } @@ -429,7 +430,7 @@ func (c *Client) flushQueue(ctx context.Context, underLock bool, lock *dbconn.Ta c.Lock() changesToFlush := c.queuedChanges c.queuedChanges = nil // reset - posOfFlush := c.binlogPosInMemory + posOfFlush := c.canal.SyncedPosition() c.Unlock() // Early return if there is nothing to flush. @@ -486,7 +487,7 @@ func (c *Client) flushQueue(ctx context.Context, underLock bool, lock *dbconn.Ta func (c *Client) flushMap(ctx context.Context, underLock bool, lock *dbconn.TableLock) error { c.Lock() setToFlush := c.binlogChangeset - posOfFlush := c.binlogPosInMemory // copy the value, not the pointer + posOfFlush := c.canal.SyncedPosition() // copy the value, not the pointer c.binlogChangeset = make(map[string]bool) // set new value c.Unlock() // unlock immediately so others can write to the changeset // The changeset delta is because the status output is based on len(binlogChangeset) @@ -706,36 +707,7 @@ func (c *Client) StartPeriodicFlush(ctx context.Context, interval time.Duration) // **Caveat** Unless you are calling this from Flush(), calling this DOES NOT ensure that // changes have been applied to the database. func (c *Client) BlockWait(ctx context.Context) error { - targetPos, err := c.canal.GetMasterPos() // what the server is at. - if err != nil { - return err - } - for i := 100; ; i++ { - if err := c.injectBinlogNoise(ctx); err != nil { - return err - } - canalPos := c.canal.SyncedPosition() - if i%100 == 0 { - // Print status every 100 loops = 10s - c.logger.Infof("blocking until we have read all binary logs: current-pos=%s target-pos=%s", canalPos, targetPos) - } - if canalPos.Compare(targetPos) >= 0 { - break - } - time.Sleep(100 * time.Millisecond) - } - return nil -} - -// injectBinlogNoise is used to inject some noise into the binlog stream -// This helps ensure that we are "past" a binary log position if there is some off-by-one -// problem where the most recent canal event is not yet updating the canal SyncedPosition, -// and there are no current changes on the MySQL server to advance itself. -// Note: We can not update the table or the newTable, because this intentionally -// causes a panic (c.tableChanged() is called). -func (c *Client) injectBinlogNoise(ctx context.Context) error { - tblName := fmt.Sprintf("_%s_chkpnt", c.table.TableName) - return dbconn.Exec(ctx, c.db, "ALTER TABLE %n.%n AUTO_INCREMENT=0", c.table.SchemaName, tblName) + return c.canal.CatchMasterPos(DefaultTimeout) } func (c *Client) keyHasChanged(key []interface{}, deleted bool) { diff --git a/pkg/repl/client_test.go b/pkg/repl/client_test.go index b13613d0..b62692c2 100644 --- a/pkg/repl/client_test.go +++ b/pkg/repl/client_test.go @@ -359,3 +359,50 @@ func TestFeedback(t *testing.T) { } assert.Equal(t, int64(500), client.targetBatchSize) // less keys. } + +// TestBlockWait tests that the BlockWait function will: +// - check the server's binary log position +// - block waiting until the repl client is at that position. +func TestBlockWait(t *testing.T) { + db, err := dbconn.New(testutils.DSN(), dbconn.NewDBConfig()) + assert.NoError(t, err) + + testutils.RunSQL(t, "DROP TABLE IF EXISTS blockwaitt1, blockwaitt2") + testutils.RunSQL(t, "CREATE TABLE blockwaitt1 (a INT NOT NULL, b INT, c INT, PRIMARY KEY (a))") + testutils.RunSQL(t, "CREATE TABLE blockwaitt2 (a INT NOT NULL, b INT, c INT, PRIMARY KEY (a))") + + t1 := table.NewTableInfo(db, "test", "blockwaitt1") + assert.NoError(t, t1.SetInfo(context.TODO())) + t2 := table.NewTableInfo(db, "test", "blockwaitt2") + assert.NoError(t, t2.SetInfo(context.TODO())) + + logger := logrus.New() + cfg, err := mysql2.ParseDSN(testutils.DSN()) + assert.NoError(t, err) + client := NewClient(db, cfg.Addr, t1, t2, cfg.User, cfg.Passwd, &ClientConfig{ + Logger: logger, + Concurrency: 4, + TargetBatchTime: time.Second, + }) + assert.NoError(t, client.Run()) + defer client.Close() + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*2) // if it takes any longer block wait is failing. + defer cancel() + + assert.NoError(t, client.BlockWait(ctx)) + + // Insert into t1. + testutils.RunSQL(t, "INSERT INTO blockwaitt1 (a, b, c) VALUES (1, 2, 3)") + assert.NoError(t, client.Flush(ctx)) // apply the changes (not required, they only need to be received for block wait to unblock) + assert.NoError(t, client.BlockWait(ctx)) // should be quick still. + testutils.RunSQL(t, "INSERT INTO blockwaitt1 (a, b, c) VALUES (2, 2, 3)") // don't apply changes. + assert.NoError(t, client.BlockWait(ctx)) // should be quick because apply not required. + + testutils.RunSQL(t, "ANALYZE TABLE blockwaitt1") + testutils.RunSQL(t, "ANALYZE TABLE blockwaitt1") + testutils.RunSQL(t, "ANALYZE TABLE blockwaitt1") + testutils.RunSQL(t, "ANALYZE TABLE blockwaitt1") + testutils.RunSQL(t, "ANALYZE TABLE blockwaitt1") + assert.NoError(t, client.BlockWait(ctx)) // should be quick +}