diff --git a/go/vt/vttablet/tabletserver/repltracker/repltracker.go b/go/vt/vttablet/tabletserver/repltracker/repltracker.go index db20c2e821c..906b0179eb7 100644 --- a/go/vt/vttablet/tabletserver/repltracker/repltracker.go +++ b/go/vt/vttablet/tabletserver/repltracker/repltracker.go @@ -17,6 +17,8 @@ limitations under the License. package repltracker import ( + "errors" + "fmt" "sync" "time" @@ -47,6 +49,8 @@ var ( heartbeatLagNsHistogram = stats.NewGenericHistogram("HeartbeatLagNsHistogram", "Histogram of lag values in nanoseconds", []int64{0, 1e6, 1e7, 1e8, 1e9, 1e10, 1e11, 1e12}, []string{"0", "1ms", "10ms", "100ms", "1s", "10s", "100s", "1000s", ">1000s"}, "Count", "Total") + + errFallback = errors.New("failed to obtain replication lag from poller after attempting to use it as fall-back for heartbeat") ) // ReplTracker tracks replication lag. @@ -133,14 +137,29 @@ func (rt *ReplTracker) Status() (time.Duration, error) { rt.mu.Lock() defer rt.mu.Unlock() + fallbackToPoller := false + var heartbeatLag, mysqlLag time.Duration + var heartbeatErr, mysqlErr error + switch { case rt.isPrimary || rt.mode == tabletenv.Disable: return 0, nil case rt.mode == tabletenv.Heartbeat: - return rt.hr.Status() + // This should allow us to migrate safely to using vttablet heartbeat. If using heartbeat fails (e.g. because + // the shard's primary does not yet have them and therefore, either the heartbeat table is missing or it's + // empty), fall back to the poller. Otherwise, use what the heartbeat says. + if heartbeatLag, heartbeatErr = rt.hr.Status(); heartbeatErr == nil { + return heartbeatLag, heartbeatErr + } + fallbackToPoller = true } - // rt.mode == tabletenv.Poller - return rt.poller.Status() + // rt.mode == tabletenv.Poller or fallback after heartbeat error + mysqlLag, mysqlErr = rt.poller.Status() + if fallbackToPoller && mysqlErr != nil { + return 0, fmt.Errorf("%w: %s", errFallback, mysqlErr) + } + + return mysqlLag, mysqlErr } // EnableHeartbeat enables or disables writes of heartbeat. This functionality diff --git a/go/vt/vttablet/tabletserver/repltracker/repltracker_test.go b/go/vt/vttablet/tabletserver/repltracker/repltracker_test.go index 362148cd3b2..0695a079b82 100644 --- a/go/vt/vttablet/tabletserver/repltracker/repltracker_test.go +++ b/go/vt/vttablet/tabletserver/repltracker/repltracker_test.go @@ -95,3 +95,76 @@ func TestReplTracker(t *testing.T) { _, err = rt.Status() assert.Equal(t, "err", err.Error()) } + +func TestStatusHeartbeatFallBack(t *testing.T) { + t.Parallel() + + heartbeatErr := errors.New("some error reading heartbeat") + mysqlErr := errors.New("some mysql error") + testCases := []struct { + name string + heartbeatLag tabletenv.Seconds + heartbeatError error + mysqldLag uint + mysqldErr error + expectedError error + expectedLag time.Duration + }{ + { + name: "Heartbeat successful", + heartbeatLag: tabletenv.Seconds(5.0), + heartbeatError: nil, + expectedLag: 5 * time.Second, + }, + { + name: "Heartbeat failed, mysqld lag successful", + heartbeatError: heartbeatErr, + mysqldLag: 8, + expectedLag: 8 * time.Second, + }, + { + name: "Heartbeat & mysqld lag failed", + heartbeatError: heartbeatErr, + mysqldErr: mysqlErr, + expectedError: errFallback, + }, + } + + for _, testCase := range testCases { + theCase := testCase + + t.Run(theCase.name, func(t *testing.T) { + t.Parallel() + config := tabletenv.NewDefaultConfig() + config.ReplicationTracker.Mode = tabletenv.Heartbeat + config.ReplicationTracker.HeartbeatIntervalSeconds = theCase.heartbeatLag + env := tabletenv.NewEnv(config, "ReplTrackerTest") + alias := &topodatapb.TabletAlias{ + Cell: "cell", + Uid: 1, + } + mysqld := fakemysqldaemon.NewFakeMysqlDaemon(nil) + mysqld.ReplicationLagSeconds = theCase.mysqldLag + mysqld.Replicating = true + mysqld.ReplicationStatusError = theCase.mysqldErr + target := &querypb.Target{} + + rt := NewReplTracker(env, alias) + + rt.hr.lastKnownLag = time.Duration(theCase.heartbeatLag) * time.Second + rt.hr.lastKnownError = theCase.heartbeatError + rt.InitDBConfig(target, mysqld) + + lag, err := rt.Status() + + if theCase.expectedError == nil { + assert.NoError(t, err) + assert.Equal(t, theCase.expectedLag, lag) + } else { + assert.ErrorIs(t, err, theCase.expectedError) + } + + }) + } + +}