Skip to content

Commit

Permalink
[raft] add a test for replace dead replicas (#6830)
Browse files Browse the repository at this point in the history
  • Loading branch information
luluz66 authored Jun 14, 2024
1 parent 48545ef commit 9024a34
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 16 deletions.
81 changes: 79 additions & 2 deletions enterprise/server/raft/store/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,15 @@ func waitForReplicaToCatchUp(t testing.TB, ctx context.Context, r *replica.Repli
}
}

func includeReplicaWithNHID(rd *rfpb.RangeDescriptor, nhid string) bool {
for _, r := range rd.GetReplicas() {
if r.GetNhid() == nhid {
return true
}
}
return false
}

func getReplica(t testing.TB, s *testutil.TestingStore, rangeID uint64) *replica.Replica {
for {
res, err := s.GetReplica(rangeID)
Expand Down Expand Up @@ -952,9 +961,7 @@ func TestDownReplicate(t *testing.T) {

// Advance the clock to trigger scan replicas
clock.Advance(61 * time.Second)
i := 0
for {
i++
clock.Advance(3 * time.Second)
time.Sleep(100 * time.Millisecond)

Expand All @@ -969,3 +976,73 @@ func TestDownReplicate(t *testing.T) {
}
}
}

func TestReplaceDeadReplica(t *testing.T) {
flags.Set(t, "cache.raft.max_range_size_bytes", 0) // disable auto splitting
// disable txn cleanup and zombie scan, because advance the fake clock can
// prematurely trigger txn cleanup and zombie cleanup
flags.Set(t, "cache.raft.enable_txn_cleanup", false)
flags.Set(t, "cache.raft.zombie_node_scan_interval", 0)

clock := clockwork.NewFakeClock()
sf := testutil.NewStoreFactoryWithClock(t, clock)
s1 := sf.NewStore(t)
s2 := sf.NewStore(t)
s3 := sf.NewStore(t)
ctx := context.Background()

// start shards for s1, s2, s3
stores := []*testutil.TestingStore{s1, s2, s3}
sf.StartShard(t, ctx, stores...)

{ // Verify that there are 2 replicas for range 2, and also write 10 records
s := getStoreWithRangeLease(t, ctx, stores, 2)
writeNRecords(ctx, t, s, 10)
replicas := getMembership(t, s, ctx, 2)
require.Equal(t, 3, len(replicas))
rd := s.GetRange(2)
require.Equal(t, 3, len(rd.GetReplicas()))
}

s := getStoreWithRangeLease(t, ctx, stores, 2)
r, err := s.GetReplica(2)
require.NoError(t, err)
desiredAppliedIndex, err := r.LastAppliedIndex()
require.NoError(t, err)

s4 := sf.NewStore(t)
// Stop store 3
s3.Stop()

nhid3 := s3.NodeHost().ID()
nhid4 := s4.NodeHost().ID()

// Advance the clock pass the cache.raft.dead_store_timeout so s3 is considered dead.
clock.Advance(5*time.Minute + 1*time.Second)
for {
// advance the clock to trigger scan replicas
clock.Advance(61 * time.Second)
// wait some time to allow let driver queue execute
time.Sleep(100 * time.Millisecond)
list, err := s4.ListReplicas(ctx, &rfpb.ListReplicasRequest{})
require.NoError(t, err)
if len(list.GetReplicas()) < 2 {
// s4 should have two ranges
continue
}

if !includeReplicaWithNHID(s1.GetRange(1), nhid3) &&
!includeReplicaWithNHID(s1.GetRange(2), nhid3) &&
!includeReplicaWithNHID(s2.GetRange(1), nhid3) &&
!includeReplicaWithNHID(s2.GetRange(2), nhid3) {
// nhid4 should be added to range 1 and range2, and nhid3 removed
require.True(t, includeReplicaWithNHID(s1.GetRange(1), nhid4))
require.True(t, includeReplicaWithNHID(s1.GetRange(2), nhid4))
require.True(t, includeReplicaWithNHID(s2.GetRange(1), nhid4))
require.True(t, includeReplicaWithNHID(s2.GetRange(2), nhid4))
break
}
}
r2 := getReplica(t, s4, 2)
waitForReplicaToCatchUp(t, ctx, r2, desiredAppliedIndex)
}
33 changes: 19 additions & 14 deletions enterprise/server/raft/testutil/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,6 @@ func localAddr(t *testing.T) string {
return fmt.Sprintf("127.0.0.1:%d", testport.FindFree(t))
}

func newGossipManager(t testing.TB, nodeAddr string, seeds []string) *gossip.GossipManager {
node, err := gossip.New("name-"+nodeAddr, nodeAddr, seeds)
require.NoError(t, err)
t.Cleanup(func() {
node.Shutdown()
})
return node
}

type StoreFactory struct {
rootDir string
fileDir string
Expand Down Expand Up @@ -85,10 +76,12 @@ func (sf *StoreFactory) Registry() registry.NodeRegistry {

func (sf *StoreFactory) NewStore(t *testing.T) *TestingStore {
nodeAddr := localAddr(t)
gm := newGossipManager(t, nodeAddr, sf.gossipAddrs)
gm, err := gossip.New("name-"+nodeAddr, nodeAddr, sf.gossipAddrs)
require.NoError(t, err)
sf.gossipAddrs = append(sf.gossipAddrs, nodeAddr)

ts := &TestingStore{
gm: gm,
RaftAddress: localAddr(t),
GRPCAddress: localAddr(t),
RootDir: filepath.Join(sf.rootDir, fmt.Sprintf("store-%d", len(sf.gossipAddrs))),
Expand Down Expand Up @@ -139,10 +132,7 @@ func (sf *StoreFactory) NewStore(t *testing.T) *TestingStore {
ts.Store = store

t.Cleanup(func() {
ctx := context.Background()
ctx, cancelFn := context.WithTimeout(ctx, 3*time.Second)
defer cancelFn()
store.Stop(ctx)
ts.Stop()
})
return ts
}
Expand All @@ -160,9 +150,11 @@ type TestingStore struct {

db pebble.IPebbleDB

gm *gossip.GossipManager
RootDir string
RaftAddress string
GRPCAddress string
closed bool
}

func (ts *TestingStore) DB() pebble.IPebbleDB {
Expand All @@ -174,6 +166,19 @@ func (ts *TestingStore) NewReplica(shardID, replicaID uint64) *replica.Replica {
return sm.(*replica.Replica)
}

func (ts *TestingStore) Stop() {
if ts.closed {
return
}
ctx := context.Background()
ctx, cancelFn := context.WithTimeout(ctx, 3*time.Second)
defer cancelFn()
ts.Store.Stop(ctx)
ts.gm.Leave()
ts.gm.Shutdown()
ts.closed = true
}

func (sf *StoreFactory) StartShard(t *testing.T, ctx context.Context, stores ...*TestingStore) {
require.Greater(t, len(stores), 0)
err := bringup.SendStartShardRequests(ctx, client.NewSessionWithClock(sf.clock), stores[0].NodeHost(), stores[0].APIClient(), MakeNodeGRPCAddressesMap(stores...))
Expand Down

0 comments on commit 9024a34

Please sign in to comment.