diff --git a/pkg/checksum/checker.go b/pkg/checksum/checker.go index e4740cb0..bcd2477c 100644 --- a/pkg/checksum/checker.go +++ b/pkg/checksum/checker.go @@ -201,6 +201,15 @@ func (c *Checker) Run(ctx context.Context) error { } c.logger.Info("table unlocked, starting checksum") + // Start the periodic flush again *just* for the duration of the checksum. + // If the checksum is long running, it could block flushing for too long: + // - If we need to resume from checkpoint, the binlogs may not be there. + // - If they are there, they will take a huge amount of time to flush + // - The memory requirements for 1MM deltas seems reasonable, but for a multi-day + // checksum it is reasonable to assume it may exceed this. + go c.feed.StartPeriodicFlush(ctx, repl.DefaultFlushInterval) + defer c.feed.StopPeriodicFlush() + g, errGrpCtx := errgroup.WithContext(ctx) g.SetLimit(c.concurrency) for !c.chunker.IsRead() && c.isHealthy(errGrpCtx) { diff --git a/pkg/migration/runner.go b/pkg/migration/runner.go index 541763eb..a6718a4b 100644 --- a/pkg/migration/runner.go +++ b/pkg/migration/runner.go @@ -43,10 +43,6 @@ var ( checkpointDumpInterval = 50 * time.Second tableStatUpdateInterval = 5 * time.Minute statusInterval = 30 * time.Second - // binlogPerodicFlushInterval is the time that the client will flush all binlog changes to disk. - // Longer values require more memory, but permit more merging. - // I expect we will change this to 1hr-24hr in the future. - binlogPerodicFlushInterval = 30 * time.Second ) func (s migrationState) String() string { @@ -269,6 +265,9 @@ func (r *Runner) Run(originalCtx context.Context) error { func (r *Runner) prepareForCutover(ctx context.Context) error { r.setCurrentState(stateApplyChangeset) // Disable the periodic flush and flush all pending events. + // We want it disabled for ANALYZE TABLE and acquiring a table lock + // *but* it will be started again briefly inside of the checksum + // runner to ensure that the lag does not grow too long. r.replClient.StopPeriodicFlush() if err := r.replClient.Flush(ctx); err != nil { return err @@ -426,7 +425,7 @@ func (r *Runner) setup(ctx context.Context) error { // Continuously update the min/max and estimated rows // and to flush the binary log position periodically. go r.table.AutoUpdateStatistics(ctx, tableStatUpdateInterval, r.logger) - go r.replClient.StartPeriodicFlush(ctx, binlogPerodicFlushInterval) + go r.replClient.StartPeriodicFlush(ctx, repl.DefaultFlushInterval) return nil } diff --git a/pkg/repl/client.go b/pkg/repl/client.go index d3f741af..d9d41264 100644 --- a/pkg/repl/client.go +++ b/pkg/repl/client.go @@ -33,6 +33,10 @@ const ( // multiple-flush-threads, which should help it group commit and still be fast. DefaultBatchSize = 1000 flushThreads = 16 + // DefaultFlushInterval is the time that the client will flush all binlog changes to disk. + // Longer values require more memory, but permit more merging. + // I expect we will change this to 1hr-24hr in the future. + DefaultFlushInterval = 30 * time.Second ) type queuedChange struct {