Skip to content

Commit

Permalink
Only start active raft ranges (#7395)
Browse files Browse the repository at this point in the history
  • Loading branch information
tylerwilliams authored Sep 9, 2024
1 parent 73174e9 commit 19688e8
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 10 deletions.
82 changes: 82 additions & 0 deletions enterprise/server/raft/sender/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package sender

import (
"context"
"fmt"
"strings"

"github.com/buildbuddy-io/buildbuddy/enterprise/server/raft/client"
Expand Down Expand Up @@ -79,6 +80,55 @@ func lookupRangeDescriptor(ctx context.Context, c rfspb.ApiClient, h *rfpb.Heade
return nil, status.UnavailableErrorf("Error finding range descriptor for %q", key)
}

func lookupActiveReplicas(ctx context.Context, c rfspb.ApiClient, h *rfpb.Header, replicas []*rfpb.ReplicaDescriptor) ([]*rfpb.ReplicaDescriptor, error) {
replicaKey := func(r *rfpb.ReplicaDescriptor) string {
return fmt.Sprintf("%d-%d", r.GetRangeId(), r.GetReplicaId())
}
candidateReplicas := make(map[string]*rfpb.ReplicaDescriptor, len(replicas))
for _, r := range replicas {
candidateReplicas[replicaKey(r)] = r
}

batchReq, err := rbuilder.NewBatchBuilder().Add(&rfpb.ScanRequest{
Start: constants.MetaRangePrefix,
End: constants.SystemPrefix,
ScanType: rfpb.ScanRequest_SEEKGT_SCAN_TYPE,
}).ToProto()
if err != nil {
return nil, err
}
rsp, err := c.SyncRead(ctx, &rfpb.SyncReadRequest{
Header: h,
Batch: batchReq,
})
if err != nil {
return nil, err
}
scanRsp, err := rbuilder.NewBatchResponseFromProto(rsp.GetBatch()).ScanResponse(0)
if err != nil {
log.Errorf("Error reading scan response: %s", err)
return nil, err
}

matchedReplicas := make([]*rfpb.ReplicaDescriptor, 0, len(replicas))
rd := &rfpb.RangeDescriptor{}
for _, kv := range scanRsp.GetKvs() {
if err := proto.Unmarshal(kv.GetValue(), rd); err != nil {
log.Errorf("scan returned unparsable kv: %s", err)
continue
}
for _, r2 := range rd.GetReplicas() {
if r, present := candidateReplicas[replicaKey(r2)]; present {
matchedReplicas = append(matchedReplicas, r)
// delete this candidate so it cannot possibly
// show up more than once in matchedReplicas.
delete(candidateReplicas, replicaKey(r2))
}
}
}
return matchedReplicas, nil
}

func (s *Sender) GetMetaRangeDescriptor() *rfpb.RangeDescriptor {
return s.rangeCache.Get(constants.MetaRangePrefix)
}
Expand Down Expand Up @@ -132,6 +182,38 @@ func (s *Sender) LookupRangeDescriptor(ctx context.Context, key []byte, skipCach
return rangeDescriptor, nil
}

func (s *Sender) LookupActiveReplicas(ctx context.Context, candidates []*rfpb.ReplicaDescriptor) ([]*rfpb.ReplicaDescriptor, error) {
if len(candidates) == 0 {
return nil, nil
}
retrier := retry.DefaultWithContext(ctx)

var activeReplicas []*rfpb.ReplicaDescriptor
fn := func(c rfspb.ApiClient, h *rfpb.Header) error {
replicas, err := lookupActiveReplicas(ctx, c, h, candidates)
if err != nil {
return err
}
activeReplicas = replicas
return nil
}
for retrier.Next() {
metaRangeDescriptor := s.rangeCache.Get(constants.MetaRangePrefix)
if metaRangeDescriptor == nil {
log.Warning("RangeCache did not have meta range yet")
continue
}
_, err := s.tryReplicas(ctx, metaRangeDescriptor, fn, rfpb.Header_LINEARIZABLE)
if err == nil {
return activeReplicas, nil
}
if !status.IsOutOfRangeError(err) {
return nil, err
}
}
return nil, status.UnavailableError("Error finding active replicas")
}

func (s *Sender) UpdateRange(rangeDescriptor *rfpb.RangeDescriptor) error {
return s.rangeCache.UpdateRange(rangeDescriptor)
}
Expand Down
41 changes: 31 additions & 10 deletions enterprise/server/raft/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,20 +265,41 @@ func NewWithArgs(env environment.Env, rootDir string, nodeHost *dragonboat.NodeH

s.updateTagsWorker.Start()

// rejoin configured clusters
nodeHostInfo := nodeHost.GetNodeHostInfo(dragonboat.NodeHostInfoOption{})

logSize := len(nodeHostInfo.LogInfo)
for i, logInfo := range nodeHostInfo.LogInfo {
if nodeHost.HasNodeInfo(logInfo.ShardID, logInfo.ReplicaID) {
s.log.Infof("Had info for c%dn%d. (%d/%d)", logInfo.ShardID, logInfo.ReplicaID, i+1, logSize)
previouslyStartedReplicas := make([]*rfpb.ReplicaDescriptor, 0, len(nodeHostInfo.LogInfo))
for _, logInfo := range nodeHostInfo.LogInfo {
if !nodeHost.HasNodeInfo(logInfo.ShardID, logInfo.ReplicaID) {
// Skip nodes not on this machine.
continue
}
if logInfo.ShardID == constants.MetaRangeID {
s.log.Infof("Starting metarange replica: %+v", logInfo)
s.replicaInitStatusWaiter.MarkStarted(logInfo.ShardID)
r := raftConfig.GetRaftConfig(logInfo.ShardID, logInfo.ReplicaID)
if err := nodeHost.StartOnDiskReplica(nil, false /*=join*/, s.ReplicaFactoryFn, r); err != nil {
rc := raftConfig.GetRaftConfig(logInfo.ShardID, logInfo.ReplicaID)
if err := nodeHost.StartOnDiskReplica(nil, false /*=join*/, s.ReplicaFactoryFn, rc); err != nil {
return nil, status.InternalErrorf("failed to start c%dn%d: %s", logInfo.ShardID, logInfo.ReplicaID, err)
}
s.configuredClusters++
s.log.Infof("Recreated c%dn%d.", logInfo.ShardID, logInfo.ReplicaID)
} else {
replicaDescriptor := &rfpb.ReplicaDescriptor{RangeId: logInfo.ShardID, ReplicaId: logInfo.ReplicaID}
previouslyStartedReplicas = append(previouslyStartedReplicas, replicaDescriptor)
}
}

ctx := context.Background()

// Scan the metarange and start any clusters we own that have not been
// removed. If previouslyStartedReplicas is an empty list, then
// LookupActiveReplicas will return nil, nil, and the following loop
// will be a no-op.
activeReplicas, err := s.sender.LookupActiveReplicas(ctx, previouslyStartedReplicas)
if err != nil {
return nil, err
}
for _, r := range activeReplicas {
s.replicaInitStatusWaiter.MarkStarted(r.GetRangeId())
rc := raftConfig.GetRaftConfig(r.GetRangeId(), r.GetReplicaId())
if err := nodeHost.StartOnDiskReplica(nil, false /*=join*/, s.ReplicaFactoryFn, rc); err != nil {
return nil, status.InternalErrorf("failed to start c%dn%d: %s", r.GetRangeId(), r.GetReplicaId(), err)
}
}

Expand Down

0 comments on commit 19688e8

Please sign in to comment.