diff --git a/pkg/checksum/checker.go b/pkg/checksum/checker.go index 60c0596..c74e8f1 100644 --- a/pkg/checksum/checker.go +++ b/pkg/checksum/checker.go @@ -41,6 +41,7 @@ type Checker struct { fixDifferences bool differencesFound atomic.Uint64 recopyLock sync.Mutex + isResume bool } type CheckerConfig struct { @@ -49,6 +50,7 @@ type CheckerConfig struct { DBConfig *dbconn.DBConfig Logger loggers.Advanced FixDifferences bool + Watermark string // optional; defines a watermark to start from } func NewCheckerDefaultConfig() *CheckerConfig { @@ -76,6 +78,14 @@ func NewChecker(db *sql.DB, tbl, newTable *table.TableInfo, feed *repl.Client, c if err != nil { return nil, err } + // If there is a watermark, we need to open the chunker at that watermark. + // Overwrite the previously attached chunker with a new one. + if config.Watermark != "" { + config.Logger.Warnf("opening checksum chunker at watermark: %s", config.Watermark) + if err := chunker.OpenAtWatermark(config.Watermark, newTable.MaxValue()); err != nil { + return nil, err + } + } checksum := &Checker{ table: tbl, newTable: newTable, @@ -86,6 +96,7 @@ func NewChecker(db *sql.DB, tbl, newTable *table.TableInfo, feed *repl.Client, c dbConfig: config.DBConfig, logger: config.Logger, fixDifferences: config.FixDifferences, + isResume: config.Watermark != "", } return checksum, nil } @@ -158,6 +169,13 @@ func (c *Checker) RecentValue() string { return fmt.Sprintf("%v", c.recentValue) } +func (c *Checker) GetLowWatermark() (string, error) { + if c.chunker == nil { + return "", errors.New("chunker not initialized") + } + return c.chunker.GetLowWatermark() +} + func (c *Checker) inspectDifferences(trx *sql.Tx, chunk *table.Chunk) error { sourceSubquery := fmt.Sprintf("SELECT CRC32(CONCAT(%s)) as row_checksum, %s FROM %s WHERE %s", c.intersectColumns(), @@ -362,8 +380,13 @@ func (c *Checker) Run(ctx context.Context) error { defer func() { c.ExecTime = time.Since(c.startTime) }() - if err := c.chunker.Open(); err != nil { - return err + // Open the chunker if it's not open. + // It will already be open if this is a resume from checkpoint. + // This is a little annoying, but just the way the chunker API works. + if !c.isResume { + if err := c.chunker.Open(); err != nil { + return err + } } c.Unlock() diff --git a/pkg/checksum/checker_test.go b/pkg/checksum/checker_test.go index 76f5d91..abeca61 100644 --- a/pkg/checksum/checker_test.go +++ b/pkg/checksum/checker_test.go @@ -258,3 +258,36 @@ func TestChangeDataTypeDatetime(t *testing.T) { assert.NoError(t, err) assert.NoError(t, checker.Run(context.Background())) // fails } + +func TestFromWatermark(t *testing.T) { + testutils.RunSQL(t, "DROP TABLE IF EXISTS tfromwatermark, _tfromwatermark_new, _tfromwatermark_chkpnt") + testutils.RunSQL(t, "CREATE TABLE tfromwatermark (a INT NOT NULL, b INT, c INT, PRIMARY KEY (a))") + testutils.RunSQL(t, "CREATE TABLE _tfromwatermark_new (a INT NOT NULL, b INT, c INT, PRIMARY KEY (a))") + testutils.RunSQL(t, "CREATE TABLE _tfromwatermark_chkpnt (a INT)") // for binlog advancement + testutils.RunSQL(t, "INSERT INTO tfromwatermark VALUES (1, 2, 3)") + testutils.RunSQL(t, "INSERT INTO _tfromwatermark_new VALUES (1, 2, 3)") + + db, err := dbconn.New(testutils.DSN(), dbconn.NewDBConfig()) + assert.NoError(t, err) + + t1 := table.NewTableInfo(db, "test", "tfromwatermark") + assert.NoError(t, t1.SetInfo(context.TODO())) + t2 := table.NewTableInfo(db, "test", "_tfromwatermark_new") + assert.NoError(t, t2.SetInfo(context.TODO())) + logger := logrus.New() + + cfg, err := mysql.ParseDSN(testutils.DSN()) + assert.NoError(t, err) + feed := repl.NewClient(db, cfg.Addr, t1, t2, cfg.User, cfg.Passwd, &repl.ClientConfig{ + Logger: logger, + Concurrency: 4, + TargetBatchTime: time.Second, + }) + assert.NoError(t, feed.Run()) + + config := NewCheckerDefaultConfig() + config.Watermark = "{\"Key\":[\"a\"],\"ChunkSize\":1000,\"LowerBound\":{\"Value\": [\"2\"],\"Inclusive\":true},\"UpperBound\":{\"Value\": [\"3\"],\"Inclusive\":false}}" + checker, err := NewChecker(db, t1, t2, feed, config) + assert.NoError(t, err) + assert.NoError(t, checker.Run(context.Background())) +} diff --git a/pkg/migration/runner.go b/pkg/migration/runner.go index 3de7927..a2aa473 100644 --- a/pkg/migration/runner.go +++ b/pkg/migration/runner.go @@ -92,6 +92,9 @@ type Runner struct { checker *checksum.Checker checkerLock sync.Mutex + // used to recover direct to checksum. + checksumWatermark string + // Track some key statistics. startTime time.Time sentinelWaitStartTime time.Time @@ -252,7 +255,9 @@ func (r *Runner) Run(originalCtx context.Context) error { go r.dumpCheckpointContinuously(ctx) // start periodically dumping the checkpoint. // Perform the main copy rows task. This is where the majority - // of migrations usually spend time. + // of migrations usually spend time. It is not strictly necessary, + // but we always recopy the last-bit, even if we are resuming + // partially through the checksum. r.setCurrentState(stateCopyRows) if err := r.copier.Run(ctx); err != nil { return err @@ -630,7 +635,16 @@ func (r *Runner) createCheckpointTable(ctx context.Context) error { if err := dbconn.Exec(ctx, r.db, "DROP TABLE IF EXISTS %n.%n", r.table.SchemaName, cpName); err != nil { return err } - if err := dbconn.Exec(ctx, r.db, "CREATE TABLE %n.%n (id int NOT NULL AUTO_INCREMENT PRIMARY KEY, low_watermark TEXT, binlog_name VARCHAR(255), binlog_pos INT, rows_copied BIGINT, rows_copied_logical BIGINT, alter_statement TEXT)", + if err := dbconn.Exec(ctx, r.db, `CREATE TABLE %n.%n ( + id int NOT NULL AUTO_INCREMENT PRIMARY KEY, + copier_watermark TEXT, + checksum_watermark TEXT, + binlog_name VARCHAR(255), + binlog_pos INT, + rows_copied BIGINT, + rows_copied_logical BIGINT, + alter_statement TEXT + )`, r.table.SchemaName, cpName); err != nil { return err } @@ -740,17 +754,21 @@ func (r *Runner) resumeFromCheckpoint(ctx context.Context) error { // Make sure we can read from the new table. if err := dbconn.Exec(ctx, r.db, "SELECT * FROM %n.%n LIMIT 1", r.migration.Database, newName); err != nil { - return fmt.Errorf("could not read from table '%s'", newName) + return fmt.Errorf("could not find any checkpoints in table '%s'", newName) } - query := fmt.Sprintf("SELECT low_watermark, binlog_name, binlog_pos, rows_copied, rows_copied_logical, alter_statement FROM `%s`.`%s` ORDER BY id DESC LIMIT 1", + // We intentionally SELECT * FROM the checkpoint table because if the structure + // changes, we want this operation to fail. This will indicate that the checkpoint + // was created by either an earlier or later version of spirit, in which case + // we do not support recovery. + query := fmt.Sprintf("SELECT * FROM `%s`.`%s` ORDER BY id DESC LIMIT 1", r.migration.Database, cpName) - var lowWatermark, binlogName, alterStatement string - var binlogPos int + var copierWatermark, binlogName, alterStatement string + var id, binlogPos int var rowsCopied, rowsCopiedLogical uint64 - err := r.db.QueryRow(query).Scan(&lowWatermark, &binlogName, &binlogPos, &rowsCopied, &rowsCopiedLogical, &alterStatement) + err := r.db.QueryRow(query).Scan(&id, &copierWatermark, &r.checksumWatermark, &binlogName, &binlogPos, &rowsCopied, &rowsCopiedLogical, &alterStatement) if err != nil { - return fmt.Errorf("could not read from table '%s'", cpName) + return fmt.Errorf("could not read from table '%s', err:%v", cpName, err) } if r.migration.Alter != alterStatement { return ErrMismatchedAlter @@ -777,8 +795,7 @@ func (r *Runner) resumeFromCheckpoint(ctx context.Context) error { Logger: r.logger, MetricsSink: r.metricsSink, DBConfig: r.dbConfig, - }, lowWatermark, rowsCopied, rowsCopiedLogical) - + }, copierWatermark, rowsCopied, rowsCopiedLogical) if err != nil { return err } @@ -796,9 +813,6 @@ func (r *Runner) resumeFromCheckpoint(ctx context.Context) error { }) r.checkpointTable = table.NewTableInfo(r.db, r.table.SchemaName, cpName) - if err != nil { - return err - } // Start the replClient now. This is because if the checkpoint is so old there // are no longer binary log files, we want to abandon resume-from-checkpoint @@ -808,7 +822,7 @@ func (r *Runner) resumeFromCheckpoint(ctx context.Context) error { r.logger.Warnf("resuming from checkpoint failed because resuming from the previous binlog position failed. log-file: %s log-pos: %d", binlogName, binlogPos) return err } - r.logger.Warnf("resuming from checkpoint. low-watermark: %s log-file: %s log-pos: %d copy-rows: %d", lowWatermark, binlogName, binlogPos, rowsCopied) + r.logger.Warnf("resuming from checkpoint. copier-watermark: %s checksum-watermark: %s log-file: %s log-pos: %d copy-rows: %d", copierWatermark, r.checksumWatermark, binlogName, binlogPos, rowsCopied) r.usedResumeFromCheckpoint = true return nil } @@ -821,13 +835,16 @@ func (r *Runner) checksum(ctx context.Context) error { // - background flushing // - checkpoint thread // - checksum "replaceChunk" DB connections - // Handle a case in the tests not having a dbConfig + // Handle a case just in the tests not having a dbConfig if r.dbConfig == nil { r.dbConfig = dbconn.NewDBConfig() } r.db.SetMaxOpenConns(r.dbConfig.MaxOpenConnections + 2) var err error for i := range 3 { // try the checksum up to 3 times. + if i > 0 { + r.checksumWatermark = "" // reset the watermark if we are retrying. + } r.checkerLock.Lock() r.checker, err = checksum.NewChecker(r.db, r.table, r.newTable, r.replClient, &checksum.CheckerConfig{ Concurrency: r.migration.Threads, @@ -835,6 +852,7 @@ func (r *Runner) checksum(ctx context.Context) error { DBConfig: r.dbConfig, Logger: r.logger, FixDifferences: true, // we want to repair the differences. + Watermark: r.checksumWatermark, }) r.checkerLock.Unlock() if err != nil { @@ -881,26 +899,44 @@ func (r *Runner) setCurrentState(s migrationState) { atomic.StoreInt32((*int32)(&r.currentState), int32(s)) } +// dumpCheckpoint is called approximately every minute. +// It writes the current state of the migration to the checkpoint table, +// which can be used in recovery. Previously resuming from checkpoint +// would always restart at the copier, but it can now also resume at +// the checksum phase. func (r *Runner) dumpCheckpoint(ctx context.Context) error { // Retrieve the binlog position first and under a mutex. - // Currently, it never advances, but it's possible it might in future - // and this race condition is missed. binlog := r.replClient.GetBinlogApplyPosition() - lowWatermark, err := r.copier.GetLowWatermark() + copierWatermark, err := r.copier.GetLowWatermark() if err != nil { return err // it might not be ready, we can try again. } + // We only dump the checksumWatermark if we are in >= checksum state. + // We require a mutex because the checker can be replaced during + // operation, leaving a race condition. + var checksumWatermark string + if r.getCurrentState() >= stateChecksum { + r.checkerLock.Lock() + defer r.checkerLock.Unlock() + if r.checker != nil { + checksumWatermark, err = r.checker.GetLowWatermark() + if err != nil { + return err + } + } + } copyRows := atomic.LoadUint64(&r.copier.CopyRowsCount) logicalCopyRows := atomic.LoadUint64(&r.copier.CopyRowsLogicalCount) // Note: when we dump the lowWatermark to the log, we are exposing the PK values, // when using the composite chunker are based on actual user-data. // We believe this is OK but may change it in the future. Please do not // add any other fields to this log line. - r.logger.Infof("checkpoint: low-watermark=%s log-file=%s log-pos=%d rows-copied=%d rows-copied-logical=%d", lowWatermark, binlog.Name, binlog.Pos, copyRows, logicalCopyRows) - return dbconn.Exec(ctx, r.db, "INSERT INTO %n.%n (low_watermark, binlog_name, binlog_pos, rows_copied, rows_copied_logical, alter_statement) VALUES (%?, %?, %?, %?, %?, %?)", + r.logger.Infof("checkpoint: low-watermark=%s log-file=%s log-pos=%d rows-copied=%d rows-copied-logical=%d", copierWatermark, binlog.Name, binlog.Pos, copyRows, logicalCopyRows) + return dbconn.Exec(ctx, r.db, "INSERT INTO %n.%n (copier_watermark, checksum_watermark, binlog_name, binlog_pos, rows_copied, rows_copied_logical, alter_statement) VALUES (%?, %?, %?, %?, %?, %?, %?)", r.checkpointTable.SchemaName, r.checkpointTable.TableName, - lowWatermark, + copierWatermark, + checksumWatermark, binlog.Name, binlog.Pos, copyRows, diff --git a/pkg/migration/runner_test.go b/pkg/migration/runner_test.go index 59f4be6..12db606 100644 --- a/pkg/migration/runner_test.go +++ b/pkg/migration/runner_test.go @@ -924,9 +924,20 @@ func TestCheckpointRestore(t *testing.T) { // from issue #125 watermark := "{\"Key\":[\"id\"],\"ChunkSize\":1000,\"LowerBound\":{\"Value\":[\"53926425\"],\"Inclusive\":true},\"UpperBound\":{\"Value\":[\"53926425\"],\"Inclusive\":false}}" binlog := r.replClient.GetBinlogApplyPosition() - query := fmt.Sprintf("INSERT INTO %s (low_watermark, binlog_name, binlog_pos, rows_copied, rows_copied_logical, alter_statement) VALUES (?, ?, ?, ?, ?, ?)", - r.checkpointTable.QuotedName) - _, err = r.db.ExecContext(context.TODO(), query, watermark, binlog.Name, binlog.Pos, 0, 0, r.migration.Alter) + err = dbconn.Exec(context.TODO(), r.db, `INSERT INTO %n.%n + (copier_watermark, checksum_watermark, binlog_name, binlog_pos, rows_copied, rows_copied_logical, alter_statement) + VALUES + (%?, %?, %?, %?, %?, %?, %?)`, + r.checkpointTable.SchemaName, + r.checkpointTable.TableName, + watermark, + "", + binlog.Name, + binlog.Pos, + 0, + 0, + r.migration.Alter, + ) assert.NoError(t, err) r2, err := NewRunner(&Migration{ @@ -944,6 +955,81 @@ func TestCheckpointRestore(t *testing.T) { assert.True(t, r2.usedResumeFromCheckpoint) } +func TestCheckpointResumeDuringChecksum(t *testing.T) { + tbl := `CREATE TABLE cptresume ( + id INT NOT NULL AUTO_INCREMENT PRIMARY KEY, + id2 INT NOT NULL, + pad VARCHAR(100) NOT NULL default 0)` + cfg, err := mysql.ParseDSN(testutils.DSN()) + assert.NoError(t, err) + testutils.RunSQL(t, `DROP TABLE IF EXISTS cptresume, _cptresume_new, _cptresume_chkpnt, _cptresume_sentinel`) + testutils.RunSQL(t, tbl) + testutils.RunSQL(t, `CREATE TABLE _cptresume_sentinel (id INT NOT NULL PRIMARY KEY)`) + testutils.RunSQL(t, `insert into cptresume (id2,pad) SELECT 1, REPEAT('a', 100) FROM dual`) + testutils.RunSQL(t, `insert into cptresume (id2,pad) SELECT 1, REPEAT('a', 100) FROM cptresume`) + testutils.RunSQL(t, `insert into cptresume (id2,pad) SELECT 1, REPEAT('a', 100) FROM cptresume a JOIN cptresume b JOIN cptresume c`) + + r, err := NewRunner(&Migration{ + Host: cfg.Addr, + Username: cfg.User, + Password: cfg.Passwd, + Database: cfg.DBName, + Threads: 4, + TargetChunkTime: 100 * time.Millisecond, + Table: "cptresume", + Alter: "ENGINE=InnoDB", + Checksum: true, + }) + assert.NoError(t, err) + + // Call r.Run() with our context in a go-routine. + // When we see that we are waiting on the sentinel table, + // we then manually start the first bits of checksum, and then close() + // We should be able to resume from the checkpoint into the checksum state. + ctx, cancel := context.WithCancel(context.Background()) + go func() { + err := r.Run(ctx) + assert.Error(t, err) // context cancelled + }() + for { + // Wait for the sentinel table. + if r.getCurrentState() >= stateWaitingOnSentinelTable { + break + } + time.Sleep(time.Millisecond) + } + + assert.NoError(t, r.checksum(context.TODO())) // run the checksum, the original Run is blocked on sentinel. + assert.NoError(t, r.dumpCheckpoint(context.TODO())) // dump a checkpoint with the watermark. + cancel() // unblock the original waiting on sentinel. + assert.NoError(t, r.Close()) // close the run. + + // drop the sentinel table. + testutils.RunSQL(t, `DROP TABLE _cptresume_sentinel`) + + // insert a couple more rows (should not change anything) + testutils.RunSQL(t, `insert into cptresume (id2,pad) SELECT 1, REPEAT('b', 100) FROM dual`) + testutils.RunSQL(t, `insert into cptresume (id2,pad) SELECT 1, REPEAT('c', 100) FROM dual`) + + // Start again as a new runner, + r2, err := NewRunner(&Migration{ + Host: cfg.Addr, + Username: cfg.User, + Password: cfg.Passwd, + Database: cfg.DBName, + Threads: 4, + TargetChunkTime: 100 * time.Millisecond, + Table: "cptresume", + Alter: "ENGINE=InnoDB", + Checksum: true, + }) + assert.NoError(t, err) + err = r2.Run(context.Background()) + assert.NoError(t, err) + assert.True(t, r2.usedResumeFromCheckpoint) + assert.NotEmpty(t, r2.checksumWatermark) // it had a checksum watermark +} + func TestCheckpointDifferentRestoreOptions(t *testing.T) { tbl := `CREATE TABLE cpt1difft1 ( id INT NOT NULL AUTO_INCREMENT PRIMARY KEY, @@ -2663,7 +2749,7 @@ func TestResumeFromCheckpointE2EWithManualSentinel(t *testing.T) { go func() { err := runner.Run(ctx) - assert.ErrorContains(t, err, "context canceled") // it gets interrupted as soon as there is a checkpoint saved. + assert.Error(t, err) // it gets interrupted as soon as there is a checkpoint saved. }() // wait until a checkpoint is saved (which means copy is in progress)