Skip to content

Commit

Permalink
Merge pull request #192 from squareup/mtocker-flush-during-checksum
Browse files Browse the repository at this point in the history
checksum, migration: Continue to flush binlog during long running checksum
  • Loading branch information
morgo authored Sep 5, 2023
2 parents 6f7646e + b0a786c commit d4172df
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 5 deletions.
9 changes: 9 additions & 0 deletions pkg/checksum/checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,15 @@ func (c *Checker) Run(ctx context.Context) error {
}
c.logger.Info("table unlocked, starting checksum")

// Start the periodic flush again *just* for the duration of the checksum.
// If the checksum is long running, it could block flushing for too long:
// - If we need to resume from checkpoint, the binlogs may not be there.
// - If they are there, they will take a huge amount of time to flush
// - The memory requirements for 1MM deltas seems reasonable, but for a multi-day
// checksum it is reasonable to assume it may exceed this.
go c.feed.StartPeriodicFlush(ctx, repl.DefaultFlushInterval)
defer c.feed.StopPeriodicFlush()

g, errGrpCtx := errgroup.WithContext(ctx)
g.SetLimit(c.concurrency)
for !c.chunker.IsRead() && c.isHealthy(errGrpCtx) {
Expand Down
9 changes: 4 additions & 5 deletions pkg/migration/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,6 @@ var (
checkpointDumpInterval = 50 * time.Second
tableStatUpdateInterval = 5 * time.Minute
statusInterval = 30 * time.Second
// binlogPerodicFlushInterval is the time that the client will flush all binlog changes to disk.
// Longer values require more memory, but permit more merging.
// I expect we will change this to 1hr-24hr in the future.
binlogPerodicFlushInterval = 30 * time.Second
)

func (s migrationState) String() string {
Expand Down Expand Up @@ -269,6 +265,9 @@ func (r *Runner) Run(originalCtx context.Context) error {
func (r *Runner) prepareForCutover(ctx context.Context) error {
r.setCurrentState(stateApplyChangeset)
// Disable the periodic flush and flush all pending events.
// We want it disabled for ANALYZE TABLE and acquiring a table lock
// *but* it will be started again briefly inside of the checksum
// runner to ensure that the lag does not grow too long.
r.replClient.StopPeriodicFlush()
if err := r.replClient.Flush(ctx); err != nil {
return err
Expand Down Expand Up @@ -426,7 +425,7 @@ func (r *Runner) setup(ctx context.Context) error {
// Continuously update the min/max and estimated rows
// and to flush the binary log position periodically.
go r.table.AutoUpdateStatistics(ctx, tableStatUpdateInterval, r.logger)
go r.replClient.StartPeriodicFlush(ctx, binlogPerodicFlushInterval)
go r.replClient.StartPeriodicFlush(ctx, repl.DefaultFlushInterval)
return nil
}

Expand Down
4 changes: 4 additions & 0 deletions pkg/repl/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ const (
// multiple-flush-threads, which should help it group commit and still be fast.
DefaultBatchSize = 1000
flushThreads = 16
// DefaultFlushInterval is the time that the client will flush all binlog changes to disk.
// Longer values require more memory, but permit more merging.
// I expect we will change this to 1hr-24hr in the future.
DefaultFlushInterval = 30 * time.Second
)

type queuedChange struct {
Expand Down

0 comments on commit d4172df

Please sign in to comment.