Skip to content

Commit

Permalink
[raft] use seperate sessions for eviction and startShard (#7710)
Browse files Browse the repository at this point in the history
<!-- Optional: Provide additional context (beyond the PR title). -->

<!-- Optional: link a GitHub issue.
Example: "Fixes #123" will auto-close #123 when the PR is merged. -->

**Related issues**: N/A
  • Loading branch information
luluz66 authored Oct 9, 2024
1 parent 8dc6b25 commit 7cd9c98
Showing 1 changed file with 36 additions and 17 deletions.
53 changes: 36 additions & 17 deletions enterprise/server/raft/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,19 @@ type Store struct {
grpcServer *grpc.Server
apiClient *client.APIClient
liveness *nodeliveness.Liveness
session *client.Session
// use a seperate session for operations like split, add, remove replica.
// This session is used by most of the SyncPropose traffic
session *client.Session
// The following sessions are created so that we can seperate background
// traffic such as eviction, startShard, splitRange from the main write
// traffic.
// session for transactions; used by split, add and remove replica.
txnSession *client.Session
log log.Logger
// session for eviction
evictionSession *client.Session
// session for StartShard
shardStarterSession *client.Session

log log.Logger

db pebble.IPebbleDB
leaser pebble.Leaser
Expand Down Expand Up @@ -202,22 +211,26 @@ func NewWithArgs(env environment.Env, rootDir string, nodeHost *dragonboat.NodeH
clock := env.GetClock()
session := client.NewSessionWithClock(clock)
txnSession := client.NewSessionWithClock(clock)
evictionSession := client.NewSessionWithClock(clock)
shardStarterSession := client.NewSessionWithClock(clock)
lkSession := client.NewSessionWithClock(clock)

s := &Store{
env: env,
rootDir: rootDir,
grpcAddr: grpcAddress,
nodeHost: nodeHost,
partitions: partitions,
gossipManager: gossipManager,
sender: sender,
registry: registry,
apiClient: apiClient,
liveness: nodeLiveness,
session: session,
txnSession: txnSession,
log: nhLog,
env: env,
rootDir: rootDir,
grpcAddr: grpcAddress,
nodeHost: nodeHost,
partitions: partitions,
gossipManager: gossipManager,
sender: sender,
registry: registry,
apiClient: apiClient,
liveness: nodeLiveness,
session: session,
txnSession: txnSession,
evictionSession: evictionSession,
shardStarterSession: shardStarterSession,
log: nhLog,

rangeMu: sync.RWMutex{},
openRanges: make(map[uint64]*rfpb.RangeDescriptor),
Expand Down Expand Up @@ -962,7 +975,7 @@ func (s *Store) StartShard(ctx context.Context, req *rfpb.StartShardRequest) (*r
}
sort.Slice(replicaIDs, func(i, j int) bool { return replicaIDs[i] < replicaIDs[j] })
if req.GetReplicaId() == replicaIDs[len(replicaIDs)-1] {
batchResponse, err := s.session.SyncProposeLocal(ctx, s.nodeHost, req.GetRangeId(), req.GetBatch())
batchResponse, err := s.shardStarterSession.SyncProposeLocal(ctx, s.nodeHost, req.GetRangeId(), req.GetBatch())
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1063,6 +1076,12 @@ func (s *Store) SyncPropose(ctx context.Context, req *rfpb.SyncProposeRequest) (
session := s.session
if len(req.GetBatch().GetTransactionId()) > 0 {
session = s.txnSession
} else {
// use eviction session for delete requests
unions := req.GetBatch().GetUnion()
if len(unions) > 0 && unions[0].GetDelete() != nil {
session = s.evictionSession
}
}
batchResponse, err := session.SyncProposeLocal(ctx, s.nodeHost, rangeID, req.GetBatch())
if err != nil {
Expand Down

0 comments on commit 7cd9c98

Please sign in to comment.