Skip to content

Commit

Permalink
track binlog coords
Browse files Browse the repository at this point in the history
  • Loading branch information
meiji163 committed Oct 10, 2024
1 parent 6321e73 commit 5600b91
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 24 deletions.
87 changes: 71 additions & 16 deletions go/logic/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,13 @@ import (
type Coordinator struct {
migrationContext *base.MigrationContext

binlogSyncer *replication.BinlogSyncer
currentCoordinates mysql.BinlogCoordinates
binlogSyncer *replication.BinlogSyncer

// protects currentCoordinates and transactionCoordinates
currentCoordinatesMutex *sync.Mutex
currentCoordinates mysql.BinlogCoordinates
// maps sequence number to its binlog coordinates.
transactionCoordinates map[int64]mysql.BinlogCoordinates

onChangelogEvent func(dmlEvent *binlog.BinlogDMLEvent) error

Expand All @@ -49,6 +53,8 @@ type Coordinator struct {
events chan *replication.BinlogEvent

workerQueue chan *Worker

finishedMigrating atomic.Bool
}

type Worker struct {
Expand All @@ -66,23 +72,38 @@ func (w *Worker) ProcessEvents() error {
changelogTableName := w.coordinator.migrationContext.GetChangelogTableName()

for {
if w.coordinator.finishedMigrating.Load() {
return nil
}
ev := <-w.eventQueue
// fmt.Printf("Worker %d processing event: %T\n", w.id, ev.Event)

// Verify this is a GTID Event
gtidEvent, ok := ev.Event.(*replication.GTIDEvent)
if !ok {
fmt.Printf("Received unexpected event: %v\n", ev)
w.coordinator.migrationContext.Log.Debugf("Received unexpected event: %v\n", ev)
}

// record the coordinates for this transaction
func() {
w.coordinator.currentCoordinatesMutex.Lock()
defer w.coordinator.currentCoordinatesMutex.Unlock()
w.coordinator.transactionCoordinates[gtidEvent.SequenceNumber] = mysql.BinlogCoordinates{
LogPos: int64(ev.Header.LogPos),
EventSize: int64(ev.Header.EventSize),
}
}()

// Wait for conditions to be met
waitChannel := w.coordinator.WaitForTransaction(gtidEvent.LastCommitted)
if waitChannel != nil {
//fmt.Printf("Worker %d - transaction %d waiting for transaction: %d\n", w.id, gtidEvent.SequenceNumber, gtidEvent.LastCommitted)
t := time.Now()
<-waitChannel
timeWaited := time.Since(t)
fmt.Printf("Worker %d waited for transaction %d for: %d\n", w.id, gtidEvent.LastCommitted, timeWaited)
w.coordinator.migrationContext.Log.Infof(
"Worker %d waited for transaction %d for: %d\n",
w.id, gtidEvent.LastCommitted, timeWaited)
}

// Process the transaction
Expand All @@ -93,6 +114,9 @@ func (w *Worker) ProcessEvents() error {

events:
for {
if w.coordinator.finishedMigrating.Load() {
return nil
}
ev := <-w.eventQueue
if ev == nil {
fmt.Printf("Worker %d ending transaction early\n", w.id)
Expand Down Expand Up @@ -158,7 +182,9 @@ func (w *Worker) ProcessEvents() error {
if len(dmlEvents) == cap(dmlEvents) {
err := w.coordinator.applier.ApplyDMLEventQueries(dmlEvents)
if err != nil {
//TODO(meiji163) add retry
w.coordinator.migrationContext.Log.Errore(err)
w.coordinator.migrationContext.PanicAbort <- err
}
dmlEvents = dmlEvents[:0]
}
Expand All @@ -185,6 +211,7 @@ func (w *Worker) ProcessEvents() error {
w.coordinator.HandleChangeLogEvent(changelogEvent)
}

w.executedJobs += 1
w.coordinator.workerQueue <- w
w.coordinator.busyWorkers.Add(-1)
}
Expand All @@ -200,6 +227,7 @@ func NewCoordinator(migrationContext *base.MigrationContext, applier *Applier, o

currentCoordinates: mysql.BinlogCoordinates{},
currentCoordinatesMutex: &sync.Mutex{},
transactionCoordinates: make(map[int64]mysql.BinlogCoordinates),

binlogSyncer: replication.NewBinlogSyncer(replication.BinlogSyncerConfig{
ServerID: uint32(migrationContext.ReplicaServerId),
Expand All @@ -224,24 +252,26 @@ func NewCoordinator(migrationContext *base.MigrationContext, applier *Applier, o
}
}

func (c *Coordinator) StartStreaming() error {
func (c *Coordinator) StartStreaming(canStopStreaming func() bool) error {
ctx := context.TODO()

streamer, err := c.binlogSyncer.StartSync(gomysql.Position{
Name: c.currentCoordinates.LogFile,
Pos: uint32(c.currentCoordinates.LogPos),
})

if err != nil {
return err
}

for {
if canStopStreaming() {
return nil
}
ev, err := streamer.GetEvent(ctx)
if err != nil {
//TODO(meiji163): try reconnect on error
return err
}

c.events <- ev
}
}
Expand Down Expand Up @@ -307,13 +337,22 @@ func (c *Coordinator) ProcessEventsUntilDrained() error {
// Read events from the binlog and submit them to the next worker
case ev := <-c.events:
{
if c.finishedMigrating.Load() {
return nil
}
// c.migrationContext.Log.Infof("Received event: %T - %+v", ev.Event, ev.Event)

switch binlogEvent := ev.Event.(type) {
case *replication.GTIDEvent:
if c.lowWaterMark == 0 && binlogEvent.SequenceNumber > 0 {
c.lowWaterMark = binlogEvent.SequenceNumber - 1
}
case *replication.RotateEvent:
c.currentCoordinatesMutex.Lock()
c.currentCoordinates.LogFile = string(binlogEvent.NextLogName)
c.currentCoordinatesMutex.Unlock()
c.migrationContext.Log.Infof("rotate to next log from %s:%d to %s", c.currentCoordinates.LogFile, int64(ev.Header.LogPos), binlogEvent.NextLogName)
continue
default: // ignore all other events
continue
}
Expand Down Expand Up @@ -361,12 +400,9 @@ func (c *Coordinator) ProcessEventsUntilDrained() error {

// No events in the queue. Check if all workers are sleeping now
default:
c.migrationContext.Log.Info("No events in the queue")
{
busyWorkers := c.busyWorkers.Load()
if busyWorkers == 0 {
//c.migrationContext.Log.Info("All workers are sleeping")
// All workers are sleeping. We're done.
return nil
} else {

Check failure on line 407 in go/logic/coordinator.go

View workflow job for this annotation

GitHub Actions / lint

SA9003: empty branch (staticcheck)
//c.migrationContext.Log.Infof("%d/%d workers are busy\n", busyWorkers, cap(c.workerQueue))
Expand Down Expand Up @@ -404,7 +440,7 @@ func (c *Coordinator) WaitForTransaction(lastCommitted int64) chan struct{} {
}

func (c *Coordinator) HandleChangeLogEvent(event *binlog.BinlogDMLEvent) {
fmt.Printf("Coordinator: Handling changelog event: %+v\n", event)
c.migrationContext.Log.Infof("Coordinator: Handling changelog event: %+v\n", event)

c.mu.Lock()
defer c.mu.Unlock()
Expand All @@ -414,28 +450,26 @@ func (c *Coordinator) HandleChangeLogEvent(event *binlog.BinlogDMLEvent) {

func (c *Coordinator) MarkTransactionCompleted(sequenceNumber int64) {
var channelsToNotify []chan struct{}
transactionsCompleted := []int64{}

func() {
c.mu.Lock()
defer c.mu.Unlock()

c.migrationContext.Log.Infof("Coordinator: Marking job as completed: %d\n", sequenceNumber)
//c.migrationContext.Log.Infof("Coordinator: Marking job as completed: %d\n", sequenceNumber)

// Mark the job as completed
c.completedJobs[sequenceNumber] = true

// Then, update the low water mark if possible
for {
if c.completedJobs[c.lowWaterMark+1] {
c.lowWaterMark++

// TODO: Remember the applied binlog coordinates
transactionsCompleted = append(transactionsCompleted, c.lowWaterMark)
delete(c.completedJobs, c.lowWaterMark)
} else {
break
}
}

channelsToNotify = make([]chan struct{}, 0)

// Schedule any jobs that were waiting for this job to complete
Expand All @@ -447,7 +481,28 @@ func (c *Coordinator) MarkTransactionCompleted(sequenceNumber int64) {
}
}()

// update the binlog coords to the coords of the low water mark
if len(transactionsCompleted) > 0 {
// c.migrationContext.Log.Infof("Updating binlog coordinates to %s:%d\n", c.currentCoordinates.LogFile, c.currentCoordinates.LogPos)
go func() {
c.currentCoordinatesMutex.Lock()
defer c.currentCoordinatesMutex.Unlock()

lastPos := c.transactionCoordinates[c.lowWaterMark]
c.currentCoordinates.LogPos = lastPos.LogPos
c.currentCoordinates.EventSize = lastPos.EventSize

for _, sequenceNumber := range transactionsCompleted {
delete(c.transactionCoordinates, sequenceNumber)
}
}()
}

for _, waitChannel := range channelsToNotify {
waitChannel <- struct{}{}
}
}

func (c *Coordinator) Teardown() {
c.finishedMigrating.Store(true)
}
8 changes: 6 additions & 2 deletions go/logic/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,10 @@ func TestCoordinator(t *testing.T) {
}

migrationContext.SetConnectionConfig("innodb")
migrationContext.NumWorkers = 4

applier := NewApplier(migrationContext)
err = applier.InitDBConnections()
err = applier.InitDBConnections(migrationContext.NumWorkers)
require.NoError(t, err)

err = applier.CreateChangelogTable()
Expand Down Expand Up @@ -116,8 +117,11 @@ func TestCoordinator(t *testing.T) {
coord.applier = applier
coord.InitializeWorkers(8)

canStopStreaming := func() bool {
return false
}
go func() {
err = coord.StartStreaming()
err = coord.StartStreaming(canStopStreaming)
require.NoError(t, err)
}()

Expand Down
17 changes: 11 additions & 6 deletions go/logic/migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -1109,7 +1109,7 @@ func (this *Migrator) initiateStreaming() error {
go func() {
this.migrationContext.Log.Debugf("Beginning streaming")
//err := this.eventsStreamer.StreamEvents(this.canStopStreaming)
err := this.trxCoordinator.StartStreaming()
err := this.trxCoordinator.StartStreaming(this.canStopStreaming)
if err != nil {
this.migrationContext.PanicAbort <- err
}
Expand Down Expand Up @@ -1348,6 +1348,16 @@ func (this *Migrator) finalCleanup() error {
func (this *Migrator) teardown() {
atomic.StoreInt64(&this.finishedMigrating, 1)

if this.trxCoordinator != nil {
this.migrationContext.Log.Infof("Tearing down coordinator")
this.trxCoordinator.Teardown()
}

if this.throttler != nil {
this.migrationContext.Log.Infof("Tearing down throttler")
this.throttler.Teardown()
}

if this.inspector != nil {
this.migrationContext.Log.Infof("Tearing down inspector")
this.inspector.Teardown()
Expand All @@ -1362,9 +1372,4 @@ func (this *Migrator) teardown() {
this.migrationContext.Log.Infof("Tearing down streamer")
this.eventsStreamer.Teardown()
}

if this.throttler != nil {
this.migrationContext.Log.Infof("Tearing down throttler")
this.throttler.Teardown()
}
}

0 comments on commit 5600b91

Please sign in to comment.