@@ -385,8 +385,38 @@ func (this *Migrator) ExecOnFailureHook() (err error) {
385
385
return this .hooksExecutor .onFailure ()
386
386
}
387
387
388
+ func (this * Migrator ) handleCutOverResult (cutOverError error ) (err error ) {
389
+ if this .migrationContext .TestOnReplica {
390
+ // We're merly testing, we don't want to keep this state. Rollback the renames as possible
391
+ this .applier .RenameTablesRollback ()
392
+ }
393
+ if cutOverError == nil {
394
+ return nil
395
+ }
396
+ // Only on error:
397
+
398
+ if this .migrationContext .TestOnReplica {
399
+ // With `--test-on-replica` we stop replication thread, and then proceed to use
400
+ // the same cut-over phase as the master would use. That means we take locks
401
+ // and swap the tables.
402
+ // The difference is that we will later swap the tables back.
403
+ if err := this .hooksExecutor .onStartReplication (); err != nil {
404
+ return log .Errore (err )
405
+ }
406
+ if this .migrationContext .TestOnReplicaSkipReplicaStop {
407
+ log .Warningf ("--test-on-replica-skip-replica-stop enabled, we are not starting replication." )
408
+ } else {
409
+ log .Debugf ("testing on replica. Starting replication IO thread after cut-over failure" )
410
+ if err := this .retryOperation (this .applier .StartReplication ); err != nil {
411
+ return log .Errore (err )
412
+ }
413
+ }
414
+ }
415
+ return nil
416
+ }
417
+
388
418
// cutOver performs the final step of migration, based on migration
389
- // type (on replica? bumpy ? safe?)
419
+ // type (on replica? atomic ? safe?)
390
420
func (this * Migrator ) cutOver () (err error ) {
391
421
if this .migrationContext .Noop {
392
422
log .Debugf ("Noop operation; not really swapping tables" )
@@ -441,18 +471,18 @@ func (this *Migrator) cutOver() (err error) {
441
471
return err
442
472
}
443
473
}
444
- // We're merly testing, we don't want to keep this state. Rollback the renames as possible
445
- defer this .applier .RenameTablesRollback ()
446
- // We further proceed to do the cutover by normal means; the 'defer' above will rollback the swap
447
474
}
448
475
if this .migrationContext .CutOverType == base .CutOverAtomic {
449
476
// Atomic solution: we use low timeout and multiple attempts. But for
450
477
// each failed attempt, we throttle until replication lag is back to normal
451
478
err := this .atomicCutOver ()
479
+ this .handleCutOverResult (err )
452
480
return err
453
481
}
454
482
if this .migrationContext .CutOverType == base .CutOverTwoStep {
455
- return this .cutOverTwoStep ()
483
+ err := this .cutOverTwoStep ()
484
+ this .handleCutOverResult (err )
485
+ return err
456
486
}
457
487
return log .Fatalf ("Unknown cut-over type: %d; should never get here!" , this .migrationContext .CutOverType )
458
488
}
@@ -1064,8 +1094,10 @@ func (this *Migrator) onApplyEventStruct(eventStruct *applyEventStruct) error {
1064
1094
1065
1095
availableEvents := len (this .applyEventsQueue )
1066
1096
batchSize := int (atomic .LoadInt64 (& this .migrationContext .DMLBatchSize ))
1067
- if availableEvents > batchSize {
1068
- availableEvents = batchSize
1097
+ if availableEvents > batchSize - 1 {
1098
+ // The "- 1" is because we already consumed one event: the original event that led to this function getting called.
1099
+ // So, if DMLBatchSize==1 we wish to not process any further events
1100
+ availableEvents = batchSize - 1
1069
1101
}
1070
1102
for i := 0 ; i < availableEvents ; i ++ {
1071
1103
additionalStruct := <- this .applyEventsQueue
0 commit comments