diff --git a/cmd/server/cadence/fx.go b/cmd/server/cadence/fx.go index 81617a1ee2a..a28af7c9b7c 100644 --- a/cmd/server/cadence/fx.go +++ b/cmd/server/cadence/fx.go @@ -74,8 +74,7 @@ func Module(serviceName string) fx.Option { return z.With(zap.String("service", service.ShardDistributor)), l.WithTags(tag.Service(service.ShardDistributor)) }), - fx.Provide(etcd.NewStore), - fx.Provide(etcd.NewLeaderStore), + etcd.Module, rpcfx.Module, sharddistributorfx.Module) diff --git a/common/log/tag/tags.go b/common/log/tag/tags.go index 30e62c8d15a..b2d2ec2a431 100644 --- a/common/log/tag/tags.go +++ b/common/log/tag/tags.go @@ -1181,6 +1181,10 @@ func ShardExecutors(executorIDs []string) Tag { return newStringsTag("shard-executors", executorIDs) } +func ShardKey(shardKey string) Tag { + return newStringTag("shard-key", shardKey) +} + func ElectionDelay(t time.Duration) Tag { return newDurationTag("election-delay", t) } diff --git a/service/sharddistributor/executorclient/clientimpl.go b/service/sharddistributor/executorclient/clientimpl.go index 12584215420..8a2f0495cfa 100644 --- a/service/sharddistributor/executorclient/clientimpl.go +++ b/service/sharddistributor/executorclient/clientimpl.go @@ -3,6 +3,7 @@ package executorclient import ( "context" "fmt" + "math/rand" "sync" "sync/atomic" "time" @@ -26,6 +27,10 @@ const ( processorStateStopping ) +const ( + heartbeatJitterMax = 100 * time.Millisecond +) + type managedProcessor[SP ShardProcessor] struct { processor SP state atomic.Int32 @@ -89,8 +94,8 @@ func (e *executorImpl[SP]) GetShardProcess(shardID string) (SP, error) { } func (e *executorImpl[SP]) heartbeatloop(ctx context.Context) { - heartBeatTicker := e.timeSource.NewTicker(e.heartBeatInterval) - defer heartBeatTicker.Stop() + heartBeatTimer := e.timeSource.NewTimer(getJitteredHeartbeatDuration(e.heartBeatInterval, heartbeatJitterMax)) + defer heartBeatTimer.Stop() for { select { @@ -102,7 +107,8 @@ func (e *executorImpl[SP]) heartbeatloop(ctx context.Context) { e.logger.Info("shard distributorexecutor stopped") e.stopShardProcessors() return - case <-heartBeatTicker.Chan(): + case <-heartBeatTimer.Chan(): + heartBeatTimer.Reset(getJitteredHeartbeatDuration(e.heartBeatInterval, heartbeatJitterMax)) shardAssignment, err := e.heartbeat(ctx) if err != nil { e.logger.Error("failed to heartbeat", tag.Error(err)) @@ -231,3 +237,10 @@ func (e *executorImpl[SP]) stopShardProcessors() { wg.Wait() } + +func getJitteredHeartbeatDuration(interval time.Duration, jitterMax time.Duration) time.Duration { + jitterMaxNanos := int64(jitterMax) + randomJitterNanos := rand.Int63n(jitterMaxNanos) + jitter := time.Duration(randomJitterNanos) + return interval - jitter +} diff --git a/service/sharddistributor/leader/process/processor.go b/service/sharddistributor/leader/process/processor.go index 45e77ca5832..67db4222695 100644 --- a/service/sharddistributor/leader/process/processor.go +++ b/service/sharddistributor/leader/process/processor.go @@ -282,6 +282,7 @@ func (p *namespaceProcessor) rebalanceShardsImpl(ctx context.Context, metricsLoo } if namespaceState.GlobalRevision <= p.lastAppliedRevision { + p.logger.Debug("No changes detected. Skipping rebalance.") return nil } p.lastAppliedRevision = namespaceState.GlobalRevision @@ -303,6 +304,7 @@ func (p *namespaceProcessor) rebalanceShardsImpl(ctx context.Context, metricsLoo distributionChanged = distributionChanged || p.updateAssignments(shardsToReassign, activeExecutors, currentAssignments) if !distributionChanged { + p.logger.Debug("No changes to distribution detected. Skipping rebalance.") return nil } @@ -311,8 +313,7 @@ func (p *namespaceProcessor) rebalanceShardsImpl(ctx context.Context, metricsLoo p.logger.Info("Applying new shard distribution.") // Use the leader guard for the assign operation. err = p.shardStore.AssignShards(ctx, p.namespaceCfg.Name, store.AssignShardsRequest{ - NewState: namespaceState, - ShardsToDelete: deletedShards, + NewState: namespaceState, }, p.election.Guard()) if err != nil { return fmt.Errorf("assign shards: %w", err) @@ -329,7 +330,6 @@ func (p *namespaceProcessor) findDeletedShards(namespaceState *store.NamespaceSt if shardState.Status == types.ShardStatusDONE { deletedShards[shardID] = store.ShardState{ ExecutorID: executorID, - Revision: namespaceState.Shards[shardID].Revision, } } } @@ -385,23 +385,21 @@ func (*namespaceProcessor) updateAssignments(shardsToReassign []string, activeEx } func (p *namespaceProcessor) addAssignmentsToNamespaceState(namespaceState *store.NamespaceState, currentAssignments map[string][]string) { - if namespaceState.Shards == nil { - namespaceState.Shards = make(map[string]store.ShardState) - } - newState := make(map[string]store.AssignedState) for executorID, shards := range currentAssignments { assignedShardsMap := make(map[string]*types.ShardAssignment) for _, shardID := range shards { assignedShardsMap[shardID] = &types.ShardAssignment{Status: types.AssignmentStatusREADY} - namespaceState.Shards[shardID] = store.ShardState{ - ExecutorID: executorID, - Revision: namespaceState.Shards[shardID].Revision, - } } + modRevision := int64(0) // Should be 0 if we have not seen it yet + if namespaceAssignments, ok := namespaceState.ShardAssignments[executorID]; ok { + modRevision = namespaceAssignments.ModRevision + } + newState[executorID] = store.AssignedState{ AssignedShards: assignedShardsMap, LastUpdated: p.timeSource.Now().Unix(), + ModRevision: modRevision, } } @@ -477,12 +475,14 @@ func getShards(cfg config.Namespace, namespaceState *store.NamespaceState, delet return makeShards(cfg.ShardNum) } else if cfg.Type == config.NamespaceTypeEphemeral { shards := make([]string, 0) - for shardID := range namespaceState.Shards { - // If the shard is deleted, we don't include it in the shards. - if _, ok := deletedShards[shardID]; !ok { - shards = append(shards, shardID) + for _, state := range namespaceState.ShardAssignments { + for shardID := range state.AssignedShards { + if _, ok := deletedShards[shardID]; !ok { + shards = append(shards, shardID) + } } } + return shards } return nil diff --git a/service/sharddistributor/leader/process/processor_test.go b/service/sharddistributor/leader/process/processor_test.go index 5f66cc2458d..fa351a067bd 100644 --- a/service/sharddistributor/leader/process/processor_test.go +++ b/service/sharddistributor/leader/process/processor_test.go @@ -125,7 +125,6 @@ func TestRebalanceShards_ExecutorRemoved(t *testing.T) { } mocks.store.EXPECT().GetState(gomock.Any(), mocks.cfg.Name).Return(&store.NamespaceState{ Executors: heartbeats, - Shards: nil, ShardAssignments: assignments, GlobalRevision: 1, }, nil) @@ -142,63 +141,6 @@ func TestRebalanceShards_ExecutorRemoved(t *testing.T) { require.NoError(t, err) } -func TestRebalanceShards_UpdatesShardStateOnReassign(t *testing.T) { - mocks := setupProcessorTest(t, config.NamespaceTypeFixed) - defer mocks.ctrl.Finish() - processor := mocks.factory.CreateProcessor(mocks.cfg, mocks.store, mocks.election).(*namespaceProcessor) - - // Initial state: exec-2 is draining, exec-1 is active. - // Shards "0" and "1" are initially owned by exec-2. - heartbeats := map[string]store.HeartbeatState{ - "exec-1": {Status: types.ExecutorStatusACTIVE}, - "exec-2": {Status: types.ExecutorStatusDRAINING}, - } - assignments := map[string]store.AssignedState{ - "exec-2": { - AssignedShards: map[string]*types.ShardAssignment{ - "0": {Status: types.AssignmentStatusREADY}, - "1": {Status: types.AssignmentStatusREADY}, - }, - }, - } - // This is the crucial part for the test: the initial state of the Shards map. - shards := map[string]store.ShardState{ - "0": {ExecutorID: "exec-2", Revision: 101}, - "1": {ExecutorID: "exec-2", Revision: 102}, - } - - mocks.store.EXPECT().GetState(gomock.Any(), mocks.cfg.Name).Return(&store.NamespaceState{ - Executors: heartbeats, - Shards: shards, // Provide the initial shard state. - ShardAssignments: assignments, - GlobalRevision: 2, - }, nil) - - mocks.election.EXPECT().Guard().Return(store.NopGuard()) - - // We expect AssignShards to be called with the updated state. - mocks.store.EXPECT().AssignShards(gomock.Any(), mocks.cfg.Name, gomock.Any(), gomock.Any()).DoAndReturn( - func(_ context.Context, _ string, request store.AssignShardsRequest, _ store.GuardFunc) error { - // Assert that the assignments were moved to exec-1. - assert.Len(t, request.NewState.ShardAssignments["exec-1"].AssignedShards, 2) - assert.Len(t, request.NewState.ShardAssignments["exec-2"].AssignedShards, 0) - - // **Assert that the Shards map is correctly updated.** - // The ExecutorID should be updated to the new owner. - assert.Equal(t, "exec-1", request.NewState.Shards["0"].ExecutorID) - assert.Equal(t, "exec-1", request.NewState.Shards["1"].ExecutorID) - - // The Revision should be preserved from the original state. - assert.Equal(t, int64(101), request.NewState.Shards["0"].Revision) - assert.Equal(t, int64(102), request.NewState.Shards["1"].Revision) - return nil - }, - ) - - err := processor.rebalanceShards(context.Background()) - require.NoError(t, err) -} - func TestRebalanceShards_NoActiveExecutors(t *testing.T) { mocks := setupProcessorTest(t, config.NamespaceTypeFixed) defer mocks.ctrl.Finish() @@ -343,7 +285,6 @@ func TestRebalanceShards_NoShardsToReassign(t *testing.T) { } mocks.store.EXPECT().GetState(gomock.Any(), mocks.cfg.Name).Return(&store.NamespaceState{ Executors: heartbeats, - Shards: nil, ShardAssignments: assignments, GlobalRevision: 2, }, nil) @@ -386,72 +327,6 @@ func TestRebalanceShards_WithUnassignedShards(t *testing.T) { require.NoError(t, err) } -func TestRebalanceShards_WithDeletedShards(t *testing.T) { - mocks := setupProcessorTest(t, config.NamespaceTypeEphemeral) - defer mocks.ctrl.Finish() - processor := mocks.factory.CreateProcessor(mocks.cfg, mocks.store, mocks.election).(*namespaceProcessor) - - heartbeats := map[string]store.HeartbeatState{ - "exec-1": { - Status: types.ExecutorStatusACTIVE, - ReportedShards: map[string]*types.ShardStatusReport{ - "0": {Status: types.ShardStatusDONE}, - "1": {Status: types.ShardStatusDONE}, - "2": {Status: types.ShardStatusREADY}, - }, - }, - "exec-2": { - Status: types.ExecutorStatusACTIVE, - ReportedShards: map[string]*types.ShardStatusReport{}, - }, - } - assignments := map[string]store.AssignedState{ - "exec-1": { - AssignedShards: map[string]*types.ShardAssignment{ - "1": {Status: types.AssignmentStatusREADY}, - "2": {Status: types.AssignmentStatusREADY}, - }, - }, - // One of the deleted shards were reassigned to exec-2, it should still be deleted - "exec-2": { - AssignedShards: map[string]*types.ShardAssignment{ - "0": {Status: types.AssignmentStatusREADY}, - "3": {Status: types.AssignmentStatusREADY}, - "4": {Status: types.AssignmentStatusREADY}, - }, - }, - } - - shards := map[string]store.ShardState{ - "0": {ExecutorID: "exec-2"}, - "1": {ExecutorID: "exec-1"}, - "2": {ExecutorID: "exec-1"}, - "3": {ExecutorID: "exec-2"}, - "4": {ExecutorID: "exec-2"}, - } - mocks.store.EXPECT().GetState(gomock.Any(), mocks.cfg.Name).Return(&store.NamespaceState{ - Executors: heartbeats, - ShardAssignments: assignments, - GlobalRevision: 3, - Shards: shards, - }, nil) - mocks.election.EXPECT().Guard().Return(store.NopGuard()) - mocks.store.EXPECT().AssignShards(gomock.Any(), mocks.cfg.Name, gomock.Any(), gomock.Any()).DoAndReturn( - func(_ context.Context, _ string, request store.AssignShardsRequest, _ store.GuardFunc) error { - assert.Contains(t, request.ShardsToDelete, "0") - assert.Contains(t, request.ShardsToDelete, "1") - - assert.Contains(t, request.NewState.ShardAssignments["exec-1"].AssignedShards, "2") - assert.Contains(t, request.NewState.ShardAssignments["exec-2"].AssignedShards, "3") - assert.Contains(t, request.NewState.ShardAssignments["exec-2"].AssignedShards, "4") - return nil - }, - ) - - err := processor.rebalanceShards(context.Background()) - require.NoError(t, err) -} - func TestGetShards_Utility(t *testing.T) { t.Run("Fixed type", func(t *testing.T) { cfg := config.Namespace{Type: config.NamespaceTypeFixed, ShardNum: 5} @@ -462,12 +337,20 @@ func TestGetShards_Utility(t *testing.T) { t.Run("Ephemeral type", func(t *testing.T) { cfg := config.Namespace{Type: config.NamespaceTypeEphemeral} nsState := &store.NamespaceState{ - Shards: map[string]store.ShardState{ - "s0": {ExecutorID: "exec-1"}, - "s1": {ExecutorID: "exec-1"}, - "s2": {ExecutorID: "exec-1"}, - "s3": {ExecutorID: "exec-1"}, - "s4": {ExecutorID: "exec-1"}, + ShardAssignments: map[string]store.AssignedState{ + "executor1": { + AssignedShards: map[string]*types.ShardAssignment{ + "s0": {Status: types.AssignmentStatusREADY}, + "s1": {Status: types.AssignmentStatusREADY}, + "s2": {Status: types.AssignmentStatusREADY}, + }, + }, + "executor2": { + AssignedShards: map[string]*types.ShardAssignment{ + "s3": {Status: types.AssignmentStatusREADY}, + "s4": {Status: types.AssignmentStatusREADY}, + }, + }, }, } shards := getShards(cfg, nsState, nil) @@ -478,12 +361,20 @@ func TestGetShards_Utility(t *testing.T) { t.Run("Ephemeral type with deleted shards", func(t *testing.T) { cfg := config.Namespace{Type: config.NamespaceTypeEphemeral} nsState := &store.NamespaceState{ - Shards: map[string]store.ShardState{ - "s0": {ExecutorID: "exec-1"}, - "s1": {ExecutorID: "exec-1"}, - "s2": {ExecutorID: "exec-1"}, - "s3": {ExecutorID: "exec-1"}, - "s4": {ExecutorID: "exec-1"}, + ShardAssignments: map[string]store.AssignedState{ + "executor1": { + AssignedShards: map[string]*types.ShardAssignment{ + "s0": {Status: types.AssignmentStatusREADY}, + "s1": {Status: types.AssignmentStatusREADY}, + "s2": {Status: types.AssignmentStatusREADY}, + }, + }, + "executor2": { + AssignedShards: map[string]*types.ShardAssignment{ + "s3": {Status: types.AssignmentStatusREADY}, + "s4": {Status: types.AssignmentStatusREADY}, + }, + }, }, } deletedShards := map[string]store.ShardState{ diff --git a/service/sharddistributor/store/etcd/etcdkeys/etcdkeys.go b/service/sharddistributor/store/etcd/etcdkeys/etcdkeys.go new file mode 100644 index 00000000000..aad2638e35e --- /dev/null +++ b/service/sharddistributor/store/etcd/etcdkeys/etcdkeys.go @@ -0,0 +1,59 @@ +package etcdkeys + +import ( + "fmt" + "strings" +) + +const ( + ExecutorHeartbeatKey = "heartbeat" + ExecutorStatusKey = "status" + ExecutorReportedShardsKey = "reported_shards" + ExecutorAssignedStateKey = "assigned_state" + ShardAssignedKey = "assigned" +) + +var validKeyTypes = []string{ + ExecutorHeartbeatKey, + ExecutorStatusKey, + ExecutorReportedShardsKey, + ExecutorAssignedStateKey, +} + +func isValidKeyType(key string) bool { + for _, validKey := range validKeyTypes { + if key == validKey { + return true + } + } + return false +} + +func BuildNamespacePrefix(prefix string, namespace string) string { + return fmt.Sprintf("%s/%s", prefix, namespace) +} + +func BuildExecutorPrefix(prefix string, namespace string) string { + return fmt.Sprintf("%s/executors/", BuildNamespacePrefix(prefix, namespace)) +} + +func BuildExecutorKey(prefix string, namespace, executorID, keyType string) (string, error) { + // We allow an empty key, to build the full prefix + if !isValidKeyType(keyType) && keyType != "" { + return "", fmt.Errorf("invalid key type: %s", keyType) + } + return fmt.Sprintf("%s%s/%s", BuildExecutorPrefix(prefix, namespace), executorID, keyType), nil +} + +func ParseExecutorKey(prefix string, namespace, key string) (executorID, keyType string, err error) { + prefix = BuildExecutorPrefix(prefix, namespace) + if !strings.HasPrefix(key, prefix) { + return "", "", fmt.Errorf("key '%s' does not have expected prefix '%s'", key, prefix) + } + remainder := strings.TrimPrefix(key, prefix) + parts := strings.Split(remainder, "/") + if len(parts) != 2 { + return "", "", fmt.Errorf("unexpected key format: %s", key) + } + return parts[0], parts[1], nil +} diff --git a/service/sharddistributor/store/etcd/etcdkeys/etcdkeys_test.go b/service/sharddistributor/store/etcd/etcdkeys/etcdkeys_test.go new file mode 100644 index 00000000000..c98a01f2b57 --- /dev/null +++ b/service/sharddistributor/store/etcd/etcdkeys/etcdkeys_test.go @@ -0,0 +1,44 @@ +package etcdkeys + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestBuildNamespacePrefix(t *testing.T) { + got := BuildNamespacePrefix("/cadence", "test-ns") + assert.Equal(t, "/cadence/test-ns", got) +} + +func TestBuildExecutorPrefix(t *testing.T) { + got := BuildExecutorPrefix("/cadence", "test-ns") + assert.Equal(t, "/cadence/test-ns/executors/", got) +} + +func TestBuildExecutorKey(t *testing.T) { + got, err := BuildExecutorKey("/cadence", "test-ns", "exec-1", "heartbeat") + assert.NoError(t, err) + assert.Equal(t, "/cadence/test-ns/executors/exec-1/heartbeat", got) +} + +func TestBuildExecutorKeyFail(t *testing.T) { + _, err := BuildExecutorKey("/cadence", "test-ns", "exec-1", "invalid") + assert.ErrorContains(t, err, "invalid key type: invalid") +} + +func TestParseExecutorKey(t *testing.T) { + // Valid key + executorID, keyType, err := ParseExecutorKey("/cadence", "test-ns", "/cadence/test-ns/executors/exec-1/heartbeat") + assert.NoError(t, err) + assert.Equal(t, "exec-1", executorID) + assert.Equal(t, "heartbeat", keyType) + + // Prefix missing + _, _, err = ParseExecutorKey("/cadence", "test-ns", "/wrong/prefix") + assert.ErrorContains(t, err, "key '/wrong/prefix' does not have expected prefix '/cadence/test-ns/executors/'") + + // Unexpected key format + _, _, err = ParseExecutorKey("/cadence", "test-ns", "/cadence/test-ns/executors/exec-1/heartbeat/extra") + assert.ErrorContains(t, err, "unexpected key format: /cadence/test-ns/executors/exec-1/heartbeat/extra") +} diff --git a/service/sharddistributor/store/etcd/etcdstore_test.go b/service/sharddistributor/store/etcd/etcdstore_test.go deleted file mode 100644 index ff0fc818176..00000000000 --- a/service/sharddistributor/store/etcd/etcdstore_test.go +++ /dev/null @@ -1,638 +0,0 @@ -package etcd - -import ( - "context" - "encoding/json" - "flag" - "fmt" - "os" - "strconv" - "strings" - "testing" - "time" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - clientv3 "go.etcd.io/etcd/client/v3" - "go.uber.org/fx/fxtest" - "gopkg.in/yaml.v2" - - "github.com/uber/cadence/common/config" - "github.com/uber/cadence/common/types" - shardDistributorCfg "github.com/uber/cadence/service/sharddistributor/config" - "github.com/uber/cadence/service/sharddistributor/store" - "github.com/uber/cadence/testflags" -) - -// TestRecordHeartbeat verifies that an executor's heartbeat is correctly stored. -func TestRecordHeartbeat(t *testing.T) { - tc := setupStoreTestCluster(t) - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - - nowTS := time.Now().Unix() - - executorID := "executor-TestRecordHeartbeat" - req := store.HeartbeatState{ - LastHeartbeat: nowTS, - Status: types.ExecutorStatusACTIVE, - ReportedShards: map[string]*types.ShardStatusReport{ - "shard-TestRecordHeartbeat": {Status: types.ShardStatusREADY}, - }, - } - - err := tc.store.RecordHeartbeat(ctx, tc.namespace, executorID, req) - require.NoError(t, err) - - // Verify directly in etcd - heartbeatKey := tc.store.buildExecutorKey(tc.namespace, executorID, executorHeartbeatKey) - stateKey := tc.store.buildExecutorKey(tc.namespace, executorID, executorStatusKey) - reportedShardsKey := tc.store.buildExecutorKey(tc.namespace, executorID, executorReportedShardsKey) - - resp, err := tc.client.Get(ctx, heartbeatKey) - require.NoError(t, err) - assert.Equal(t, int64(1), resp.Count, "Heartbeat key should exist") - assert.Equal(t, strconv.FormatInt(nowTS, 10), string(resp.Kvs[0].Value)) - - resp, err = tc.client.Get(ctx, stateKey) - require.NoError(t, err) - require.Equal(t, int64(1), resp.Count, "State key should exist") - assert.Equal(t, stringStatus(types.ExecutorStatusACTIVE), string(resp.Kvs[0].Value)) - - resp, err = tc.client.Get(ctx, reportedShardsKey) - require.NoError(t, err) - require.Equal(t, int64(1), resp.Count, "Reported shards key should exist") - - var reportedShards map[string]*types.ShardStatusReport - err = json.Unmarshal(resp.Kvs[0].Value, &reportedShards) - require.NoError(t, err) - require.Len(t, reportedShards, 1) - assert.Equal(t, types.ShardStatusREADY, reportedShards["shard-TestRecordHeartbeat"].Status) -} - -func TestGetHeartbeat(t *testing.T) { - tc := setupStoreTestCluster(t) - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - - nowTS := time.Now().Unix() - - executorID := "executor-get" - req := store.HeartbeatState{ - Status: types.ExecutorStatusDRAINING, - LastHeartbeat: nowTS, - } - - // 1. Record a heartbeat - err := tc.store.RecordHeartbeat(ctx, tc.namespace, executorID, req) - require.NoError(t, err) - - // Assign shards to one executor - assignState := map[string]store.AssignedState{ - executorID: { - AssignedShards: map[string]*types.ShardAssignment{ - "shard-1": {Status: types.AssignmentStatusREADY}, - }, - }, - } - require.NoError(t, tc.store.AssignShards(ctx, tc.namespace, store.AssignShardsRequest{ - NewState: &store.NamespaceState{ - ShardAssignments: assignState, - }, - }, store.NopGuard())) - - // 2. Get the heartbeat back - hb, assignedFromDB, err := tc.store.GetHeartbeat(ctx, tc.namespace, executorID) - require.NoError(t, err) - require.NotNil(t, hb) - - // 3. Verify the state - assert.Equal(t, types.ExecutorStatusDRAINING, hb.Status) - assert.Equal(t, nowTS, hb.LastHeartbeat) - require.NotNil(t, assignedFromDB.AssignedShards) - assert.Equal(t, assignState[executorID].AssignedShards, assignedFromDB.AssignedShards) - - // 4. Test getting a non-existent executor - _, _, err = tc.store.GetHeartbeat(ctx, tc.namespace, "executor-non-existent") - require.Error(t, err) - assert.ErrorIs(t, err, store.ErrExecutorNotFound) -} - -// TestGetState verifies that the store can accurately retrieve the state of all executors. -func TestGetState(t *testing.T) { - tc := setupStoreTestCluster(t) - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - - executorID1 := "exec-TestGetState-1" - executorID2 := "exec-TestGetState-2" - shardID1 := "shard-1" - shardID2 := "shard-2" - - // Setup: Record heartbeats and assign shards. - require.NoError(t, tc.store.RecordHeartbeat(ctx, tc.namespace, executorID1, store.HeartbeatState{Status: types.ExecutorStatusACTIVE})) - require.NoError(t, tc.store.RecordHeartbeat(ctx, tc.namespace, executorID2, store.HeartbeatState{Status: types.ExecutorStatusDRAINING})) - require.NoError(t, tc.store.AssignShards(ctx, tc.namespace, store.AssignShardsRequest{ - NewState: &store.NamespaceState{ - ShardAssignments: map[string]store.AssignedState{ - executorID1: {AssignedShards: map[string]*types.ShardAssignment{shardID1: {}}}, - executorID2: {AssignedShards: map[string]*types.ShardAssignment{shardID2: {}}}, - }, - }, - }, store.NopGuard())) - - // Action: Get the state. - namespaceState, err := tc.store.GetState(ctx, tc.namespace) - require.NoError(t, err) - - // Verification: - // 1. Check Executors - require.Len(t, namespaceState.Executors, 2, "Should retrieve two heartbeat states") - assert.Equal(t, types.ExecutorStatusACTIVE, namespaceState.Executors[executorID1].Status) - assert.Equal(t, types.ExecutorStatusDRAINING, namespaceState.Executors[executorID2].Status) - - // 2. Check Shard ownership and revisions - require.Len(t, namespaceState.Shards, 2, "Should retrieve two shard states") - assert.Equal(t, executorID1, namespaceState.Shards[shardID1].ExecutorID) - assert.Greater(t, namespaceState.Shards[shardID1].Revision, int64(0)) - assert.Equal(t, executorID2, namespaceState.Shards[shardID2].ExecutorID) - assert.Greater(t, namespaceState.Shards[shardID2].Revision, int64(0)) - - // 3. Check ShardAssignments (from executor records) - require.Len(t, namespaceState.ShardAssignments, 2, "Should retrieve two assignment states") - assert.Contains(t, namespaceState.ShardAssignments[executorID1].AssignedShards, shardID1) - assert.Contains(t, namespaceState.ShardAssignments[executorID2].AssignedShards, shardID2) -} - -// TestAssignShards_WithRevisions tests the optimistic locking logic of AssignShards. -func TestAssignShards_WithRevisions(t *testing.T) { - tc := setupStoreTestCluster(t) - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - - executorID1 := "exec-rev-1" - executorID2 := "exec-rev-2" - require.NoError(t, tc.store.RecordHeartbeat(ctx, tc.namespace, executorID1, store.HeartbeatState{Status: types.ExecutorStatusACTIVE})) - require.NoError(t, tc.store.RecordHeartbeat(ctx, tc.namespace, executorID2, store.HeartbeatState{Status: types.ExecutorStatusACTIVE})) - - t.Run("Success", func(t *testing.T) { - // 1. Get initial state (empty) - initialState, err := tc.store.GetState(ctx, tc.namespace) - require.NoError(t, err) - - // 2. Define a new state: assign shard1 to exec1 - newState := &store.NamespaceState{ - Shards: initialState.Shards, // Pass the read revisions - ShardAssignments: map[string]store.AssignedState{ - executorID1: {AssignedShards: map[string]*types.ShardAssignment{"shard-1": {}}}, - }, - } - - // 3. Assign - should succeed - err = tc.store.AssignShards(ctx, tc.namespace, store.AssignShardsRequest{NewState: newState}, store.NopGuard()) - require.NoError(t, err) - - // 4. Verify the assignment - owner, err := tc.store.GetShardOwner(ctx, tc.namespace, "shard-1") - require.NoError(t, err) - assert.Equal(t, executorID1, owner) - }) - - t.Run("ConflictOnNewShard", func(t *testing.T) { - // 1. Get initial state - initialState, err := tc.store.GetState(ctx, tc.namespace) - require.NoError(t, err) - - // 2. Process A defines its desired state: assign shard-new to exec1 - processAState := &store.NamespaceState{ - Shards: initialState.Shards, - ShardAssignments: map[string]store.AssignedState{ - executorID1: {AssignedShards: map[string]*types.ShardAssignment{"shard-new": {}}}, - }, - } - - // 3. Process B defines its desired state: assign shard-new to exec2 - processBState := &store.NamespaceState{ - Shards: initialState.Shards, - ShardAssignments: map[string]store.AssignedState{ - executorID2: {AssignedShards: map[string]*types.ShardAssignment{"shard-new": {}}}, - }, - } - - // 4. Process A succeeds - err = tc.store.AssignShards(ctx, tc.namespace, store.AssignShardsRequest{NewState: processAState}, store.NopGuard()) - require.NoError(t, err) - - // 5. Process B tries to commit, but its revision check for shard-new (rev=0) will fail. - err = tc.store.AssignShards(ctx, tc.namespace, store.AssignShardsRequest{NewState: processBState}, store.NopGuard()) - require.Error(t, err) - assert.ErrorIs(t, err, store.ErrVersionConflict) - }) - - t.Run("ConflictOnExistingShard", func(t *testing.T) { - shardID := "shard-to-move" - // 1. Setup: Assign the shard to executor1 - setupState, err := tc.store.GetState(ctx, tc.namespace) - require.NoError(t, err) - setupState.ShardAssignments = map[string]store.AssignedState{ - executorID1: {AssignedShards: map[string]*types.ShardAssignment{shardID: {}}}, - } - require.NoError(t, tc.store.AssignShards(ctx, tc.namespace, store.AssignShardsRequest{NewState: setupState}, store.NopGuard())) - - // 2. Process A reads the state, intending to move the shard to executor2 - stateForProcA, err := tc.store.GetState(ctx, tc.namespace) - require.NoError(t, err) - stateForProcA.ShardAssignments = map[string]store.AssignedState{ - executorID2: {AssignedShards: map[string]*types.ShardAssignment{shardID: {}}}, - } - - // 3. In the meantime, another process makes a different change (e.g., re-assigns to same executor, which changes revision) - intermediateState, err := tc.store.GetState(ctx, tc.namespace) - require.NoError(t, err) - intermediateState.ShardAssignments = map[string]store.AssignedState{ - executorID1: {AssignedShards: map[string]*types.ShardAssignment{shardID: {}}}, - } - require.NoError(t, tc.store.AssignShards(ctx, tc.namespace, store.AssignShardsRequest{NewState: intermediateState}, store.NopGuard())) - - // 4. Process A tries to commit its change. It will fail because its stored revision for the shard is now stale. - err = tc.store.AssignShards(ctx, tc.namespace, store.AssignShardsRequest{NewState: stateForProcA}, store.NopGuard()) - require.Error(t, err) - assert.ErrorIs(t, err, store.ErrVersionConflict) - }) - - t.Run("DeletedShards", func(t *testing.T) { - shardID := "shard-to-delete" - - // 1. Setup: Assign the shard to executor1 - setupState, err := tc.store.GetState(ctx, tc.namespace) - require.NoError(t, err) - setupState.ShardAssignments = map[string]store.AssignedState{ - executorID1: {AssignedShards: map[string]*types.ShardAssignment{shardID: {}}}, - } - require.NoError(t, tc.store.AssignShards(ctx, tc.namespace, store.AssignShardsRequest{NewState: setupState}, store.NopGuard())) - - // 2. Delete the shard - stateAfterAssign, err := tc.store.GetState(ctx, tc.namespace) - require.NoError(t, err) - - deletedShards := map[string]store.ShardState{shardID: stateAfterAssign.Shards[shardID]} - stateAfterAssign.ShardAssignments = map[string]store.AssignedState{ - executorID1: {AssignedShards: map[string]*types.ShardAssignment{}}, - } - err = tc.store.AssignShards(ctx, tc.namespace, store.AssignShardsRequest{NewState: stateAfterAssign, ShardsToDelete: deletedShards}, store.NopGuard()) - require.NoError(t, err) - - // 3. Verify the shard is deleted - owner, err := tc.store.GetShardOwner(ctx, tc.namespace, shardID) - assert.ErrorIs(t, err, store.ErrShardNotFound) - assert.Equal(t, "", owner) - }) - - t.Run("NoChanges", func(t *testing.T) { - // Get the current state - state, err := tc.store.GetState(ctx, tc.namespace) - require.NoError(t, err) - - // Call AssignShards with the same assignments - err = tc.store.AssignShards(ctx, tc.namespace, store.AssignShardsRequest{NewState: state}, store.NopGuard()) - require.NoError(t, err, "Assigning with no changes should succeed") - }) -} - -// TestGuardedOperations verifies that AssignShards and DeleteExecutors respect the leader guard. -func TestGuardedOperations(t *testing.T) { - tc := setupStoreTestCluster(t) - ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) - defer cancel() - - namespace := "test-guarded-ns" - executorID := "exec-to-delete" - - // 1. Create two potential leaders - // FIX: Use the correct constructor for the leader elector. - elector, err := NewLeaderStore(StoreParams{Client: tc.client, Cfg: tc.leaderCfg, Lifecycle: tc.lifecycle}) - require.NoError(t, err) - election1, err := elector.CreateElection(ctx, namespace) - require.NoError(t, err) - defer election1.Cleanup(ctx) - election2, err := elector.CreateElection(ctx, namespace) - require.NoError(t, err) - defer election2.Cleanup(ctx) - - // 2. First node becomes leader - require.NoError(t, election1.Campaign(ctx, "host-1")) - validGuard := election1.Guard() - - // 3. Use the valid guard to assign shards - should succeed - assignState := map[string]store.AssignedState{"exec-1": {}} - err = tc.store.AssignShards(ctx, tc.namespace, store.AssignShardsRequest{NewState: &store.NamespaceState{ShardAssignments: assignState}}, validGuard) - require.NoError(t, err, "Assigning shards with a valid leader guard should succeed") - - // 4. First node resigns, second node becomes leader - require.NoError(t, election1.Resign(ctx)) - require.NoError(t, election2.Campaign(ctx, "host-2")) - - // 5. Use the now-invalid guard from the first leader - should fail - err = tc.store.AssignShards(ctx, tc.namespace, store.AssignShardsRequest{NewState: &store.NamespaceState{ShardAssignments: assignState}}, validGuard) - require.Error(t, err, "Assigning shards with a stale leader guard should fail") - - // 6. Use the NopGuard to delete an executor - should succeed - require.NoError(t, tc.store.RecordHeartbeat(ctx, tc.namespace, executorID, store.HeartbeatState{Status: types.ExecutorStatusACTIVE})) - err = tc.store.DeleteExecutors(ctx, tc.namespace, []string{executorID}, store.NopGuard()) - require.NoError(t, err, "Deleting an executor without a guard should succeed") - - // Verify deletion - newState, err := tc.store.GetState(ctx, namespace) - require.NoError(t, err) - _, ok := newState.ShardAssignments[executorID] - require.False(t, ok, "Executor should have been deleted") -} - -// TestSubscribe verifies that the subscription channel receives notifications for significant changes. -func TestSubscribe(t *testing.T) { - tc := setupStoreTestCluster(t) - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - - executorID := "exec-sub" - - // Start subscription - sub, err := tc.store.Subscribe(ctx, tc.namespace) - require.NoError(t, err) - - // Manually put a heartbeat update, which is an insignificant change - heartbeatKey := tc.store.buildExecutorKey(tc.namespace, executorID, "heartbeat") - _, err = tc.client.Put(ctx, heartbeatKey, "timestamp") - require.NoError(t, err) - - select { - case <-sub: - t.Fatal("Should not receive notification for a heartbeat-only update") - case <-time.After(100 * time.Millisecond): - // Expected behavior - } - - // Now update the reported shards, which IS a significant change - reportedShardsKey := tc.store.buildExecutorKey(tc.namespace, executorID, "reported_shards") - _, err = tc.client.Put(ctx, reportedShardsKey, `{"shard-1":{"status":"running"}}`) - require.NoError(t, err) - - select { - case rev, ok := <-sub: - require.True(t, ok, "Channel should be open") - assert.Greater(t, rev, int64(0), "Should receive a valid revision for reported shards change") - case <-time.After(1 * time.Second): - t.Fatal("Should have received a notification for a reported shards change") - } -} - -func TestDeleteExecutors_Empty(t *testing.T) { - tc := setupStoreTestCluster(t) - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - - err := tc.store.DeleteExecutors(ctx, tc.namespace, []string{}, store.NopGuard()) - require.NoError(t, err) -} - -// TestDeleteExecutors covers various scenarios for the DeleteExecutors method. -func TestDeleteExecutors(t *testing.T) { - tc := setupStoreTestCluster(t) - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - - // Setup: Create two active executors for the tests. - executorID1 := "executor-to-delete-1" - executorID2 := "executor-to-delete-2" - survivingExecutorID := "executor-survivor" - require.NoError(t, tc.store.RecordHeartbeat(ctx, tc.namespace, executorID1, store.HeartbeatState{Status: types.ExecutorStatusACTIVE})) - require.NoError(t, tc.store.RecordHeartbeat(ctx, tc.namespace, executorID2, store.HeartbeatState{Status: types.ExecutorStatusACTIVE})) - require.NoError(t, tc.store.RecordHeartbeat(ctx, tc.namespace, survivingExecutorID, store.HeartbeatState{Status: types.ExecutorStatusACTIVE})) - - t.Run("DoesNotDeleteAssignedShards", func(t *testing.T) { - shardID := "shard-assigned" - require.NoError(t, tc.store.AssignShard(ctx, tc.namespace, shardID, executorID1), "Setup: Assign shard") - - // Action: Delete the executor. - err := tc.store.DeleteExecutors(ctx, tc.namespace, []string{executorID1}, store.NopGuard()) - require.NoError(t, err) - - // Verification: - // 1. Check that the executor is gone. - _, _, err = tc.store.GetHeartbeat(ctx, tc.namespace, executorID1) - assert.ErrorIs(t, err, store.ErrExecutorNotFound, "Executor should be deleted") - - // 2. Check that its assigned shard is not deleted. - owner, err := tc.store.GetShardOwner(ctx, tc.namespace, shardID) - assert.NoError(t, err) - assert.Equal(t, executorID1, owner) - }) - - t.Run("SucceedsForNonExistentExecutor", func(t *testing.T) { - // Action: Delete a non-existent executor. - err := tc.store.DeleteExecutors(ctx, tc.namespace, []string{"non-existent-executor"}, store.NopGuard()) - // Verification: Should not return an error. - require.NoError(t, err) - }) - - t.Run("DeletesMultipleExecutors", func(t *testing.T) { - // Setup: Create and assign shards to multiple executors. - execToDelete1 := "multi-delete-1" - execToDelete2 := "multi-delete-2" - execToKeep := "multi-keep-1" - shardOfDeletedExecutor1 := "multi-shard-1" - shardOfDeletedExecutor2 := "multi-shard-2" - shardOfSurvivingExecutor := "multi-shard-keep" - - require.NoError(t, tc.store.RecordHeartbeat(ctx, tc.namespace, execToDelete1, store.HeartbeatState{Status: types.ExecutorStatusACTIVE})) - require.NoError(t, tc.store.RecordHeartbeat(ctx, tc.namespace, execToDelete2, store.HeartbeatState{Status: types.ExecutorStatusACTIVE})) - require.NoError(t, tc.store.RecordHeartbeat(ctx, tc.namespace, execToKeep, store.HeartbeatState{Status: types.ExecutorStatusACTIVE})) - - require.NoError(t, tc.store.AssignShard(ctx, tc.namespace, shardOfDeletedExecutor1, execToDelete1)) - require.NoError(t, tc.store.AssignShard(ctx, tc.namespace, shardOfDeletedExecutor2, execToDelete2)) - require.NoError(t, tc.store.AssignShard(ctx, tc.namespace, shardOfSurvivingExecutor, execToKeep)) - - // Action: Delete two of the three executors in one call. - err := tc.store.DeleteExecutors(ctx, tc.namespace, []string{execToDelete1, execToDelete2}, store.NopGuard()) - require.NoError(t, err) - - // Verification: - // 1. Check deleted executors are gone, but their shards are not. - _, _, err = tc.store.GetHeartbeat(ctx, tc.namespace, execToDelete1) - assert.ErrorIs(t, err, store.ErrExecutorNotFound, "Executor 1 should be gone") - owner, err := tc.store.GetShardOwner(ctx, tc.namespace, shardOfDeletedExecutor1) - assert.NoError(t, err) - assert.Equal(t, execToDelete1, owner) - - _, _, err = tc.store.GetHeartbeat(ctx, tc.namespace, execToDelete2) - assert.ErrorIs(t, err, store.ErrExecutorNotFound, "Executor 2 should be gone") - owner, err = tc.store.GetShardOwner(ctx, tc.namespace, shardOfDeletedExecutor2) - assert.NoError(t, err) - assert.Equal(t, execToDelete2, owner) - - // 2. Check that the surviving executor and its shard remain. - _, _, err = tc.store.GetHeartbeat(ctx, tc.namespace, execToKeep) - assert.NoError(t, err, "Surviving executor should still exist") - owner, err = tc.store.GetShardOwner(ctx, tc.namespace, shardOfSurvivingExecutor) - assert.NoError(t, err, "Surviving shard should still exist") - assert.Equal(t, execToKeep, owner, "Surviving shard should retain its owner") - }) -} - -func TestParseExecutorKey_Errors(t *testing.T) { - tc := setupStoreTestCluster(t) - - _, _, err := tc.store.parseExecutorKey(tc.namespace, "/wrong/prefix/exec/heartbeat") - require.Error(t, err) - assert.Contains(t, err.Error(), "does not have expected prefix") - - key := tc.store.buildExecutorPrefix(tc.namespace) + "too/many/parts" - _, _, err = tc.store.parseExecutorKey(tc.namespace, key) - require.Error(t, err) - assert.Contains(t, err.Error(), "unexpected key format") -} - -// TestAssignAndGetShardOwnerRoundtrip verifies the successful assignment and retrieval of a shard owner. -func TestAssignAndGetShardOwnerRoundtrip(t *testing.T) { - tc := setupStoreTestCluster(t) - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - - executorID := "executor-roundtrip" - shardID := "shard-roundtrip" - - // Setup: Create an active executor. - err := tc.store.RecordHeartbeat(ctx, tc.namespace, executorID, store.HeartbeatState{Status: types.ExecutorStatusACTIVE}) - require.NoError(t, err) - - // 1. Assign a shard to the active executor. - err = tc.store.AssignShard(ctx, tc.namespace, shardID, executorID) - require.NoError(t, err, "Should successfully assign shard to an active executor") - - // 2. Get the owner and verify it's the correct executor. - owner, err := tc.store.GetShardOwner(ctx, tc.namespace, shardID) - require.NoError(t, err, "Should successfully get the shard owner after assignment") - assert.Equal(t, executorID, owner, "Owner should be the executor it was assigned to") -} - -// TestAssignShardErrors tests the various error conditions when assigning a shard. -func TestAssignShardErrors(t *testing.T) { - tc := setupStoreTestCluster(t) - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - - activeExecutorID := "executor-active-errors" - drainingExecutorID := "executor-draining-errors" - shardID1 := "shard-err-1" - shardID2 := "shard-err-2" - - // Setup: Create an active and a draining executor, and assign one shard. - err := tc.store.RecordHeartbeat(ctx, tc.namespace, activeExecutorID, store.HeartbeatState{Status: types.ExecutorStatusACTIVE}) - require.NoError(t, err) - err = tc.store.RecordHeartbeat(ctx, tc.namespace, drainingExecutorID, store.HeartbeatState{Status: types.ExecutorStatusDRAINING}) - require.NoError(t, err) - err = tc.store.AssignShard(ctx, tc.namespace, shardID1, activeExecutorID) - require.NoError(t, err) - - // Case 1: Assigning an already-assigned shard. - err = tc.store.AssignShard(ctx, tc.namespace, shardID1, activeExecutorID) - require.Error(t, err, "Should fail to assign an already-assigned shard") - assert.ErrorIs(t, err, store.ErrVersionConflict, "Error should be ErrVersionConflict for duplicate assignment") - - // Case 2: Assigning to a non-existent executor. - err = tc.store.AssignShard(ctx, tc.namespace, shardID2, "non-existent-executor") - require.Error(t, err, "Should fail to assign to a non-existent executor") - assert.ErrorIs(t, err, store.ErrExecutorNotFound, "Error should be ErrExecutorNotFound") - - // Case 3: Assigning to a non-active (draining) executor. - err = tc.store.AssignShard(ctx, tc.namespace, shardID2, drainingExecutorID) - require.Error(t, err, "Should fail to assign to a draining executor") - assert.ErrorIs(t, err, store.ErrVersionConflict, "Error should be ErrVersionConflict for non-active executor") -} - -// TestGetShardOwnerErrors tests error conditions for getting a shard owner. -func TestGetShardOwnerErrors(t *testing.T) { - tc := setupStoreTestCluster(t) - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - - // Try to get the owner of a shard that has not been assigned. - _, err := tc.store.GetShardOwner(ctx, tc.namespace, "non-existent-shard") - require.Error(t, err, "Should return an error for a non-existent shard") - assert.ErrorIs(t, err, store.ErrShardNotFound, "Error should be ErrShardNotFound") -} - -// --- Test Setup --- - -type storeTestCluster struct { - store *Store // Use concrete type to access buildExecutorKey - namespace string - leaderCfg shardDistributorCfg.ShardDistribution - client *clientv3.Client - lifecycle *fxtest.Lifecycle -} - -func setupStoreTestCluster(t *testing.T) *storeTestCluster { - t.Helper() - flag.Parse() - testflags.RequireEtcd(t) - - namespace := fmt.Sprintf("ns-%s", strings.ToLower(t.Name())) - - endpoints := strings.Split(os.Getenv("ETCD_ENDPOINTS"), ",") - if endpoints == nil || len(endpoints) == 0 || endpoints[0] == "" { - endpoints = []string{"localhost:2379"} - } - t.Logf("ETCD endpoints: %v", endpoints) - - etcdConfigRaw := map[string]interface{}{ - "endpoints": endpoints, - "dialTimeout": "5s", - "prefix": fmt.Sprintf("/test-shard-store/%s", t.Name()), - "electionTTL": "5s", // Needed for leader config part - } - - yamlCfg, err := yaml.Marshal(etcdConfigRaw) - require.NoError(t, err) - var yamlNode *config.YamlNode - err = yaml.Unmarshal(yamlCfg, &yamlNode) - require.NoError(t, err) - - leaderCfg := shardDistributorCfg.ShardDistribution{ - Enabled: true, - Store: shardDistributorCfg.Store{StorageParams: yamlNode}, - LeaderStore: shardDistributorCfg.Store{StorageParams: yamlNode}, - } - - lifecycle := fxtest.NewLifecycle(t) - s, err := NewStore(StoreParams{ - Cfg: leaderCfg, - Lifecycle: lifecycle, - }) - require.NoError(t, err) - - client, err := clientv3.New(clientv3.Config{Endpoints: endpoints, DialTimeout: 5 * time.Second}) - require.NoError(t, err) - t.Cleanup(func() { client.Close() }) - - rawStore := s.(*Store) - - _, err = client.Delete(context.Background(), rawStore.buildNamespacePrefix(namespace), clientv3.WithPrefix()) - require.NoError(t, err) - - return &storeTestCluster{ - namespace: namespace, - store: rawStore, - leaderCfg: leaderCfg, - client: client, - lifecycle: lifecycle, - } -} - -func stringStatus(s types.ExecutorStatus) string { - res, err := json.Marshal(s) - if err != nil { - panic(err) - } - return string(res) -} diff --git a/service/sharddistributor/store/etcd/etcdstore.go b/service/sharddistributor/store/etcd/executorstore/etcdstore.go similarity index 59% rename from service/sharddistributor/store/etcd/etcdstore.go rename to service/sharddistributor/store/etcd/executorstore/etcdstore.go index 243b949cbd8..e8fc6f50d73 100644 --- a/service/sharddistributor/store/etcd/etcdstore.go +++ b/service/sharddistributor/store/etcd/executorstore/etcdstore.go @@ -1,50 +1,51 @@ -package etcd +package executorstore + +//go:generate mockgen -package $GOPACKAGE -source $GOFILE -destination=executorstore_mock.go ExecutorStore import ( + "bytes" "context" "encoding/json" + "errors" "fmt" "strconv" - "strings" "time" clientv3 "go.etcd.io/etcd/client/v3" "go.uber.org/fx" + "github.com/uber/cadence/common/log" + "github.com/uber/cadence/common/log/tag" "github.com/uber/cadence/common/types" "github.com/uber/cadence/service/sharddistributor/config" "github.com/uber/cadence/service/sharddistributor/store" -) - -const ( - executorHeartbeatKey = "heartbeat" - executorStatusKey = "status" - executorReportedShardsKey = "reported_shards" - executorAssignedStateKey = "assigned_state" - shardAssignedKey = "assigned" + "github.com/uber/cadence/service/sharddistributor/store/etcd/etcdkeys" + "github.com/uber/cadence/service/sharddistributor/store/etcd/executorstore/shardcache" ) var ( _executorStatusRunningJSON = fmt.Sprintf(`"%s"`, types.ExecutorStatusACTIVE) ) -// Store implements the generic store.Store interface using etcd as the backend. -type Store struct { - client *clientv3.Client - prefix string +type executorStoreImpl struct { + client *clientv3.Client + prefix string + logger log.Logger + shardCache *shardcache.ShardToExecutorCache } -// StoreParams defines the dependencies for the etcd store, for use with fx. -type StoreParams struct { +// ExecutorStoreParams defines the dependencies for the etcd store, for use with fx. +type ExecutorStoreParams struct { fx.In Client *clientv3.Client `optional:"true"` Cfg config.ShardDistribution Lifecycle fx.Lifecycle + Logger log.Logger } // NewStore creates a new etcd-backed store and provides it to the fx application. -func NewStore(p StoreParams) (store.Store, error) { +func NewStore(p ExecutorStoreParams) (store.Store, error) { if !p.Cfg.Enabled { return nil, nil } @@ -71,20 +72,44 @@ func NewStore(p StoreParams) (store.Store, error) { } } - p.Lifecycle.Append(fx.StopHook(etcdClient.Close)) + shardCache := shardcache.NewShardToExecutorCache(etcdCfg.Prefix, etcdClient, p.Logger) - return &Store{ - client: etcdClient, - prefix: etcdCfg.Prefix, - }, nil + store := &executorStoreImpl{ + client: etcdClient, + prefix: etcdCfg.Prefix, + logger: p.Logger, + shardCache: shardCache, + } + + p.Lifecycle.Append(fx.StartStopHook(store.Start, store.Stop)) + + return store, nil +} + +func (s *executorStoreImpl) Start() { + s.shardCache.Start() +} + +func (s *executorStoreImpl) Stop() { + s.shardCache.Stop() + s.client.Close() } // --- HeartbeatStore Implementation --- -func (s *Store) RecordHeartbeat(ctx context.Context, namespace, executorID string, request store.HeartbeatState) error { - heartbeatETCDKey := s.buildExecutorKey(namespace, executorID, executorHeartbeatKey) - stateETCDKey := s.buildExecutorKey(namespace, executorID, executorStatusKey) - reportedShardsETCDKey := s.buildExecutorKey(namespace, executorID, executorReportedShardsKey) +func (s *executorStoreImpl) RecordHeartbeat(ctx context.Context, namespace, executorID string, request store.HeartbeatState) error { + heartbeatETCDKey, err := etcdkeys.BuildExecutorKey(s.prefix, namespace, executorID, etcdkeys.ExecutorHeartbeatKey) + if err != nil { + return fmt.Errorf("build executor heartbeat key: %w", err) + } + stateETCDKey, err := etcdkeys.BuildExecutorKey(s.prefix, namespace, executorID, etcdkeys.ExecutorStatusKey) + if err != nil { + return fmt.Errorf("build executor status key: %w", err) + } + reportedShardsETCDKey, err := etcdkeys.BuildExecutorKey(s.prefix, namespace, executorID, etcdkeys.ExecutorReportedShardsKey) + if err != nil { + return fmt.Errorf("build executor reported shards key: %w", err) + } reportedShardsData, err := json.Marshal(request.ReportedShards) if err != nil { @@ -110,9 +135,12 @@ func (s *Store) RecordHeartbeat(ctx context.Context, namespace, executorID strin } // GetHeartbeat retrieves the last known heartbeat state for a single executor. -func (s *Store) GetHeartbeat(ctx context.Context, namespace string, executorID string) (*store.HeartbeatState, *store.AssignedState, error) { +func (s *executorStoreImpl) GetHeartbeat(ctx context.Context, namespace string, executorID string) (*store.HeartbeatState, *store.AssignedState, error) { // The prefix for all keys related to a single executor. - executorPrefix := s.buildExecutorKey(namespace, executorID, "") + executorPrefix, err := etcdkeys.BuildExecutorKey(s.prefix, namespace, executorID, "") + if err != nil { + return nil, nil, fmt.Errorf("build executor prefix: %w", err) + } resp, err := s.client.Get(ctx, executorPrefix, clientv3.WithPrefix()) if err != nil { return nil, nil, fmt.Errorf("etcd get failed for executor %s: %w", executorID, err) @@ -129,30 +157,30 @@ func (s *Store) GetHeartbeat(ctx context.Context, namespace string, executorID s for _, kv := range resp.Kvs { key := string(kv.Key) value := string(kv.Value) - _, keyType, keyErr := s.parseExecutorKey(namespace, key) + _, keyType, keyErr := etcdkeys.ParseExecutorKey(s.prefix, namespace, key) if keyErr != nil { continue // Ignore unexpected keys } found = true // We found at least one valid key part for the executor. switch keyType { - case executorHeartbeatKey: + case etcdkeys.ExecutorHeartbeatKey: timestamp, err := strconv.ParseInt(value, 10, 64) if err != nil { return nil, nil, fmt.Errorf("parse heartbeat timestamp: %w", err) } heartbeatState.LastHeartbeat = timestamp - case executorStatusKey: + case etcdkeys.ExecutorStatusKey: err := json.Unmarshal([]byte(value), &heartbeatState.Status) if err != nil { return nil, nil, fmt.Errorf("parse heartbeat state: %w, value %s", err, value) } - case executorReportedShardsKey: + case etcdkeys.ExecutorReportedShardsKey: err = json.Unmarshal(kv.Value, &heartbeatState.ReportedShards) if err != nil { return nil, nil, fmt.Errorf("unmarshal reported shards: %w", err) } - case executorAssignedStateKey: + case etcdkeys.ExecutorAssignedStateKey: err = json.Unmarshal(kv.Value, &assignedState) if err != nil { return nil, nil, fmt.Errorf("unmarshal assigned shards: %w", err) @@ -170,11 +198,11 @@ func (s *Store) GetHeartbeat(ctx context.Context, namespace string, executorID s // --- ShardStore Implementation --- -func (s *Store) GetState(ctx context.Context, namespace string) (*store.NamespaceState, error) { +func (s *executorStoreImpl) GetState(ctx context.Context, namespace string) (*store.NamespaceState, error) { heartbeatStates := make(map[string]store.HeartbeatState) assignedStates := make(map[string]store.AssignedState) - executorPrefix := s.buildExecutorPrefix(namespace) + executorPrefix := etcdkeys.BuildExecutorPrefix(s.prefix, namespace) resp, err := s.client.Get(ctx, executorPrefix, clientv3.WithPrefix()) if err != nil { return nil, fmt.Errorf("get executor data: %w", err) @@ -183,85 +211,69 @@ func (s *Store) GetState(ctx context.Context, namespace string) (*store.Namespac for _, kv := range resp.Kvs { key := string(kv.Key) value := string(kv.Value) - executorID, keyType, keyErr := s.parseExecutorKey(namespace, key) + executorID, keyType, keyErr := etcdkeys.ParseExecutorKey(s.prefix, namespace, key) if keyErr != nil { continue } heartbeat := heartbeatStates[executorID] assigned := assignedStates[executorID] switch keyType { - case executorHeartbeatKey: + case etcdkeys.ExecutorHeartbeatKey: timestamp, _ := strconv.ParseInt(value, 10, 64) heartbeat.LastHeartbeat = timestamp - case executorStatusKey: + case etcdkeys.ExecutorStatusKey: err := json.Unmarshal([]byte(value), &heartbeat.Status) if err != nil { return nil, fmt.Errorf("parse heartbeat state: %w, value %s", err, value) } - case executorReportedShardsKey: + case etcdkeys.ExecutorReportedShardsKey: err = json.Unmarshal(kv.Value, &heartbeat.ReportedShards) if err != nil { return nil, fmt.Errorf("unmarshal reported shards: %w", err) } - case executorAssignedStateKey: + case etcdkeys.ExecutorAssignedStateKey: err = json.Unmarshal(kv.Value, &assigned) if err != nil { return nil, fmt.Errorf("unmarshal assigned shards: %w, %s", err, value) } + assigned.ModRevision = kv.ModRevision } heartbeatStates[executorID] = heartbeat assignedStates[executorID] = assigned } - shardStates := make(map[string]store.ShardState) - shardsPrefix := s.buildShardsPrefix(namespace) - shardResp, err := s.client.Get(ctx, shardsPrefix, clientv3.WithPrefix()) - if err != nil { - return nil, fmt.Errorf("get shard ownership data: %w", err) - } - for _, kv := range shardResp.Kvs { - key := string(kv.Key) - remainder := strings.TrimPrefix(key, shardsPrefix) - parts := strings.Split(remainder, "/") - if len(parts) < 2 || parts[1] != shardAssignedKey { - continue - } - shardID := parts[0] - shardStates[shardID] = store.ShardState{ - ExecutorID: string(kv.Value), - Revision: kv.ModRevision, - } - } - return &store.NamespaceState{ Executors: heartbeatStates, - Shards: shardStates, ShardAssignments: assignedStates, GlobalRevision: resp.Header.Revision, }, nil } -func (s *Store) Subscribe(ctx context.Context, namespace string) (<-chan int64, error) { +func (s *executorStoreImpl) Subscribe(ctx context.Context, namespace string) (<-chan int64, error) { revisionChan := make(chan int64, 1) - watchPrefix := s.buildExecutorPrefix(namespace) + watchPrefix := etcdkeys.BuildExecutorPrefix(s.prefix, namespace) go func() { defer close(revisionChan) - watchChan := s.client.Watch(ctx, watchPrefix, clientv3.WithPrefix()) + watchChan := s.client.Watch(ctx, watchPrefix, clientv3.WithPrefix(), clientv3.WithPrevKV()) for watchResp := range watchChan { if err := watchResp.Err(); err != nil { return } isSignificantChange := false for _, event := range watchResp.Events { + if event.IsModify() && bytes.Equal(event.Kv.Value, event.PrevKv.Value) { + continue // Value is unchanged, ignore this event. + } + if !event.IsCreate() && !event.IsModify() { isSignificantChange = true break } - _, keyType, err := s.parseExecutorKey(namespace, string(event.Kv.Key)) + _, keyType, err := etcdkeys.ParseExecutorKey(s.prefix, namespace, string(event.Kv.Key)) if err != nil { continue } - if keyType != executorHeartbeatKey && keyType != executorAssignedStateKey { + if keyType != etcdkeys.ExecutorHeartbeatKey && keyType != etcdkeys.ExecutorAssignedStateKey { isSignificantChange = true break } @@ -278,7 +290,7 @@ func (s *Store) Subscribe(ctx context.Context, namespace string) (<-chan int64, return revisionChan, nil } -func (s *Store) AssignShards(ctx context.Context, namespace string, request store.AssignShardsRequest, guard store.GuardFunc) error { +func (s *executorStoreImpl) AssignShards(ctx context.Context, namespace string, request store.AssignShardsRequest, guard store.GuardFunc) error { var ops []clientv3.Op var comparisons []clientv3.Cmp @@ -286,36 +298,16 @@ func (s *Store) AssignShards(ctx context.Context, namespace string, request stor // and comparisons to check for concurrent modifications. for executorID, state := range request.NewState.ShardAssignments { // Update the executor's assigned_state key. - executorStateKey := s.buildExecutorKey(namespace, executorID, executorAssignedStateKey) + executorStateKey, err := etcdkeys.BuildExecutorKey(s.prefix, namespace, executorID, etcdkeys.ExecutorAssignedStateKey) + if err != nil { + return fmt.Errorf("build executor assigned state key: %w", err) + } value, err := json.Marshal(state) if err != nil { return fmt.Errorf("marshal assigned shards for executor %s: %w", executorID, err) } ops = append(ops, clientv3.OpPut(executorStateKey, string(value))) - - // For each shard in the new assignment, add a Put operation and a revision check. - for shardID := range state.AssignedShards { - shardOwnerKey := s.buildShardKey(namespace, shardID, shardAssignedKey) - ops = append(ops, clientv3.OpPut(shardOwnerKey, executorID)) - - // Check the revision of the shard from the state we read in GetState. - previousShardState, ok := request.NewState.Shards[shardID] - if ok { - // The shard existed before. Check that its revision has not changed. - // This handles moves and re-assignments to the same executor. - comparisons = append(comparisons, clientv3.Compare(clientv3.ModRevision(shardOwnerKey), "=", previousShardState.Revision)) - } else { - // The shard is new. Check that it has not been created by another process. - // A non-existent key has a ModRevision of 0. - comparisons = append(comparisons, clientv3.Compare(clientv3.ModRevision(shardOwnerKey), "=", 0)) - } - } - } - - for shardID, shardState := range request.ShardsToDelete { - shardOwnerKey := s.buildShardKey(namespace, shardID, shardAssignedKey) - ops = append(ops, clientv3.OpDelete(shardOwnerKey)) - comparisons = append(comparisons, clientv3.Compare(clientv3.ModRevision(shardOwnerKey), "=", shardState.Revision)) + comparisons = append(comparisons, clientv3.Compare(clientv3.ModRevision(executorStateKey), "=", state.ModRevision)) } if len(ops) == 0 { @@ -368,22 +360,15 @@ func (s *Store) AssignShards(ctx context.Context, namespace string, request stor return nil } -func (s *Store) GetShardOwner(ctx context.Context, namespace, shardID string) (string, error) { - key := s.buildShardKey(namespace, shardID, shardAssignedKey) - resp, err := s.client.Get(ctx, key) +func (s *executorStoreImpl) AssignShard(ctx context.Context, namespace, shardID, executorID string) error { + assignedState, err := etcdkeys.BuildExecutorKey(s.prefix, namespace, executorID, etcdkeys.ExecutorAssignedStateKey) if err != nil { - return "", fmt.Errorf("etcd get for shard %s: %w", shardID, err) + return fmt.Errorf("build executor assigned state key: %w", err) } - if resp.Count == 0 { - return "", store.ErrShardNotFound + statusKey, err := etcdkeys.BuildExecutorKey(s.prefix, namespace, executorID, etcdkeys.ExecutorStatusKey) + if err != nil { + return fmt.Errorf("build executor status key: %w", err) } - return string(resp.Kvs[0].Value), nil -} - -func (s *Store) AssignShard(ctx context.Context, namespace, shardID, executorID string) error { - assignedState := s.buildExecutorKey(namespace, executorID, executorAssignedStateKey) - statusKey := s.buildExecutorKey(namespace, executorID, executorStatusKey) - shardOwnerKey := s.buildShardKey(namespace, shardID, shardAssignedKey) // Use a read-modify-write loop to handle concurrent updates safely. for { @@ -418,20 +403,32 @@ func (s *Store) AssignShard(ctx context.Context, namespace, shardID, executorID return fmt.Errorf("marshal new assigned state: %w", err) } + var comparisons []clientv3.Cmp + // 3. Prepare and commit the transaction with three atomic checks. - // a) Check that the shard is not already assigned (its key revision must be 0). - cmpShardUnassigned := clientv3.Compare(clientv3.ModRevision(shardOwnerKey), "=", 0) - // b) Check that the executor's status is ACTIVE. - cmpStatus := clientv3.Compare(clientv3.Value(statusKey), "=", _executorStatusRunningJSON) - // c) Check that the assigned_state key hasn't been changed by another process. - cmpAssignedState := clientv3.Compare(clientv3.ModRevision(assignedState), "=", modRevision) + // a) Check that the executor's status is ACTIVE. + comparisons = append(comparisons, clientv3.Compare(clientv3.Value(statusKey), "=", _executorStatusRunningJSON)) + // b) Check that the assigned_state key hasn't been changed by another process. + comparisons = append(comparisons, clientv3.Compare(clientv3.ModRevision(assignedState), "=", modRevision)) + // c) Check that the cache is up to date. + cmp, err := s.shardCache.GetExecutorModRevisionCmp(namespace) + if err != nil { + return fmt.Errorf("get executor mod revision cmp: %w", err) + } + comparisons = append(comparisons, cmp...) - opUpdateExecutorState := clientv3.OpPut(assignedState, string(newStateValue)) - opUpdateShardOwner := clientv3.OpPut(shardOwnerKey, executorID) + // We check the shard cache to see if the shard is already assigned to an executor. + owner, err := s.shardCache.GetShardOwner(ctx, namespace, shardID) + if err != nil && !errors.Is(err, store.ErrShardNotFound) { + return fmt.Errorf("checking shard owner: %w", err) + } + if err == nil { + return &store.ErrShardAlreadyAssigned{ShardID: shardID, AssignedTo: owner} + } txnResp, err := s.client.Txn(ctx). - If(cmpShardUnassigned, cmpStatus, cmpAssignedState). // All conditions must be true. - Then(opUpdateExecutorState, opUpdateShardOwner). + If(comparisons...). + Then(clientv3.OpPut(assignedState, string(newStateValue))). Commit() if err != nil { @@ -442,17 +439,6 @@ func (s *Store) AssignShard(ctx context.Context, namespace, shardID, executorID return nil } - // If the transaction failed, diagnose the reason to return a specific error. - // Check for the new failure condition first. - shardResp, err := s.client.Get(ctx, shardOwnerKey) - if err != nil { - return fmt.Errorf("check shard owner after failed transaction: %w", err) - } - if len(shardResp.Kvs) > 0 { - // The shard key exists, meaning it's already assigned. This was the reason for failure. - return fmt.Errorf("%w: shard is already owned by %s", store.ErrVersionConflict, shardResp.Kvs[0].Value) - } - // If the transaction failed, another process interfered. // Provide a specific error if the status check failed. currentStatusResp, err := s.client.Get(ctx, statusKey) @@ -463,20 +449,24 @@ func (s *Store) AssignShard(ctx context.Context, namespace, shardID, executorID return fmt.Errorf(`%w: executor status is %s"`, store.ErrVersionConflict, currentStatusResp.Kvs[0].Value) } + s.logger.Info("Assign shard transaction failed due to a conflict. Retrying...", tag.ShardNamespace(namespace), tag.ShardKey(shardID), tag.ShardExecutor(executorID)) // Otherwise, it was a revision mismatch. Loop to retry the operation. } } // DeleteExecutors deletes the given executors from the store. It does not delete the shards owned by the executors, this // should be handled by the namespace processor loop as we want to reassign, not delete the shards. -func (s *Store) DeleteExecutors(ctx context.Context, namespace string, executorIDs []string, guard store.GuardFunc) error { +func (s *executorStoreImpl) DeleteExecutors(ctx context.Context, namespace string, executorIDs []string, guard store.GuardFunc) error { if len(executorIDs) == 0 { return nil } var ops []clientv3.Op for _, executorID := range executorIDs { - executorPrefix := s.buildExecutorKey(namespace, executorID, "") + executorPrefix, err := etcdkeys.BuildExecutorKey(s.prefix, namespace, executorID, "") + if err != nil { + return fmt.Errorf("build executor prefix: %w", err) + } ops = append(ops, clientv3.OpDelete(executorPrefix, clientv3.WithPrefix())) } @@ -505,37 +495,6 @@ func (s *Store) DeleteExecutors(ctx context.Context, namespace string, executorI return nil } -// --- Key Management Utilities --- - -func (s *Store) buildNamespacePrefix(namespace string) string { - return fmt.Sprintf("%s/%s", s.prefix, namespace) -} - -func (s *Store) buildShardsPrefix(namespace string) string { - return fmt.Sprintf("%s/shards/", s.buildNamespacePrefix(namespace)) -} - -func (s *Store) buildShardKey(namespace, shardID string, keyType string) string { - return fmt.Sprintf("%s%s/%s", s.buildShardsPrefix(namespace), shardID, keyType) -} - -func (s *Store) buildExecutorPrefix(namespace string) string { - return fmt.Sprintf("%s/executors/", s.buildNamespacePrefix(namespace)) -} - -func (s *Store) buildExecutorKey(namespace, executorID, keyType string) string { - return fmt.Sprintf("%s%s/%s", s.buildExecutorPrefix(namespace), executorID, keyType) -} - -func (s *Store) parseExecutorKey(namespace, key string) (executorID, keyType string, err error) { - prefix := s.buildExecutorPrefix(namespace) - if !strings.HasPrefix(key, prefix) { - return "", "", fmt.Errorf("key '%s' does not have expected prefix '%s'", key, prefix) - } - remainder := strings.TrimPrefix(key, prefix) - parts := strings.Split(remainder, "/") - if len(parts) != 2 { - return "", "", fmt.Errorf("unexpected key format: %s", key) - } - return parts[0], parts[1], nil +func (s *executorStoreImpl) GetShardOwner(ctx context.Context, namespace, shardID string) (string, error) { + return s.shardCache.GetShardOwner(ctx, namespace, shardID) } diff --git a/service/sharddistributor/store/etcd/executorstore/etcdstore_test.go b/service/sharddistributor/store/etcd/executorstore/etcdstore_test.go new file mode 100644 index 00000000000..ff3dc815e36 --- /dev/null +++ b/service/sharddistributor/store/etcd/executorstore/etcdstore_test.go @@ -0,0 +1,536 @@ +package executorstore + +import ( + "context" + "encoding/json" + "strconv" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/fx/fxtest" + + "github.com/uber/cadence/common/log/testlogger" + "github.com/uber/cadence/common/types" + "github.com/uber/cadence/service/sharddistributor/store" + "github.com/uber/cadence/service/sharddistributor/store/etcd/etcdkeys" + "github.com/uber/cadence/service/sharddistributor/store/etcd/leaderstore" + "github.com/uber/cadence/service/sharddistributor/store/etcd/testhelper" +) + +// TestRecordHeartbeat verifies that an executor's heartbeat is correctly stored. +func TestRecordHeartbeat(t *testing.T) { + tc := testhelper.SetupStoreTestCluster(t) + executorStore := createStore(t, tc) + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + nowTS := time.Now().Unix() + + executorID := "executor-TestRecordHeartbeat" + req := store.HeartbeatState{ + LastHeartbeat: nowTS, + Status: types.ExecutorStatusACTIVE, + ReportedShards: map[string]*types.ShardStatusReport{ + "shard-TestRecordHeartbeat": {Status: types.ShardStatusREADY}, + }, + } + + err := executorStore.RecordHeartbeat(ctx, tc.Namespace, executorID, req) + require.NoError(t, err) + + // Verify directly in etcd + heartbeatKey, err := etcdkeys.BuildExecutorKey(tc.EtcdPrefix, tc.Namespace, executorID, etcdkeys.ExecutorHeartbeatKey) + require.NoError(t, err) + stateKey, err := etcdkeys.BuildExecutorKey(tc.EtcdPrefix, tc.Namespace, executorID, etcdkeys.ExecutorStatusKey) + require.NoError(t, err) + reportedShardsKey, err := etcdkeys.BuildExecutorKey(tc.EtcdPrefix, tc.Namespace, executorID, etcdkeys.ExecutorReportedShardsKey) + require.NoError(t, err) + + resp, err := tc.Client.Get(ctx, heartbeatKey) + require.NoError(t, err) + assert.Equal(t, int64(1), resp.Count, "Heartbeat key should exist") + assert.Equal(t, strconv.FormatInt(nowTS, 10), string(resp.Kvs[0].Value)) + + resp, err = tc.Client.Get(ctx, stateKey) + require.NoError(t, err) + require.Equal(t, int64(1), resp.Count, "State key should exist") + assert.Equal(t, stringStatus(types.ExecutorStatusACTIVE), string(resp.Kvs[0].Value)) + + resp, err = tc.Client.Get(ctx, reportedShardsKey) + require.NoError(t, err) + require.Equal(t, int64(1), resp.Count, "Reported shards key should exist") + + var reportedShards map[string]*types.ShardStatusReport + err = json.Unmarshal(resp.Kvs[0].Value, &reportedShards) + require.NoError(t, err) + require.Len(t, reportedShards, 1) + assert.Equal(t, types.ShardStatusREADY, reportedShards["shard-TestRecordHeartbeat"].Status) +} + +func TestGetHeartbeat(t *testing.T) { + tc := testhelper.SetupStoreTestCluster(t) + executorStore := createStore(t, tc) + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + nowTS := time.Now().Unix() + + executorID := "executor-get" + req := store.HeartbeatState{ + Status: types.ExecutorStatusDRAINING, + LastHeartbeat: nowTS, + } + + // 1. Record a heartbeat + err := executorStore.RecordHeartbeat(ctx, tc.Namespace, executorID, req) + require.NoError(t, err) + + // Assign shards to one executor + assignState := map[string]store.AssignedState{ + executorID: { + AssignedShards: map[string]*types.ShardAssignment{ + "shard-1": {Status: types.AssignmentStatusREADY}, + }, + }, + } + require.NoError(t, executorStore.AssignShards(ctx, tc.Namespace, store.AssignShardsRequest{ + NewState: &store.NamespaceState{ + ShardAssignments: assignState, + }, + }, store.NopGuard())) + + // 2. Get the heartbeat back + hb, assignedFromDB, err := executorStore.GetHeartbeat(ctx, tc.Namespace, executorID) + require.NoError(t, err) + require.NotNil(t, hb) + + // 3. Verify the state + assert.Equal(t, types.ExecutorStatusDRAINING, hb.Status) + assert.Equal(t, nowTS, hb.LastHeartbeat) + require.NotNil(t, assignedFromDB.AssignedShards) + assert.Equal(t, assignState[executorID].AssignedShards, assignedFromDB.AssignedShards) + + // 4. Test getting a non-existent executor + _, _, err = executorStore.GetHeartbeat(ctx, tc.Namespace, "executor-non-existent") + require.Error(t, err) + assert.ErrorIs(t, err, store.ErrExecutorNotFound) +} + +// TestGetState verifies that the store can accurately retrieve the state of all executors. +func TestGetState(t *testing.T) { + tc := testhelper.SetupStoreTestCluster(t) + executorStore := createStore(t, tc) + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + executorID1 := "exec-TestGetState-1" + executorID2 := "exec-TestGetState-2" + shardID1 := "shard-1" + shardID2 := "shard-2" + + // Setup: Record heartbeats and assign shards. + require.NoError(t, executorStore.RecordHeartbeat(ctx, tc.Namespace, executorID1, store.HeartbeatState{Status: types.ExecutorStatusACTIVE})) + require.NoError(t, executorStore.RecordHeartbeat(ctx, tc.Namespace, executorID2, store.HeartbeatState{Status: types.ExecutorStatusDRAINING})) + require.NoError(t, executorStore.AssignShards(ctx, tc.Namespace, store.AssignShardsRequest{ + NewState: &store.NamespaceState{ + ShardAssignments: map[string]store.AssignedState{ + executorID1: {AssignedShards: map[string]*types.ShardAssignment{shardID1: {}}}, + executorID2: {AssignedShards: map[string]*types.ShardAssignment{shardID2: {}}}, + }, + }, + }, store.NopGuard())) + + // Action: Get the state. + namespaceState, err := executorStore.GetState(ctx, tc.Namespace) + require.NoError(t, err) + + // Verification: + // Check Executors + require.Len(t, namespaceState.Executors, 2, "Should retrieve two heartbeat states") + assert.Equal(t, types.ExecutorStatusACTIVE, namespaceState.Executors[executorID1].Status) + assert.Equal(t, types.ExecutorStatusDRAINING, namespaceState.Executors[executorID2].Status) + + // Check ShardAssignments (from executor records) + require.Len(t, namespaceState.ShardAssignments, 2, "Should retrieve two assignment states") + assert.Contains(t, namespaceState.ShardAssignments[executorID1].AssignedShards, shardID1) + assert.Contains(t, namespaceState.ShardAssignments[executorID2].AssignedShards, shardID2) +} + +// TestAssignShards_WithRevisions tests the optimistic locking logic of AssignShards. +func TestAssignShards_WithRevisions(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + executorID1 := "exec-rev-1" + executorID2 := "exec-rev-2" + + t.Run("Success", func(t *testing.T) { + tc := testhelper.SetupStoreTestCluster(t) + executorStore := createStore(t, tc) + recordHeartbeats(ctx, t, executorStore, tc.Namespace, executorID1, executorID2) + + // Define a new state: assign shard1 to exec1 + newState := &store.NamespaceState{ + ShardAssignments: map[string]store.AssignedState{ + executorID1: {AssignedShards: map[string]*types.ShardAssignment{"shard-1": {}}}, + }, + } + + // Assign - should succeed + err := executorStore.AssignShards(ctx, tc.Namespace, store.AssignShardsRequest{NewState: newState}, store.NopGuard()) + require.NoError(t, err) + + // Verify the assignment + state, err := executorStore.GetState(ctx, tc.Namespace) + require.NoError(t, err) + assert.Contains(t, state.ShardAssignments[executorID1].AssignedShards, "shard-1") + }) + + t.Run("ConflictOnNewShard", func(t *testing.T) { + tc := testhelper.SetupStoreTestCluster(t) + executorStore := createStore(t, tc) + recordHeartbeats(ctx, t, executorStore, tc.Namespace, executorID1, executorID2) + + // Process A defines its desired state: assign shard-new to exec1 + processAState := &store.NamespaceState{ + ShardAssignments: map[string]store.AssignedState{ + executorID1: {AssignedShards: map[string]*types.ShardAssignment{"shard-new": {}}}, + executorID2: {}, + }, + } + + // Process B defines its desired state: assign shard-new to exec2 + processBState := &store.NamespaceState{ + ShardAssignments: map[string]store.AssignedState{ + executorID1: {}, + executorID2: {AssignedShards: map[string]*types.ShardAssignment{"shard-new": {}}}, + }, + } + + // Process A succeeds + err := executorStore.AssignShards(ctx, tc.Namespace, store.AssignShardsRequest{NewState: processAState}, store.NopGuard()) + require.NoError(t, err) + + // Process B tries to commit, but its revision check for shard-new (rev=0) will fail. + err = executorStore.AssignShards(ctx, tc.Namespace, store.AssignShardsRequest{NewState: processBState}, store.NopGuard()) + require.Error(t, err) + assert.ErrorIs(t, err, store.ErrVersionConflict) + }) + + t.Run("ConflictOnExistingShard", func(t *testing.T) { + tc := testhelper.SetupStoreTestCluster(t) + executorStore := createStore(t, tc) + recordHeartbeats(ctx, t, executorStore, tc.Namespace, executorID1, executorID2) + + shardID := "shard-to-move" + // 1. Setup: Assign the shard to executor1 + setupState, err := executorStore.GetState(ctx, tc.Namespace) + require.NoError(t, err) + setupState.ShardAssignments = map[string]store.AssignedState{ + executorID1: {AssignedShards: map[string]*types.ShardAssignment{shardID: {}}}, + } + require.NoError(t, executorStore.AssignShards(ctx, tc.Namespace, store.AssignShardsRequest{NewState: setupState}, store.NopGuard())) + + // 2. Process A reads the state, intending to move the shard to executor2 + stateForProcA, err := executorStore.GetState(ctx, tc.Namespace) + require.NoError(t, err) + stateForProcA.ShardAssignments = map[string]store.AssignedState{ + executorID1: {ModRevision: stateForProcA.ShardAssignments[executorID1].ModRevision}, + executorID2: {AssignedShards: map[string]*types.ShardAssignment{shardID: {}}, ModRevision: 0}, + } + + // 3. In the meantime, another process makes a different change (e.g., re-assigns to same executor, which changes revision) + intermediateState, err := executorStore.GetState(ctx, tc.Namespace) + require.NoError(t, err) + intermediateState.ShardAssignments = map[string]store.AssignedState{ + executorID1: { + AssignedShards: map[string]*types.ShardAssignment{shardID: {}}, + ModRevision: intermediateState.ShardAssignments[executorID1].ModRevision, + }, + } + require.NoError(t, executorStore.AssignShards(ctx, tc.Namespace, store.AssignShardsRequest{NewState: intermediateState}, store.NopGuard())) + + // 4. Process A tries to commit its change. It will fail because its stored revision for the shard is now stale. + err = executorStore.AssignShards(ctx, tc.Namespace, store.AssignShardsRequest{NewState: stateForProcA}, store.NopGuard()) + require.Error(t, err) + assert.ErrorIs(t, err, store.ErrVersionConflict) + }) + + t.Run("NoChanges", func(t *testing.T) { + tc := testhelper.SetupStoreTestCluster(t) + executorStore := createStore(t, tc) + recordHeartbeats(ctx, t, executorStore, tc.Namespace, executorID1, executorID2) + + // Get the current state + state, err := executorStore.GetState(ctx, tc.Namespace) + require.NoError(t, err) + + // Call AssignShards with the same assignments + err = executorStore.AssignShards(ctx, tc.Namespace, store.AssignShardsRequest{NewState: state}, store.NopGuard()) + require.NoError(t, err, "Assigning with no changes should succeed") + }) +} + +// TestGuardedOperations verifies that AssignShards and DeleteExecutors respect the leader guard. +func TestGuardedOperations(t *testing.T) { + tc := testhelper.SetupStoreTestCluster(t) + executorStore := createStore(t, tc) + ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) + defer cancel() + + namespace := "test-guarded-ns" + executorID := "exec-to-delete" + + // 1. Create two potential leaders + // FIX: Use the correct constructor for the leader elector. + elector, err := leaderstore.NewLeaderStore(leaderstore.StoreParams{Client: tc.Client, Cfg: tc.LeaderCfg, Lifecycle: fxtest.NewLifecycle(t)}) + require.NoError(t, err) + election1, err := elector.CreateElection(ctx, namespace) + require.NoError(t, err) + defer election1.Cleanup(ctx) + election2, err := elector.CreateElection(ctx, namespace) + require.NoError(t, err) + defer election2.Cleanup(ctx) + + // 2. First node becomes leader + require.NoError(t, election1.Campaign(ctx, "host-1")) + validGuard := election1.Guard() + + // 3. Use the valid guard to assign shards - should succeed + assignState := map[string]store.AssignedState{"exec-1": {}} + err = executorStore.AssignShards(ctx, tc.Namespace, store.AssignShardsRequest{NewState: &store.NamespaceState{ShardAssignments: assignState}}, validGuard) + require.NoError(t, err, "Assigning shards with a valid leader guard should succeed") + + // 4. First node resigns, second node becomes leader + require.NoError(t, election1.Resign(ctx)) + require.NoError(t, election2.Campaign(ctx, "host-2")) + + // 5. Use the now-invalid guard from the first leader - should fail + err = executorStore.AssignShards(ctx, tc.Namespace, store.AssignShardsRequest{NewState: &store.NamespaceState{ShardAssignments: assignState}}, validGuard) + require.Error(t, err, "Assigning shards with a stale leader guard should fail") + + // 6. Use the NopGuard to delete an executor - should succeed + require.NoError(t, executorStore.RecordHeartbeat(ctx, tc.Namespace, executorID, store.HeartbeatState{Status: types.ExecutorStatusACTIVE})) + err = executorStore.DeleteExecutors(ctx, tc.Namespace, []string{executorID}, store.NopGuard()) + require.NoError(t, err, "Deleting an executor without a guard should succeed") + + // Verify deletion + newState, err := executorStore.GetState(ctx, namespace) + require.NoError(t, err) + _, ok := newState.ShardAssignments[executorID] + require.False(t, ok, "Executor should have been deleted") +} + +// TestSubscribe verifies that the subscription channel receives notifications for significant changes. +func TestSubscribe(t *testing.T) { + tc := testhelper.SetupStoreTestCluster(t) + executorStore := createStore(t, tc) + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + executorID := "exec-sub" + + // Start subscription + sub, err := executorStore.Subscribe(ctx, tc.Namespace) + require.NoError(t, err) + + // Manually put a heartbeat update, which is an insignificant change + heartbeatKey, err := etcdkeys.BuildExecutorKey(tc.EtcdPrefix, tc.Namespace, executorID, "heartbeat") + require.NoError(t, err) + _, err = tc.Client.Put(ctx, heartbeatKey, "timestamp") + require.NoError(t, err) + + select { + case <-sub: + t.Fatal("Should not receive notification for a heartbeat-only update") + case <-time.After(100 * time.Millisecond): + // Expected behavior + } + + // Now update the reported shards, which IS a significant change + reportedShardsKey, err := etcdkeys.BuildExecutorKey(tc.EtcdPrefix, tc.Namespace, executorID, "reported_shards") + require.NoError(t, err) + _, err = tc.Client.Put(ctx, reportedShardsKey, `{"shard-1":{"status":"running"}}`) + require.NoError(t, err) + + select { + case rev, ok := <-sub: + require.True(t, ok, "Channel should be open") + assert.Greater(t, rev, int64(0), "Should receive a valid revision for reported shards change") + case <-time.After(1 * time.Second): + t.Fatal("Should have received a notification for a reported shards change") + } +} + +func TestDeleteExecutors_Empty(t *testing.T) { + tc := testhelper.SetupStoreTestCluster(t) + executorStore := createStore(t, tc) + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + err := executorStore.DeleteExecutors(ctx, tc.Namespace, []string{}, store.NopGuard()) + require.NoError(t, err) +} + +// TestDeleteExecutors covers various scenarios for the DeleteExecutors method. +func TestDeleteExecutors(t *testing.T) { + tc := testhelper.SetupStoreTestCluster(t) + executorStore := createStore(t, tc) + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + // Setup: Create two active executors for the tests. + executorID1 := "executor-to-delete-1" + executorID2 := "executor-to-delete-2" + survivingExecutorID := "executor-survivor" + require.NoError(t, executorStore.RecordHeartbeat(ctx, tc.Namespace, executorID1, store.HeartbeatState{Status: types.ExecutorStatusACTIVE})) + require.NoError(t, executorStore.RecordHeartbeat(ctx, tc.Namespace, executorID2, store.HeartbeatState{Status: types.ExecutorStatusACTIVE})) + require.NoError(t, executorStore.RecordHeartbeat(ctx, tc.Namespace, survivingExecutorID, store.HeartbeatState{Status: types.ExecutorStatusACTIVE})) + + t.Run("SucceedsForNonExistentExecutor", func(t *testing.T) { + // Action: Delete a non-existent executor. + err := executorStore.DeleteExecutors(ctx, tc.Namespace, []string{"non-existent-executor"}, store.NopGuard()) + // Verification: Should not return an error. + require.NoError(t, err) + }) + + t.Run("DeletesMultipleExecutors", func(t *testing.T) { + // Setup: Create and assign shards to multiple executors. + execToDelete1 := "multi-delete-1" + execToDelete2 := "multi-delete-2" + execToKeep := "multi-keep-1" + shardOfDeletedExecutor1 := "multi-shard-1" + shardOfDeletedExecutor2 := "multi-shard-2" + shardOfSurvivingExecutor := "multi-shard-keep" + + require.NoError(t, executorStore.RecordHeartbeat(ctx, tc.Namespace, execToDelete1, store.HeartbeatState{Status: types.ExecutorStatusACTIVE})) + require.NoError(t, executorStore.RecordHeartbeat(ctx, tc.Namespace, execToDelete2, store.HeartbeatState{Status: types.ExecutorStatusACTIVE})) + require.NoError(t, executorStore.RecordHeartbeat(ctx, tc.Namespace, execToKeep, store.HeartbeatState{Status: types.ExecutorStatusACTIVE})) + + require.NoError(t, executorStore.AssignShard(ctx, tc.Namespace, shardOfDeletedExecutor1, execToDelete1)) + require.NoError(t, executorStore.AssignShard(ctx, tc.Namespace, shardOfDeletedExecutor2, execToDelete2)) + require.NoError(t, executorStore.AssignShard(ctx, tc.Namespace, shardOfSurvivingExecutor, execToKeep)) + + // Action: Delete two of the three executors in one call. + err := executorStore.DeleteExecutors(ctx, tc.Namespace, []string{execToDelete1, execToDelete2}, store.NopGuard()) + require.NoError(t, err) + + // Verification: + // 1. Check deleted executors are gone. + _, _, err = executorStore.GetHeartbeat(ctx, tc.Namespace, execToDelete1) + assert.ErrorIs(t, err, store.ErrExecutorNotFound, "Executor 1 should be gone") + + _, _, err = executorStore.GetHeartbeat(ctx, tc.Namespace, execToDelete2) + assert.ErrorIs(t, err, store.ErrExecutorNotFound, "Executor 2 should be gone") + + // 2. Check that the surviving executor remain. + _, _, err = executorStore.GetHeartbeat(ctx, tc.Namespace, execToKeep) + assert.NoError(t, err, "Surviving executor should still exist") + }) +} + +func TestParseExecutorKey_Errors(t *testing.T) { + tc := testhelper.SetupStoreTestCluster(t) + + _, _, err := etcdkeys.ParseExecutorKey(tc.EtcdPrefix, tc.Namespace, "/wrong/prefix/exec/heartbeat") + require.Error(t, err) + assert.Contains(t, err.Error(), "does not have expected prefix") + + key := etcdkeys.BuildExecutorPrefix(tc.EtcdPrefix, tc.Namespace) + "too/many/parts" + _, _, err = etcdkeys.ParseExecutorKey(tc.EtcdPrefix, tc.Namespace, key) + require.Error(t, err) + assert.Contains(t, err.Error(), "unexpected key format") +} + +// TestAssignAndGetShardOwnerRoundtrip verifies the successful assignment and retrieval of a shard owner. +func TestAssignAndGetShardOwnerRoundtrip(t *testing.T) { + tc := testhelper.SetupStoreTestCluster(t) + executorStore := createStore(t, tc) + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + executorID := "executor-roundtrip" + shardID := "shard-roundtrip" + + // Setup: Create an active executor. + err := executorStore.RecordHeartbeat(ctx, tc.Namespace, executorID, store.HeartbeatState{Status: types.ExecutorStatusACTIVE}) + require.NoError(t, err) + + // 1. Assign a shard to the active executor. + err = executorStore.AssignShard(ctx, tc.Namespace, shardID, executorID) + require.NoError(t, err, "Should successfully assign shard to an active executor") + + // 2. Get the owner and verify it's the correct executor. + state, err := executorStore.GetState(ctx, tc.Namespace) + require.NoError(t, err) + assert.Contains(t, state.ShardAssignments[executorID].AssignedShards, shardID) +} + +// TestAssignShardErrors tests the various error conditions when assigning a shard. +func TestAssignShardErrors(t *testing.T) { + tc := testhelper.SetupStoreTestCluster(t) + executorStore := createStore(t, tc) + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + activeExecutorID := "executor-active-errors" + drainingExecutorID := "executor-draining-errors" + shardID1 := "shard-err-1" + shardID2 := "shard-err-2" + + // Setup: Create an active and a draining executor, and assign one shard. + err := executorStore.RecordHeartbeat(ctx, tc.Namespace, activeExecutorID, store.HeartbeatState{Status: types.ExecutorStatusACTIVE}) + require.NoError(t, err) + err = executorStore.RecordHeartbeat(ctx, tc.Namespace, drainingExecutorID, store.HeartbeatState{Status: types.ExecutorStatusDRAINING}) + require.NoError(t, err) + err = executorStore.AssignShard(ctx, tc.Namespace, shardID1, activeExecutorID) + require.NoError(t, err) + + // Case 1: Assigning an already-assigned shard. + err = executorStore.AssignShard(ctx, tc.Namespace, shardID1, activeExecutorID) + require.Error(t, err, "Should fail to assign an already-assigned shard") + assert.ErrorAs(t, err, new(*store.ErrShardAlreadyAssigned)) + + // Case 2: Assigning to a non-existent executor. + err = executorStore.AssignShard(ctx, tc.Namespace, shardID2, "non-existent-executor") + require.Error(t, err, "Should fail to assign to a non-existent executor") + assert.ErrorIs(t, err, store.ErrExecutorNotFound, "Error should be ErrExecutorNotFound") + + // Case 3: Assigning to a non-active (draining) executor. + err = executorStore.AssignShard(ctx, tc.Namespace, shardID2, drainingExecutorID) + require.Error(t, err, "Should fail to assign to a draining executor") + assert.ErrorIs(t, err, store.ErrVersionConflict, "Error should be ErrVersionConflict for non-active executor") +} + +// --- Test Setup --- + +func stringStatus(s types.ExecutorStatus) string { + res, err := json.Marshal(s) + if err != nil { + panic(err) + } + return string(res) +} + +func recordHeartbeats(ctx context.Context, t *testing.T, executorStore store.Store, namespace string, executorIDs ...string) { + t.Helper() + + for _, executorID := range executorIDs { + require.NoError(t, executorStore.RecordHeartbeat(ctx, namespace, executorID, store.HeartbeatState{Status: types.ExecutorStatusACTIVE})) + } +} + +func createStore(t *testing.T, tc *testhelper.StoreTestCluster) store.Store { + t.Helper() + store, err := NewStore(ExecutorStoreParams{ + Client: tc.Client, + Cfg: tc.LeaderCfg, + Lifecycle: fxtest.NewLifecycle(t), + Logger: testlogger.New(t), + }) + require.NoError(t, err) + return store +} diff --git a/service/sharddistributor/store/etcd/executorstore/shardcache/namespaceshardcache.go b/service/sharddistributor/store/etcd/executorstore/shardcache/namespaceshardcache.go new file mode 100644 index 00000000000..31ae6dfdfaa --- /dev/null +++ b/service/sharddistributor/store/etcd/executorstore/shardcache/namespaceshardcache.go @@ -0,0 +1,154 @@ +package shardcache + +import ( + "context" + "encoding/json" + "fmt" + "sync" + + clientv3 "go.etcd.io/etcd/client/v3" + + "github.com/uber/cadence/common/log" + "github.com/uber/cadence/common/log/tag" + "github.com/uber/cadence/service/sharddistributor/store" + "github.com/uber/cadence/service/sharddistributor/store/etcd/etcdkeys" +) + +type namespaceShardToExecutor struct { + sync.RWMutex + + shardToExecutor map[string]string + executorRevision map[string]int64 + namespace string + etcdPrefix string + changeUpdateChannel clientv3.WatchChan + stopCh chan struct{} + logger log.Logger + client *clientv3.Client +} + +func newNamespaceShardToExecutor(etcdPrefix, namespace string, client *clientv3.Client, stopCh chan struct{}, logger log.Logger) (*namespaceShardToExecutor, error) { + // Start listening + watchPrefix := etcdkeys.BuildExecutorPrefix(etcdPrefix, namespace) + watchChan := client.Watch(context.Background(), watchPrefix, clientv3.WithPrefix()) + + return &namespaceShardToExecutor{ + shardToExecutor: make(map[string]string), + executorRevision: make(map[string]int64), + namespace: namespace, + etcdPrefix: etcdPrefix, + changeUpdateChannel: watchChan, + stopCh: stopCh, + logger: logger, + client: client, + }, nil +} + +func (n *namespaceShardToExecutor) Start(wg *sync.WaitGroup) { + wg.Add(1) + go func() { + defer wg.Done() + n.nameSpaceRefreashLoop() + }() +} + +func (n *namespaceShardToExecutor) GetShardOwner(ctx context.Context, shardID string) (string, error) { + n.RLock() + owner, ok := n.shardToExecutor[shardID] + n.RUnlock() + + if ok { + return owner, nil + } + + // Force refresh the cache + err := n.refresh(ctx) + if err != nil { + return "", fmt.Errorf("refresh for namespace %s: %w", n.namespace, err) + } + + // Check the cache again after refresh + n.RLock() + owner, ok = n.shardToExecutor[shardID] + n.RUnlock() + if ok { + return owner, nil + } + + return "", store.ErrShardNotFound +} + +func (n *namespaceShardToExecutor) GetExecutorModRevisionCmp() ([]clientv3.Cmp, error) { + n.RLock() + defer n.RUnlock() + comparisons := []clientv3.Cmp{} + for executor, revision := range n.executorRevision { + executorAssignedStateKey, err := etcdkeys.BuildExecutorKey(n.etcdPrefix, n.namespace, executor, etcdkeys.ExecutorAssignedStateKey) + if err != nil { + return nil, fmt.Errorf("build executor assigned state key: %w", err) + } + comparisons = append(comparisons, clientv3.Compare(clientv3.ModRevision(executorAssignedStateKey), "=", revision)) + } + + return comparisons, nil +} + +func (n *namespaceShardToExecutor) nameSpaceRefreashLoop() { + for { + select { + case <-n.stopCh: + return + case watchResp := <-n.changeUpdateChannel: + shouldRefresh := false + for _, event := range watchResp.Events { + _, keyType, keyErr := etcdkeys.ParseExecutorKey(n.etcdPrefix, n.namespace, string(event.Kv.Key)) + if keyErr == nil && keyType == etcdkeys.ExecutorAssignedStateKey { + shouldRefresh = true + break + } + } + if shouldRefresh { + err := n.refresh(context.Background()) + if err != nil { + n.logger.Error("failed to refresh namespace shard to executor", tag.ShardNamespace(n.namespace), tag.Error(err)) + } + } + + } + } +} + +func (n *namespaceShardToExecutor) refresh(ctx context.Context) error { + + executorPrefix := etcdkeys.BuildExecutorPrefix(n.etcdPrefix, n.namespace) + + resp, err := n.client.Get(ctx, executorPrefix, clientv3.WithPrefix()) + if err != nil { + return fmt.Errorf("get executor prefix for namespace %s: %w", n.namespace, err) + } + + n.Lock() + defer n.Unlock() + // Clear the cache, so we don't have any stale data + n.shardToExecutor = make(map[string]string) + n.executorRevision = make(map[string]int64) + + for _, kv := range resp.Kvs { + executorID, keyType, keyErr := etcdkeys.ParseExecutorKey(n.etcdPrefix, n.namespace, string(kv.Key)) + if keyErr != nil || keyType != etcdkeys.ExecutorAssignedStateKey { + continue + } + + var assignedState store.AssignedState + err = json.Unmarshal(kv.Value, &assignedState) + if err != nil { + return fmt.Errorf("unmarshal assigned state: %w", err) + } + for shardID := range assignedState.AssignedShards { + n.shardToExecutor[shardID] = executorID + n.executorRevision[executorID] = kv.ModRevision + } + } + + return nil +} diff --git a/service/sharddistributor/store/etcd/executorstore/shardcache/namespaceshardcache_test.go b/service/sharddistributor/store/etcd/executorstore/shardcache/namespaceshardcache_test.go new file mode 100644 index 00000000000..3ab0e6c2224 --- /dev/null +++ b/service/sharddistributor/store/etcd/executorstore/shardcache/namespaceshardcache_test.go @@ -0,0 +1,74 @@ +package shardcache + +import ( + "context" + "encoding/json" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/uber/cadence/common/log/testlogger" + "github.com/uber/cadence/common/types" + "github.com/uber/cadence/service/sharddistributor/store" + "github.com/uber/cadence/service/sharddistributor/store/etcd/etcdkeys" + "github.com/uber/cadence/service/sharddistributor/store/etcd/testhelper" +) + +func TestNamespaceShardToExecutor_Lifecycle(t *testing.T) { + testCluster := testhelper.SetupStoreTestCluster(t) + + logger := testlogger.New(t) + + stopCh := make(chan struct{}) + + // Setup: Create an assigned state for the executor + assignedState := &store.AssignedState{ + AssignedShards: map[string]*types.ShardAssignment{ + "shard-1": {Status: types.AssignmentStatusREADY}, + }, + } + assignedStateJSON, err := json.Marshal(assignedState) + require.NoError(t, err) + + executor1AssignedStateKey, err := etcdkeys.BuildExecutorKey(testCluster.EtcdPrefix, "test-ns", "executor-1", etcdkeys.ExecutorAssignedStateKey) + require.NoError(t, err) + testCluster.Client.Put(context.Background(), executor1AssignedStateKey, string(assignedStateJSON)) + + // First call should get the state and return the owner as executor-1 + namespaceShardToExecutor, err := newNamespaceShardToExecutor(testCluster.EtcdPrefix, "test-ns", testCluster.Client, stopCh, logger) + assert.NoError(t, err) + namespaceShardToExecutor.Start(&sync.WaitGroup{}) + + owner, err := namespaceShardToExecutor.GetShardOwner(context.Background(), "shard-1") + assert.NoError(t, err) + assert.Equal(t, "executor-1", owner) + + // Check the cache is populated + _, ok := namespaceShardToExecutor.executorRevision["executor-1"] + assert.True(t, ok) + assert.Equal(t, "executor-1", namespaceShardToExecutor.shardToExecutor["shard-1"]) + + // Send a message on the channel to trigger a refresh + // Change the owner to executor-2 + delete(assignedState.AssignedShards, "shard-1") + assignedState.AssignedShards["shard-2"] = &types.ShardAssignment{Status: types.AssignmentStatusREADY} + assignedStateJSON, err = json.Marshal(assignedState) + require.NoError(t, err) + + executor2AssignedStateKey, err := etcdkeys.BuildExecutorKey(testCluster.EtcdPrefix, "test-ns", "executor-2", etcdkeys.ExecutorAssignedStateKey) + require.NoError(t, err) + testCluster.Client.Put(context.Background(), executor2AssignedStateKey, string(assignedStateJSON)) + + // Sleep a bit to allow the cache to refresh + time.Sleep(100 * time.Millisecond) + + // Check that executor-2 and shard-2 is in the cache + _, ok = namespaceShardToExecutor.executorRevision["executor-2"] + assert.True(t, ok) + assert.Equal(t, "executor-2", namespaceShardToExecutor.shardToExecutor["shard-2"]) + + close(stopCh) +} diff --git a/service/sharddistributor/store/etcd/executorstore/shardcache/shardcache.go b/service/sharddistributor/store/etcd/executorstore/shardcache/shardcache.go new file mode 100644 index 00000000000..c10e98b007a --- /dev/null +++ b/service/sharddistributor/store/etcd/executorstore/shardcache/shardcache.go @@ -0,0 +1,84 @@ +package shardcache + +import ( + "context" + "fmt" + "sync" + + clientv3 "go.etcd.io/etcd/client/v3" + + "github.com/uber/cadence/common/log" +) + +type NamespaceToShards map[string]*namespaceShardToExecutor +type ShardToExecutorCache struct { + sync.RWMutex + namespaceToShards NamespaceToShards + client *clientv3.Client + stopC chan struct{} + logger log.Logger + prefix string + wg sync.WaitGroup +} + +func NewShardToExecutorCache( + prefix string, + client *clientv3.Client, + logger log.Logger, +) *ShardToExecutorCache { + shardCache := &ShardToExecutorCache{ + namespaceToShards: make(NamespaceToShards), + stopC: make(chan struct{}), + logger: logger, + prefix: prefix, + client: client, + wg: sync.WaitGroup{}, + } + + return shardCache +} + +func (s *ShardToExecutorCache) Start() {} + +func (s *ShardToExecutorCache) Stop() { + close(s.stopC) + s.wg.Wait() +} + +func (s *ShardToExecutorCache) GetShardOwner(ctx context.Context, namespace, shardID string) (string, error) { + namespaceShardToExecutor, err := s.getNamespaceShardToExecutor(namespace) + if err != nil { + return "", fmt.Errorf("get namespace shard to executor: %w", err) + } + return namespaceShardToExecutor.GetShardOwner(ctx, shardID) +} + +func (s *ShardToExecutorCache) GetExecutorModRevisionCmp(namespace string) ([]clientv3.Cmp, error) { + namespaceShardToExecutor, err := s.getNamespaceShardToExecutor(namespace) + if err != nil { + return nil, fmt.Errorf("get namespace shard to executor: %w", err) + } + return namespaceShardToExecutor.GetExecutorModRevisionCmp() +} + +func (s *ShardToExecutorCache) getNamespaceShardToExecutor(namespace string) (*namespaceShardToExecutor, error) { + s.RLock() + namespaceShardToExecutor, ok := s.namespaceToShards[namespace] + s.RUnlock() + + if ok { + return namespaceShardToExecutor, nil + } + + s.Lock() + defer s.Unlock() + + namespaceShardToExecutor, err := newNamespaceShardToExecutor(s.prefix, namespace, s.client, s.stopC, s.logger) + if err != nil { + return nil, fmt.Errorf("new namespace shard to executor: %w", err) + } + namespaceShardToExecutor.Start(&s.wg) + + s.namespaceToShards[namespace] = namespaceShardToExecutor + return namespaceShardToExecutor, nil +} diff --git a/service/sharddistributor/store/etcd/executorstore/shardcache/shardcache_test.go b/service/sharddistributor/store/etcd/executorstore/shardcache/shardcache_test.go new file mode 100644 index 00000000000..4db6d43e2b2 --- /dev/null +++ b/service/sharddistributor/store/etcd/executorstore/shardcache/shardcache_test.go @@ -0,0 +1,66 @@ +package shardcache + +import ( + "context" + "encoding/json" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + clientv3 "go.etcd.io/etcd/client/v3" + + "github.com/uber/cadence/common/log/testlogger" + "github.com/uber/cadence/common/types" + "github.com/uber/cadence/service/sharddistributor/store" + "github.com/uber/cadence/service/sharddistributor/store/etcd/etcdkeys" + "github.com/uber/cadence/service/sharddistributor/store/etcd/testhelper" +) + +func TestNewShardToExecutorCache(t *testing.T) { + logger := testlogger.New(t) + + client := &clientv3.Client{} + cache := NewShardToExecutorCache("some-prefix", client, logger) + + assert.NotNil(t, cache) + + assert.NotNil(t, cache.namespaceToShards) + assert.NotNil(t, cache.stopC) + + assert.Equal(t, logger, cache.logger) + assert.Equal(t, "some-prefix", cache.prefix) + assert.Equal(t, client, cache.client) +} + +func TestShardExecutorCache(t *testing.T) { + testCluster := testhelper.SetupStoreTestCluster(t) + + logger := testlogger.New(t) + + // Setup: Create an assigned state for the executor + assignedState := &store.AssignedState{ + AssignedShards: map[string]*types.ShardAssignment{ + "shard-1": {Status: types.AssignmentStatusREADY}, + }, + } + assignedStateJSON, err := json.Marshal(assignedState) + require.NoError(t, err) + + executorAssignedStateKey, err := etcdkeys.BuildExecutorKey(testCluster.EtcdPrefix, "test-ns", "executor-1", etcdkeys.ExecutorAssignedStateKey) + require.NoError(t, err) + testCluster.Client.Put(context.Background(), executorAssignedStateKey, string(assignedStateJSON)) + + cache := NewShardToExecutorCache(testCluster.EtcdPrefix, testCluster.Client, logger) + + cache.Start() + defer cache.Stop() + + // This will read the namespace from the store as the cache is empty + owner, err := cache.GetShardOwner(context.Background(), "test-ns", "shard-1") + assert.NoError(t, err) + assert.Equal(t, "executor-1", owner) + + // Check the cache is populated + assert.Greater(t, cache.namespaceToShards["test-ns"].executorRevision["executor-1"], int64(0)) + assert.Equal(t, "executor-1", cache.namespaceToShards["test-ns"].shardToExecutor["shard-1"]) +} diff --git a/service/sharddistributor/store/etcd/go.mod b/service/sharddistributor/store/etcd/go.mod index fd0fb24493d..3893a026804 100644 --- a/service/sharddistributor/store/etcd/go.mod +++ b/service/sharddistributor/store/etcd/go.mod @@ -11,6 +11,8 @@ require ( github.com/uber/cadence v0.0.0-00010101000000-000000000000 go.etcd.io/etcd/client/v3 v3.5.5 go.uber.org/fx v1.23.0 + go.uber.org/goleak v1.2.0 + go.uber.org/mock v0.5.0 gopkg.in/yaml.v2 v2.4.0 ) @@ -34,6 +36,7 @@ require ( github.com/golang-jwt/jwt/v5 v5.2.0 // indirect github.com/golang/mock v1.6.0 // indirect github.com/golang/protobuf v1.5.4 // indirect + github.com/google/gofuzz v1.0.0 // indirect github.com/google/uuid v1.6.0 // indirect github.com/jmespath/go-jmespath v0.4.0 // indirect github.com/jonboulle/clockwork v0.5.0 // indirect @@ -66,7 +69,6 @@ require ( go.uber.org/cadence v0.19.0 // indirect go.uber.org/config v1.4.0 // indirect go.uber.org/dig v1.18.0 // indirect - go.uber.org/mock v0.5.0 // indirect go.uber.org/multierr v1.10.0 // indirect go.uber.org/net/metrics v1.3.0 // indirect go.uber.org/thriftrw v1.29.2 // indirect diff --git a/service/sharddistributor/store/etcd/etcdleaderstore.go b/service/sharddistributor/store/etcd/leaderstore/etcdleaderstore.go similarity index 94% rename from service/sharddistributor/store/etcd/etcdleaderstore.go rename to service/sharddistributor/store/etcd/leaderstore/etcdleaderstore.go index 63b93ef9bd4..12211929591 100644 --- a/service/sharddistributor/store/etcd/etcdleaderstore.go +++ b/service/sharddistributor/store/etcd/leaderstore/etcdleaderstore.go @@ -1,4 +1,4 @@ -package etcd +package leaderstore import ( "context" @@ -9,6 +9,7 @@ import ( "go.etcd.io/etcd/client/v3/concurrency" "go.uber.org/fx" + "github.com/uber/cadence/common/log" "github.com/uber/cadence/service/sharddistributor/config" "github.com/uber/cadence/service/sharddistributor/store" ) @@ -18,15 +19,6 @@ type LeaderStore struct { electionConfig etcdCfg } -type LeaderStoreParams struct { - fx.In - - // Client could be provided externally. - Client *clientv3.Client `optional:"true"` - Cfg config.ShardDistribution - Lifecycle fx.Lifecycle -} - type etcdCfg struct { Endpoints []string `yaml:"endpoints"` DialTimeout time.Duration `yaml:"dialTimeout"` @@ -34,6 +26,16 @@ type etcdCfg struct { ElectionTTL time.Duration `yaml:"electionTTL"` } +// StoreParams defines the dependencies for the etcd store, for use with fx. +type StoreParams struct { + fx.In + + Client *clientv3.Client `optional:"true"` + Cfg config.ShardDistribution + Lifecycle fx.Lifecycle + Logger log.Logger +} + // NewLeaderStore creates a new leaderstore backed by ETCD. func NewLeaderStore(p StoreParams) (store.Elector, error) { if !p.Cfg.Enabled { diff --git a/service/sharddistributor/store/etcd/etcdleaderstore_test.go b/service/sharddistributor/store/etcd/leaderstore/etcdleaderstore_test.go similarity index 86% rename from service/sharddistributor/store/etcd/etcdleaderstore_test.go rename to service/sharddistributor/store/etcd/leaderstore/etcdleaderstore_test.go index d2628de91d6..aca73713ee3 100644 --- a/service/sharddistributor/store/etcd/etcdleaderstore_test.go +++ b/service/sharddistributor/store/etcd/leaderstore/etcdleaderstore_test.go @@ -1,26 +1,4 @@ -// The MIT License (MIT) - -// Copyright (c) 2017-2020 Uber Technologies Inc. - -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in all -// copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -// SOFTWARE. - -package etcd +package leaderstore import ( "context" diff --git a/service/sharddistributor/store/etcd/module.go b/service/sharddistributor/store/etcd/module.go new file mode 100644 index 00000000000..c5b1e147aab --- /dev/null +++ b/service/sharddistributor/store/etcd/module.go @@ -0,0 +1,13 @@ +package etcd + +import ( + "go.uber.org/fx" + + "github.com/uber/cadence/service/sharddistributor/store/etcd/executorstore" + "github.com/uber/cadence/service/sharddistributor/store/etcd/leaderstore" +) + +var Module = fx.Module("etcd", + fx.Provide(executorstore.NewStore), + fx.Provide(leaderstore.NewLeaderStore), +) diff --git a/service/sharddistributor/store/etcd/testhelper/testhelper.go b/service/sharddistributor/store/etcd/testhelper/testhelper.go new file mode 100644 index 00000000000..4c03a4850f3 --- /dev/null +++ b/service/sharddistributor/store/etcd/testhelper/testhelper.go @@ -0,0 +1,75 @@ +package testhelper + +import ( + "context" + "flag" + "fmt" + "os" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/require" + clientv3 "go.etcd.io/etcd/client/v3" + "gopkg.in/yaml.v2" + + "github.com/uber/cadence/common/config" + shardDistributorCfg "github.com/uber/cadence/service/sharddistributor/config" + "github.com/uber/cadence/service/sharddistributor/store/etcd/etcdkeys" + "github.com/uber/cadence/testflags" +) + +type StoreTestCluster struct { + EtcdPrefix string + Namespace string + LeaderCfg shardDistributorCfg.ShardDistribution + Client *clientv3.Client +} + +func SetupStoreTestCluster(t *testing.T) *StoreTestCluster { + t.Helper() + flag.Parse() + testflags.RequireEtcd(t) + + namespace := fmt.Sprintf("ns-%s", strings.ToLower(t.Name())) + + endpoints := strings.Split(os.Getenv("ETCD_ENDPOINTS"), ",") + if len(endpoints) == 0 || endpoints[0] == "" { + endpoints = []string{"localhost:2379"} + } + t.Logf("ETCD endpoints: %v", endpoints) + + etcdPrefix := fmt.Sprintf("/test-shard-store/%s", t.Name()) + etcdConfigRaw := map[string]interface{}{ + "endpoints": endpoints, + "dialTimeout": "5s", + "prefix": etcdPrefix, + "electionTTL": "5s", // Needed for leader config part + } + + yamlCfg, err := yaml.Marshal(etcdConfigRaw) + require.NoError(t, err) + var yamlNode *config.YamlNode + err = yaml.Unmarshal(yamlCfg, &yamlNode) + require.NoError(t, err) + + leaderCfg := shardDistributorCfg.ShardDistribution{ + Enabled: true, + Store: shardDistributorCfg.Store{StorageParams: yamlNode}, + LeaderStore: shardDistributorCfg.Store{StorageParams: yamlNode}, + } + + client, err := clientv3.New(clientv3.Config{Endpoints: endpoints, DialTimeout: 5 * time.Second}) + require.NoError(t, err) + t.Cleanup(func() { client.Close() }) + + _, err = client.Delete(context.Background(), etcdkeys.BuildNamespacePrefix(etcdPrefix, namespace), clientv3.WithPrefix()) + require.NoError(t, err) + + return &StoreTestCluster{ + Namespace: namespace, + EtcdPrefix: etcdPrefix, + LeaderCfg: leaderCfg, + Client: client, + } +} diff --git a/service/sharddistributor/store/state.go b/service/sharddistributor/store/state.go index 132c66f4eca..2020df0988d 100644 --- a/service/sharddistributor/store/state.go +++ b/service/sharddistributor/store/state.go @@ -13,16 +13,15 @@ type HeartbeatState struct { type AssignedState struct { AssignedShards map[string]*types.ShardAssignment `json:"assigned_shards"` // What we assigned LastUpdated int64 `json:"last_updated"` + ModRevision int64 `json:"mod_revision"` } type NamespaceState struct { Executors map[string]HeartbeatState - Shards map[string]ShardState ShardAssignments map[string]AssignedState GlobalRevision int64 } type ShardState struct { ExecutorID string - Revision int64 } diff --git a/service/sharddistributor/store/store.go b/service/sharddistributor/store/store.go index d14fcb90d84..31adffa58a1 100644 --- a/service/sharddistributor/store/store.go +++ b/service/sharddistributor/store/store.go @@ -22,6 +22,15 @@ var ( ErrExecutorNotRunning = fmt.Errorf("executor not running") ) +type ErrShardAlreadyAssigned struct { + ShardID string + AssignedTo string +} + +func (e *ErrShardAlreadyAssigned) Error() string { + return fmt.Sprintf("shard %s is already assigned to %s", e.ShardID, e.AssignedTo) +} + // Txn represents a generic, backend-agnostic transaction. // It is used as a vehicle for the GuardFunc to operate on. type Txn interface{} @@ -43,10 +52,6 @@ func NopGuard() GuardFunc { type AssignShardsRequest struct { // NewState is the new state of the namespace, containing the new assignments of shards to executors. NewState *NamespaceState - - // ShardsToDelete is a map of shards to delete. These shards are not present in the NewState, as they - // should be deleted, so we need to pass them explicitly. - ShardsToDelete map[string]ShardState } // Store is a composite interface that combines all storage capabilities.