From 5600b916c7f86e16247e1829cec4365e0932a657 Mon Sep 17 00:00:00 2001 From: meiji163 Date: Thu, 10 Oct 2024 15:34:33 -0700 Subject: [PATCH] track binlog coords --- go/logic/coordinator.go | 87 +++++++++++++++++++++++++++++------- go/logic/coordinator_test.go | 8 +++- go/logic/migrator.go | 17 ++++--- 3 files changed, 88 insertions(+), 24 deletions(-) diff --git a/go/logic/coordinator.go b/go/logic/coordinator.go index 2dc568600..8873de3fc 100644 --- a/go/logic/coordinator.go +++ b/go/logic/coordinator.go @@ -21,9 +21,13 @@ import ( type Coordinator struct { migrationContext *base.MigrationContext - binlogSyncer *replication.BinlogSyncer - currentCoordinates mysql.BinlogCoordinates + binlogSyncer *replication.BinlogSyncer + + // protects currentCoordinates and transactionCoordinates currentCoordinatesMutex *sync.Mutex + currentCoordinates mysql.BinlogCoordinates + // maps sequence number to its binlog coordinates. + transactionCoordinates map[int64]mysql.BinlogCoordinates onChangelogEvent func(dmlEvent *binlog.BinlogDMLEvent) error @@ -49,6 +53,8 @@ type Coordinator struct { events chan *replication.BinlogEvent workerQueue chan *Worker + + finishedMigrating atomic.Bool } type Worker struct { @@ -66,15 +72,28 @@ func (w *Worker) ProcessEvents() error { changelogTableName := w.coordinator.migrationContext.GetChangelogTableName() for { + if w.coordinator.finishedMigrating.Load() { + return nil + } ev := <-w.eventQueue // fmt.Printf("Worker %d processing event: %T\n", w.id, ev.Event) // Verify this is a GTID Event gtidEvent, ok := ev.Event.(*replication.GTIDEvent) if !ok { - fmt.Printf("Received unexpected event: %v\n", ev) + w.coordinator.migrationContext.Log.Debugf("Received unexpected event: %v\n", ev) } + // record the coordinates for this transaction + func() { + w.coordinator.currentCoordinatesMutex.Lock() + defer w.coordinator.currentCoordinatesMutex.Unlock() + w.coordinator.transactionCoordinates[gtidEvent.SequenceNumber] = mysql.BinlogCoordinates{ + LogPos: int64(ev.Header.LogPos), + EventSize: int64(ev.Header.EventSize), + } + }() + // Wait for conditions to be met waitChannel := w.coordinator.WaitForTransaction(gtidEvent.LastCommitted) if waitChannel != nil { @@ -82,7 +101,9 @@ func (w *Worker) ProcessEvents() error { t := time.Now() <-waitChannel timeWaited := time.Since(t) - fmt.Printf("Worker %d waited for transaction %d for: %d\n", w.id, gtidEvent.LastCommitted, timeWaited) + w.coordinator.migrationContext.Log.Infof( + "Worker %d waited for transaction %d for: %d\n", + w.id, gtidEvent.LastCommitted, timeWaited) } // Process the transaction @@ -93,6 +114,9 @@ func (w *Worker) ProcessEvents() error { events: for { + if w.coordinator.finishedMigrating.Load() { + return nil + } ev := <-w.eventQueue if ev == nil { fmt.Printf("Worker %d ending transaction early\n", w.id) @@ -158,7 +182,9 @@ func (w *Worker) ProcessEvents() error { if len(dmlEvents) == cap(dmlEvents) { err := w.coordinator.applier.ApplyDMLEventQueries(dmlEvents) if err != nil { + //TODO(meiji163) add retry w.coordinator.migrationContext.Log.Errore(err) + w.coordinator.migrationContext.PanicAbort <- err } dmlEvents = dmlEvents[:0] } @@ -185,6 +211,7 @@ func (w *Worker) ProcessEvents() error { w.coordinator.HandleChangeLogEvent(changelogEvent) } + w.executedJobs += 1 w.coordinator.workerQueue <- w w.coordinator.busyWorkers.Add(-1) } @@ -200,6 +227,7 @@ func NewCoordinator(migrationContext *base.MigrationContext, applier *Applier, o currentCoordinates: mysql.BinlogCoordinates{}, currentCoordinatesMutex: &sync.Mutex{}, + transactionCoordinates: make(map[int64]mysql.BinlogCoordinates), binlogSyncer: replication.NewBinlogSyncer(replication.BinlogSyncerConfig{ ServerID: uint32(migrationContext.ReplicaServerId), @@ -224,24 +252,26 @@ func NewCoordinator(migrationContext *base.MigrationContext, applier *Applier, o } } -func (c *Coordinator) StartStreaming() error { +func (c *Coordinator) StartStreaming(canStopStreaming func() bool) error { ctx := context.TODO() streamer, err := c.binlogSyncer.StartSync(gomysql.Position{ Name: c.currentCoordinates.LogFile, Pos: uint32(c.currentCoordinates.LogPos), }) - if err != nil { return err } for { + if canStopStreaming() { + return nil + } ev, err := streamer.GetEvent(ctx) if err != nil { + //TODO(meiji163): try reconnect on error return err } - c.events <- ev } } @@ -307,6 +337,9 @@ func (c *Coordinator) ProcessEventsUntilDrained() error { // Read events from the binlog and submit them to the next worker case ev := <-c.events: { + if c.finishedMigrating.Load() { + return nil + } // c.migrationContext.Log.Infof("Received event: %T - %+v", ev.Event, ev.Event) switch binlogEvent := ev.Event.(type) { @@ -314,6 +347,12 @@ func (c *Coordinator) ProcessEventsUntilDrained() error { if c.lowWaterMark == 0 && binlogEvent.SequenceNumber > 0 { c.lowWaterMark = binlogEvent.SequenceNumber - 1 } + case *replication.RotateEvent: + c.currentCoordinatesMutex.Lock() + c.currentCoordinates.LogFile = string(binlogEvent.NextLogName) + c.currentCoordinatesMutex.Unlock() + c.migrationContext.Log.Infof("rotate to next log from %s:%d to %s", c.currentCoordinates.LogFile, int64(ev.Header.LogPos), binlogEvent.NextLogName) + continue default: // ignore all other events continue } @@ -361,12 +400,9 @@ func (c *Coordinator) ProcessEventsUntilDrained() error { // No events in the queue. Check if all workers are sleeping now default: - c.migrationContext.Log.Info("No events in the queue") { busyWorkers := c.busyWorkers.Load() if busyWorkers == 0 { - //c.migrationContext.Log.Info("All workers are sleeping") - // All workers are sleeping. We're done. return nil } else { //c.migrationContext.Log.Infof("%d/%d workers are busy\n", busyWorkers, cap(c.workerQueue)) @@ -404,7 +440,7 @@ func (c *Coordinator) WaitForTransaction(lastCommitted int64) chan struct{} { } func (c *Coordinator) HandleChangeLogEvent(event *binlog.BinlogDMLEvent) { - fmt.Printf("Coordinator: Handling changelog event: %+v\n", event) + c.migrationContext.Log.Infof("Coordinator: Handling changelog event: %+v\n", event) c.mu.Lock() defer c.mu.Unlock() @@ -414,28 +450,26 @@ func (c *Coordinator) HandleChangeLogEvent(event *binlog.BinlogDMLEvent) { func (c *Coordinator) MarkTransactionCompleted(sequenceNumber int64) { var channelsToNotify []chan struct{} + transactionsCompleted := []int64{} func() { c.mu.Lock() defer c.mu.Unlock() - c.migrationContext.Log.Infof("Coordinator: Marking job as completed: %d\n", sequenceNumber) + //c.migrationContext.Log.Infof("Coordinator: Marking job as completed: %d\n", sequenceNumber) // Mark the job as completed c.completedJobs[sequenceNumber] = true - // Then, update the low water mark if possible for { if c.completedJobs[c.lowWaterMark+1] { c.lowWaterMark++ - - // TODO: Remember the applied binlog coordinates + transactionsCompleted = append(transactionsCompleted, c.lowWaterMark) delete(c.completedJobs, c.lowWaterMark) } else { break } } - channelsToNotify = make([]chan struct{}, 0) // Schedule any jobs that were waiting for this job to complete @@ -447,7 +481,28 @@ func (c *Coordinator) MarkTransactionCompleted(sequenceNumber int64) { } }() + // update the binlog coords to the coords of the low water mark + if len(transactionsCompleted) > 0 { + // c.migrationContext.Log.Infof("Updating binlog coordinates to %s:%d\n", c.currentCoordinates.LogFile, c.currentCoordinates.LogPos) + go func() { + c.currentCoordinatesMutex.Lock() + defer c.currentCoordinatesMutex.Unlock() + + lastPos := c.transactionCoordinates[c.lowWaterMark] + c.currentCoordinates.LogPos = lastPos.LogPos + c.currentCoordinates.EventSize = lastPos.EventSize + + for _, sequenceNumber := range transactionsCompleted { + delete(c.transactionCoordinates, sequenceNumber) + } + }() + } + for _, waitChannel := range channelsToNotify { waitChannel <- struct{}{} } } + +func (c *Coordinator) Teardown() { + c.finishedMigrating.Store(true) +} diff --git a/go/logic/coordinator_test.go b/go/logic/coordinator_test.go index 17ae7ea7d..f1e7233f5 100644 --- a/go/logic/coordinator_test.go +++ b/go/logic/coordinator_test.go @@ -71,9 +71,10 @@ func TestCoordinator(t *testing.T) { } migrationContext.SetConnectionConfig("innodb") + migrationContext.NumWorkers = 4 applier := NewApplier(migrationContext) - err = applier.InitDBConnections() + err = applier.InitDBConnections(migrationContext.NumWorkers) require.NoError(t, err) err = applier.CreateChangelogTable() @@ -116,8 +117,11 @@ func TestCoordinator(t *testing.T) { coord.applier = applier coord.InitializeWorkers(8) + canStopStreaming := func() bool { + return false + } go func() { - err = coord.StartStreaming() + err = coord.StartStreaming(canStopStreaming) require.NoError(t, err) }() diff --git a/go/logic/migrator.go b/go/logic/migrator.go index d7c9c1707..847f0dadc 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -1109,7 +1109,7 @@ func (this *Migrator) initiateStreaming() error { go func() { this.migrationContext.Log.Debugf("Beginning streaming") //err := this.eventsStreamer.StreamEvents(this.canStopStreaming) - err := this.trxCoordinator.StartStreaming() + err := this.trxCoordinator.StartStreaming(this.canStopStreaming) if err != nil { this.migrationContext.PanicAbort <- err } @@ -1348,6 +1348,16 @@ func (this *Migrator) finalCleanup() error { func (this *Migrator) teardown() { atomic.StoreInt64(&this.finishedMigrating, 1) + if this.trxCoordinator != nil { + this.migrationContext.Log.Infof("Tearing down coordinator") + this.trxCoordinator.Teardown() + } + + if this.throttler != nil { + this.migrationContext.Log.Infof("Tearing down throttler") + this.throttler.Teardown() + } + if this.inspector != nil { this.migrationContext.Log.Infof("Tearing down inspector") this.inspector.Teardown() @@ -1362,9 +1372,4 @@ func (this *Migrator) teardown() { this.migrationContext.Log.Infof("Tearing down streamer") this.eventsStreamer.Teardown() } - - if this.throttler != nil { - this.migrationContext.Log.Infof("Tearing down throttler") - this.throttler.Teardown() - } }