Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions cmd/server/cadence/fx.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,7 @@ func Module(serviceName string) fx.Option {
return z.With(zap.String("service", service.ShardDistributor)), l.WithTags(tag.Service(service.ShardDistributor))
}),

fx.Provide(etcd.NewStore),
fx.Provide(etcd.NewLeaderStore),
etcd.Module,

rpcfx.Module,
sharddistributorfx.Module)
Expand Down
4 changes: 4 additions & 0 deletions common/log/tag/tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -1181,6 +1181,10 @@ func ShardExecutors(executorIDs []string) Tag {
return newStringsTag("shard-executors", executorIDs)
}

func ShardKey(shardKey string) Tag {
return newStringTag("shard-key", shardKey)
}

func ElectionDelay(t time.Duration) Tag {
return newDurationTag("election-delay", t)
}
Expand Down
19 changes: 16 additions & 3 deletions service/sharddistributor/executorclient/clientimpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package executorclient
import (
"context"
"fmt"
"math/rand"
"sync"
"sync/atomic"
"time"
Expand All @@ -26,6 +27,10 @@ const (
processorStateStopping
)

const (
heartbeatJitterMax = 100 * time.Millisecond
)

type managedProcessor[SP ShardProcessor] struct {
processor SP
state atomic.Int32
Expand Down Expand Up @@ -89,8 +94,8 @@ func (e *executorImpl[SP]) GetShardProcess(shardID string) (SP, error) {
}

func (e *executorImpl[SP]) heartbeatloop(ctx context.Context) {
heartBeatTicker := e.timeSource.NewTicker(e.heartBeatInterval)
defer heartBeatTicker.Stop()
heartBeatTimer := e.timeSource.NewTimer(getJitteredHeartbeatDuration(e.heartBeatInterval, heartbeatJitterMax))
defer heartBeatTimer.Stop()

for {
select {
Expand All @@ -102,7 +107,8 @@ func (e *executorImpl[SP]) heartbeatloop(ctx context.Context) {
e.logger.Info("shard distributorexecutor stopped")
e.stopShardProcessors()
return
case <-heartBeatTicker.Chan():
case <-heartBeatTimer.Chan():
heartBeatTimer.Reset(getJitteredHeartbeatDuration(e.heartBeatInterval, heartbeatJitterMax))
shardAssignment, err := e.heartbeat(ctx)
if err != nil {
e.logger.Error("failed to heartbeat", tag.Error(err))
Expand Down Expand Up @@ -231,3 +237,10 @@ func (e *executorImpl[SP]) stopShardProcessors() {

wg.Wait()
}

func getJitteredHeartbeatDuration(interval time.Duration, jitterMax time.Duration) time.Duration {
jitterMaxNanos := int64(jitterMax)
randomJitterNanos := rand.Int63n(jitterMaxNanos)
jitter := time.Duration(randomJitterNanos)
return interval - jitter
}
30 changes: 15 additions & 15 deletions service/sharddistributor/leader/process/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,7 @@ func (p *namespaceProcessor) rebalanceShardsImpl(ctx context.Context, metricsLoo
}

if namespaceState.GlobalRevision <= p.lastAppliedRevision {
p.logger.Debug("No changes detected. Skipping rebalance.")
return nil
}
p.lastAppliedRevision = namespaceState.GlobalRevision
Expand All @@ -303,6 +304,7 @@ func (p *namespaceProcessor) rebalanceShardsImpl(ctx context.Context, metricsLoo
distributionChanged = distributionChanged || p.updateAssignments(shardsToReassign, activeExecutors, currentAssignments)

if !distributionChanged {
p.logger.Debug("No changes to distribution detected. Skipping rebalance.")
return nil
}

Expand All @@ -311,8 +313,7 @@ func (p *namespaceProcessor) rebalanceShardsImpl(ctx context.Context, metricsLoo
p.logger.Info("Applying new shard distribution.")
// Use the leader guard for the assign operation.
err = p.shardStore.AssignShards(ctx, p.namespaceCfg.Name, store.AssignShardsRequest{
NewState: namespaceState,
ShardsToDelete: deletedShards,
NewState: namespaceState,
}, p.election.Guard())
if err != nil {
return fmt.Errorf("assign shards: %w", err)
Expand All @@ -329,7 +330,6 @@ func (p *namespaceProcessor) findDeletedShards(namespaceState *store.NamespaceSt
if shardState.Status == types.ShardStatusDONE {
deletedShards[shardID] = store.ShardState{
ExecutorID: executorID,
Revision: namespaceState.Shards[shardID].Revision,
}
}
}
Expand Down Expand Up @@ -385,23 +385,21 @@ func (*namespaceProcessor) updateAssignments(shardsToReassign []string, activeEx
}

func (p *namespaceProcessor) addAssignmentsToNamespaceState(namespaceState *store.NamespaceState, currentAssignments map[string][]string) {
if namespaceState.Shards == nil {
namespaceState.Shards = make(map[string]store.ShardState)
}

newState := make(map[string]store.AssignedState)
for executorID, shards := range currentAssignments {
assignedShardsMap := make(map[string]*types.ShardAssignment)
for _, shardID := range shards {
assignedShardsMap[shardID] = &types.ShardAssignment{Status: types.AssignmentStatusREADY}
namespaceState.Shards[shardID] = store.ShardState{
ExecutorID: executorID,
Revision: namespaceState.Shards[shardID].Revision,
}
}
modRevision := int64(0) // Should be 0 if we have not seen it yet
if namespaceAssignments, ok := namespaceState.ShardAssignments[executorID]; ok {
modRevision = namespaceAssignments.ModRevision
}

newState[executorID] = store.AssignedState{
AssignedShards: assignedShardsMap,
LastUpdated: p.timeSource.Now().Unix(),
ModRevision: modRevision,
}
}

Expand Down Expand Up @@ -477,12 +475,14 @@ func getShards(cfg config.Namespace, namespaceState *store.NamespaceState, delet
return makeShards(cfg.ShardNum)
} else if cfg.Type == config.NamespaceTypeEphemeral {
shards := make([]string, 0)
for shardID := range namespaceState.Shards {
// If the shard is deleted, we don't include it in the shards.
if _, ok := deletedShards[shardID]; !ok {
shards = append(shards, shardID)
for _, state := range namespaceState.ShardAssignments {
for shardID := range state.AssignedShards {
if _, ok := deletedShards[shardID]; !ok {
shards = append(shards, shardID)
}
}
}

return shards
}
return nil
Expand Down
165 changes: 28 additions & 137 deletions service/sharddistributor/leader/process/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,6 @@ func TestRebalanceShards_ExecutorRemoved(t *testing.T) {
}
mocks.store.EXPECT().GetState(gomock.Any(), mocks.cfg.Name).Return(&store.NamespaceState{
Executors: heartbeats,
Shards: nil,
ShardAssignments: assignments,
GlobalRevision: 1,
}, nil)
Expand All @@ -142,63 +141,6 @@ func TestRebalanceShards_ExecutorRemoved(t *testing.T) {
require.NoError(t, err)
}

func TestRebalanceShards_UpdatesShardStateOnReassign(t *testing.T) {
mocks := setupProcessorTest(t, config.NamespaceTypeFixed)
defer mocks.ctrl.Finish()
processor := mocks.factory.CreateProcessor(mocks.cfg, mocks.store, mocks.election).(*namespaceProcessor)

// Initial state: exec-2 is draining, exec-1 is active.
// Shards "0" and "1" are initially owned by exec-2.
heartbeats := map[string]store.HeartbeatState{
"exec-1": {Status: types.ExecutorStatusACTIVE},
"exec-2": {Status: types.ExecutorStatusDRAINING},
}
assignments := map[string]store.AssignedState{
"exec-2": {
AssignedShards: map[string]*types.ShardAssignment{
"0": {Status: types.AssignmentStatusREADY},
"1": {Status: types.AssignmentStatusREADY},
},
},
}
// This is the crucial part for the test: the initial state of the Shards map.
shards := map[string]store.ShardState{
"0": {ExecutorID: "exec-2", Revision: 101},
"1": {ExecutorID: "exec-2", Revision: 102},
}

mocks.store.EXPECT().GetState(gomock.Any(), mocks.cfg.Name).Return(&store.NamespaceState{
Executors: heartbeats,
Shards: shards, // Provide the initial shard state.
ShardAssignments: assignments,
GlobalRevision: 2,
}, nil)

mocks.election.EXPECT().Guard().Return(store.NopGuard())

// We expect AssignShards to be called with the updated state.
mocks.store.EXPECT().AssignShards(gomock.Any(), mocks.cfg.Name, gomock.Any(), gomock.Any()).DoAndReturn(
func(_ context.Context, _ string, request store.AssignShardsRequest, _ store.GuardFunc) error {
// Assert that the assignments were moved to exec-1.
assert.Len(t, request.NewState.ShardAssignments["exec-1"].AssignedShards, 2)
assert.Len(t, request.NewState.ShardAssignments["exec-2"].AssignedShards, 0)

// **Assert that the Shards map is correctly updated.**
// The ExecutorID should be updated to the new owner.
assert.Equal(t, "exec-1", request.NewState.Shards["0"].ExecutorID)
assert.Equal(t, "exec-1", request.NewState.Shards["1"].ExecutorID)

// The Revision should be preserved from the original state.
assert.Equal(t, int64(101), request.NewState.Shards["0"].Revision)
assert.Equal(t, int64(102), request.NewState.Shards["1"].Revision)
return nil
},
)

err := processor.rebalanceShards(context.Background())
require.NoError(t, err)
}

func TestRebalanceShards_NoActiveExecutors(t *testing.T) {
mocks := setupProcessorTest(t, config.NamespaceTypeFixed)
defer mocks.ctrl.Finish()
Expand Down Expand Up @@ -343,7 +285,6 @@ func TestRebalanceShards_NoShardsToReassign(t *testing.T) {
}
mocks.store.EXPECT().GetState(gomock.Any(), mocks.cfg.Name).Return(&store.NamespaceState{
Executors: heartbeats,
Shards: nil,
ShardAssignments: assignments,
GlobalRevision: 2,
}, nil)
Expand Down Expand Up @@ -386,72 +327,6 @@ func TestRebalanceShards_WithUnassignedShards(t *testing.T) {
require.NoError(t, err)
}

func TestRebalanceShards_WithDeletedShards(t *testing.T) {
mocks := setupProcessorTest(t, config.NamespaceTypeEphemeral)
defer mocks.ctrl.Finish()
processor := mocks.factory.CreateProcessor(mocks.cfg, mocks.store, mocks.election).(*namespaceProcessor)

heartbeats := map[string]store.HeartbeatState{
"exec-1": {
Status: types.ExecutorStatusACTIVE,
ReportedShards: map[string]*types.ShardStatusReport{
"0": {Status: types.ShardStatusDONE},
"1": {Status: types.ShardStatusDONE},
"2": {Status: types.ShardStatusREADY},
},
},
"exec-2": {
Status: types.ExecutorStatusACTIVE,
ReportedShards: map[string]*types.ShardStatusReport{},
},
}
assignments := map[string]store.AssignedState{
"exec-1": {
AssignedShards: map[string]*types.ShardAssignment{
"1": {Status: types.AssignmentStatusREADY},
"2": {Status: types.AssignmentStatusREADY},
},
},
// One of the deleted shards were reassigned to exec-2, it should still be deleted
"exec-2": {
AssignedShards: map[string]*types.ShardAssignment{
"0": {Status: types.AssignmentStatusREADY},
"3": {Status: types.AssignmentStatusREADY},
"4": {Status: types.AssignmentStatusREADY},
},
},
}

shards := map[string]store.ShardState{
"0": {ExecutorID: "exec-2"},
"1": {ExecutorID: "exec-1"},
"2": {ExecutorID: "exec-1"},
"3": {ExecutorID: "exec-2"},
"4": {ExecutorID: "exec-2"},
}
mocks.store.EXPECT().GetState(gomock.Any(), mocks.cfg.Name).Return(&store.NamespaceState{
Executors: heartbeats,
ShardAssignments: assignments,
GlobalRevision: 3,
Shards: shards,
}, nil)
mocks.election.EXPECT().Guard().Return(store.NopGuard())
mocks.store.EXPECT().AssignShards(gomock.Any(), mocks.cfg.Name, gomock.Any(), gomock.Any()).DoAndReturn(
func(_ context.Context, _ string, request store.AssignShardsRequest, _ store.GuardFunc) error {
assert.Contains(t, request.ShardsToDelete, "0")
assert.Contains(t, request.ShardsToDelete, "1")

assert.Contains(t, request.NewState.ShardAssignments["exec-1"].AssignedShards, "2")
assert.Contains(t, request.NewState.ShardAssignments["exec-2"].AssignedShards, "3")
assert.Contains(t, request.NewState.ShardAssignments["exec-2"].AssignedShards, "4")
return nil
},
)

err := processor.rebalanceShards(context.Background())
require.NoError(t, err)
}

func TestGetShards_Utility(t *testing.T) {
t.Run("Fixed type", func(t *testing.T) {
cfg := config.Namespace{Type: config.NamespaceTypeFixed, ShardNum: 5}
Expand All @@ -462,12 +337,20 @@ func TestGetShards_Utility(t *testing.T) {
t.Run("Ephemeral type", func(t *testing.T) {
cfg := config.Namespace{Type: config.NamespaceTypeEphemeral}
nsState := &store.NamespaceState{
Shards: map[string]store.ShardState{
"s0": {ExecutorID: "exec-1"},
"s1": {ExecutorID: "exec-1"},
"s2": {ExecutorID: "exec-1"},
"s3": {ExecutorID: "exec-1"},
"s4": {ExecutorID: "exec-1"},
ShardAssignments: map[string]store.AssignedState{
"executor1": {
AssignedShards: map[string]*types.ShardAssignment{
"s0": {Status: types.AssignmentStatusREADY},
"s1": {Status: types.AssignmentStatusREADY},
"s2": {Status: types.AssignmentStatusREADY},
},
},
"executor2": {
AssignedShards: map[string]*types.ShardAssignment{
"s3": {Status: types.AssignmentStatusREADY},
"s4": {Status: types.AssignmentStatusREADY},
},
},
},
}
shards := getShards(cfg, nsState, nil)
Expand All @@ -478,12 +361,20 @@ func TestGetShards_Utility(t *testing.T) {
t.Run("Ephemeral type with deleted shards", func(t *testing.T) {
cfg := config.Namespace{Type: config.NamespaceTypeEphemeral}
nsState := &store.NamespaceState{
Shards: map[string]store.ShardState{
"s0": {ExecutorID: "exec-1"},
"s1": {ExecutorID: "exec-1"},
"s2": {ExecutorID: "exec-1"},
"s3": {ExecutorID: "exec-1"},
"s4": {ExecutorID: "exec-1"},
ShardAssignments: map[string]store.AssignedState{
"executor1": {
AssignedShards: map[string]*types.ShardAssignment{
"s0": {Status: types.AssignmentStatusREADY},
"s1": {Status: types.AssignmentStatusREADY},
"s2": {Status: types.AssignmentStatusREADY},
},
},
"executor2": {
AssignedShards: map[string]*types.ShardAssignment{
"s3": {Status: types.AssignmentStatusREADY},
"s4": {Status: types.AssignmentStatusREADY},
},
},
},
}
deletedShards := map[string]store.ShardState{
Expand Down
Loading
Loading