Skip to content

Commit

Permalink
Merge pull request #332 from cashapp/mtocker-fix-all-changes-not-flushed
Browse files Browse the repository at this point in the history
Fix changes not synced error
  • Loading branch information
morgo authored Jul 31, 2024
2 parents 7a357a6 + e523cbe commit 5c90191
Show file tree
Hide file tree
Showing 8 changed files with 116 additions and 84 deletions.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.22

require (
github.com/alecthomas/kong v0.7.1
github.com/go-mysql-org/go-mysql v1.8.0
github.com/go-mysql-org/go-mysql v1.8.1-0.20240728143959-24fbb5be92c3
github.com/go-sql-driver/mysql v1.7.1
github.com/pingcap/errors v0.11.5-0.20221009092201-b66cddb77c32
github.com/pingcap/tidb/pkg/parser v0.0.0-20231103042308-035ad5ccbe67
Expand All @@ -23,7 +23,7 @@ require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/goccy/go-json v0.10.2 // indirect
github.com/google/uuid v1.4.0 // indirect
github.com/klauspost/compress v1.17.1 // indirect
github.com/klauspost/compress v1.17.8 // indirect
github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c // indirect
github.com/pingcap/log v1.1.1-0.20230317032135-a0d097d16e22 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
Expand Down
10 changes: 4 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,8 @@ github.com/cznic/mathutil v0.0.0-20181122101859-297441e03548/go.mod h1:e6NPNENfs
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/go-mysql-org/go-mysql v1.7.1-0.20231227030153-e817d9813fe3 h1:3LDcDJa6U5onHlQ6Z0fjEF5up/ZI4mlTRpx6zHjq9hg=
github.com/go-mysql-org/go-mysql v1.7.1-0.20231227030153-e817d9813fe3/go.mod h1:AGmdMIbBbcuUDRjzvHskI7+1SmPKhgKrqf3g8lknIc4=
github.com/go-mysql-org/go-mysql v1.8.0 h1:bN+/Q5yyQXQOAabXPkI3GZX43w4Tsj2DIthjC9i6CkQ=
github.com/go-mysql-org/go-mysql v1.8.0/go.mod h1:kwbF156Z9Sy8amP3E1SZp7/s/0PuJj/xKaOWToQiq0Y=
github.com/go-mysql-org/go-mysql v1.8.1-0.20240728143959-24fbb5be92c3 h1:IaBlE/5xoN3ETAvK8bPYTnzU4fWq9Fd3Si5uaAP/qGA=
github.com/go-mysql-org/go-mysql v1.8.1-0.20240728143959-24fbb5be92c3/go.mod h1:+SgFgTlqjqOQoMc98n9oyUWEgn2KkOL1VmXDoq2ONOs=
github.com/go-sql-driver/mysql v1.7.1 h1:lUIinVbN1DY0xBg0eMOzmmtGoHwWBbvnWubQUrtU8EI=
github.com/go-sql-driver/mysql v1.7.1/go.mod h1:OXbVy3sEdcQ2Doequ6Z5BW6fXNQTmx+9S1MCJN5yJMI=
github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU=
Expand All @@ -27,8 +25,8 @@ github.com/google/uuid v1.4.0 h1:MtMxsa51/r9yyhkyLsVeVt0B+BGQZzpQiTQ4eHZ8bc4=
github.com/google/uuid v1.4.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/hexops/gotextdiff v1.0.3 h1:gitA9+qJrrTCsiCl7+kh75nPqQt1cx4ZkudSTLoUqJM=
github.com/hexops/gotextdiff v1.0.3/go.mod h1:pSWU5MAI3yDq+fZBTazCSJysOMbxWL1BSow5/V2vxeg=
github.com/klauspost/compress v1.17.1 h1:NE3C767s2ak2bweCZo3+rdP4U/HoyVXLv/X9f2gPS5g=
github.com/klauspost/compress v1.17.1/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/klauspost/compress v1.17.8 h1:YcnTYrq7MikUT7k0Yb5eceMmALQPYBW/Xltxn0NAMnU=
github.com/klauspost/compress v1.17.8/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
Expand Down
6 changes: 3 additions & 3 deletions pkg/checksum/checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,15 +332,15 @@ func (c *Checker) initConnPool(ctx context.Context) error {
// Lock the source table in a trx
// so the connection is not used by others
c.logger.Info("starting checksum operation, this will require a table lock")
serverLock, err := dbconn.NewTableLock(ctx, c.db, c.table, c.dbConfig, c.logger)
tableLock, err := dbconn.NewTableLock(ctx, c.db, c.table, c.dbConfig, c.logger)
if err != nil {
return err
}
defer serverLock.Close()
defer tableLock.Close()
// With the lock held, flush one more time under the lock tables.
// Because we know canal is up to date this now guarantees
// we have everything in the new table.
if err := c.feed.FlushUnderLock(ctx, serverLock); err != nil {
if err := c.feed.FlushUnderTableLock(ctx, tableLock); err != nil {
return err
}
// Assert that the change set is empty. This should always
Expand Down
11 changes: 5 additions & 6 deletions pkg/migration/cutover.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,22 +87,21 @@ func (c *CutOver) Run(ctx context.Context) error {
func (c *CutOver) algorithmRenameUnderLock(ctx context.Context) error {
// Lock the source table in a trx
// so the connection is not used by others
serverLock, err := dbconn.NewTableLock(ctx, c.db, c.table, c.dbConfig, c.logger)
tableLock, err := dbconn.NewTableLock(ctx, c.db, c.table, c.dbConfig, c.logger)
if err != nil {
return err
}
defer serverLock.Close()
if err := c.feed.FlushUnderLock(ctx, serverLock); err != nil {
defer tableLock.Close()
if err := c.feed.FlushUnderTableLock(ctx, tableLock); err != nil {
return err
}
if !c.feed.AllChangesFlushed() {
return errors.New("not all changes flushed, final flush might be broken")
}
oldName := c.oldTableName
oldQuotedName := fmt.Sprintf("`%s`.`%s`", c.table.SchemaName, oldName)
oldQuotedName := fmt.Sprintf("`%s`.`%s`", c.table.SchemaName, c.oldTableName)
renameStatement := fmt.Sprintf("RENAME TABLE %s TO %s, %s TO %s",
c.table.QuotedName, oldQuotedName,
c.newTable.QuotedName, c.table.QuotedName,
)
return serverLock.ExecUnderLock(ctx, renameStatement)
return tableLock.ExecUnderLock(ctx, renameStatement)
}
20 changes: 18 additions & 2 deletions pkg/migration/cutover_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ func TestCutOver(t *testing.T) {
assert.Equal(t, 0, db.Stats().InUse) // no connections in use.

t1 := table.NewTableInfo(db, "test", "cutovert1")
assert.NoError(t, t1.SetInfo(context.Background())) // required to extract PK.
t1new := table.NewTableInfo(db, "test", "_cutovert1_new")
t1old := "_cutovert1_old"
logger := logrus.New()
Expand Down Expand Up @@ -100,6 +101,7 @@ func TestMDLLockFails(t *testing.T) {
assert.NoError(t, err)

t1 := table.NewTableInfo(db, "test", "mdllocks")
assert.NoError(t, t1.SetInfo(context.Background())) // required to extract PK.
t1new := table.NewTableInfo(db, "test", "_mdllocks_new")
t1old := "test_old"
logger := logrus.New()
Expand Down Expand Up @@ -136,11 +138,25 @@ func TestInvalidOptions(t *testing.T) {
assert.NoError(t, err)
logger := logrus.New()

testutils.RunSQL(t, `DROP TABLE IF EXISTS invalid_t1, _invalid_t1_new`)
tbl := `CREATE TABLE invalid_t1 (
id int(11) NOT NULL AUTO_INCREMENT,
name varchar(255) NOT NULL,
PRIMARY KEY (id)
)`
testutils.RunSQL(t, tbl)
tbl = `CREATE TABLE _invalid_t1_new (
id int(11) NOT NULL AUTO_INCREMENT,
name varchar(255) NOT NULL,
PRIMARY KEY (id)
)`
testutils.RunSQL(t, tbl)
// Invalid options
_, err = NewCutOver(db, nil, nil, "", nil, dbconn.NewDBConfig(), logger)
assert.Error(t, err)
t1 := table.NewTableInfo(db, "test", "t1")
t1new := table.NewTableInfo(db, "test", "t1_new")
t1 := table.NewTableInfo(db, "test", "invalid_t1")
assert.NoError(t, t1.SetInfo(context.Background())) // required to extract PK.
t1new := table.NewTableInfo(db, "test", "_invalid_t1_new")
t1old := "test_old"
cfg, err := mysql.ParseDSN(testutils.DSN())
assert.NoError(t, err)
Expand Down
2 changes: 1 addition & 1 deletion pkg/migration/migration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ func TestRenameInMySQL80(t *testing.T) {
migration.Database = cfg.DBName
migration.Threads = 16
migration.Checksum = true
migration.Table = "t1"
migration.Table = "renamet1"
migration.Alter = "CHANGE name nameNew varchar(255) not null"

err = migration.Run()
Expand Down
100 changes: 36 additions & 64 deletions pkg/repl/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ const (
// Longer values require more memory, but permit more merging.
// I expect we will change this to 1hr-24hr in the future.
DefaultFlushInterval = 30 * time.Second
// DefaultTimeout is how long BlockWait is supposed to wait before returning errors.
DefaultTimeout = 10 * time.Second
)

type queuedChange struct {
Expand Down Expand Up @@ -73,8 +75,6 @@ type Client struct {
binlogChangeset map[string]bool // bool is deleted
binlogChangesetDelta int64 // a special "fix" for keys that have been popped off, use atomic get/set
binlogPosSynced mysql.Position // safely written to new table
binlogPosInMemory mysql.Position // available in the binlog binlogChangeset
lastLogFileName string // last log file name we've seen in a rotation event

queuedChanges []queuedChange // used when disableDeltaMap is true

Expand Down Expand Up @@ -191,7 +191,6 @@ func (c *Client) OnRow(e *canal.RowsEvent) error {
c.logger.Errorf("unknown action: %v", e.Action)
}
}
c.updatePosInMemory(e.Header.LogPos)
return nil
}

Expand All @@ -203,15 +202,6 @@ func (c *Client) KeyAboveWatermarkEnabled() bool {
return c.enableKeyAboveWatermark && c.KeyAboveCopierCallback != nil
}

// OnRotate is called when a rotate event is discovered via replication.
// We use this to capture the log file name, since only the position is caught on the row event.
func (c *Client) OnRotate(header *replication.EventHeader, rotateEvent *replication.RotateEvent) error {
c.Lock()
defer c.Unlock()
c.lastLogFileName = string(rotateEvent.NextLogName)
return nil
}

// OnTableChanged is called when a table is changed via DDL.
// This is a failsafe because we don't expect DDL to be performed on the table while we are operating.
func (c *Client) OnTableChanged(header *replication.EventHeader, schema string, table string) error {
Expand Down Expand Up @@ -239,12 +229,19 @@ func (c *Client) SetPos(pos mysql.Position) {
}

func (c *Client) AllChangesFlushed() bool {
if c.GetDeltaLen() > 0 {
return false
}
deltaLen := c.GetDeltaLen()
c.Lock()
defer c.Unlock()
return c.binlogPosInMemory.Compare(c.binlogPosSynced) == 0

// We check if the position canal is up to is the same position
// as what we've made changes for. If this is zero it's a good
// indicator that we are up to date. However, because the
// "server lock" is not a global lock, it's possible that the synced
// position could still advance.
if c.canal.SyncedPosition().Compare(c.binlogPosSynced) != 0 {
c.logger.Warnf("Binlog reader info canal-position=%v synced-position=%v. Discrepancies could be due to modifications on other tables.", c.canal.SyncedPosition(), c.binlogPosSynced)
}
return deltaLen == 0
}

func (c *Client) GetBinlogApplyPosition() mysql.Position {
Expand Down Expand Up @@ -303,7 +300,8 @@ func (c *Client) Run() (err error) {
cfg.Password = c.password
cfg.Logger = NewLogWrapper(c.logger) // wrapper to filter the noise.
cfg.IncludeTableRegex = []string{fmt.Sprintf("^%s\\.%s$", c.table.SchemaName, c.table.TableName)}
cfg.Dump.ExecutionPath = "" // skip dump
cfg.Dump.ExecutionPath = "" // skip dump
cfg.DisableFlushBinlogWhileWaiting = true // can't guarantee privileges exist.
if dbconn.IsRDSHost(cfg.Addr) {
// create a new TLSConfig for RDS
// It needs to be a copy because sharing a global pointer
Expand Down Expand Up @@ -335,9 +333,6 @@ func (c *Client) Run() (err error) {
return errors.New("binlog position is impossible, the source may have already purged it")
}

c.binlogPosInMemory = c.binlogPosSynced
c.lastLogFileName = c.binlogPosInMemory.Name

// Call start canal as a go routine.
go c.startCanal()
return nil
Expand Down Expand Up @@ -381,7 +376,8 @@ func (c *Client) startCanal() {
// but since canal is now closed we can safely return
return
}
c.logger.Errorf("canal has failed. error: %v", err)

c.logger.Errorf("canal has failed. error: %v, table: %s", err, c.table.TableName)
panic("canal has failed")
}
}
Expand All @@ -395,18 +391,23 @@ func (c *Client) Close() {
}
}

func (c *Client) updatePosInMemory(pos uint32) {
c.Lock()
defer c.Unlock()
c.binlogPosInMemory = mysql.Position{
Name: c.lastLogFileName,
Pos: pos,
// FlushUnderTableLock is a final flush under an exclusive table lock using the connection
// that holds a write lock. Because flushing generates binary log events,
// we actually want to call flush *twice*:
// - The first time flushes the pending changes to the new table.
// - We then ensure that we have all the binary log changes read from the server.
// - The second time reads through the changes generated by the first flush
// and updates the in memory applied position to match the server's position.
// This is required to satisfy the binlog position is updated for the c.AllChangesFlushed() check.
func (c *Client) FlushUnderTableLock(ctx context.Context, lock *dbconn.TableLock) error {
if err := c.flush(ctx, true, lock); err != nil {
return err
}
}

// FlushUnderLock is a final flush under an exclusive lock using the connection
// that holds a write lock.
func (c *Client) FlushUnderLock(ctx context.Context, lock *dbconn.TableLock) error {
// Wait for the changes flushed to be received.
if err := c.BlockWait(ctx); err != nil {
return err
}
// Do a final flush
return c.flush(ctx, true, lock)
}

Expand All @@ -429,7 +430,7 @@ func (c *Client) flushQueue(ctx context.Context, underLock bool, lock *dbconn.Ta
c.Lock()
changesToFlush := c.queuedChanges
c.queuedChanges = nil // reset
posOfFlush := c.binlogPosInMemory
posOfFlush := c.canal.SyncedPosition()
c.Unlock()

// Early return if there is nothing to flush.
Expand Down Expand Up @@ -486,7 +487,7 @@ func (c *Client) flushQueue(ctx context.Context, underLock bool, lock *dbconn.Ta
func (c *Client) flushMap(ctx context.Context, underLock bool, lock *dbconn.TableLock) error {
c.Lock()
setToFlush := c.binlogChangeset
posOfFlush := c.binlogPosInMemory // copy the value, not the pointer
posOfFlush := c.canal.SyncedPosition() // copy the value, not the pointer
c.binlogChangeset = make(map[string]bool) // set new value
c.Unlock() // unlock immediately so others can write to the changeset
// The changeset delta is because the status output is based on len(binlogChangeset)
Expand Down Expand Up @@ -706,36 +707,7 @@ func (c *Client) StartPeriodicFlush(ctx context.Context, interval time.Duration)
// **Caveat** Unless you are calling this from Flush(), calling this DOES NOT ensure that
// changes have been applied to the database.
func (c *Client) BlockWait(ctx context.Context) error {
targetPos, err := c.canal.GetMasterPos() // what the server is at.
if err != nil {
return err
}
for i := 100; ; i++ {
if err := c.injectBinlogNoise(ctx); err != nil {
return err
}
canalPos := c.canal.SyncedPosition()
if i%100 == 0 {
// Print status every 100 loops = 10s
c.logger.Infof("blocking until we have read all binary logs: current-pos=%s target-pos=%s", canalPos, targetPos)
}
if canalPos.Compare(targetPos) >= 0 {
break
}
time.Sleep(100 * time.Millisecond)
}
return nil
}

// injectBinlogNoise is used to inject some noise into the binlog stream
// This helps ensure that we are "past" a binary log position if there is some off-by-one
// problem where the most recent canal event is not yet updating the canal SyncedPosition,
// and there are no current changes on the MySQL server to advance itself.
// Note: We can not update the table or the newTable, because this intentionally
// causes a panic (c.tableChanged() is called).
func (c *Client) injectBinlogNoise(ctx context.Context) error {
tblName := fmt.Sprintf("_%s_chkpnt", c.table.TableName)
return dbconn.Exec(ctx, c.db, "ALTER TABLE %n.%n AUTO_INCREMENT=0", c.table.SchemaName, tblName)
return c.canal.CatchMasterPos(DefaultTimeout)
}

func (c *Client) keyHasChanged(key []interface{}, deleted bool) {
Expand Down
47 changes: 47 additions & 0 deletions pkg/repl/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,3 +359,50 @@ func TestFeedback(t *testing.T) {
}
assert.Equal(t, int64(500), client.targetBatchSize) // less keys.
}

// TestBlockWait tests that the BlockWait function will:
// - check the server's binary log position
// - block waiting until the repl client is at that position.
func TestBlockWait(t *testing.T) {
db, err := dbconn.New(testutils.DSN(), dbconn.NewDBConfig())
assert.NoError(t, err)

testutils.RunSQL(t, "DROP TABLE IF EXISTS blockwaitt1, blockwaitt2")
testutils.RunSQL(t, "CREATE TABLE blockwaitt1 (a INT NOT NULL, b INT, c INT, PRIMARY KEY (a))")
testutils.RunSQL(t, "CREATE TABLE blockwaitt2 (a INT NOT NULL, b INT, c INT, PRIMARY KEY (a))")

t1 := table.NewTableInfo(db, "test", "blockwaitt1")
assert.NoError(t, t1.SetInfo(context.TODO()))
t2 := table.NewTableInfo(db, "test", "blockwaitt2")
assert.NoError(t, t2.SetInfo(context.TODO()))

logger := logrus.New()
cfg, err := mysql2.ParseDSN(testutils.DSN())
assert.NoError(t, err)
client := NewClient(db, cfg.Addr, t1, t2, cfg.User, cfg.Passwd, &ClientConfig{
Logger: logger,
Concurrency: 4,
TargetBatchTime: time.Second,
})
assert.NoError(t, client.Run())
defer client.Close()

ctx, cancel := context.WithTimeout(context.Background(), time.Second*2) // if it takes any longer block wait is failing.
defer cancel()

assert.NoError(t, client.BlockWait(ctx))

// Insert into t1.
testutils.RunSQL(t, "INSERT INTO blockwaitt1 (a, b, c) VALUES (1, 2, 3)")
assert.NoError(t, client.Flush(ctx)) // apply the changes (not required, they only need to be received for block wait to unblock)
assert.NoError(t, client.BlockWait(ctx)) // should be quick still.
testutils.RunSQL(t, "INSERT INTO blockwaitt1 (a, b, c) VALUES (2, 2, 3)") // don't apply changes.
assert.NoError(t, client.BlockWait(ctx)) // should be quick because apply not required.

testutils.RunSQL(t, "ANALYZE TABLE blockwaitt1")
testutils.RunSQL(t, "ANALYZE TABLE blockwaitt1")
testutils.RunSQL(t, "ANALYZE TABLE blockwaitt1")
testutils.RunSQL(t, "ANALYZE TABLE blockwaitt1")
testutils.RunSQL(t, "ANALYZE TABLE blockwaitt1")
assert.NoError(t, client.BlockWait(ctx)) // should be quick
}

0 comments on commit 5c90191

Please sign in to comment.