Skip to content

Commit

Permalink
Only start SQL thread temporarily to WaitForPosition if needed (vites…
Browse files Browse the repository at this point in the history
…sio#10104)

After vitessio#9512 we always attempted to start the replication SQL_Thread(s) when waiting for a given position. The problem with this, however, is that if the SQL_Thread is running but the IO_Thread is not then the tablet repair does not try and start replication on a replica tablet. So in certain states such as when initializing a shard, replication may end up in a non-healthy state and never be repaired.

This changes the behavior so that:
  1. We only attempt to start the SQL_Thread(s) if it's not already running
  2. If we explicitly start the SQL_Thread(s) then we also explicitly reset it to what it was (stopped) as we exit the call

Because the caller should be/have a TabletManager which has a mutex, this should ensure that the replication manager calls are serialized and because we are resetting the replication state after mutating it, everything should work as it did before vitessio#9512 with the exception being that when waiting we ensure that the replica at least has the possibility of catching up.

Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord authored Apr 18, 2022
1 parent 9fc6d52 commit 2571b36
Show file tree
Hide file tree
Showing 6 changed files with 107 additions and 11 deletions.
31 changes: 25 additions & 6 deletions go/mysql/flavor.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,17 +66,24 @@ type flavor interface {
// restartReplicationCommands returns the commands to stop, reset and start the replication.
restartReplicationCommands() []string

// startReplicationUntilAfter will restart replication, but only allow it
// startReplicationUntilAfter will start replication, but only allow it
// to run until `pos` is reached. After reaching pos, replication will be stopped again
startReplicationUntilAfter(pos Position) string

// startSQLThreadUntilAfter will start replication's sql thread(s), but only allow it
// to run until `pos` is reached. After reaching pos, it will be stopped again
startSQLThreadUntilAfter(pos Position) string

// stopReplicationCommand returns the command to stop the replication.
stopReplicationCommand() string

// stopIOThreadCommand returns the command to stop the replica's io thread only.
// stopIOThreadCommand returns the command to stop the replica's IO thread only.
stopIOThreadCommand() string

// startSQLThreadCommand returns the command to start the replica's sql thread only.
// stopSQLThreadCommand returns the command to stop the replica's SQL thread(s) only.
stopSQLThreadCommand() string

// startSQLThreadCommand returns the command to start the replica's SQL thread only.
startSQLThreadCommand() string

// sendBinlogDumpCommand sends the packet required to start
Expand Down Expand Up @@ -236,21 +243,28 @@ func (c *Conn) PrimaryFilePosition() (Position, error) {
}, nil
}

// StartReplicationCommand returns the command to start the replication.
// StartReplicationCommand returns the command to start replication.
func (c *Conn) StartReplicationCommand() string {
return c.flavor.startReplicationCommand()
}

// RestartReplicationCommands returns the commands to stop, reset and start the replication.
// RestartReplicationCommands returns the commands to stop, reset and start replication.
func (c *Conn) RestartReplicationCommands() []string {
return c.flavor.restartReplicationCommands()
}

// StartReplicationUntilAfterCommand returns the command to start the replication.
// StartReplicationUntilAfterCommand returns the command to start replication.
func (c *Conn) StartReplicationUntilAfterCommand(pos Position) string {
return c.flavor.startReplicationUntilAfter(pos)
}

// StartSQLThreadUntilAfterCommand returns the command to start the replica's SQL
// thread(s) and have it run until it has reached the given position, at which point
// it will stop.
func (c *Conn) StartSQLThreadUntilAfterCommand(pos Position) string {
return c.flavor.startSQLThreadUntilAfter(pos)
}

// StopReplicationCommand returns the command to stop the replication.
func (c *Conn) StopReplicationCommand() string {
return c.flavor.stopReplicationCommand()
Expand All @@ -261,6 +275,11 @@ func (c *Conn) StopIOThreadCommand() string {
return c.flavor.stopIOThreadCommand()
}

// StopSQLThreadCommand returns the command to stop the replica's SQL thread(s).
func (c *Conn) StopSQLThreadCommand() string {
return c.flavor.stopSQLThreadCommand()
}

// StartSQLThreadCommand returns the command to start the replica's SQL thread.
func (c *Conn) StartSQLThreadCommand() string {
return c.flavor.startSQLThreadCommand()
Expand Down
8 changes: 8 additions & 0 deletions go/mysql/flavor_filepos.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ func (flv *filePosFlavor) stopIOThreadCommand() string {
return "unsupported"
}

func (flv *filePosFlavor) stopSQLThreadCommand() string {
return "unsupported"
}

func (flv *filePosFlavor) startSQLThreadCommand() string {
return "unsupported"
}
Expand Down Expand Up @@ -269,6 +273,10 @@ func (*filePosFlavor) startReplicationUntilAfter(pos Position) string {
return "unsupported"
}

func (*filePosFlavor) startSQLThreadUntilAfter(pos Position) string {
return "unsupported"
}

// enableBinlogPlaybackCommand is part of the Flavor interface.
func (*filePosFlavor) enableBinlogPlaybackCommand() string {
return ""
Expand Down
8 changes: 8 additions & 0 deletions go/mysql/flavor_mariadb.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ func (mariadbFlavor) startReplicationUntilAfter(pos Position) string {
return fmt.Sprintf("START SLAVE UNTIL master_gtid_pos = \"%s\"", pos)
}

func (mariadbFlavor) startSQLThreadUntilAfter(pos Position) string {
return fmt.Sprintf("START SLAVE SQL_THREAD UNTIL master_gtid_pos = \"%s\"", pos)
}

func (mariadbFlavor) startReplicationCommand() string {
return "START SLAVE"
}
Expand All @@ -77,6 +81,10 @@ func (mariadbFlavor) stopIOThreadCommand() string {
return "STOP SLAVE IO_THREAD"
}

func (mariadbFlavor) stopSQLThreadCommand() string {
return "STOP SLAVE SQL_THREAD"
}

func (mariadbFlavor) startSQLThreadCommand() string {
return "START SLAVE SQL_THREAD"
}
Expand Down
8 changes: 8 additions & 0 deletions go/mysql/flavor_mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ func (mysqlFlavor) startReplicationUntilAfter(pos Position) string {
return fmt.Sprintf("START SLAVE UNTIL SQL_AFTER_GTIDS = '%s'", pos)
}

func (mysqlFlavor) startSQLThreadUntilAfter(pos Position) string {
return fmt.Sprintf("START SLAVE SQL_THREAD UNTIL SQL_AFTER_GTIDS = '%s'", pos)
}

func (mysqlFlavor) stopReplicationCommand() string {
return "STOP SLAVE"
}
Expand All @@ -80,6 +84,10 @@ func (mysqlFlavor) stopIOThreadCommand() string {
return "STOP SLAVE IO_THREAD"
}

func (mysqlFlavor) stopSQLThreadCommand() string {
return "STOP SLAVE SQL_THREAD"
}

func (mysqlFlavor) startSQLThreadCommand() string {
return "START SLAVE SQL_THREAD"
}
Expand Down
10 changes: 10 additions & 0 deletions go/mysql/flavor_mysqlgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,11 @@ func (mysqlGRFlavor) startReplicationUntilAfter(pos Position) string {
return ""
}

// startSQLThreadUntilAfter is disabled in mysqlGRFlavor
func (mysqlGRFlavor) startSQLThreadUntilAfter(pos Position) string {
return ""
}

// stopReplicationCommand returns the command to stop the replication.
// we return empty here since `STOP GROUP_REPLICATION` should be called by
// the external orchestrator
Expand All @@ -73,6 +78,11 @@ func (mysqlGRFlavor) stopIOThreadCommand() string {
return ""
}

// stopSQLThreadCommand is disabled in mysqlGRFlavor
func (mysqlGRFlavor) stopSQLThreadCommand() string {
return ""
}

// startSQLThreadCommand is disabled in mysqlGRFlavor
func (mysqlGRFlavor) startSQLThreadCommand() string {
return ""
Expand Down
53 changes: 48 additions & 5 deletions go/vt/mysqlctl/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"strings"
"time"

"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vtgate/evalengine"

"context"
Expand Down Expand Up @@ -97,6 +98,19 @@ func (mysqld *Mysqld) StartReplicationUntilAfter(ctx context.Context, targetPos
return mysqld.executeSuperQueryListConn(ctx, conn, queries)
}

// StartSQLThreadUntilAfter starts replication's SQL thread(s) until replication has come to `targetPos`, then it stops it
func (mysqld *Mysqld) StartSQLThreadUntilAfter(ctx context.Context, targetPos mysql.Position) error {
conn, err := getPoolReconnect(ctx, mysqld.dbaPool)
if err != nil {
return err
}
defer conn.Recycle()

queries := []string{conn.StartSQLThreadUntilAfterCommand(targetPos)}

return mysqld.executeSuperQueryListConn(ctx, conn, queries)
}

// StopReplication stops replication.
func (mysqld *Mysqld) StopReplication(hookExtraEnv map[string]string) error {
h := hook.NewSimpleHook("preflight_stop_slave")
Expand Down Expand Up @@ -125,6 +139,17 @@ func (mysqld *Mysqld) StopIOThread(ctx context.Context) error {
return mysqld.executeSuperQueryListConn(ctx, conn, []string{conn.StopIOThreadCommand()})
}

// StopSQLThread stops a replica's SQL thread(s) only.
func (mysqld *Mysqld) StopSQLThread(ctx context.Context) error {
conn, err := getPoolReconnect(ctx, mysqld.dbaPool)
if err != nil {
return err
}
defer conn.Recycle()

return mysqld.executeSuperQueryListConn(ctx, conn, []string{conn.StopSQLThreadCommand()})
}

// RestartReplication stops, resets and starts replication.
func (mysqld *Mysqld) RestartReplication(hookExtraEnv map[string]string) error {
h := hook.NewSimpleHook("preflight_stop_slave")
Expand Down Expand Up @@ -244,14 +269,32 @@ func (mysqld *Mysqld) WaitSourcePos(ctx context.Context, targetPos mysql.Positio
return nil
}

// Start the SQL Thread before waiting for position to be reached, since the replicas
// can only make forward progress if the SQL thread is started and we have already verified
// that the replica is not already as advanced as we want it to be
err = mysqld.executeSuperQueryListConn(ctx, conn, []string{conn.StartSQLThreadCommand()})
if err != nil {
replicationStatus, err := conn.ShowReplicationStatus()
if err != nil && !errors.Is(err, mysql.ErrNotReplica) {
return err
}

// If the SQL thread(s) is not already running -- e.g. in the case of EmergencyReparentShard where the
// instance is transitioning to PRIMARY (elect) and we can no longer talk to the old PRIMARY, we need
// it to catch up as much as possible by executing all of the locally queued binary log events (in
// the existing relay logs) before it starts serving traffic for the shard to minimize potential data
// loss -- then we try to start the SQL Thread(s) before waiting for the position to be reached at
// which point the SQL thread(s) will be stopped again, since the replicas can only make forward
// progress if the SQL thread is started and we have already verified that the replica is not already
// as advanced as we want it to be
if !replicationStatus.SQLHealthy() {
// Let's ensure the replication state is put back to what it was when we started.
// Doing this in a deferred function ensures that we do so even if we timeout while waiting.
defer func() {
mysqld.executeSuperQueryListConn(ctx, conn, []string{conn.StopSQLThreadCommand()})
}()
if err = mysqld.executeSuperQueryListConn(ctx, conn, []string{conn.StartSQLThreadCommand()}); err != nil {
return vterrors.Wrap(err,
fmt.Sprintf("the replication SQL thread(s) was stopped and we could not temporarily start it in order to wait for the target position of %v",
targetPos))
}
}

// Find the query to run, run it.
query, err = conn.WaitUntilPositionCommand(ctx, targetPos)
if err != nil {
Expand Down

0 comments on commit 2571b36

Please sign in to comment.