From 0a81c8bb405e19411a7a9bdc4ae3b2d0edcfdecc Mon Sep 17 00:00:00 2001 From: Morgan Tocker Date: Tue, 5 Sep 2023 09:13:45 -0600 Subject: [PATCH 1/2] checksum, migration: Continue to flush binlog during long running checksum --- pkg/checksum/checker.go | 11 +++++++++++ pkg/migration/runner.go | 3 +++ 2 files changed, 14 insertions(+) diff --git a/pkg/checksum/checker.go b/pkg/checksum/checker.go index e4740cb0..cce4b14d 100644 --- a/pkg/checksum/checker.go +++ b/pkg/checksum/checker.go @@ -21,6 +21,8 @@ import ( "golang.org/x/sync/errgroup" ) +var binlogFlushIntervalDuringChecksum = 30 * time.Second + type Checker struct { sync.Mutex table *table.TableInfo @@ -201,6 +203,15 @@ func (c *Checker) Run(ctx context.Context) error { } c.logger.Info("table unlocked, starting checksum") + // Start the periodic flush again *just* for the duration of the checksum. + // If the checksum is long running, it could block flushing for too long: + // - If we need to resume from checkpoint, the binlogs may not be there. + // - If they are there, they will take a huge amount of time to flush + // - The memory requirements for 1MM deltas seems reasonable, but for a multi-day + // checksum it is reasonable to assume it may exceed this. + go c.feed.StartPeriodicFlush(ctx, binlogFlushIntervalDuringChecksum) + defer c.feed.StopPeriodicFlush() + g, errGrpCtx := errgroup.WithContext(ctx) g.SetLimit(c.concurrency) for !c.chunker.IsRead() && c.isHealthy(errGrpCtx) { diff --git a/pkg/migration/runner.go b/pkg/migration/runner.go index f99856c0..090a0115 100644 --- a/pkg/migration/runner.go +++ b/pkg/migration/runner.go @@ -269,6 +269,9 @@ func (r *Runner) Run(originalCtx context.Context) error { func (r *Runner) prepareForCutover(ctx context.Context) error { r.setCurrentState(stateApplyChangeset) // Disable the periodic flush and flush all pending events. + // We want it disabled for ANALYZE TABLE and acquiring a table lock + // *but* it will be started again briefly inside of the checksum + // runner to ensure that the lag does not grow too long. r.replClient.StopPeriodicFlush() if err := r.replClient.Flush(ctx); err != nil { return err From b0a786c28dde343cc1ce840b2b478b0712f113b7 Mon Sep 17 00:00:00 2001 From: Morgan Tocker Date: Tue, 5 Sep 2023 11:55:33 -0600 Subject: [PATCH 2/2] Address PR feedback --- pkg/checksum/checker.go | 4 +--- pkg/migration/runner.go | 6 +----- pkg/repl/client.go | 4 ++++ 3 files changed, 6 insertions(+), 8 deletions(-) diff --git a/pkg/checksum/checker.go b/pkg/checksum/checker.go index cce4b14d..bcd2477c 100644 --- a/pkg/checksum/checker.go +++ b/pkg/checksum/checker.go @@ -21,8 +21,6 @@ import ( "golang.org/x/sync/errgroup" ) -var binlogFlushIntervalDuringChecksum = 30 * time.Second - type Checker struct { sync.Mutex table *table.TableInfo @@ -209,7 +207,7 @@ func (c *Checker) Run(ctx context.Context) error { // - If they are there, they will take a huge amount of time to flush // - The memory requirements for 1MM deltas seems reasonable, but for a multi-day // checksum it is reasonable to assume it may exceed this. - go c.feed.StartPeriodicFlush(ctx, binlogFlushIntervalDuringChecksum) + go c.feed.StartPeriodicFlush(ctx, repl.DefaultFlushInterval) defer c.feed.StopPeriodicFlush() g, errGrpCtx := errgroup.WithContext(ctx) diff --git a/pkg/migration/runner.go b/pkg/migration/runner.go index 3dc38514..a6718a4b 100644 --- a/pkg/migration/runner.go +++ b/pkg/migration/runner.go @@ -43,10 +43,6 @@ var ( checkpointDumpInterval = 50 * time.Second tableStatUpdateInterval = 5 * time.Minute statusInterval = 30 * time.Second - // binlogPerodicFlushInterval is the time that the client will flush all binlog changes to disk. - // Longer values require more memory, but permit more merging. - // I expect we will change this to 1hr-24hr in the future. - binlogPerodicFlushInterval = 30 * time.Second ) func (s migrationState) String() string { @@ -429,7 +425,7 @@ func (r *Runner) setup(ctx context.Context) error { // Continuously update the min/max and estimated rows // and to flush the binary log position periodically. go r.table.AutoUpdateStatistics(ctx, tableStatUpdateInterval, r.logger) - go r.replClient.StartPeriodicFlush(ctx, binlogPerodicFlushInterval) + go r.replClient.StartPeriodicFlush(ctx, repl.DefaultFlushInterval) return nil } diff --git a/pkg/repl/client.go b/pkg/repl/client.go index d3f741af..d9d41264 100644 --- a/pkg/repl/client.go +++ b/pkg/repl/client.go @@ -33,6 +33,10 @@ const ( // multiple-flush-threads, which should help it group commit and still be fast. DefaultBatchSize = 1000 flushThreads = 16 + // DefaultFlushInterval is the time that the client will flush all binlog changes to disk. + // Longer values require more memory, but permit more merging. + // I expect we will change this to 1hr-24hr in the future. + DefaultFlushInterval = 30 * time.Second ) type queuedChange struct {