From 9679451982ca0356827f59970b94ab1a405ebe6c Mon Sep 17 00:00:00 2001 From: Morgan Tocker Date: Tue, 23 Jul 2024 23:00:27 -0600 Subject: [PATCH 01/10] Fix changes not synced error --- go.mod | 4 ++- go.sum | 10 +++--- pkg/repl/client.go | 57 ++++++++++++++++++++++------------ pkg/repl/client_test.go | 47 ++++++++++++++++++++++++++++ pkg/table/tableinfo.go | 39 +++++++++++++++++++++++ pkg/table/tableinfo_test.go | 62 +++++++++++++++++++++++++++++++++++++ 6 files changed, 193 insertions(+), 26 deletions(-) diff --git a/go.mod b/go.mod index 5fb6a33c..2cc41785 100644 --- a/go.mod +++ b/go.mod @@ -23,7 +23,7 @@ require ( github.com/davecgh/go-spew v1.1.1 // indirect github.com/goccy/go-json v0.10.2 // indirect github.com/google/uuid v1.4.0 // indirect - github.com/klauspost/compress v1.17.1 // indirect + github.com/klauspost/compress v1.17.8 // indirect github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c // indirect github.com/pingcap/log v1.1.1-0.20230317032135-a0d097d16e22 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect @@ -40,3 +40,5 @@ require ( gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) + +replace github.com/go-mysql-org/go-mysql => github.com/morgo/go-mysql v0.0.0-20240724035801-a383815d9cd7 diff --git a/go.sum b/go.sum index c3c8b26c..a7e69309 100644 --- a/go.sum +++ b/go.sum @@ -15,10 +15,6 @@ github.com/cznic/mathutil v0.0.0-20181122101859-297441e03548/go.mod h1:e6NPNENfs github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/go-mysql-org/go-mysql v1.7.1-0.20231227030153-e817d9813fe3 h1:3LDcDJa6U5onHlQ6Z0fjEF5up/ZI4mlTRpx6zHjq9hg= -github.com/go-mysql-org/go-mysql v1.7.1-0.20231227030153-e817d9813fe3/go.mod h1:AGmdMIbBbcuUDRjzvHskI7+1SmPKhgKrqf3g8lknIc4= -github.com/go-mysql-org/go-mysql v1.8.0 h1:bN+/Q5yyQXQOAabXPkI3GZX43w4Tsj2DIthjC9i6CkQ= -github.com/go-mysql-org/go-mysql v1.8.0/go.mod h1:kwbF156Z9Sy8amP3E1SZp7/s/0PuJj/xKaOWToQiq0Y= github.com/go-sql-driver/mysql v1.7.1 h1:lUIinVbN1DY0xBg0eMOzmmtGoHwWBbvnWubQUrtU8EI= github.com/go-sql-driver/mysql v1.7.1/go.mod h1:OXbVy3sEdcQ2Doequ6Z5BW6fXNQTmx+9S1MCJN5yJMI= github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU= @@ -27,8 +23,8 @@ github.com/google/uuid v1.4.0 h1:MtMxsa51/r9yyhkyLsVeVt0B+BGQZzpQiTQ4eHZ8bc4= github.com/google/uuid v1.4.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/hexops/gotextdiff v1.0.3 h1:gitA9+qJrrTCsiCl7+kh75nPqQt1cx4ZkudSTLoUqJM= github.com/hexops/gotextdiff v1.0.3/go.mod h1:pSWU5MAI3yDq+fZBTazCSJysOMbxWL1BSow5/V2vxeg= -github.com/klauspost/compress v1.17.1 h1:NE3C767s2ak2bweCZo3+rdP4U/HoyVXLv/X9f2gPS5g= -github.com/klauspost/compress v1.17.1/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= +github.com/klauspost/compress v1.17.8 h1:YcnTYrq7MikUT7k0Yb5eceMmALQPYBW/Xltxn0NAMnU= +github.com/klauspost/compress v1.17.8/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= @@ -37,6 +33,8 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/morgo/go-mysql v0.0.0-20240724035801-a383815d9cd7 h1:m9hDoUFXOF4UM7B+wCjE/r9aqcGWI0xG6vW3tTQ/vFc= +github.com/morgo/go-mysql v0.0.0-20240724035801-a383815d9cd7/go.mod h1:+SgFgTlqjqOQoMc98n9oyUWEgn2KkOL1VmXDoq2ONOs= github.com/pingcap/errors v0.11.0/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8= github.com/pingcap/errors v0.11.4/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8= github.com/pingcap/errors v0.11.5-0.20221009092201-b66cddb77c32 h1:m5ZsBa5o/0CkzZXfXLaThzKuR85SnHHetqBCpzQ30h8= diff --git a/pkg/repl/client.go b/pkg/repl/client.go index 4bdea6dd..fa8dafdd 100644 --- a/pkg/repl/client.go +++ b/pkg/repl/client.go @@ -191,7 +191,6 @@ func (c *Client) OnRow(e *canal.RowsEvent) error { c.logger.Errorf("unknown action: %v", e.Action) } } - c.updatePosInMemory(e.Header.LogPos) return nil } @@ -212,12 +211,32 @@ func (c *Client) OnRotate(header *replication.EventHeader, rotateEvent *replicat return nil } +func (c *Client) OnPosSynced(header *replication.EventHeader, pos mysql.Position, _ mysql.GTIDSet, _ bool) error { + if header != nil { + c.updatePosInMemory(header.LogPos) + } + return nil +} + // OnTableChanged is called when a table is changed via DDL. // This is a failsafe because we don't expect DDL to be performed on the table while we are operating. +// We don't know anything about this change. It could be ANALYZE TABLE, which should +// soon send a notification: https://github.com/go-mysql-org/go-mysql/pull/900 func (c *Client) OnTableChanged(header *replication.EventHeader, schema string, table string) error { - if (c.table.SchemaName == schema && c.table.TableName == table) || - (c.newTable.SchemaName == schema && c.newTable.TableName == table) { - if c.TableChangeNotificationCallback != nil { + if c.TableChangeNotificationCallback == nil || schema != c.table.SchemaName { + return nil // can't callback. + } + // If it matches the existing table, or it matches the new table, + // check for modifications at the source. + if table == c.table.TableName { + modified, err := c.table.IsModified(context.TODO()) + if err != nil || modified { + c.TableChangeNotificationCallback() + } + } + if table == c.newTable.TableName { + modified, err := c.newTable.IsModified(context.TODO()) + if err != nil || modified { c.TableChangeNotificationCallback() } } @@ -405,8 +424,22 @@ func (c *Client) updatePosInMemory(pos uint32) { } // FlushUnderLock is a final flush under an exclusive lock using the connection -// that holds a write lock. +// that holds a write lock. Because flushing generates binary log events, +// we actually want to call flush *twice*: +// - The first time flushes the pending changes to the new table. +// - We then ensure that we have all the binary log changes read from the server. +// - The second time reads through the changes generated by the first flush +// and updates the in memory applied position to match the server's position. +// This is required to satisfy the binlog position is updated for the c.AllChangesFlushed() check. func (c *Client) FlushUnderLock(ctx context.Context, lock *dbconn.TableLock) error { + if err := c.flush(ctx, true, lock); err != nil { + return err + } + // Wait for the changes flushed to be received. + if err := c.BlockWait(ctx); err != nil { + return err + } + // Do a final flush return c.flush(ctx, true, lock) } @@ -711,9 +744,6 @@ func (c *Client) BlockWait(ctx context.Context) error { return err } for i := 100; ; i++ { - if err := c.injectBinlogNoise(ctx); err != nil { - return err - } canalPos := c.canal.SyncedPosition() if i%100 == 0 { // Print status every 100 loops = 10s @@ -727,17 +757,6 @@ func (c *Client) BlockWait(ctx context.Context) error { return nil } -// injectBinlogNoise is used to inject some noise into the binlog stream -// This helps ensure that we are "past" a binary log position if there is some off-by-one -// problem where the most recent canal event is not yet updating the canal SyncedPosition, -// and there are no current changes on the MySQL server to advance itself. -// Note: We can not update the table or the newTable, because this intentionally -// causes a panic (c.tableChanged() is called). -func (c *Client) injectBinlogNoise(ctx context.Context) error { - tblName := fmt.Sprintf("_%s_chkpnt", c.table.TableName) - return dbconn.Exec(ctx, c.db, "ALTER TABLE %n.%n AUTO_INCREMENT=0", c.table.SchemaName, tblName) -} - func (c *Client) keyHasChanged(key []interface{}, deleted bool) { c.Lock() defer c.Unlock() diff --git a/pkg/repl/client_test.go b/pkg/repl/client_test.go index b13613d0..b62692c2 100644 --- a/pkg/repl/client_test.go +++ b/pkg/repl/client_test.go @@ -359,3 +359,50 @@ func TestFeedback(t *testing.T) { } assert.Equal(t, int64(500), client.targetBatchSize) // less keys. } + +// TestBlockWait tests that the BlockWait function will: +// - check the server's binary log position +// - block waiting until the repl client is at that position. +func TestBlockWait(t *testing.T) { + db, err := dbconn.New(testutils.DSN(), dbconn.NewDBConfig()) + assert.NoError(t, err) + + testutils.RunSQL(t, "DROP TABLE IF EXISTS blockwaitt1, blockwaitt2") + testutils.RunSQL(t, "CREATE TABLE blockwaitt1 (a INT NOT NULL, b INT, c INT, PRIMARY KEY (a))") + testutils.RunSQL(t, "CREATE TABLE blockwaitt2 (a INT NOT NULL, b INT, c INT, PRIMARY KEY (a))") + + t1 := table.NewTableInfo(db, "test", "blockwaitt1") + assert.NoError(t, t1.SetInfo(context.TODO())) + t2 := table.NewTableInfo(db, "test", "blockwaitt2") + assert.NoError(t, t2.SetInfo(context.TODO())) + + logger := logrus.New() + cfg, err := mysql2.ParseDSN(testutils.DSN()) + assert.NoError(t, err) + client := NewClient(db, cfg.Addr, t1, t2, cfg.User, cfg.Passwd, &ClientConfig{ + Logger: logger, + Concurrency: 4, + TargetBatchTime: time.Second, + }) + assert.NoError(t, client.Run()) + defer client.Close() + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*2) // if it takes any longer block wait is failing. + defer cancel() + + assert.NoError(t, client.BlockWait(ctx)) + + // Insert into t1. + testutils.RunSQL(t, "INSERT INTO blockwaitt1 (a, b, c) VALUES (1, 2, 3)") + assert.NoError(t, client.Flush(ctx)) // apply the changes (not required, they only need to be received for block wait to unblock) + assert.NoError(t, client.BlockWait(ctx)) // should be quick still. + testutils.RunSQL(t, "INSERT INTO blockwaitt1 (a, b, c) VALUES (2, 2, 3)") // don't apply changes. + assert.NoError(t, client.BlockWait(ctx)) // should be quick because apply not required. + + testutils.RunSQL(t, "ANALYZE TABLE blockwaitt1") + testutils.RunSQL(t, "ANALYZE TABLE blockwaitt1") + testutils.RunSQL(t, "ANALYZE TABLE blockwaitt1") + testutils.RunSQL(t, "ANALYZE TABLE blockwaitt1") + testutils.RunSQL(t, "ANALYZE TABLE blockwaitt1") + assert.NoError(t, client.BlockWait(ctx)) // should be quick +} diff --git a/pkg/table/tableinfo.go b/pkg/table/tableinfo.go index 652f6946..6ba8ee77 100644 --- a/pkg/table/tableinfo.go +++ b/pkg/table/tableinfo.go @@ -7,6 +7,7 @@ import ( "database/sql" "errors" "fmt" + "reflect" "sync" "sync/atomic" "time" @@ -94,6 +95,44 @@ func (t *TableInfo) SetInfo(ctx context.Context) error { return t.setMinMax(ctx) } +// IsModified checks if the table has been modified since we ran SetInfo +func (t *TableInfo) IsModified(ctx context.Context) (bool, error) { + t.statisticsLock.Lock() + defer t.statisticsLock.Unlock() + + // Copy what we know about and cache it. + columns := t.Columns + columnTps := t.columnsMySQLTps + keyColumns := t.KeyColumns + indexes := t.Indexes + + // Refetch the values + if err := t.setColumns(ctx); err != nil { + return true, err + } + if err := t.setPrimaryKey(ctx); err != nil { + return true, err + } + if err := t.setIndexes(ctx); err != nil { + return true, err + } + // Compare the values. + if !reflect.DeepEqual(columns, t.Columns) { + return true, nil + } + if !reflect.DeepEqual(columnTps, t.columnsMySQLTps) { + return true, nil + } + if !reflect.DeepEqual(keyColumns, t.KeyColumns) { + return true, nil + } + if !reflect.DeepEqual(indexes, t.Indexes) { + return true, nil + } + // If we get here, nothing has changed. + return false, nil +} + // setRowEstimate is a separate function so it can be repeated continuously // Since if a schema migration takes 14 days, it could change. func (t *TableInfo) setRowEstimate(ctx context.Context) error { diff --git a/pkg/table/tableinfo_test.go b/pkg/table/tableinfo_test.go index 2b8c38b2..825c133c 100644 --- a/pkg/table/tableinfo_test.go +++ b/pkg/table/tableinfo_test.go @@ -324,3 +324,65 @@ func TestDiscoveryGeneratedCols(t *testing.T) { assert.Equal(t, []string{"id", "name", "b", "c1", "c2", "c3", "d"}, t1.Columns) assert.Equal(t, []string{"id", "name", "b", "d"}, t1.NonGeneratedColumns) } + +func TestTableIsModified(t *testing.T) { + db, err := sql.Open("mysql", testutils.DSN()) + assert.NoError(t, err) + defer db.Close() + + testutils.RunSQL(t, `DROP TABLE IF EXISTS modifiedt1`) + table := `CREATE TABLE modifiedt1 ( + id int(11) NOT NULL AUTO_INCREMENT, + name varchar(255) NOT NULL, + PRIMARY KEY (id) + )` + testutils.RunSQL(t, table) + + t1 := NewTableInfo(db, "test", "modifiedt1") + assert.NoError(t, t1.SetInfo(context.TODO())) + + modified, err := t1.IsModified(context.TODO()) + assert.NoError(t, err) + assert.False(t, modified) + + testutils.RunSQL(t, `ALTER TABLE modifiedt1 ADD COLUMN age INT`) + + // There's a new column. + modified, err = t1.IsModified(context.TODO()) + assert.NoError(t, err) + assert.True(t, modified) + + // Subsequent calls should return false. + modified, err = t1.IsModified(context.TODO()) + assert.NoError(t, err) + assert.False(t, modified) + + // Now add an index. + testutils.RunSQL(t, `ALTER TABLE modifiedt1 ADD INDEX idx_age (age)`) + modified, err = t1.IsModified(context.TODO()) + assert.NoError(t, err) + assert.True(t, modified) + + // Change a datatype. + testutils.RunSQL(t, `ALTER TABLE modifiedt1 MODIFY COLUMN age VARCHAR(255)`) + modified, err = t1.IsModified(context.TODO()) + assert.NoError(t, err) + assert.True(t, modified) + + // Drop an index + testutils.RunSQL(t, `ALTER TABLE modifiedt1 DROP INDEX idx_age`) + modified, err = t1.IsModified(context.TODO()) + assert.NoError(t, err) + assert.True(t, modified) + + // Drop a column + testutils.RunSQL(t, `ALTER TABLE modifiedt1 DROP COLUMN age`) + modified, err = t1.IsModified(context.TODO()) + assert.NoError(t, err) + assert.True(t, modified) + + // No changes if run again. + modified, err = t1.IsModified(context.TODO()) + assert.NoError(t, err) + assert.False(t, modified) +} From 13c4d7a925d4f04e693adda29a9e1a48a35f3351 Mon Sep 17 00:00:00 2001 From: Morgan Tocker Date: Wed, 24 Jul 2024 00:13:09 -0600 Subject: [PATCH 02/10] Remove data races --- pkg/table/tableinfo.go | 112 ++++++++++++++++++------------------ pkg/table/tableinfo_test.go | 54 ++++++----------- 2 files changed, 73 insertions(+), 93 deletions(-) diff --git a/pkg/table/tableinfo.go b/pkg/table/tableinfo.go index 6ba8ee77..101a4cf0 100644 --- a/pkg/table/tableinfo.go +++ b/pkg/table/tableinfo.go @@ -77,19 +77,21 @@ func (t *TableInfo) PrimaryKeyValues(row interface{}) ([]interface{}, error) { } // SetInfo reads from MySQL metadata (usually infoschema) and sets the values in TableInfo. +// To avoid data races, the functions return values which can then be set. func (t *TableInfo) SetInfo(ctx context.Context) error { t.statisticsLock.Lock() defer t.statisticsLock.Unlock() - if err := t.setRowEstimate(ctx); err != nil { + var err error + if err = t.setRowEstimate(ctx); err != nil { return err } - if err := t.setColumns(ctx); err != nil { + if t.Columns, t.NonGeneratedColumns, t.columnsMySQLTps, err = t.fetchColumns(ctx); err != nil { return err } - if err := t.setPrimaryKey(ctx); err != nil { + if t.KeyColumns, t.KeyIsAutoInc, t.keyColumnsMySQLTp, t.keyDatums, err = t.fetchPrimaryKey(ctx); err != nil { return err } - if err := t.setIndexes(ctx); err != nil { + if t.Indexes, err = t.fetchIndexes(ctx); err != nil { return err } return t.setMinMax(ctx) @@ -100,32 +102,32 @@ func (t *TableInfo) IsModified(ctx context.Context) (bool, error) { t.statisticsLock.Lock() defer t.statisticsLock.Unlock() - // Copy what we know about and cache it. - columns := t.Columns - columnTps := t.columnsMySQLTps - keyColumns := t.KeyColumns - indexes := t.Indexes - - // Refetch the values - if err := t.setColumns(ctx); err != nil { - return true, err - } - if err := t.setPrimaryKey(ctx); err != nil { - return true, err - } - if err := t.setIndexes(ctx); err != nil { + // Compare columns + columns, _, columnsMySQLTps, err := t.fetchColumns(ctx) + if err != nil { return true, err } - // Compare the values. if !reflect.DeepEqual(columns, t.Columns) { return true, nil } - if !reflect.DeepEqual(columnTps, t.columnsMySQLTps) { + if !reflect.DeepEqual(columnsMySQLTps, t.columnsMySQLTps) { return true, nil } + + // Compare key columns. + keyColumns, _, _, _, err := t.fetchPrimaryKey(ctx) + if err != nil { + return true, err + } if !reflect.DeepEqual(keyColumns, t.KeyColumns) { return true, nil } + + // Compare indexes. + indexes, err := t.fetchIndexes(ctx) + if err != nil { + return true, err + } if !reflect.DeepEqual(indexes, t.Indexes) { return true, nil } @@ -150,61 +152,59 @@ func (t *TableInfo) setRowEstimate(ctx context.Context) error { return nil } -func (t *TableInfo) setIndexes(ctx context.Context) error { +func (t *TableInfo) fetchIndexes(ctx context.Context) (indexes []string, err error) { rows, err := t.db.QueryContext(ctx, "SELECT DISTINCT INDEX_NAME FROM INFORMATION_SCHEMA.STATISTICS WHERE table_schema=? AND table_name=? AND index_name != 'PRIMARY'", t.SchemaName, t.TableName, ) if err != nil { - return err + return nil, err } defer rows.Close() - t.Indexes = []string{} for rows.Next() { var name string if err := rows.Scan(&name); err != nil { - return err + return nil, err } - t.Indexes = append(t.Indexes, name) + indexes = append(indexes, name) } if rows.Err() != nil { - return rows.Err() + return nil, rows.Err() } - return nil + return indexes, nil } -func (t *TableInfo) setColumns(ctx context.Context) error { +func (t *TableInfo) fetchColumns(ctx context.Context) (columns []string, nonGeneratedColumns []string, columnsMySQLTps map[string]string, err error) { rows, err := t.db.QueryContext(ctx, "SELECT column_name, column_type, GENERATION_EXPRESSION FROM information_schema.columns WHERE table_schema=? AND table_name=? ORDER BY ORDINAL_POSITION", t.SchemaName, t.TableName, ) if err != nil { - return err + return } defer rows.Close() - t.Columns = []string{} - t.NonGeneratedColumns = []string{} - t.columnsMySQLTps = make(map[string]string) + columnsMySQLTps = make(map[string]string) for rows.Next() { var col, tp, expression string - if err := rows.Scan(&col, &tp, &expression); err != nil { - return err + if err = rows.Scan(&col, &tp, &expression); err != nil { + return } - t.Columns = append(t.Columns, col) - t.columnsMySQLTps[col] = tp + columns = append(columns, col) + columnsMySQLTps[col] = tp if expression == "" { - t.NonGeneratedColumns = append(t.NonGeneratedColumns, col) + nonGeneratedColumns = append(nonGeneratedColumns, col) } } if rows.Err() != nil { - return rows.Err() + err = rows.Err() + return } - return nil + return } // DescIndex describes the columns in an index. func (t *TableInfo) DescIndex(keyName string) ([]string, error) { - cols := []string{} + var cols []string rows, err := t.db.Query("SELECT column_name FROM INFORMATION_SCHEMA.STATISTICS WHERE TABLE_SCHEMA=? AND TABLE_NAME=? AND index_name=? ORDER BY seq_in_index", t.SchemaName, t.TableName, @@ -227,47 +227,49 @@ func (t *TableInfo) DescIndex(keyName string) ([]string, error) { return cols, nil } -// setPrimaryKey sets the primary key and also the primary key type. +// fetchPrimaryKey sets the primary key and also the primary key type. // A primary key can contain multiple columns. -func (t *TableInfo) setPrimaryKey(ctx context.Context) error { +func (t *TableInfo) fetchPrimaryKey(ctx context.Context) (keyColumns []string, keyIsAutoInc bool, keyColumnsMySQLTp []string, keyDatums []datumTp, err error) { rows, err := t.db.QueryContext(ctx, "SELECT column_name FROM information_schema.key_column_usage WHERE table_schema=? and table_name=? and constraint_name='PRIMARY' ORDER BY ORDINAL_POSITION", t.SchemaName, t.TableName, ) if err != nil { - return err + return } defer rows.Close() - t.KeyColumns = []string{} for rows.Next() { var col string if err := rows.Scan(&col); err != nil { - return err + return nil, false, nil, nil, err } - t.KeyColumns = append(t.KeyColumns, col) + keyColumns = append(keyColumns, col) } if rows.Err() != nil { - return rows.Err() + err = rows.Err() + return } - if len(t.KeyColumns) == 0 { - return errors.New("no primary key found (not supported)") + if len(keyColumns) == 0 { + err = errors.New("no primary key found (not supported)") + return } - for i, col := range t.KeyColumns { + for i, col := range keyColumns { // Get primary key type and auto_inc info. query := "SELECT column_type, extra FROM information_schema.columns WHERE table_schema=? AND table_name=? and column_name=?" var extra, pkType string err = t.db.QueryRowContext(ctx, query, t.SchemaName, t.TableName, col).Scan(&pkType, &extra) if err != nil { - return err + return nil, false, nil, nil, err } pkType = removeWidth(pkType) - t.keyColumnsMySQLTp = append(t.keyColumnsMySQLTp, pkType) - t.keyDatums = append(t.keyDatums, mySQLTypeToDatumTp(pkType)) + keyColumnsMySQLTp = append(keyColumnsMySQLTp, pkType) + keyDatums = append(keyDatums, mySQLTypeToDatumTp(pkType)) if i == 0 { - t.KeyIsAutoInc = (extra == "auto_increment") + keyIsAutoInc = (extra == "auto_increment") } } - return nil + err = nil + return } // PrimaryKeyIsMemoryComparable checks that the PRIMARY KEY type is compatible. diff --git a/pkg/table/tableinfo_test.go b/pkg/table/tableinfo_test.go index 825c133c..f10657de 100644 --- a/pkg/table/tableinfo_test.go +++ b/pkg/table/tableinfo_test.go @@ -345,44 +345,22 @@ func TestTableIsModified(t *testing.T) { assert.NoError(t, err) assert.False(t, modified) - testutils.RunSQL(t, `ALTER TABLE modifiedt1 ADD COLUMN age INT`) - - // There's a new column. - modified, err = t1.IsModified(context.TODO()) - assert.NoError(t, err) - assert.True(t, modified) - - // Subsequent calls should return false. - modified, err = t1.IsModified(context.TODO()) - assert.NoError(t, err) - assert.False(t, modified) - - // Now add an index. - testutils.RunSQL(t, `ALTER TABLE modifiedt1 ADD INDEX idx_age (age)`) - modified, err = t1.IsModified(context.TODO()) - assert.NoError(t, err) - assert.True(t, modified) - - // Change a datatype. - testutils.RunSQL(t, `ALTER TABLE modifiedt1 MODIFY COLUMN age VARCHAR(255)`) - modified, err = t1.IsModified(context.TODO()) - assert.NoError(t, err) - assert.True(t, modified) - - // Drop an index - testutils.RunSQL(t, `ALTER TABLE modifiedt1 DROP INDEX idx_age`) - modified, err = t1.IsModified(context.TODO()) - assert.NoError(t, err) - assert.True(t, modified) + var stmts = []string{ + `ALTER TABLE modifiedt1 ADD COLUMN age INT`, + `ALTER TABLE modifiedt1 ADD INDEX idx_age (age)`, + `ALTER TABLE modifiedt1 MODIFY COLUMN age VARCHAR(255)`, + `ALTER TABLE modifiedt1 DROP INDEX idx_age`, + `ALTER TABLE modifiedt1 DROP COLUMN age`, + } - // Drop a column - testutils.RunSQL(t, `ALTER TABLE modifiedt1 DROP COLUMN age`) - modified, err = t1.IsModified(context.TODO()) - assert.NoError(t, err) - assert.True(t, modified) + for _, stmt := range stmts { + // reset tbl each time. + tbl := NewTableInfo(db, "test", "modifiedt1") + assert.NoError(t, tbl.SetInfo(context.TODO())) - // No changes if run again. - modified, err = t1.IsModified(context.TODO()) - assert.NoError(t, err) - assert.False(t, modified) + testutils.RunSQL(t, stmt) // run a statement that modifies the table. + modified, err = tbl.IsModified(context.TODO()) + assert.NoError(t, err) + assert.True(t, modified, "expected table to be modified after running %s", stmt) + } } From 4834099fea241ab89dd1e04a01c14a5e58648dde Mon Sep 17 00:00:00 2001 From: Morgan Tocker Date: Wed, 24 Jul 2024 00:17:07 -0600 Subject: [PATCH 03/10] Update golangci --- .golangci.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.golangci.yaml b/.golangci.yaml index 586f04e9..7fc0603c 100644 --- a/.golangci.yaml +++ b/.golangci.yaml @@ -7,6 +7,7 @@ output: linters: enable-all: true disable: + - gomoddirectives - tagalign - depguard - mnd @@ -32,7 +33,6 @@ linters: - wsl - funlen - gocognit - - gomnd - goprintffuncname - paralleltest - nlreturn From 73cdfc86b5690c64b635fff07be9cb67ea68dd63 Mon Sep 17 00:00:00 2001 From: Morgan Tocker Date: Wed, 24 Jul 2024 00:29:29 -0600 Subject: [PATCH 04/10] fix linter --- .golangci.yaml | 1 + pkg/table/tableinfo.go | 12 ++++++------ 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/.golangci.yaml b/.golangci.yaml index 7fc0603c..0f9c00fd 100644 --- a/.golangci.yaml +++ b/.golangci.yaml @@ -33,6 +33,7 @@ linters: - wsl - funlen - gocognit + - gomnd - goprintffuncname - paralleltest - nlreturn diff --git a/pkg/table/tableinfo.go b/pkg/table/tableinfo.go index 101a4cf0..5b1336df 100644 --- a/pkg/table/tableinfo.go +++ b/pkg/table/tableinfo.go @@ -115,7 +115,7 @@ func (t *TableInfo) IsModified(ctx context.Context) (bool, error) { } // Compare key columns. - keyColumns, _, _, _, err := t.fetchPrimaryKey(ctx) + keyColumns, _, _, _, err := t.fetchPrimaryKey(ctx) //nolint: dogsled if err != nil { return true, err } @@ -235,7 +235,7 @@ func (t *TableInfo) fetchPrimaryKey(ctx context.Context) (keyColumns []string, k t.TableName, ) if err != nil { - return + return //nolint: nakedret } defer rows.Close() for rows.Next() { @@ -247,11 +247,11 @@ func (t *TableInfo) fetchPrimaryKey(ctx context.Context) (keyColumns []string, k } if rows.Err() != nil { err = rows.Err() - return + return //nolint: nakedret } if len(keyColumns) == 0 { err = errors.New("no primary key found (not supported)") - return + return //nolint: nakedret } for i, col := range keyColumns { // Get primary key type and auto_inc info. @@ -259,7 +259,7 @@ func (t *TableInfo) fetchPrimaryKey(ctx context.Context) (keyColumns []string, k var extra, pkType string err = t.db.QueryRowContext(ctx, query, t.SchemaName, t.TableName, col).Scan(&pkType, &extra) if err != nil { - return nil, false, nil, nil, err + return //nolint: nakedret } pkType = removeWidth(pkType) keyColumnsMySQLTp = append(keyColumnsMySQLTp, pkType) @@ -269,7 +269,7 @@ func (t *TableInfo) fetchPrimaryKey(ctx context.Context) (keyColumns []string, k } } err = nil - return + return //nolint: nakedret } // PrimaryKeyIsMemoryComparable checks that the PRIMARY KEY type is compatible. From e6d696ce9fddbd5e59927324dfd95e861baf9b2c Mon Sep 17 00:00:00 2001 From: Morgan Tocker Date: Wed, 24 Jul 2024 06:33:57 -0600 Subject: [PATCH 05/10] use different canal fix --- go.mod | 2 +- go.sum | 4 +- pkg/repl/client.go | 38 ++---------- pkg/table/tableinfo.go | 119 ++++++++++++------------------------ pkg/table/tableinfo_test.go | 40 ------------ 5 files changed, 48 insertions(+), 155 deletions(-) diff --git a/go.mod b/go.mod index 2cc41785..b18f8268 100644 --- a/go.mod +++ b/go.mod @@ -41,4 +41,4 @@ require ( gopkg.in/yaml.v3 v3.0.1 // indirect ) -replace github.com/go-mysql-org/go-mysql => github.com/morgo/go-mysql v0.0.0-20240724035801-a383815d9cd7 +replace github.com/go-mysql-org/go-mysql => github.com/morgo/go-mysql v0.0.0-20240724121709-5edecf639700 diff --git a/go.sum b/go.sum index a7e69309..274d450b 100644 --- a/go.sum +++ b/go.sum @@ -33,8 +33,8 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= -github.com/morgo/go-mysql v0.0.0-20240724035801-a383815d9cd7 h1:m9hDoUFXOF4UM7B+wCjE/r9aqcGWI0xG6vW3tTQ/vFc= -github.com/morgo/go-mysql v0.0.0-20240724035801-a383815d9cd7/go.mod h1:+SgFgTlqjqOQoMc98n9oyUWEgn2KkOL1VmXDoq2ONOs= +github.com/morgo/go-mysql v0.0.0-20240724121709-5edecf639700 h1:yJkv9NE1jRXioM61N/kMkjyAPsGtVYFA5poPYvFMsno= +github.com/morgo/go-mysql v0.0.0-20240724121709-5edecf639700/go.mod h1:+SgFgTlqjqOQoMc98n9oyUWEgn2KkOL1VmXDoq2ONOs= github.com/pingcap/errors v0.11.0/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8= github.com/pingcap/errors v0.11.4/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8= github.com/pingcap/errors v0.11.5-0.20221009092201-b66cddb77c32 h1:m5ZsBa5o/0CkzZXfXLaThzKuR85SnHHetqBCpzQ30h8= diff --git a/pkg/repl/client.go b/pkg/repl/client.go index fa8dafdd..985b60b2 100644 --- a/pkg/repl/client.go +++ b/pkg/repl/client.go @@ -41,6 +41,8 @@ const ( // Longer values require more memory, but permit more merging. // I expect we will change this to 1hr-24hr in the future. DefaultFlushInterval = 30 * time.Second + // DefaultTimeout is how long BlockWait is supposed to wait before returning errors. + DefaultTimeout = 10 * time.Second ) type queuedChange struct { @@ -220,23 +222,10 @@ func (c *Client) OnPosSynced(header *replication.EventHeader, pos mysql.Position // OnTableChanged is called when a table is changed via DDL. // This is a failsafe because we don't expect DDL to be performed on the table while we are operating. -// We don't know anything about this change. It could be ANALYZE TABLE, which should -// soon send a notification: https://github.com/go-mysql-org/go-mysql/pull/900 func (c *Client) OnTableChanged(header *replication.EventHeader, schema string, table string) error { - if c.TableChangeNotificationCallback == nil || schema != c.table.SchemaName { - return nil // can't callback. - } - // If it matches the existing table, or it matches the new table, - // check for modifications at the source. - if table == c.table.TableName { - modified, err := c.table.IsModified(context.TODO()) - if err != nil || modified { - c.TableChangeNotificationCallback() - } - } - if table == c.newTable.TableName { - modified, err := c.newTable.IsModified(context.TODO()) - if err != nil || modified { + if (c.table.SchemaName == schema && c.table.TableName == table) || + (c.newTable.SchemaName == schema && c.newTable.TableName == table) { + if c.TableChangeNotificationCallback != nil { c.TableChangeNotificationCallback() } } @@ -739,22 +728,7 @@ func (c *Client) StartPeriodicFlush(ctx context.Context, interval time.Duration) // **Caveat** Unless you are calling this from Flush(), calling this DOES NOT ensure that // changes have been applied to the database. func (c *Client) BlockWait(ctx context.Context) error { - targetPos, err := c.canal.GetMasterPos() // what the server is at. - if err != nil { - return err - } - for i := 100; ; i++ { - canalPos := c.canal.SyncedPosition() - if i%100 == 0 { - // Print status every 100 loops = 10s - c.logger.Infof("blocking until we have read all binary logs: current-pos=%s target-pos=%s", canalPos, targetPos) - } - if canalPos.Compare(targetPos) >= 0 { - break - } - time.Sleep(100 * time.Millisecond) - } - return nil + return c.canal.CatchMasterPos(DefaultTimeout) } func (c *Client) keyHasChanged(key []interface{}, deleted bool) { diff --git a/pkg/table/tableinfo.go b/pkg/table/tableinfo.go index 5b1336df..652f6946 100644 --- a/pkg/table/tableinfo.go +++ b/pkg/table/tableinfo.go @@ -7,7 +7,6 @@ import ( "database/sql" "errors" "fmt" - "reflect" "sync" "sync/atomic" "time" @@ -77,64 +76,24 @@ func (t *TableInfo) PrimaryKeyValues(row interface{}) ([]interface{}, error) { } // SetInfo reads from MySQL metadata (usually infoschema) and sets the values in TableInfo. -// To avoid data races, the functions return values which can then be set. func (t *TableInfo) SetInfo(ctx context.Context) error { t.statisticsLock.Lock() defer t.statisticsLock.Unlock() - var err error - if err = t.setRowEstimate(ctx); err != nil { + if err := t.setRowEstimate(ctx); err != nil { return err } - if t.Columns, t.NonGeneratedColumns, t.columnsMySQLTps, err = t.fetchColumns(ctx); err != nil { + if err := t.setColumns(ctx); err != nil { return err } - if t.KeyColumns, t.KeyIsAutoInc, t.keyColumnsMySQLTp, t.keyDatums, err = t.fetchPrimaryKey(ctx); err != nil { + if err := t.setPrimaryKey(ctx); err != nil { return err } - if t.Indexes, err = t.fetchIndexes(ctx); err != nil { + if err := t.setIndexes(ctx); err != nil { return err } return t.setMinMax(ctx) } -// IsModified checks if the table has been modified since we ran SetInfo -func (t *TableInfo) IsModified(ctx context.Context) (bool, error) { - t.statisticsLock.Lock() - defer t.statisticsLock.Unlock() - - // Compare columns - columns, _, columnsMySQLTps, err := t.fetchColumns(ctx) - if err != nil { - return true, err - } - if !reflect.DeepEqual(columns, t.Columns) { - return true, nil - } - if !reflect.DeepEqual(columnsMySQLTps, t.columnsMySQLTps) { - return true, nil - } - - // Compare key columns. - keyColumns, _, _, _, err := t.fetchPrimaryKey(ctx) //nolint: dogsled - if err != nil { - return true, err - } - if !reflect.DeepEqual(keyColumns, t.KeyColumns) { - return true, nil - } - - // Compare indexes. - indexes, err := t.fetchIndexes(ctx) - if err != nil { - return true, err - } - if !reflect.DeepEqual(indexes, t.Indexes) { - return true, nil - } - // If we get here, nothing has changed. - return false, nil -} - // setRowEstimate is a separate function so it can be repeated continuously // Since if a schema migration takes 14 days, it could change. func (t *TableInfo) setRowEstimate(ctx context.Context) error { @@ -152,59 +111,61 @@ func (t *TableInfo) setRowEstimate(ctx context.Context) error { return nil } -func (t *TableInfo) fetchIndexes(ctx context.Context) (indexes []string, err error) { +func (t *TableInfo) setIndexes(ctx context.Context) error { rows, err := t.db.QueryContext(ctx, "SELECT DISTINCT INDEX_NAME FROM INFORMATION_SCHEMA.STATISTICS WHERE table_schema=? AND table_name=? AND index_name != 'PRIMARY'", t.SchemaName, t.TableName, ) if err != nil { - return nil, err + return err } defer rows.Close() + t.Indexes = []string{} for rows.Next() { var name string if err := rows.Scan(&name); err != nil { - return nil, err + return err } - indexes = append(indexes, name) + t.Indexes = append(t.Indexes, name) } if rows.Err() != nil { - return nil, rows.Err() + return rows.Err() } - return indexes, nil + return nil } -func (t *TableInfo) fetchColumns(ctx context.Context) (columns []string, nonGeneratedColumns []string, columnsMySQLTps map[string]string, err error) { +func (t *TableInfo) setColumns(ctx context.Context) error { rows, err := t.db.QueryContext(ctx, "SELECT column_name, column_type, GENERATION_EXPRESSION FROM information_schema.columns WHERE table_schema=? AND table_name=? ORDER BY ORDINAL_POSITION", t.SchemaName, t.TableName, ) if err != nil { - return + return err } defer rows.Close() - columnsMySQLTps = make(map[string]string) + t.Columns = []string{} + t.NonGeneratedColumns = []string{} + t.columnsMySQLTps = make(map[string]string) for rows.Next() { var col, tp, expression string - if err = rows.Scan(&col, &tp, &expression); err != nil { - return + if err := rows.Scan(&col, &tp, &expression); err != nil { + return err } - columns = append(columns, col) - columnsMySQLTps[col] = tp + t.Columns = append(t.Columns, col) + t.columnsMySQLTps[col] = tp if expression == "" { - nonGeneratedColumns = append(nonGeneratedColumns, col) + t.NonGeneratedColumns = append(t.NonGeneratedColumns, col) } } if rows.Err() != nil { - err = rows.Err() - return + return rows.Err() } - return + return nil } // DescIndex describes the columns in an index. func (t *TableInfo) DescIndex(keyName string) ([]string, error) { - var cols []string + cols := []string{} rows, err := t.db.Query("SELECT column_name FROM INFORMATION_SCHEMA.STATISTICS WHERE TABLE_SCHEMA=? AND TABLE_NAME=? AND index_name=? ORDER BY seq_in_index", t.SchemaName, t.TableName, @@ -227,49 +188,47 @@ func (t *TableInfo) DescIndex(keyName string) ([]string, error) { return cols, nil } -// fetchPrimaryKey sets the primary key and also the primary key type. +// setPrimaryKey sets the primary key and also the primary key type. // A primary key can contain multiple columns. -func (t *TableInfo) fetchPrimaryKey(ctx context.Context) (keyColumns []string, keyIsAutoInc bool, keyColumnsMySQLTp []string, keyDatums []datumTp, err error) { +func (t *TableInfo) setPrimaryKey(ctx context.Context) error { rows, err := t.db.QueryContext(ctx, "SELECT column_name FROM information_schema.key_column_usage WHERE table_schema=? and table_name=? and constraint_name='PRIMARY' ORDER BY ORDINAL_POSITION", t.SchemaName, t.TableName, ) if err != nil { - return //nolint: nakedret + return err } defer rows.Close() + t.KeyColumns = []string{} for rows.Next() { var col string if err := rows.Scan(&col); err != nil { - return nil, false, nil, nil, err + return err } - keyColumns = append(keyColumns, col) + t.KeyColumns = append(t.KeyColumns, col) } if rows.Err() != nil { - err = rows.Err() - return //nolint: nakedret + return rows.Err() } - if len(keyColumns) == 0 { - err = errors.New("no primary key found (not supported)") - return //nolint: nakedret + if len(t.KeyColumns) == 0 { + return errors.New("no primary key found (not supported)") } - for i, col := range keyColumns { + for i, col := range t.KeyColumns { // Get primary key type and auto_inc info. query := "SELECT column_type, extra FROM information_schema.columns WHERE table_schema=? AND table_name=? and column_name=?" var extra, pkType string err = t.db.QueryRowContext(ctx, query, t.SchemaName, t.TableName, col).Scan(&pkType, &extra) if err != nil { - return //nolint: nakedret + return err } pkType = removeWidth(pkType) - keyColumnsMySQLTp = append(keyColumnsMySQLTp, pkType) - keyDatums = append(keyDatums, mySQLTypeToDatumTp(pkType)) + t.keyColumnsMySQLTp = append(t.keyColumnsMySQLTp, pkType) + t.keyDatums = append(t.keyDatums, mySQLTypeToDatumTp(pkType)) if i == 0 { - keyIsAutoInc = (extra == "auto_increment") + t.KeyIsAutoInc = (extra == "auto_increment") } } - err = nil - return //nolint: nakedret + return nil } // PrimaryKeyIsMemoryComparable checks that the PRIMARY KEY type is compatible. diff --git a/pkg/table/tableinfo_test.go b/pkg/table/tableinfo_test.go index f10657de..2b8c38b2 100644 --- a/pkg/table/tableinfo_test.go +++ b/pkg/table/tableinfo_test.go @@ -324,43 +324,3 @@ func TestDiscoveryGeneratedCols(t *testing.T) { assert.Equal(t, []string{"id", "name", "b", "c1", "c2", "c3", "d"}, t1.Columns) assert.Equal(t, []string{"id", "name", "b", "d"}, t1.NonGeneratedColumns) } - -func TestTableIsModified(t *testing.T) { - db, err := sql.Open("mysql", testutils.DSN()) - assert.NoError(t, err) - defer db.Close() - - testutils.RunSQL(t, `DROP TABLE IF EXISTS modifiedt1`) - table := `CREATE TABLE modifiedt1 ( - id int(11) NOT NULL AUTO_INCREMENT, - name varchar(255) NOT NULL, - PRIMARY KEY (id) - )` - testutils.RunSQL(t, table) - - t1 := NewTableInfo(db, "test", "modifiedt1") - assert.NoError(t, t1.SetInfo(context.TODO())) - - modified, err := t1.IsModified(context.TODO()) - assert.NoError(t, err) - assert.False(t, modified) - - var stmts = []string{ - `ALTER TABLE modifiedt1 ADD COLUMN age INT`, - `ALTER TABLE modifiedt1 ADD INDEX idx_age (age)`, - `ALTER TABLE modifiedt1 MODIFY COLUMN age VARCHAR(255)`, - `ALTER TABLE modifiedt1 DROP INDEX idx_age`, - `ALTER TABLE modifiedt1 DROP COLUMN age`, - } - - for _, stmt := range stmts { - // reset tbl each time. - tbl := NewTableInfo(db, "test", "modifiedt1") - assert.NoError(t, tbl.SetInfo(context.TODO())) - - testutils.RunSQL(t, stmt) // run a statement that modifies the table. - modified, err = tbl.IsModified(context.TODO()) - assert.NoError(t, err) - assert.True(t, modified, "expected table to be modified after running %s", stmt) - } -} From 255d0899a8c40e5a3144132ab35b19cb1025dd25 Mon Sep 17 00:00:00 2001 From: Morgan Tocker Date: Wed, 24 Jul 2024 06:53:13 -0600 Subject: [PATCH 06/10] use c.canal.SyncedPosition() --- pkg/repl/client.go | 36 +++--------------------------------- 1 file changed, 3 insertions(+), 33 deletions(-) diff --git a/pkg/repl/client.go b/pkg/repl/client.go index 985b60b2..1ddb76f6 100644 --- a/pkg/repl/client.go +++ b/pkg/repl/client.go @@ -75,8 +75,6 @@ type Client struct { binlogChangeset map[string]bool // bool is deleted binlogChangesetDelta int64 // a special "fix" for keys that have been popped off, use atomic get/set binlogPosSynced mysql.Position // safely written to new table - binlogPosInMemory mysql.Position // available in the binlog binlogChangeset - lastLogFileName string // last log file name we've seen in a rotation event queuedChanges []queuedChange // used when disableDeltaMap is true @@ -204,22 +202,6 @@ func (c *Client) KeyAboveWatermarkEnabled() bool { return c.enableKeyAboveWatermark && c.KeyAboveCopierCallback != nil } -// OnRotate is called when a rotate event is discovered via replication. -// We use this to capture the log file name, since only the position is caught on the row event. -func (c *Client) OnRotate(header *replication.EventHeader, rotateEvent *replication.RotateEvent) error { - c.Lock() - defer c.Unlock() - c.lastLogFileName = string(rotateEvent.NextLogName) - return nil -} - -func (c *Client) OnPosSynced(header *replication.EventHeader, pos mysql.Position, _ mysql.GTIDSet, _ bool) error { - if header != nil { - c.updatePosInMemory(header.LogPos) - } - return nil -} - // OnTableChanged is called when a table is changed via DDL. // This is a failsafe because we don't expect DDL to be performed on the table while we are operating. func (c *Client) OnTableChanged(header *replication.EventHeader, schema string, table string) error { @@ -252,7 +234,7 @@ func (c *Client) AllChangesFlushed() bool { } c.Lock() defer c.Unlock() - return c.binlogPosInMemory.Compare(c.binlogPosSynced) == 0 + return c.canal.SyncedPosition().Compare(c.binlogPosSynced) == 0 } func (c *Client) GetBinlogApplyPosition() mysql.Position { @@ -343,9 +325,6 @@ func (c *Client) Run() (err error) { return errors.New("binlog position is impossible, the source may have already purged it") } - c.binlogPosInMemory = c.binlogPosSynced - c.lastLogFileName = c.binlogPosInMemory.Name - // Call start canal as a go routine. go c.startCanal() return nil @@ -403,15 +382,6 @@ func (c *Client) Close() { } } -func (c *Client) updatePosInMemory(pos uint32) { - c.Lock() - defer c.Unlock() - c.binlogPosInMemory = mysql.Position{ - Name: c.lastLogFileName, - Pos: pos, - } -} - // FlushUnderLock is a final flush under an exclusive lock using the connection // that holds a write lock. Because flushing generates binary log events, // we actually want to call flush *twice*: @@ -451,7 +421,7 @@ func (c *Client) flushQueue(ctx context.Context, underLock bool, lock *dbconn.Ta c.Lock() changesToFlush := c.queuedChanges c.queuedChanges = nil // reset - posOfFlush := c.binlogPosInMemory + posOfFlush := c.canal.SyncedPosition() c.Unlock() // Early return if there is nothing to flush. @@ -508,7 +478,7 @@ func (c *Client) flushQueue(ctx context.Context, underLock bool, lock *dbconn.Ta func (c *Client) flushMap(ctx context.Context, underLock bool, lock *dbconn.TableLock) error { c.Lock() setToFlush := c.binlogChangeset - posOfFlush := c.binlogPosInMemory // copy the value, not the pointer + posOfFlush := c.canal.SyncedPosition() // copy the value, not the pointer c.binlogChangeset = make(map[string]bool) // set new value c.Unlock() // unlock immediately so others can write to the changeset // The changeset delta is because the status output is based on len(binlogChangeset) From b6e73990219b84ffa775b6ba91606e68077d7c96 Mon Sep 17 00:00:00 2001 From: Morgan Tocker Date: Wed, 24 Jul 2024 07:34:50 -0600 Subject: [PATCH 07/10] fix failed to extract PK error --- go.mod | 2 +- go.sum | 4 ++-- pkg/migration/cutover_test.go | 20 ++++++++++++++++++-- pkg/migration/migration_test.go | 2 +- pkg/repl/client.go | 6 ++++-- 5 files changed, 26 insertions(+), 8 deletions(-) diff --git a/go.mod b/go.mod index b18f8268..b72d1052 100644 --- a/go.mod +++ b/go.mod @@ -41,4 +41,4 @@ require ( gopkg.in/yaml.v3 v3.0.1 // indirect ) -replace github.com/go-mysql-org/go-mysql => github.com/morgo/go-mysql v0.0.0-20240724121709-5edecf639700 +replace github.com/go-mysql-org/go-mysql => github.com/morgo/go-mysql v0.0.0-20240724140746-ec163036a879 diff --git a/go.sum b/go.sum index 274d450b..9be4b90c 100644 --- a/go.sum +++ b/go.sum @@ -33,8 +33,8 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= -github.com/morgo/go-mysql v0.0.0-20240724121709-5edecf639700 h1:yJkv9NE1jRXioM61N/kMkjyAPsGtVYFA5poPYvFMsno= -github.com/morgo/go-mysql v0.0.0-20240724121709-5edecf639700/go.mod h1:+SgFgTlqjqOQoMc98n9oyUWEgn2KkOL1VmXDoq2ONOs= +github.com/morgo/go-mysql v0.0.0-20240724140746-ec163036a879 h1:pt5G19+IScbrahK+c2aP4P76qB12hWLLpE/j3r10opw= +github.com/morgo/go-mysql v0.0.0-20240724140746-ec163036a879/go.mod h1:+SgFgTlqjqOQoMc98n9oyUWEgn2KkOL1VmXDoq2ONOs= github.com/pingcap/errors v0.11.0/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8= github.com/pingcap/errors v0.11.4/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8= github.com/pingcap/errors v0.11.5-0.20221009092201-b66cddb77c32 h1:m5ZsBa5o/0CkzZXfXLaThzKuR85SnHHetqBCpzQ30h8= diff --git a/pkg/migration/cutover_test.go b/pkg/migration/cutover_test.go index e7f9095a..1de90d9a 100644 --- a/pkg/migration/cutover_test.go +++ b/pkg/migration/cutover_test.go @@ -40,6 +40,7 @@ func TestCutOver(t *testing.T) { assert.Equal(t, 0, db.Stats().InUse) // no connections in use. t1 := table.NewTableInfo(db, "test", "cutovert1") + assert.NoError(t, t1.SetInfo(context.Background())) // required to extract PK. t1new := table.NewTableInfo(db, "test", "_cutovert1_new") t1old := "_cutovert1_old" logger := logrus.New() @@ -100,6 +101,7 @@ func TestMDLLockFails(t *testing.T) { assert.NoError(t, err) t1 := table.NewTableInfo(db, "test", "mdllocks") + assert.NoError(t, t1.SetInfo(context.Background())) // required to extract PK. t1new := table.NewTableInfo(db, "test", "_mdllocks_new") t1old := "test_old" logger := logrus.New() @@ -136,11 +138,25 @@ func TestInvalidOptions(t *testing.T) { assert.NoError(t, err) logger := logrus.New() + testutils.RunSQL(t, `DROP TABLE IF EXISTS invalid_t1, _invalid_t1_new`) + tbl := `CREATE TABLE invalid_t1 ( + id int(11) NOT NULL AUTO_INCREMENT, + name varchar(255) NOT NULL, + PRIMARY KEY (id) + )` + testutils.RunSQL(t, tbl) + tbl = `CREATE TABLE _invalid_t1_new ( + id int(11) NOT NULL AUTO_INCREMENT, + name varchar(255) NOT NULL, + PRIMARY KEY (id) + )` + testutils.RunSQL(t, tbl) // Invalid options _, err = NewCutOver(db, nil, nil, "", nil, dbconn.NewDBConfig(), logger) assert.Error(t, err) - t1 := table.NewTableInfo(db, "test", "t1") - t1new := table.NewTableInfo(db, "test", "t1_new") + t1 := table.NewTableInfo(db, "test", "invalid_t1") + assert.NoError(t, t1.SetInfo(context.Background())) // required to extract PK. + t1new := table.NewTableInfo(db, "test", "_invalid_t1_new") t1old := "test_old" cfg, err := mysql.ParseDSN(testutils.DSN()) assert.NoError(t, err) diff --git a/pkg/migration/migration_test.go b/pkg/migration/migration_test.go index 400ab0a5..7cb3b29c 100644 --- a/pkg/migration/migration_test.go +++ b/pkg/migration/migration_test.go @@ -181,7 +181,7 @@ func TestRenameInMySQL80(t *testing.T) { migration.Database = cfg.DBName migration.Threads = 16 migration.Checksum = true - migration.Table = "t1" + migration.Table = "renamet1" migration.Alter = "CHANGE name nameNew varchar(255) not null" err = migration.Run() diff --git a/pkg/repl/client.go b/pkg/repl/client.go index 1ddb76f6..408d037e 100644 --- a/pkg/repl/client.go +++ b/pkg/repl/client.go @@ -293,7 +293,8 @@ func (c *Client) Run() (err error) { cfg.Password = c.password cfg.Logger = NewLogWrapper(c.logger) // wrapper to filter the noise. cfg.IncludeTableRegex = []string{fmt.Sprintf("^%s\\.%s$", c.table.SchemaName, c.table.TableName)} - cfg.Dump.ExecutionPath = "" // skip dump + cfg.Dump.ExecutionPath = "" // skip dump + cfg.DisableFlushBinlogWhileWaiting = true // can't guarantee privileges exist. if dbconn.IsRDSHost(cfg.Addr) { // create a new TLSConfig for RDS // It needs to be a copy because sharing a global pointer @@ -368,7 +369,8 @@ func (c *Client) startCanal() { // but since canal is now closed we can safely return return } - c.logger.Errorf("canal has failed. error: %v", err) + + c.logger.Errorf("canal has failed. error: %v, table: %s", err, c.table.TableName) panic("canal has failed") } } From f58eec0a233253400a2f4f0202702d98cfcbaeba Mon Sep 17 00:00:00 2001 From: Morgan Tocker Date: Wed, 24 Jul 2024 08:34:25 -0600 Subject: [PATCH 08/10] Update AllChangesFlushed --- pkg/repl/client.go | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/pkg/repl/client.go b/pkg/repl/client.go index 408d037e..78ca439b 100644 --- a/pkg/repl/client.go +++ b/pkg/repl/client.go @@ -229,12 +229,19 @@ func (c *Client) SetPos(pos mysql.Position) { } func (c *Client) AllChangesFlushed() bool { - if c.GetDeltaLen() > 0 { - return false - } + deltaLen := c.GetDeltaLen() c.Lock() defer c.Unlock() - return c.canal.SyncedPosition().Compare(c.binlogPosSynced) == 0 + + // We check if the position canal is up to is the same position + // as what we've made changes for. If this is zero it's a good + // indicator that we are up to date. However, because the + // "server lock" is not a global lock, it's possible that the synced + // position could still advance. + if c.canal.SyncedPosition().Compare(c.binlogPosSynced) != 0 { + c.logger.Warnf("Binlog reader info canal-position=%v synced-position=%v. Discrepancies could be due to modifications on other tables.", c.canal.SyncedPosition(), c.binlogPosSynced) + } + return deltaLen == 0 } func (c *Client) GetBinlogApplyPosition() mysql.Position { From 040a4f3cac9124616e33348cc0664963c6973731 Mon Sep 17 00:00:00 2001 From: Morgan Tocker Date: Sun, 28 Jul 2024 09:40:30 -0600 Subject: [PATCH 09/10] Update canal dependency --- .golangci.yaml | 1 - go.mod | 4 +--- go.sum | 4 ++-- 3 files changed, 3 insertions(+), 6 deletions(-) diff --git a/.golangci.yaml b/.golangci.yaml index 0f9c00fd..586f04e9 100644 --- a/.golangci.yaml +++ b/.golangci.yaml @@ -7,7 +7,6 @@ output: linters: enable-all: true disable: - - gomoddirectives - tagalign - depguard - mnd diff --git a/go.mod b/go.mod index b72d1052..c0c6fa51 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ go 1.22 require ( github.com/alecthomas/kong v0.7.1 - github.com/go-mysql-org/go-mysql v1.8.0 + github.com/go-mysql-org/go-mysql v1.8.1-0.20240728143959-24fbb5be92c3 github.com/go-sql-driver/mysql v1.7.1 github.com/pingcap/errors v0.11.5-0.20221009092201-b66cddb77c32 github.com/pingcap/tidb/pkg/parser v0.0.0-20231103042308-035ad5ccbe67 @@ -40,5 +40,3 @@ require ( gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) - -replace github.com/go-mysql-org/go-mysql => github.com/morgo/go-mysql v0.0.0-20240724140746-ec163036a879 diff --git a/go.sum b/go.sum index 9be4b90c..2ff7b5d8 100644 --- a/go.sum +++ b/go.sum @@ -15,6 +15,8 @@ github.com/cznic/mathutil v0.0.0-20181122101859-297441e03548/go.mod h1:e6NPNENfs github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/go-mysql-org/go-mysql v1.8.1-0.20240728143959-24fbb5be92c3 h1:IaBlE/5xoN3ETAvK8bPYTnzU4fWq9Fd3Si5uaAP/qGA= +github.com/go-mysql-org/go-mysql v1.8.1-0.20240728143959-24fbb5be92c3/go.mod h1:+SgFgTlqjqOQoMc98n9oyUWEgn2KkOL1VmXDoq2ONOs= github.com/go-sql-driver/mysql v1.7.1 h1:lUIinVbN1DY0xBg0eMOzmmtGoHwWBbvnWubQUrtU8EI= github.com/go-sql-driver/mysql v1.7.1/go.mod h1:OXbVy3sEdcQ2Doequ6Z5BW6fXNQTmx+9S1MCJN5yJMI= github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU= @@ -33,8 +35,6 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= -github.com/morgo/go-mysql v0.0.0-20240724140746-ec163036a879 h1:pt5G19+IScbrahK+c2aP4P76qB12hWLLpE/j3r10opw= -github.com/morgo/go-mysql v0.0.0-20240724140746-ec163036a879/go.mod h1:+SgFgTlqjqOQoMc98n9oyUWEgn2KkOL1VmXDoq2ONOs= github.com/pingcap/errors v0.11.0/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8= github.com/pingcap/errors v0.11.4/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8= github.com/pingcap/errors v0.11.5-0.20221009092201-b66cddb77c32 h1:m5ZsBa5o/0CkzZXfXLaThzKuR85SnHHetqBCpzQ30h8= From f201252067f9e156cc62fc512c83efa7a92383da Mon Sep 17 00:00:00 2001 From: Morgan Tocker Date: Sun, 28 Jul 2024 11:05:41 -0600 Subject: [PATCH 10/10] rename serverLock to tableLock to not confuse semantics --- pkg/checksum/checker.go | 6 +++--- pkg/migration/cutover.go | 11 +++++------ pkg/repl/client.go | 4 ++-- 3 files changed, 10 insertions(+), 11 deletions(-) diff --git a/pkg/checksum/checker.go b/pkg/checksum/checker.go index 73589109..60c0596e 100644 --- a/pkg/checksum/checker.go +++ b/pkg/checksum/checker.go @@ -332,15 +332,15 @@ func (c *Checker) initConnPool(ctx context.Context) error { // Lock the source table in a trx // so the connection is not used by others c.logger.Info("starting checksum operation, this will require a table lock") - serverLock, err := dbconn.NewTableLock(ctx, c.db, c.table, c.dbConfig, c.logger) + tableLock, err := dbconn.NewTableLock(ctx, c.db, c.table, c.dbConfig, c.logger) if err != nil { return err } - defer serverLock.Close() + defer tableLock.Close() // With the lock held, flush one more time under the lock tables. // Because we know canal is up to date this now guarantees // we have everything in the new table. - if err := c.feed.FlushUnderLock(ctx, serverLock); err != nil { + if err := c.feed.FlushUnderTableLock(ctx, tableLock); err != nil { return err } // Assert that the change set is empty. This should always diff --git a/pkg/migration/cutover.go b/pkg/migration/cutover.go index cffff80e..b1580a05 100644 --- a/pkg/migration/cutover.go +++ b/pkg/migration/cutover.go @@ -87,22 +87,21 @@ func (c *CutOver) Run(ctx context.Context) error { func (c *CutOver) algorithmRenameUnderLock(ctx context.Context) error { // Lock the source table in a trx // so the connection is not used by others - serverLock, err := dbconn.NewTableLock(ctx, c.db, c.table, c.dbConfig, c.logger) + tableLock, err := dbconn.NewTableLock(ctx, c.db, c.table, c.dbConfig, c.logger) if err != nil { return err } - defer serverLock.Close() - if err := c.feed.FlushUnderLock(ctx, serverLock); err != nil { + defer tableLock.Close() + if err := c.feed.FlushUnderTableLock(ctx, tableLock); err != nil { return err } if !c.feed.AllChangesFlushed() { return errors.New("not all changes flushed, final flush might be broken") } - oldName := c.oldTableName - oldQuotedName := fmt.Sprintf("`%s`.`%s`", c.table.SchemaName, oldName) + oldQuotedName := fmt.Sprintf("`%s`.`%s`", c.table.SchemaName, c.oldTableName) renameStatement := fmt.Sprintf("RENAME TABLE %s TO %s, %s TO %s", c.table.QuotedName, oldQuotedName, c.newTable.QuotedName, c.table.QuotedName, ) - return serverLock.ExecUnderLock(ctx, renameStatement) + return tableLock.ExecUnderLock(ctx, renameStatement) } diff --git a/pkg/repl/client.go b/pkg/repl/client.go index 78ca439b..0648bf72 100644 --- a/pkg/repl/client.go +++ b/pkg/repl/client.go @@ -391,7 +391,7 @@ func (c *Client) Close() { } } -// FlushUnderLock is a final flush under an exclusive lock using the connection +// FlushUnderTableLock is a final flush under an exclusive table lock using the connection // that holds a write lock. Because flushing generates binary log events, // we actually want to call flush *twice*: // - The first time flushes the pending changes to the new table. @@ -399,7 +399,7 @@ func (c *Client) Close() { // - The second time reads through the changes generated by the first flush // and updates the in memory applied position to match the server's position. // This is required to satisfy the binlog position is updated for the c.AllChangesFlushed() check. -func (c *Client) FlushUnderLock(ctx context.Context, lock *dbconn.TableLock) error { +func (c *Client) FlushUnderTableLock(ctx context.Context, lock *dbconn.TableLock) error { if err := c.flush(ctx, true, lock); err != nil { return err }