diff --git a/pkg/checksum/checker.go b/pkg/checksum/checker.go index 790b336..c74e8f1 100644 --- a/pkg/checksum/checker.go +++ b/pkg/checksum/checker.go @@ -25,23 +25,23 @@ import ( type Checker struct { sync.Mutex - table *table.TableInfo - newTable *table.TableInfo - concurrency int - feed *repl.Client - db *sql.DB - trxPool *dbconn.TrxPool - isInvalid bool - chunker table.Chunker - startTime time.Time - ExecTime time.Duration - recentValue interface{} // used for status - dbConfig *dbconn.DBConfig - logger loggers.Advanced - fixDifferences bool - differencesFound atomic.Uint64 - recopyLock sync.Mutex - isResumeFromCheckpoint bool + table *table.TableInfo + newTable *table.TableInfo + concurrency int + feed *repl.Client + db *sql.DB + trxPool *dbconn.TrxPool + isInvalid bool + chunker table.Chunker + startTime time.Time + ExecTime time.Duration + recentValue interface{} // used for status + dbConfig *dbconn.DBConfig + logger loggers.Advanced + fixDifferences bool + differencesFound atomic.Uint64 + recopyLock sync.Mutex + isResume bool } type CheckerConfig struct { @@ -87,16 +87,16 @@ func NewChecker(db *sql.DB, tbl, newTable *table.TableInfo, feed *repl.Client, c } } checksum := &Checker{ - table: tbl, - newTable: newTable, - concurrency: config.Concurrency, - db: db, - feed: feed, - chunker: chunker, - dbConfig: config.DBConfig, - logger: config.Logger, - fixDifferences: config.FixDifferences, - isResumeFromCheckpoint: config.Watermark != "", + table: tbl, + newTable: newTable, + concurrency: config.Concurrency, + db: db, + feed: feed, + chunker: chunker, + dbConfig: config.DBConfig, + logger: config.Logger, + fixDifferences: config.FixDifferences, + isResume: config.Watermark != "", } return checksum, nil } @@ -170,6 +170,9 @@ func (c *Checker) RecentValue() string { } func (c *Checker) GetLowWatermark() (string, error) { + if c.chunker == nil { + return "", errors.New("chunker not initialized") + } return c.chunker.GetLowWatermark() } @@ -380,7 +383,7 @@ func (c *Checker) Run(ctx context.Context) error { // Open the chunker if it's not open. // It will already be open if this is a resume from checkpoint. // This is a little annoying, but just the way the chunker API works. - if !c.isResumeFromCheckpoint { + if !c.isResume { if err := c.chunker.Open(); err != nil { return err } diff --git a/pkg/checksum/checker_test.go b/pkg/checksum/checker_test.go index 76f5d91..abeca61 100644 --- a/pkg/checksum/checker_test.go +++ b/pkg/checksum/checker_test.go @@ -258,3 +258,36 @@ func TestChangeDataTypeDatetime(t *testing.T) { assert.NoError(t, err) assert.NoError(t, checker.Run(context.Background())) // fails } + +func TestFromWatermark(t *testing.T) { + testutils.RunSQL(t, "DROP TABLE IF EXISTS tfromwatermark, _tfromwatermark_new, _tfromwatermark_chkpnt") + testutils.RunSQL(t, "CREATE TABLE tfromwatermark (a INT NOT NULL, b INT, c INT, PRIMARY KEY (a))") + testutils.RunSQL(t, "CREATE TABLE _tfromwatermark_new (a INT NOT NULL, b INT, c INT, PRIMARY KEY (a))") + testutils.RunSQL(t, "CREATE TABLE _tfromwatermark_chkpnt (a INT)") // for binlog advancement + testutils.RunSQL(t, "INSERT INTO tfromwatermark VALUES (1, 2, 3)") + testutils.RunSQL(t, "INSERT INTO _tfromwatermark_new VALUES (1, 2, 3)") + + db, err := dbconn.New(testutils.DSN(), dbconn.NewDBConfig()) + assert.NoError(t, err) + + t1 := table.NewTableInfo(db, "test", "tfromwatermark") + assert.NoError(t, t1.SetInfo(context.TODO())) + t2 := table.NewTableInfo(db, "test", "_tfromwatermark_new") + assert.NoError(t, t2.SetInfo(context.TODO())) + logger := logrus.New() + + cfg, err := mysql.ParseDSN(testutils.DSN()) + assert.NoError(t, err) + feed := repl.NewClient(db, cfg.Addr, t1, t2, cfg.User, cfg.Passwd, &repl.ClientConfig{ + Logger: logger, + Concurrency: 4, + TargetBatchTime: time.Second, + }) + assert.NoError(t, feed.Run()) + + config := NewCheckerDefaultConfig() + config.Watermark = "{\"Key\":[\"a\"],\"ChunkSize\":1000,\"LowerBound\":{\"Value\": [\"2\"],\"Inclusive\":true},\"UpperBound\":{\"Value\": [\"3\"],\"Inclusive\":false}}" + checker, err := NewChecker(db, t1, t2, feed, config) + assert.NoError(t, err) + assert.NoError(t, checker.Run(context.Background())) +} diff --git a/pkg/migration/runner.go b/pkg/migration/runner.go index 58dbf84..a2aa473 100644 --- a/pkg/migration/runner.go +++ b/pkg/migration/runner.go @@ -42,7 +42,7 @@ const ( // These are really consts, but set to var for testing. var ( - checkpointDumpInterval = 2 * time.Second + checkpointDumpInterval = 50 * time.Second tableStatUpdateInterval = 5 * time.Minute statusInterval = 30 * time.Second sentinelCheckInterval = 1 * time.Second @@ -917,10 +917,12 @@ func (r *Runner) dumpCheckpoint(ctx context.Context) error { var checksumWatermark string if r.getCurrentState() >= stateChecksum { r.checkerLock.Lock() - checksumWatermark, err = r.checker.GetLowWatermark() - r.checkerLock.Unlock() - if err != nil { - return err + defer r.checkerLock.Unlock() + if r.checker != nil { + checksumWatermark, err = r.checker.GetLowWatermark() + if err != nil { + return err + } } } copyRows := atomic.LoadUint64(&r.copier.CopyRowsCount) diff --git a/pkg/migration/runner_test.go b/pkg/migration/runner_test.go index ca98c49..6028c83 100644 --- a/pkg/migration/runner_test.go +++ b/pkg/migration/runner_test.go @@ -955,6 +955,77 @@ func TestCheckpointRestore(t *testing.T) { assert.True(t, r2.usedResumeFromCheckpoint) } +func TestCheckpointResumeDuringChecksum(t *testing.T) { + tbl := `CREATE TABLE cptresume ( + id INT NOT NULL AUTO_INCREMENT PRIMARY KEY, + id2 INT NOT NULL, + pad VARCHAR(100) NOT NULL default 0)` + cfg, err := mysql.ParseDSN(testutils.DSN()) + assert.NoError(t, err) + testutils.RunSQL(t, `DROP TABLE IF EXISTS cptresume, _cptresume_new, _cptresume_chkpnt, _cptresume_sentinel`) + testutils.RunSQL(t, tbl) + testutils.RunSQL(t, `CREATE TABLE _cptresume_sentinel (id INT NOT NULL PRIMARY KEY)`) + testutils.RunSQL(t, `insert into cptresume (id2,pad) SELECT 1, REPEAT('a', 100) FROM dual`) + testutils.RunSQL(t, `insert into cptresume (id2,pad) SELECT 1, REPEAT('a', 100) FROM cptresume`) + testutils.RunSQL(t, `insert into cptresume (id2,pad) SELECT 1, REPEAT('a', 100) FROM cptresume a JOIN cptresume b JOIN cptresume c`) + + r, err := NewRunner(&Migration{ + Host: cfg.Addr, + Username: cfg.User, + Password: cfg.Passwd, + Database: cfg.DBName, + Threads: 4, + TargetChunkTime: 100 * time.Millisecond, + Table: "cptresume", + Alter: "ENGINE=InnoDB", + Checksum: true, + }) + assert.NoError(t, err) + + // Call r.Run() with our context in a go-routine. + // When we see that we are waiting on the sentinel table, + // we then manually start the first bits of checksum, and then close() + // We should be able to resume from the checkpoint into the checksum state. + ctx, cancel := context.WithCancel(context.Background()) + go func() { + err := r.Run(ctx) + assert.Error(t, err) + }() + for { + // Wait for the sentinel table. + if r.getCurrentState() >= stateWaitingOnSentinelTable { + break + } + time.Sleep(time.Millisecond) + } + + assert.NoError(t, r.checksum(context.TODO())) // run the checksum, the original Run is blocked on sentinel. + assert.NoError(t, r.dumpCheckpoint(context.TODO())) // dump a checkpoint with the watermark. + cancel() // unblock the original waiting on sentinel. + assert.NoError(t, r.Close()) // close the run. + + // drop the sentinel table. + testutils.RunSQL(t, `DROP TABLE _cptresume_sentinel`) + + // Start again as a new runner, + r2, err := NewRunner(&Migration{ + Host: cfg.Addr, + Username: cfg.User, + Password: cfg.Passwd, + Database: cfg.DBName, + Threads: 4, + TargetChunkTime: 100 * time.Millisecond, + Table: "cptresume", + Alter: "ENGINE=InnoDB", + Checksum: true, + }) + assert.NoError(t, err) + err = r2.Run(context.Background()) + assert.NoError(t, err) + assert.True(t, r2.usedResumeFromCheckpoint) + assert.NotEmpty(t, r2.checksumWatermark) // it had a checksum watermark +} + func TestCheckpointDifferentRestoreOptions(t *testing.T) { tbl := `CREATE TABLE cpt1difft1 ( id INT NOT NULL AUTO_INCREMENT PRIMARY KEY, @@ -2674,7 +2745,7 @@ func TestResumeFromCheckpointE2EWithManualSentinel(t *testing.T) { go func() { err := runner.Run(ctx) - assert.ErrorContains(t, err, "context canceled") // it gets interrupted as soon as there is a checkpoint saved. + assert.Error(t, err) // it gets interrupted as soon as there is a checkpoint saved. }() // wait until a checkpoint is saved (which means copy is in progress)