Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add resume from checksum #344

Merged
merged 3 commits into from
Sep 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 25 additions & 2 deletions pkg/checksum/checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type Checker struct {
fixDifferences bool
differencesFound atomic.Uint64
recopyLock sync.Mutex
isResume bool
}

type CheckerConfig struct {
Expand All @@ -49,6 +50,7 @@ type CheckerConfig struct {
DBConfig *dbconn.DBConfig
Logger loggers.Advanced
FixDifferences bool
Watermark string // optional; defines a watermark to start from
}

func NewCheckerDefaultConfig() *CheckerConfig {
Expand Down Expand Up @@ -76,6 +78,14 @@ func NewChecker(db *sql.DB, tbl, newTable *table.TableInfo, feed *repl.Client, c
if err != nil {
return nil, err
}
// If there is a watermark, we need to open the chunker at that watermark.
// Overwrite the previously attached chunker with a new one.
if config.Watermark != "" {
config.Logger.Warnf("opening checksum chunker at watermark: %s", config.Watermark)
if err := chunker.OpenAtWatermark(config.Watermark, newTable.MaxValue()); err != nil {
return nil, err
}
}
checksum := &Checker{
table: tbl,
newTable: newTable,
Expand All @@ -86,6 +96,7 @@ func NewChecker(db *sql.DB, tbl, newTable *table.TableInfo, feed *repl.Client, c
dbConfig: config.DBConfig,
logger: config.Logger,
fixDifferences: config.FixDifferences,
isResume: config.Watermark != "",
}
return checksum, nil
}
Expand Down Expand Up @@ -158,6 +169,13 @@ func (c *Checker) RecentValue() string {
return fmt.Sprintf("%v", c.recentValue)
}

func (c *Checker) GetLowWatermark() (string, error) {
if c.chunker == nil {
return "", errors.New("chunker not initialized")
}
return c.chunker.GetLowWatermark()
}

func (c *Checker) inspectDifferences(trx *sql.Tx, chunk *table.Chunk) error {
sourceSubquery := fmt.Sprintf("SELECT CRC32(CONCAT(%s)) as row_checksum, %s FROM %s WHERE %s",
c.intersectColumns(),
Expand Down Expand Up @@ -362,8 +380,13 @@ func (c *Checker) Run(ctx context.Context) error {
defer func() {
c.ExecTime = time.Since(c.startTime)
}()
if err := c.chunker.Open(); err != nil {
return err
// Open the chunker if it's not open.
// It will already be open if this is a resume from checkpoint.
// This is a little annoying, but just the way the chunker API works.
if !c.isResume {
if err := c.chunker.Open(); err != nil {
return err
}
}
c.Unlock()

Expand Down
33 changes: 33 additions & 0 deletions pkg/checksum/checker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,3 +258,36 @@ func TestChangeDataTypeDatetime(t *testing.T) {
assert.NoError(t, err)
assert.NoError(t, checker.Run(context.Background())) // fails
}

func TestFromWatermark(t *testing.T) {
testutils.RunSQL(t, "DROP TABLE IF EXISTS tfromwatermark, _tfromwatermark_new, _tfromwatermark_chkpnt")
testutils.RunSQL(t, "CREATE TABLE tfromwatermark (a INT NOT NULL, b INT, c INT, PRIMARY KEY (a))")
testutils.RunSQL(t, "CREATE TABLE _tfromwatermark_new (a INT NOT NULL, b INT, c INT, PRIMARY KEY (a))")
testutils.RunSQL(t, "CREATE TABLE _tfromwatermark_chkpnt (a INT)") // for binlog advancement
testutils.RunSQL(t, "INSERT INTO tfromwatermark VALUES (1, 2, 3)")
testutils.RunSQL(t, "INSERT INTO _tfromwatermark_new VALUES (1, 2, 3)")

db, err := dbconn.New(testutils.DSN(), dbconn.NewDBConfig())
assert.NoError(t, err)

t1 := table.NewTableInfo(db, "test", "tfromwatermark")
assert.NoError(t, t1.SetInfo(context.TODO()))
t2 := table.NewTableInfo(db, "test", "_tfromwatermark_new")
assert.NoError(t, t2.SetInfo(context.TODO()))
logger := logrus.New()

cfg, err := mysql.ParseDSN(testutils.DSN())
assert.NoError(t, err)
feed := repl.NewClient(db, cfg.Addr, t1, t2, cfg.User, cfg.Passwd, &repl.ClientConfig{
Logger: logger,
Concurrency: 4,
TargetBatchTime: time.Second,
})
assert.NoError(t, feed.Run())

config := NewCheckerDefaultConfig()
config.Watermark = "{\"Key\":[\"a\"],\"ChunkSize\":1000,\"LowerBound\":{\"Value\": [\"2\"],\"Inclusive\":true},\"UpperBound\":{\"Value\": [\"3\"],\"Inclusive\":false}}"
checker, err := NewChecker(db, t1, t2, feed, config)
assert.NoError(t, err)
assert.NoError(t, checker.Run(context.Background()))
}
78 changes: 57 additions & 21 deletions pkg/migration/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ type Runner struct {
checker *checksum.Checker
checkerLock sync.Mutex

// used to recover direct to checksum.
checksumWatermark string

// Track some key statistics.
startTime time.Time
sentinelWaitStartTime time.Time
Expand Down Expand Up @@ -252,7 +255,9 @@ func (r *Runner) Run(originalCtx context.Context) error {
go r.dumpCheckpointContinuously(ctx) // start periodically dumping the checkpoint.

// Perform the main copy rows task. This is where the majority
// of migrations usually spend time.
// of migrations usually spend time. It is not strictly necessary,
// but we always recopy the last-bit, even if we are resuming
// partially through the checksum.
r.setCurrentState(stateCopyRows)
if err := r.copier.Run(ctx); err != nil {
return err
Expand Down Expand Up @@ -630,7 +635,16 @@ func (r *Runner) createCheckpointTable(ctx context.Context) error {
if err := dbconn.Exec(ctx, r.db, "DROP TABLE IF EXISTS %n.%n", r.table.SchemaName, cpName); err != nil {
return err
}
if err := dbconn.Exec(ctx, r.db, "CREATE TABLE %n.%n (id int NOT NULL AUTO_INCREMENT PRIMARY KEY, low_watermark TEXT, binlog_name VARCHAR(255), binlog_pos INT, rows_copied BIGINT, rows_copied_logical BIGINT, alter_statement TEXT)",
if err := dbconn.Exec(ctx, r.db, `CREATE TABLE %n.%n (
id int NOT NULL AUTO_INCREMENT PRIMARY KEY,
copier_watermark TEXT,
checksum_watermark TEXT,
binlog_name VARCHAR(255),
binlog_pos INT,
rows_copied BIGINT,
rows_copied_logical BIGINT,
alter_statement TEXT
)`,
r.table.SchemaName, cpName); err != nil {
return err
}
Expand Down Expand Up @@ -740,17 +754,21 @@ func (r *Runner) resumeFromCheckpoint(ctx context.Context) error {
// Make sure we can read from the new table.
if err := dbconn.Exec(ctx, r.db, "SELECT * FROM %n.%n LIMIT 1",
r.migration.Database, newName); err != nil {
return fmt.Errorf("could not read from table '%s'", newName)
return fmt.Errorf("could not find any checkpoints in table '%s'", newName)
}

query := fmt.Sprintf("SELECT low_watermark, binlog_name, binlog_pos, rows_copied, rows_copied_logical, alter_statement FROM `%s`.`%s` ORDER BY id DESC LIMIT 1",
// We intentionally SELECT * FROM the checkpoint table because if the structure
// changes, we want this operation to fail. This will indicate that the checkpoint
// was created by either an earlier or later version of spirit, in which case
// we do not support recovery.
query := fmt.Sprintf("SELECT * FROM `%s`.`%s` ORDER BY id DESC LIMIT 1",
r.migration.Database, cpName)
var lowWatermark, binlogName, alterStatement string
var binlogPos int
var copierWatermark, binlogName, alterStatement string
var id, binlogPos int
var rowsCopied, rowsCopiedLogical uint64
err := r.db.QueryRow(query).Scan(&lowWatermark, &binlogName, &binlogPos, &rowsCopied, &rowsCopiedLogical, &alterStatement)
err := r.db.QueryRow(query).Scan(&id, &copierWatermark, &r.checksumWatermark, &binlogName, &binlogPos, &rowsCopied, &rowsCopiedLogical, &alterStatement)
if err != nil {
return fmt.Errorf("could not read from table '%s'", cpName)
return fmt.Errorf("could not read from table '%s', err:%v", cpName, err)
}
if r.migration.Alter != alterStatement {
return ErrMismatchedAlter
Expand All @@ -777,8 +795,7 @@ func (r *Runner) resumeFromCheckpoint(ctx context.Context) error {
Logger: r.logger,
MetricsSink: r.metricsSink,
DBConfig: r.dbConfig,
}, lowWatermark, rowsCopied, rowsCopiedLogical)

}, copierWatermark, rowsCopied, rowsCopiedLogical)
if err != nil {
return err
}
Expand All @@ -796,9 +813,6 @@ func (r *Runner) resumeFromCheckpoint(ctx context.Context) error {
})

r.checkpointTable = table.NewTableInfo(r.db, r.table.SchemaName, cpName)
if err != nil {
return err
}

// Start the replClient now. This is because if the checkpoint is so old there
// are no longer binary log files, we want to abandon resume-from-checkpoint
Expand All @@ -808,7 +822,7 @@ func (r *Runner) resumeFromCheckpoint(ctx context.Context) error {
r.logger.Warnf("resuming from checkpoint failed because resuming from the previous binlog position failed. log-file: %s log-pos: %d", binlogName, binlogPos)
return err
}
r.logger.Warnf("resuming from checkpoint. low-watermark: %s log-file: %s log-pos: %d copy-rows: %d", lowWatermark, binlogName, binlogPos, rowsCopied)
r.logger.Warnf("resuming from checkpoint. copier-watermark: %s checksum-watermark: %s log-file: %s log-pos: %d copy-rows: %d", copierWatermark, r.checksumWatermark, binlogName, binlogPos, rowsCopied)
r.usedResumeFromCheckpoint = true
return nil
}
Expand All @@ -821,20 +835,24 @@ func (r *Runner) checksum(ctx context.Context) error {
// - background flushing
// - checkpoint thread
// - checksum "replaceChunk" DB connections
// Handle a case in the tests not having a dbConfig
// Handle a case just in the tests not having a dbConfig
if r.dbConfig == nil {
r.dbConfig = dbconn.NewDBConfig()
}
r.db.SetMaxOpenConns(r.dbConfig.MaxOpenConnections + 2)
var err error
for i := range 3 { // try the checksum up to 3 times.
if i > 0 {
r.checksumWatermark = "" // reset the watermark if we are retrying.
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this intentional? so resume from checksum watermark only works if first resume is successful, otherwise it starts checksum from scratch again?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes that's right. If there is any failure we recheck everything.

r.checkerLock.Lock()
r.checker, err = checksum.NewChecker(r.db, r.table, r.newTable, r.replClient, &checksum.CheckerConfig{
Concurrency: r.migration.Threads,
TargetChunkTime: r.migration.TargetChunkTime,
DBConfig: r.dbConfig,
Logger: r.logger,
FixDifferences: true, // we want to repair the differences.
Watermark: r.checksumWatermark,
})
r.checkerLock.Unlock()
if err != nil {
Expand Down Expand Up @@ -881,26 +899,44 @@ func (r *Runner) setCurrentState(s migrationState) {
atomic.StoreInt32((*int32)(&r.currentState), int32(s))
}

// dumpCheckpoint is called approximately every minute.
// It writes the current state of the migration to the checkpoint table,
// which can be used in recovery. Previously resuming from checkpoint
// would always restart at the copier, but it can now also resume at
// the checksum phase.
func (r *Runner) dumpCheckpoint(ctx context.Context) error {
// Retrieve the binlog position first and under a mutex.
// Currently, it never advances, but it's possible it might in future
// and this race condition is missed.
binlog := r.replClient.GetBinlogApplyPosition()
lowWatermark, err := r.copier.GetLowWatermark()
copierWatermark, err := r.copier.GetLowWatermark()
if err != nil {
return err // it might not be ready, we can try again.
}
// We only dump the checksumWatermark if we are in >= checksum state.
// We require a mutex because the checker can be replaced during
// operation, leaving a race condition.
var checksumWatermark string
if r.getCurrentState() >= stateChecksum {
r.checkerLock.Lock()
defer r.checkerLock.Unlock()
if r.checker != nil {
checksumWatermark, err = r.checker.GetLowWatermark()
if err != nil {
return err
}
}
}
copyRows := atomic.LoadUint64(&r.copier.CopyRowsCount)
logicalCopyRows := atomic.LoadUint64(&r.copier.CopyRowsLogicalCount)
// Note: when we dump the lowWatermark to the log, we are exposing the PK values,
// when using the composite chunker are based on actual user-data.
// We believe this is OK but may change it in the future. Please do not
// add any other fields to this log line.
r.logger.Infof("checkpoint: low-watermark=%s log-file=%s log-pos=%d rows-copied=%d rows-copied-logical=%d", lowWatermark, binlog.Name, binlog.Pos, copyRows, logicalCopyRows)
return dbconn.Exec(ctx, r.db, "INSERT INTO %n.%n (low_watermark, binlog_name, binlog_pos, rows_copied, rows_copied_logical, alter_statement) VALUES (%?, %?, %?, %?, %?, %?)",
r.logger.Infof("checkpoint: low-watermark=%s log-file=%s log-pos=%d rows-copied=%d rows-copied-logical=%d", copierWatermark, binlog.Name, binlog.Pos, copyRows, logicalCopyRows)
return dbconn.Exec(ctx, r.db, "INSERT INTO %n.%n (copier_watermark, checksum_watermark, binlog_name, binlog_pos, rows_copied, rows_copied_logical, alter_statement) VALUES (%?, %?, %?, %?, %?, %?, %?)",
r.checkpointTable.SchemaName,
r.checkpointTable.TableName,
lowWatermark,
copierWatermark,
checksumWatermark,
binlog.Name,
binlog.Pos,
copyRows,
Expand Down
94 changes: 90 additions & 4 deletions pkg/migration/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -924,9 +924,20 @@ func TestCheckpointRestore(t *testing.T) {
// from issue #125
watermark := "{\"Key\":[\"id\"],\"ChunkSize\":1000,\"LowerBound\":{\"Value\":[\"53926425\"],\"Inclusive\":true},\"UpperBound\":{\"Value\":[\"53926425\"],\"Inclusive\":false}}"
binlog := r.replClient.GetBinlogApplyPosition()
query := fmt.Sprintf("INSERT INTO %s (low_watermark, binlog_name, binlog_pos, rows_copied, rows_copied_logical, alter_statement) VALUES (?, ?, ?, ?, ?, ?)",
r.checkpointTable.QuotedName)
_, err = r.db.ExecContext(context.TODO(), query, watermark, binlog.Name, binlog.Pos, 0, 0, r.migration.Alter)
err = dbconn.Exec(context.TODO(), r.db, `INSERT INTO %n.%n
(copier_watermark, checksum_watermark, binlog_name, binlog_pos, rows_copied, rows_copied_logical, alter_statement)
VALUES
(%?, %?, %?, %?, %?, %?, %?)`,
r.checkpointTable.SchemaName,
r.checkpointTable.TableName,
watermark,
"",
binlog.Name,
binlog.Pos,
0,
0,
r.migration.Alter,
)
assert.NoError(t, err)

r2, err := NewRunner(&Migration{
Expand All @@ -944,6 +955,81 @@ func TestCheckpointRestore(t *testing.T) {
assert.True(t, r2.usedResumeFromCheckpoint)
}

func TestCheckpointResumeDuringChecksum(t *testing.T) {
tbl := `CREATE TABLE cptresume (
id INT NOT NULL AUTO_INCREMENT PRIMARY KEY,
id2 INT NOT NULL,
pad VARCHAR(100) NOT NULL default 0)`
cfg, err := mysql.ParseDSN(testutils.DSN())
assert.NoError(t, err)
testutils.RunSQL(t, `DROP TABLE IF EXISTS cptresume, _cptresume_new, _cptresume_chkpnt, _cptresume_sentinel`)
testutils.RunSQL(t, tbl)
testutils.RunSQL(t, `CREATE TABLE _cptresume_sentinel (id INT NOT NULL PRIMARY KEY)`)
testutils.RunSQL(t, `insert into cptresume (id2,pad) SELECT 1, REPEAT('a', 100) FROM dual`)
testutils.RunSQL(t, `insert into cptresume (id2,pad) SELECT 1, REPEAT('a', 100) FROM cptresume`)
testutils.RunSQL(t, `insert into cptresume (id2,pad) SELECT 1, REPEAT('a', 100) FROM cptresume a JOIN cptresume b JOIN cptresume c`)

r, err := NewRunner(&Migration{
Host: cfg.Addr,
Username: cfg.User,
Password: cfg.Passwd,
Database: cfg.DBName,
Threads: 4,
TargetChunkTime: 100 * time.Millisecond,
Table: "cptresume",
Alter: "ENGINE=InnoDB",
Checksum: true,
})
assert.NoError(t, err)

// Call r.Run() with our context in a go-routine.
// When we see that we are waiting on the sentinel table,
// we then manually start the first bits of checksum, and then close()
// We should be able to resume from the checkpoint into the checksum state.
ctx, cancel := context.WithCancel(context.Background())
go func() {
err := r.Run(ctx)
assert.Error(t, err) // context cancelled
}()
for {
// Wait for the sentinel table.
if r.getCurrentState() >= stateWaitingOnSentinelTable {
break
}
time.Sleep(time.Millisecond)
}

assert.NoError(t, r.checksum(context.TODO())) // run the checksum, the original Run is blocked on sentinel.
assert.NoError(t, r.dumpCheckpoint(context.TODO())) // dump a checkpoint with the watermark.
cancel() // unblock the original waiting on sentinel.
assert.NoError(t, r.Close()) // close the run.

// drop the sentinel table.
testutils.RunSQL(t, `DROP TABLE _cptresume_sentinel`)

morgo marked this conversation as resolved.
Show resolved Hide resolved
// insert a couple more rows (should not change anything)
testutils.RunSQL(t, `insert into cptresume (id2,pad) SELECT 1, REPEAT('b', 100) FROM dual`)
testutils.RunSQL(t, `insert into cptresume (id2,pad) SELECT 1, REPEAT('c', 100) FROM dual`)

// Start again as a new runner,
r2, err := NewRunner(&Migration{
Host: cfg.Addr,
Username: cfg.User,
Password: cfg.Passwd,
Database: cfg.DBName,
Threads: 4,
TargetChunkTime: 100 * time.Millisecond,
Table: "cptresume",
Alter: "ENGINE=InnoDB",
Checksum: true,
})
assert.NoError(t, err)
err = r2.Run(context.Background())
assert.NoError(t, err)
assert.True(t, r2.usedResumeFromCheckpoint)
assert.NotEmpty(t, r2.checksumWatermark) // it had a checksum watermark
}

func TestCheckpointDifferentRestoreOptions(t *testing.T) {
tbl := `CREATE TABLE cpt1difft1 (
id INT NOT NULL AUTO_INCREMENT PRIMARY KEY,
Expand Down Expand Up @@ -2663,7 +2749,7 @@ func TestResumeFromCheckpointE2EWithManualSentinel(t *testing.T) {

go func() {
err := runner.Run(ctx)
assert.ErrorContains(t, err, "context canceled") // it gets interrupted as soon as there is a checkpoint saved.
assert.Error(t, err) // it gets interrupted as soon as there is a checkpoint saved.
}()

// wait until a checkpoint is saved (which means copy is in progress)
Expand Down
Loading