diff --git a/pkg/schedule/affinity/affinity_test.go b/pkg/schedule/affinity/affinity_test.go index b20d71d9e4d..316d0917622 100644 --- a/pkg/schedule/affinity/affinity_test.go +++ b/pkg/schedule/affinity/affinity_test.go @@ -33,7 +33,6 @@ func TestMain(m *testing.M) { goleak.VerifyTestMain(m, testutil.LeakOptions...) } -//nolint:unused func getGroupForTest(re *require.Assertions, m *Manager, id string) *runtimeGroupInfo { m.RLock() defer m.RUnlock() @@ -45,7 +44,6 @@ func getGroupForTest(re *require.Assertions, m *Manager, id string) *runtimeGrou return &newGroup } -//nolint:unused func createGroupForTest(re *require.Assertions, m *Manager, id string, rangeCount int) []keyutil.KeyRange { gkr := GroupKeyRanges{ KeyRanges: make([]keyutil.KeyRange, rangeCount), @@ -61,7 +59,6 @@ func createGroupForTest(re *require.Assertions, m *Manager, id string, rangeCoun return gkr.KeyRanges } -//nolint:unused func testCacheStale(re *require.Assertions, m *Manager, region *core.RegionInfo) { cache, group := m.getCache(region) if cache != nil && group != nil { @@ -71,8 +68,6 @@ func testCacheStale(re *require.Assertions, m *Manager, region *core.RegionInfo) // generateRegionForTest generates a test Region from the given information, // where voterStoreIDs[0] is used as the leaderStoreID. -// -//nolint:unparam func generateRegionForTest(id uint64, voterStoreIDs []uint64, keyRange keyutil.KeyRange) *core.RegionInfo { peers := make([]*metapb.Peer, len(voterStoreIDs)) for i, storeID := range voterStoreIDs { diff --git a/pkg/schedule/affinity/group.go b/pkg/schedule/affinity/group.go index 46f3b3f9737..6ae4b9c43dd 100644 --- a/pkg/schedule/affinity/group.go +++ b/pkg/schedule/affinity/group.go @@ -65,66 +65,6 @@ func (a groupAvailability) String() string { } } -// storeCondition is an enum for store conditions. Valid values are the store-prefixed enum constants, -// which are split into three groups separated by degradedBoundary. -type storeCondition int - -const ( - storeAvailable storeCondition = iota - - // All values greater than storeAvailable and less than degradedBoundary will trigger groupDegraded. - storeEvictLeader - storeDisconnected - storePreparing - storeLowSpace - degradedBoundary - - // All values greater than degradedBoundary will trigger groupExpired. - storeDown - storeRemovingOrRemoved -) - -func (c storeCondition) String() string { - switch c { - case storeAvailable: - return "available" - case storeEvictLeader: - return "evicted" - case storeDisconnected: - return "disconnected" - case storePreparing: - return "preparing" - case storeLowSpace: - return "low-space" - case storeDown: - return "down" - case storeRemovingOrRemoved: - return "removing-or-removed" - default: - return "unknown" - } -} - -func (c storeCondition) groupAvailability() groupAvailability { - switch { - case c == storeAvailable: - return groupAvailable - case c <= degradedBoundary: - return groupDegraded - default: - return groupExpired - } -} - -func (c storeCondition) affectsLeaderOnly() bool { - switch c { - case storeEvictLeader: - return true - default: - return false - } -} - // Phase is a status intended for API display type Phase string @@ -306,15 +246,16 @@ func (g *runtimeGroupInfo) SetAvailability(newAvailability groupAvailability) { g.availability = groupExpired } // Update availability - if newAvailability == groupDegraded { + switch newAvailability { + case groupAvailable, groupExpired: + g.availability = newAvailability + case groupDegraded: // Only set the expiration time when transitioning from groupAvailable to groupDegraded. // Do nothing if the original availability is already groupDegraded or groupExpired. if g.availability == groupAvailable { g.availability = groupDegraded g.degradedExpiredAt = newDegradedExpiredAtFromNow() } - } else { - g.availability = newAvailability } } diff --git a/pkg/schedule/affinity/manager.go b/pkg/schedule/affinity/manager.go index e659f60c544..507ce57a6b7 100644 --- a/pkg/schedule/affinity/manager.go +++ b/pkg/schedule/affinity/manager.go @@ -162,7 +162,6 @@ func (m *Manager) initGroupLocked(group *Group) { } } -//nolint:unused func (m *Manager) noGroupsExist(groups []*Group) error { m.RLock() defer m.RUnlock() @@ -174,7 +173,6 @@ func (m *Manager) noGroupsExist(groups []*Group) error { return nil } -//nolint:unused func (m *Manager) allGroupsExist(groupIDs []string) error { m.RLock() defer m.RUnlock() @@ -186,7 +184,6 @@ func (m *Manager) allGroupsExist(groupIDs []string) error { return nil } -//nolint:unused func (m *Manager) createGroups(groups []*Group, labelRules []*labeler.LabelRule) { m.Lock() defer m.Unlock() @@ -202,7 +199,6 @@ func (m *Manager) resetCountLocked(groupInfo *runtimeGroupInfo) { groupInfo.AffinityVer++ } -//nolint:unused func (m *Manager) updateGroupPeers(groupID string, leaderStoreID uint64, voterStoreIDs []uint64) (*GroupState, error) { m.Lock() defer m.Unlock() @@ -276,7 +272,6 @@ func (m *Manager) updateGroupLabelRuleLockedWithCount(groupID string, labelRule groupInfo.RangeCount = rangeCount } -//nolint:unused func (m *Manager) updateGroupLabelRules(labels map[string]*labeler.LabelRule, needClear bool) { m.Lock() defer m.Unlock() @@ -303,7 +298,6 @@ func (m *Manager) deleteGroupLocked(groupID string) { delete(m.groups, groupID) } -//nolint:unused func (m *Manager) deleteGroups(groupIDs []string) { m.Lock() defer m.Unlock() diff --git a/pkg/schedule/affinity/manager_test.go b/pkg/schedule/affinity/manager_test.go new file mode 100644 index 00000000000..fdadae3e9c1 --- /dev/null +++ b/pkg/schedule/affinity/manager_test.go @@ -0,0 +1,603 @@ +// Copyright 2025 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package affinity + +import ( + "context" + "fmt" + "slices" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/pingcap/kvproto/pkg/metapb" + + "github.com/tikv/pd/pkg/core" + "github.com/tikv/pd/pkg/mock/mockconfig" + "github.com/tikv/pd/pkg/schedule/labeler" + "github.com/tikv/pd/pkg/storage" + "github.com/tikv/pd/pkg/utils/keyutil" +) + +// TestGetRegionAffinityGroupState tests the GetRegionAffinityGroupState method of Manager. +func TestGetRegionAffinityGroupState(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + store := storage.NewStorageWithMemoryBackend() + storeInfos := core.NewStoresInfo() + for i := uint64(1); i <= 4; i++ { + storeInfo := core.NewStoreInfo(&metapb.Store{Id: i, Address: "test"}) + storeInfo = storeInfo.Clone(core.SetLastHeartbeatTS(time.Now())) + storeInfos.PutStore(storeInfo) + } + + conf := mockconfig.NewTestOptions() + regionLabeler, err := labeler.NewRegionLabeler(ctx, store, time.Second*5) + re.NoError(err) + + manager, err := NewManager(ctx, store, storeInfos, conf, regionLabeler) + re.NoError(err) + + // Create affinity group with 6 key ranges for testing + ranges := createGroupForTest(re, manager, "test_group", 6) + + // Setup: voterStoreIDs will be sorted. + _, err = manager.UpdateAffinityGroupPeers("test_group", 1, []uint64{3, 2, 1}) + re.NoError(err) + groupInfo := getGroupForTest(re, manager, "test_group") + re.True(slices.Equal([]uint64{1, 2, 3}, groupInfo.VoterStoreIDs)) + + // Test the positive case first + t.Run("region conforming to affinity", func(t *testing.T) { + re := require.New(t) + region := generateRegionForTest(1, []uint64{1, 2, 3}, ranges[0]) + _, isAffinity := manager.GetRegionAffinityGroupState(region) + re.True(isAffinity) + }) + + // Test negative cases: all should return false + testCases := []struct { + name string + regionID uint64 + peers []uint64 + keyRange keyutil.KeyRange + withoutLeader bool + setup func() + }{ + { + name: "region not in any affinity group", + regionID: 1, + peers: []uint64{1, 2, 3}, + keyRange: nonOverlappingRange, + }, + { + name: "region with wrong leader", + regionID: 1, + peers: []uint64{2, 1, 3}, + keyRange: ranges[1], + }, + { + name: "region with wrong voter stores", + regionID: 3, + peers: []uint64{1, 2, 4}, + keyRange: ranges[2], + }, + { + name: "region with different number of voters", + regionID: 4, + peers: []uint64{1, 2}, + keyRange: ranges[3], + }, + { + name: "region without leader", + regionID: 5, + peers: []uint64{1, 2, 3}, + keyRange: ranges[4], + withoutLeader: true, + }, + { + name: "group not in effect", + regionID: 6, + peers: []uint64{1, 2, 3}, + keyRange: ranges[5], + setup: func() { + manager.ExpireAffinityGroup("test_group") + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + re := require.New(t) + if tc.setup != nil { + tc.setup() + } + + region := generateRegionForTest(tc.regionID, tc.peers, tc.keyRange) + if tc.withoutLeader { + region = region.Clone(core.WithLeader(nil)) + } + + _, isAffinity := manager.GetRegionAffinityGroupState(region) + re.False(isAffinity) + }) + } +} + +// TestBasicGroupOperations tests basic group CRUD operations +func TestBasicGroupOperations(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + store := storage.NewStorageWithMemoryBackend() + storeInfos := core.NewStoresInfo() + store1 := core.NewStoreInfo(&metapb.Store{Id: 1, Address: "test1"}) + store1 = store1.Clone(core.SetLastHeartbeatTS(time.Now())) + storeInfos.PutStore(store1) + + conf := mockconfig.NewTestOptions() + + // Create region labeler + regionLabeler, err := labeler.NewRegionLabeler(ctx, store, time.Second*5) + re.NoError(err) + + manager, err := NewManager(ctx, store, storeInfos, conf, regionLabeler) + re.NoError(err) + + // Create a group + err = manager.CreateAffinityGroups([]GroupKeyRanges{{GroupID: "group1"}}) + re.NoError(err) + _, err = manager.UpdateAffinityGroupPeers("group1", 1, []uint64{1}) + re.NoError(err) + re.True(manager.IsGroupExist("group1")) + + // Delete the group (no key ranges, so force=false should work) + err = manager.DeleteAffinityGroups([]string{"group1"}, false) + re.NoError(err) + re.False(manager.IsGroupExist("group1")) +} + +// TestRegionCountStaleCache documents that RegionCount counts stale cache entries when group changes. +func TestRegionCountStaleCache(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + store := storage.NewStorageWithMemoryBackend() + storeInfos := core.NewStoresInfo() + for i := 1; i < 7; i++ { + storeInfos.PutStore(core.NewStoreInfo(&metapb.Store{Id: uint64(i), Address: fmt.Sprintf("s%d", i)})) + } + + conf := mockconfig.NewTestOptions() + regionLabeler, err := labeler.NewRegionLabeler(ctx, store, time.Second*5) + re.NoError(err) + manager, err := NewManager(ctx, store, storeInfos, conf, regionLabeler) + re.NoError(err) + + ranges := createGroupForTest(re, manager, "g", 2) + _, err = manager.UpdateAffinityGroupPeers("g", 1, []uint64{1, 2, 3}) + re.NoError(err) + region := generateRegionForTest(100, []uint64{1, 2, 3}, ranges[0]) + _, isAffinity := manager.GetRegionAffinityGroupState(region) + re.True(isAffinity) + groupInfo := getGroupForTest(re, manager, "g") + re.Equal(1, groupInfo.AffinityRegionCount) + re.Len(groupInfo.Regions, 1) + + // Change peers, which bumps AffinityVer and invalidates affinity for the cached region. + _, err = manager.UpdateAffinityGroupPeers("g", 4, []uint64{4, 5, 6}) + re.NoError(err) + group2 := manager.GetAffinityGroupState("g") + re.NotNil(group2) + re.Zero(group2.AffinityRegionCount) + testCacheStale(re, manager, region) + + // Remove key ranges, which bumps AffinityVer and invalidates affinity for the cached region. + region = generateRegionForTest(200, []uint64{4, 5, 6}, ranges[0]) + _, isAffinity = manager.GetRegionAffinityGroupState(region) + re.True(isAffinity) + groupInfo = getGroupForTest(re, manager, "g") + re.Equal(1, groupInfo.AffinityRegionCount) + re.Len(groupInfo.Regions, 2) + re.NoError(manager.UpdateAffinityGroupKeyRanges(nil, []GroupKeyRanges{{GroupID: "g", KeyRanges: ranges[1:]}})) + groupInfo = getGroupForTest(re, manager, "g") + re.Equal(0, groupInfo.AffinityRegionCount) + re.Empty(groupInfo.Regions) + + // Add key ranges, which bumps AffinityVer and invalidates affinity for the cached region. + _, isAffinity = manager.GetRegionAffinityGroupState(region) + re.True(isAffinity) + groupInfo = getGroupForTest(re, manager, "g") + re.Equal(1, groupInfo.AffinityRegionCount) + re.Len(groupInfo.Regions, 1) + re.NoError(manager.UpdateAffinityGroupKeyRanges([]GroupKeyRanges{{GroupID: "g", KeyRanges: ranges[1:]}}, nil)) + groupInfo = getGroupForTest(re, manager, "g") + re.Equal(0, groupInfo.AffinityRegionCount) + re.Len(groupInfo.Regions, 1) +} + +// TestDeleteGroupClearsCache verifies that deleting a group clears all related region caches. +func TestDeleteGroupClearsCache(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + store := storage.NewStorageWithMemoryBackend() + storeInfos := core.NewStoresInfo() + for i := 1; i <= 3; i++ { + storeInfos.PutStore(core.NewStoreInfo(&metapb.Store{Id: uint64(i), Address: fmt.Sprintf("s%d", i)})) + } + + conf := mockconfig.NewTestOptions() + regionLabeler, err := labeler.NewRegionLabeler(ctx, store, time.Second*5) + re.NoError(err) + manager, err := NewManager(ctx, store, storeInfos, conf, regionLabeler) + re.NoError(err) + + // Create a group and add regions + ranges := createGroupForTest(re, manager, "test-group", 1) + _, err = manager.UpdateAffinityGroupPeers("test-group", 1, []uint64{1, 2, 3}) + re.NoError(err) + + // Create and associate multiple regions + region1 := generateRegionForTest(100, []uint64{1, 2, 3}, ranges[0]) + region2 := generateRegionForTest(200, []uint64{1, 2, 3}, ranges[0]) + + // Trigger cache population + _, isAffinity1 := manager.GetRegionAffinityGroupState(region1) + re.True(isAffinity1) + _, isAffinity2 := manager.GetRegionAffinityGroupState(region2) + re.True(isAffinity2) + + // Verify regions are in cache + groupInfo := getGroupForTest(re, manager, "test-group") + re.Len(groupInfo.Regions, 2) + re.Equal(2, groupInfo.AffinityRegionCount) + + // Verify regions in global cache + manager.RLock() + _, exists1 := manager.regions[100] + _, exists2 := manager.regions[200] + manager.RUnlock() + re.True(exists1) + re.True(exists2) + + // Delete the group + re.Error(manager.DeleteAffinityGroups([]string{"test-group"}, false)) + re.True(manager.IsGroupExist("test-group")) + re.NoError(manager.DeleteAffinityGroups([]string{"test-group"}, true)) + re.False(manager.IsGroupExist("test-group")) + + // Verify all regions are cleared from global cache + manager.RLock() + _, exists1After := manager.regions[100] + _, exists2After := manager.regions[200] + globalAffinityCount := manager.affinityRegionCount + manager.RUnlock() + re.False(exists1After, "region 100 should be removed from global cache") + re.False(exists2After, "region 200 should be removed from global cache") + re.Zero(globalAffinityCount, "global affinity region count should be 0") +} + +// TestAvailabilityChangeRegionCount verifies that changing group availability clears region cache. +func TestAvailabilityChangeRegionCount(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + store := storage.NewStorageWithMemoryBackend() + storeInfos := core.NewStoresInfo() + for i := 1; i <= 3; i++ { + storeInfo := core.NewStoreInfo(&metapb.Store{Id: uint64(i), Address: fmt.Sprintf("s%d", i), NodeState: metapb.NodeState_Serving}) + storeInfo = storeInfo.Clone(core.SetLastHeartbeatTS(time.Now())) + storeInfos.PutStore(storeInfo) + } + + conf := mockconfig.NewTestOptions() + regionLabeler, err := labeler.NewRegionLabeler(ctx, store, time.Second*5) + re.NoError(err) + manager, err := NewManager(ctx, store, storeInfos, conf, regionLabeler) + re.NoError(err) + + // Create a group + ranges := createGroupForTest(re, manager, "availability-test", 1) + _, err = manager.UpdateAffinityGroupPeers("availability-test", 1, []uint64{1, 2, 3}) + re.NoError(err) + + // Add regions to cache + region := generateRegionForTest(100, []uint64{1, 2, 3}, ranges[0]) + _, isAffinity := manager.GetRegionAffinityGroupState(region) + re.True(isAffinity) + + // Verify region is cached + groupState1 := manager.GetAffinityGroupState("availability-test") + re.NotNil(groupState1) + re.Equal(1, groupState1.RegionCount) + re.Equal(1, groupState1.AffinityRegionCount) + + // Make store 2 unhealthy to trigger availability change to degraded + store2 := storeInfos.GetStore(2) + store2Down := store2.Clone(core.SetLastHeartbeatTS(time.Now().Add(-2 * time.Hour))) + storeInfos.PutStore(store2Down) + + // Trigger availability check + manager.checkStoresAvailability() + + // Verify group state changed + groupInfo := getGroupForTest(re, manager, "availability-test") + re.False(groupInfo.IsAffinitySchedulingEnabled()) + + // Verify cache is cleared + groupState2 := manager.GetAffinityGroupState("availability-test") + re.NotNil(groupState2) + testCacheStale(re, manager, region) + re.Zero(groupState2.AffinityRegionCount, "AffinityRegionCount should be 0 after availability change") + + // Verify global cache is also cleared + manager.RLock() + globalAffinityCount := manager.affinityRegionCount + manager.RUnlock() + testCacheStale(re, manager, region) + re.Zero(globalAffinityCount, "global affinity count should be 0") +} + +// TestInvalidCacheMultipleTimes verifies that InvalidCache can be called multiple times safely. +func TestInvalidCacheMultipleTimes(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + store := storage.NewStorageWithMemoryBackend() + storeInfos := core.NewStoresInfo() + for i := 1; i <= 3; i++ { + storeInfos.PutStore(core.NewStoreInfo(&metapb.Store{Id: uint64(i), Address: fmt.Sprintf("s%d", i)})) + } + + conf := mockconfig.NewTestOptions() + regionLabeler, err := labeler.NewRegionLabeler(ctx, store, time.Second*5) + re.NoError(err) + manager, err := NewManager(ctx, store, storeInfos, conf, regionLabeler) + re.NoError(err) + + // Create a group + ranges := createGroupForTest(re, manager, "invalid-test", 1) + _, err = manager.UpdateAffinityGroupPeers("invalid-test", 1, []uint64{1, 2, 3}) + re.NoError(err) + + // Add region + region := generateRegionForTest(100, []uint64{1, 2, 3}, ranges[0]) + _, isAffinity := manager.GetRegionAffinityGroupState(region) + re.True(isAffinity) + + // Verify region is in cache + groupState := manager.GetAffinityGroupState("invalid-test") + re.NotNil(groupState) + re.Equal(1, groupState.RegionCount) + re.Equal(1, groupState.AffinityRegionCount) + + // Invalidate cache first time + manager.InvalidCache(100) + + // Verify cache is cleared + groupState2 := manager.GetAffinityGroupState("invalid-test") + re.NotNil(groupState2) + re.Zero(groupState2.RegionCount) + re.Zero(groupState2.AffinityRegionCount) + + manager.RLock() + _, exists := manager.regions[100] + manager.RUnlock() + re.False(exists) + + // Invalidate cache second time - should not panic or error + manager.InvalidCache(100) + + // Verify still cleared + groupState3 := manager.GetAffinityGroupState("invalid-test") + re.NotNil(groupState3) + re.Zero(groupState3.RegionCount) + re.Zero(groupState3.AffinityRegionCount) + + // Invalidate non-existent region - should not panic + manager.InvalidCache(999) +} + +// TestConcurrentOperations verifies concurrent operations don't cause race conditions. +// Run with: go test -race +func TestConcurrentOperations(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + store := storage.NewStorageWithMemoryBackend() + storeInfos := core.NewStoresInfo() + for i := 1; i <= 5; i++ { + storeInfo := core.NewStoreInfo(&metapb.Store{Id: uint64(i), Address: fmt.Sprintf("s%d", i), NodeState: metapb.NodeState_Serving}) + storeInfo = storeInfo.Clone(core.SetLastHeartbeatTS(time.Now())) + storeInfos.PutStore(storeInfo) + } + + conf := mockconfig.NewTestOptions() + regionLabeler, err := labeler.NewRegionLabeler(ctx, store, time.Second*5) + re.NoError(err) + manager, err := NewManager(ctx, store, storeInfos, conf, regionLabeler) + re.NoError(err) + + // Create initial groups + groups := make([][]keyutil.KeyRange, 3) + for i := range 3 { + groupID := fmt.Sprintf("concurrent-group-%d", i) + groups[i] = createGroupForTest(re, manager, groupID, 10) + _, err = manager.UpdateAffinityGroupPeers(groupID, 1, []uint64{1, 2, 3}) + re.NoError(err) + } + + // Create test regions + regions := make([]*core.RegionInfo, 10) + for i := range uint64(10) { + regions[i] = generateRegionForTest(100+i, []uint64{1, 2, 3}, groups[i%3][i]) + } + + // Run concurrent operations + var wg sync.WaitGroup + errChan := make(chan error, 100) + + // Goroutine 1: Read operations + for i := range 10 { + wg.Add(1) + go func(idx int) { + defer wg.Done() + for range 50 { + region := regions[idx%len(regions)] + _, _ = manager.GetRegionAffinityGroupState(region) + groupID := fmt.Sprintf("concurrent-group-%d", idx%3) + _ = manager.GetAffinityGroupState(groupID) + _ = manager.IsGroupExist(groupID) + } + }(i) + } + + // Goroutine 2: Update peers + for i := range 3 { + wg.Add(1) + go func(idx int) { + defer wg.Done() + groupID := fmt.Sprintf("concurrent-group-%d", idx) + for j := range 10 { + _, err := manager.UpdateAffinityGroupPeers(groupID, uint64((j%3)+1), []uint64{1, 2, 3}) + if err != nil { + errChan <- err + return + } + time.Sleep(time.Millisecond) + } + }(i) + } + + // Goroutine 3: InvalidCache operations + for i := range 5 { + wg.Add(1) + go func(idx int) { + defer wg.Done() + for j := range 30 { + regionID := uint64(100 + (idx*2+j)%10) + manager.InvalidCache(regionID) + time.Sleep(time.Millisecond) + } + }(i) + } + + // Goroutine 4: Check availability + wg.Add(1) + go func() { + defer wg.Done() + for range 20 { + manager.checkStoresAvailability() + time.Sleep(2 * time.Millisecond) + } + }() + + // Wait for all goroutines + wg.Wait() + close(errChan) + + // Check for errors + for err := range errChan { + re.NoError(err, "concurrent operation failed") + } + + // Verify final state is consistent + for i := range 3 { + groupID := fmt.Sprintf("concurrent-group-%d", i) + re.True(manager.IsGroupExist(groupID)) + state := manager.GetAffinityGroupState(groupID) + re.NotNil(state) + } +} + +// TestDegradedExpiration verifies that a degraded group automatically expires after the configured duration. +func TestDegradedExpiration(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + store := storage.NewStorageWithMemoryBackend() + storeInfos := core.NewStoresInfo() + for i := 1; i <= 3; i++ { + storeInfo := core.NewStoreInfo(&metapb.Store{Id: uint64(i), Address: fmt.Sprintf("s%d", i), NodeState: metapb.NodeState_Serving}) + storeInfo = storeInfo.Clone(core.SetLastHeartbeatTS(time.Now())) + storeInfos.PutStore(storeInfo) + } + + conf := mockconfig.NewTestOptions() + regionLabeler, err := labeler.NewRegionLabeler(ctx, store, time.Second*5) + re.NoError(err) + manager, err := NewManager(ctx, store, storeInfos, conf, regionLabeler) + re.NoError(err) + + // Create a healthy group + re.NoError(manager.CreateAffinityGroups([]GroupKeyRanges{{GroupID: "expiration-test"}})) + _, err = manager.UpdateAffinityGroupPeers("expiration-test", 1, []uint64{1, 2, 3}) + re.NoError(err) + + // Verify group is available + groupState := manager.GetAffinityGroupState("expiration-test") + re.NotNil(groupState) + re.True(groupState.AffinitySchedulingEnabled) + + // Make store 2 unhealthy to trigger degraded status + store2 := storeInfos.GetStore(2) + store2Down := store2.Clone(core.SetLastHeartbeatTS(time.Now().Add(-2 * time.Minute))) + storeInfos.PutStore(store2Down) + manager.checkStoresAvailability() + + // Verify group became degraded + groupInfo := getGroupForTest(re, manager, "expiration-test") + re.Equal(groupDegraded, groupInfo.GetAvailability()) + re.False(groupInfo.IsAffinitySchedulingEnabled()) + + // Record the expiration time + manager.RLock() + expirationTime := groupInfo.degradedExpiredAt + manager.RUnlock() + + // Verify the expiration time is approximately 10 minutes (600 seconds) from now + expectedExpiration := uint64(time.Now().Unix()) + defaultDegradedExpirationSeconds + // Allow 5 seconds tolerance for test execution time + re.InDelta(expectedExpiration, expirationTime, 5) + + // Simulate time passing beyond expiration + manager.Lock() + groupInfo.degradedExpiredAt = uint64(time.Now().Add(-time.Hour).Unix()) + manager.Unlock() + + // Run availability check again + manager.checkStoresAvailability() + + // Verify group is now expired + re.True(groupInfo.IsExpired()) + re.Equal(groupExpired, groupInfo.GetAvailability()) + + // Verify scheduling is still disallowed + groupState2 := manager.GetAffinityGroupState("expiration-test") + re.NotNil(groupState2) + re.False(groupState2.AffinitySchedulingEnabled) +} diff --git a/pkg/schedule/affinity/metrics.go b/pkg/schedule/affinity/metrics.go index 84ca1ed4f78..d0523d11343 100644 --- a/pkg/schedule/affinity/metrics.go +++ b/pkg/schedule/affinity/metrics.go @@ -35,8 +35,4 @@ var ( func init() { prometheus.MustRegister(affinityStatusGauge) - // Touch metrics to avoid unused warnings before the policy wiring is added. - _ = groupCount - _ = regionCount - _ = affinityRegionCount } diff --git a/pkg/schedule/affinity/policy.go b/pkg/schedule/affinity/policy.go new file mode 100644 index 00000000000..27d88819318 --- /dev/null +++ b/pkg/schedule/affinity/policy.go @@ -0,0 +1,260 @@ +// Copyright 2025 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package affinity + +import ( + "fmt" + "maps" + "time" + + "go.uber.org/zap" + + "github.com/pingcap/failpoint" + "github.com/pingcap/log" + + "github.com/tikv/pd/pkg/core" + "github.com/tikv/pd/pkg/errs" + "github.com/tikv/pd/pkg/schedule/config" + "github.com/tikv/pd/pkg/utils/logutil" +) + +const ( + // defaultAvailabilityCheckInterval is the default interval for checking store availability. + defaultAvailabilityCheckInterval = 10 * time.Second +) + +// storeCondition is an enum for store conditions. Valid values are the store-prefixed enum constants, +// which are split into three groups separated by degradedBoundary. +type storeCondition int + +const ( + storeAvailable storeCondition = iota + + // All values greater than storeAvailable and less than degradedBoundary will trigger groupDegraded. + storeEvictLeader + storeDisconnected + storePreparing + storeLowSpace + degradedBoundary + + // All values greater than degradedBoundary will trigger groupExpired. + storeDown + storeRemovingOrRemoved +) + +func (c storeCondition) String() string { + switch c { + case storeAvailable: + return "available" + case storeEvictLeader: + return "evicted" + case storeDisconnected: + return "disconnected" + case storePreparing: + return "preparing" + case storeLowSpace: + return "low-space" + case storeDown: + return "down" + case storeRemovingOrRemoved: + return "removing-or-removed" + default: + return "unknown" + } +} + +func (c storeCondition) groupAvailability() groupAvailability { + switch { + case c == storeAvailable: + return groupAvailable + case c <= degradedBoundary: + return groupDegraded + default: + return groupExpired + } +} + +func (c storeCondition) affectsLeaderOnly() bool { + switch c { + case storeEvictLeader: + return true + default: + return false + } +} + +// GetNewAvailability uses the given unavailableStores to compute a new groupAvailability. +// Note that this function does not update runtimeGroupInfo. +func (g *runtimeGroupInfo) GetNewAvailability(unavailableStores map[uint64]storeCondition) groupAvailability { + maxCondition := storeAvailable + for _, storeID := range g.VoterStoreIDs { + if condition, ok := unavailableStores[storeID]; ok && (!condition.affectsLeaderOnly() || storeID == g.LeaderStoreID) { + if maxCondition == storeAvailable || condition > maxCondition { + maxCondition = condition + } + } + } + return maxCondition.groupAvailability() +} + +// ObserveAvailableRegion observes available Region and collects information to update the Peer distribution within the Group. +func (m *Manager) ObserveAvailableRegion(region *core.RegionInfo, group *GroupState) { + // Use the peer distribution of the first observed available Region as the result. + // In the future, we may want to use a more sophisticated strategy rather than first-win. + if group == nil || group.AffinitySchedulingEnabled { + return + } + leaderStoreID := region.GetLeader().GetStoreId() + voterStoreIDs := make([]uint64, len(region.GetVoters())) + for i, voter := range region.GetVoters() { + voterStoreIDs[i] = voter.GetStoreId() + } + _, _ = m.updateAffinityGroupPeersWithAffinityVer(group.ID, group.affinityVer, leaderStoreID, voterStoreIDs) +} + +// startAvailabilityCheckLoop starts a goroutine to periodically check store availability and invalidate groups with unavailable stores. +// TODO: If critical operations are added, a graceful shutdown is required. +func (m *Manager) startAvailabilityCheckLoop() { + interval := defaultAvailabilityCheckInterval + ticker := time.NewTicker(interval) + failpoint.Inject("changeAvailabilityCheckInterval", func() { + ticker.Reset(100 * time.Millisecond) + }) + go func() { + defer logutil.LogPanic() + defer ticker.Stop() + for { + select { + case <-m.ctx.Done(): + log.Info("affinity manager availability check loop stopped") + return + case <-ticker.C: + m.checkStoresAvailability() + } + } + }() + log.Info("affinity manager availability check loop started", zap.Duration("interval", interval)) +} + +// checkStoresAvailability checks the availability status of stores and invalidates groups with unavailable stores. +func (m *Manager) checkStoresAvailability() { + if !m.IsAvailable() { + return + } + unavailableStores := m.generateUnavailableStores() + isUnavailableStoresChanged, groupAvailabilityChanges := m.getGroupAvailabilityChanges(unavailableStores) + if isUnavailableStoresChanged { + m.setGroupAvailabilityChanges(unavailableStores, groupAvailabilityChanges) + } + m.collectMetrics() +} + +// collectMetrics collects the global metrics of the affinity manager. +func (m *Manager) collectMetrics() { + m.RLock() + defer m.RUnlock() + + // Collect global metrics + groupCount.Set(float64(len(m.groups))) + regionCount.Set(float64(len(m.regions))) + affinityRegionCount.Set(float64(m.affinityRegionCount)) +} + +func (m *Manager) generateUnavailableStores() map[uint64]storeCondition { + unavailableStores := make(map[uint64]storeCondition) + stores := m.storeSetInformer.GetStores() + lowSpaceRatio := m.conf.GetLowSpaceRatio() + for _, store := range stores { + switch { + // First the conditions that will mark the group as expired + case store.IsRemoved() || store.IsPhysicallyDestroyed() || store.IsRemoving(): + unavailableStores[store.GetID()] = storeRemovingOrRemoved + case store.IsUnhealthy(): + unavailableStores[store.GetID()] = storeDown + + // Then the conditions that will mark the group as degraded + case !store.AllowLeaderTransferIn() || m.conf.CheckLabelProperty(config.RejectLeader, store.GetLabels()): + unavailableStores[store.GetID()] = storeEvictLeader + case store.IsDisconnected(): + unavailableStores[store.GetID()] = storeDisconnected + case store.IsLowSpace(lowSpaceRatio): + unavailableStores[store.GetID()] = storeLowSpace + case store.IsPreparing(): + unavailableStores[store.GetID()] = storePreparing + } + // Note: We intentionally do NOT check: + // - IsSlow(): Performance issue, not availability issue + } + return unavailableStores +} + +func (m *Manager) getGroupAvailabilityChanges(unavailableStores map[uint64]storeCondition) (isUnavailableStoresChanged bool, groupAvailabilityChanges map[string]groupAvailability) { + groupAvailabilityChanges = make(map[string]groupAvailability) + availableGroupCount := 0 + unavailableGroupCount := 0 + + // Validate whether unavailableStores has changed. + m.RLock() + isUnavailableStoresChanged = !maps.Equal(unavailableStores, m.unavailableStores) + if !isUnavailableStoresChanged { + m.RUnlock() + return false, nil + } + + // Analyze which Groups have changed availability + // Collect log messages to print after releasing lock + for _, groupInfo := range m.groups { + newAvailability := groupInfo.GetNewAvailability(unavailableStores) + if newAvailability != groupInfo.GetAvailability() { + groupAvailabilityChanges[groupInfo.ID] = newAvailability + } + if newAvailability == groupAvailable { + availableGroupCount++ + } else { + unavailableGroupCount++ + } + } + m.RUnlock() + + if len(unavailableStores) > 0 { + log.Warn("affinity groups invalidated due to unavailable stores", + zap.Int("unavailable-store-count", len(unavailableStores)), + zap.Int("unavailable-group-count", unavailableGroupCount), + zap.Int("available-group-count", availableGroupCount)) + } + + return +} + +func (m *Manager) setGroupAvailabilityChanges(unavailableStores map[uint64]storeCondition, groupAvailabilityChanges map[string]groupAvailability) { + m.Lock() + defer m.Unlock() + m.unavailableStores = unavailableStores + for groupID, availability := range groupAvailabilityChanges { + m.updateGroupAvailabilityLocked(groupID, availability) + } +} + +func (m *Manager) checkHasUnavailableStore(leaderStoreID uint64, voterStoreIDs []uint64) error { + m.RLock() + defer m.RUnlock() + for _, storeID := range voterStoreIDs { + condition, ok := m.unavailableStores[storeID] + if ok && (!condition.affectsLeaderOnly() || storeID == leaderStoreID) { + return errs.ErrAffinityGroupContent.GenWithStackByArgs(fmt.Sprintf("store %d is %s", storeID, condition.String())) + } + } + return nil +} diff --git a/pkg/schedule/affinity/policy_test.go b/pkg/schedule/affinity/policy_test.go new file mode 100644 index 00000000000..58758077bfb --- /dev/null +++ b/pkg/schedule/affinity/policy_test.go @@ -0,0 +1,350 @@ +// Copyright 2025 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package affinity + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/pingcap/kvproto/pkg/metapb" + + "github.com/tikv/pd/pkg/core" + "github.com/tikv/pd/pkg/mock/mockconfig" + "github.com/tikv/pd/pkg/schedule/labeler" + "github.com/tikv/pd/pkg/storage" +) + +func TestObserveAvailableRegion(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + store := storage.NewStorageWithMemoryBackend() + storeInfos := core.NewStoresInfo() + store1 := core.NewStoreInfo(&metapb.Store{Id: 1, Address: "s1", NodeState: metapb.NodeState_Serving}) + store2 := core.NewStoreInfo(&metapb.Store{Id: 2, Address: "s2", NodeState: metapb.NodeState_Serving}) + for _, s := range []*core.StoreInfo{store1, store2} { + storeInfos.PutStore(s.Clone(core.SetLastHeartbeatTS(time.Now()))) + } + conf := mockconfig.NewTestOptions() + + // Create region labeler + regionLabeler, err := labeler.NewRegionLabeler(ctx, store, time.Second*5) + re.NoError(err) + + manager, err := NewManager(ctx, store, storeInfos, conf, regionLabeler) + re.NoError(err) + + group := &Group{ID: "g", LeaderStoreID: 0, VoterStoreIDs: nil} + re.NoError(manager.CreateAffinityGroups([]GroupKeyRanges{{GroupID: group.ID}})) + + // First observation makes group available with store 1. + region1 := core.NewRegionInfo( + &metapb.Region{ + Id: 10, + StartKey: []byte(""), + EndKey: []byte("a"), + Peers: []*metapb.Peer{{Id: 11, StoreId: 1, Role: metapb.PeerRole_Voter}}, + }, + &metapb.Peer{Id: 11, StoreId: 1, Role: metapb.PeerRole_Voter}, + ) + manager.ObserveAvailableRegion(region1, manager.GetAffinityGroupState("g")) + state := manager.GetAffinityGroupState("g") + re.NotNil(state) + re.True(state.AffinitySchedulingEnabled) + re.Equal(uint64(1), state.LeaderStoreID) + re.ElementsMatch([]uint64{1}, state.VoterStoreIDs) + + // Second observation with different layout should not overwrite. + region2 := core.NewRegionInfo( + &metapb.Region{ + Id: 20, + StartKey: []byte("a"), + EndKey: []byte("b"), + Peers: []*metapb.Peer{{Id: 21, StoreId: 2, Role: metapb.PeerRole_Voter}}, + }, + &metapb.Peer{Id: 21, StoreId: 2, Role: metapb.PeerRole_Voter}, + ) + manager.ObserveAvailableRegion(region2, manager.GetAffinityGroupState("g")) + state = manager.GetAffinityGroupState("g") + re.NotNil(state) + re.True(state.AffinitySchedulingEnabled) + re.Equal(uint64(1), state.LeaderStoreID) + re.ElementsMatch([]uint64{1}, state.VoterStoreIDs) + + // A degraded group must not change voterStoreIDs. + manager.DegradeAffinityGroup("g") + state = manager.GetAffinityGroupState("g") + re.NotNil(state) + re.False(state.AffinitySchedulingEnabled) + re.False(state.RegularSchedulingEnabled) + + manager.ObserveAvailableRegion(region2, state) // region2 changes voter store IDs + state = manager.GetAffinityGroupState("g") + re.NotNil(state) + re.False(state.AffinitySchedulingEnabled) + re.False(state.RegularSchedulingEnabled) + + manager.ObserveAvailableRegion(region1, state) // region1 does not change voter store IDs + state = manager.GetAffinityGroupState("g") + re.NotNil(state) + re.True(state.AffinitySchedulingEnabled) + + // An expired group can change voterStoreIDs. + manager.ExpireAffinityGroup("g") + state = manager.GetAffinityGroupState("g") + re.NotNil(state) + re.False(state.AffinitySchedulingEnabled) + re.True(state.RegularSchedulingEnabled) + + manager.ObserveAvailableRegion(region2, state) // region2 changes voter store IDs + state = manager.GetAffinityGroupState("g") + re.NotNil(state) + re.True(state.AffinitySchedulingEnabled) +} + +func TestAvailabilityCheckInvalidatesGroup(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + store := storage.NewStorageWithMemoryBackend() + storeInfos := core.NewStoresInfo() + store1 := core.NewStoreInfo(&metapb.Store{Id: 1, Address: "s1"}) + store1 = store1.Clone(core.SetLastHeartbeatTS(time.Now())) + storeInfos.PutStore(store1) + store2 := core.NewStoreInfo(&metapb.Store{Id: 2, Address: "s2"}) + store2 = store2.Clone(core.SetLastHeartbeatTS(time.Now()), core.SetNodeState(metapb.NodeState_Removing)) + storeInfos.PutStore(store2) + + conf := mockconfig.NewTestOptions() + + // Create region labeler + regionLabeler, err := labeler.NewRegionLabeler(ctx, store, time.Second*5) + re.NoError(err) + + manager, err := NewManager(ctx, store, storeInfos, conf, regionLabeler) + re.NoError(err) + + group := &Group{ID: "avail", LeaderStoreID: 1, VoterStoreIDs: []uint64{1, 2}} + re.NoError(manager.CreateAffinityGroups([]GroupKeyRanges{{GroupID: group.ID}})) + _, err = manager.UpdateAffinityGroupPeers("avail", 1, []uint64{1, 2}) + re.NoError(err) + state := manager.GetAffinityGroupState("avail") + re.NotNil(state) + re.True(state.AffinitySchedulingEnabled) + + // Simulate store 2 unavailable. + unavailable := map[uint64]storeCondition{2: storeRemovingOrRemoved} + isUnavailableStoresChanged, groupAvailabilityChanges := manager.getGroupAvailabilityChanges(unavailable) + re.True(isUnavailableStoresChanged) + manager.setGroupAvailabilityChanges(unavailable, groupAvailabilityChanges) + + state2 := manager.GetAffinityGroupState("avail") + re.NotNil(state2) + re.False(state2.AffinitySchedulingEnabled) +} + +func TestStoreHealthCheck(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Create a test storage + store := storage.NewStorageWithMemoryBackend() + + // Create mock stores + storeInfos := core.NewStoresInfo() + store1 := core.NewStoreInfo(&metapb.Store{Id: 1, Address: "test1", NodeState: metapb.NodeState_Serving}) + store2 := core.NewStoreInfo(&metapb.Store{Id: 2, Address: "test2", NodeState: metapb.NodeState_Serving}) + store3 := core.NewStoreInfo(&metapb.Store{Id: 3, Address: "test3", NodeState: metapb.NodeState_Serving}) + + // Set store1 to be healthy + store1 = store1.Clone(core.SetLastHeartbeatTS(time.Now())) + storeInfos.PutStore(store1) + + // Set store2 to be healthy + store2 = store2.Clone(core.SetLastHeartbeatTS(time.Now())) + storeInfos.PutStore(store2) + + // Set store3 to be unhealthy (disconnected) + store3 = store3.Clone(core.SetLastHeartbeatTS(time.Now().Add(-2 * time.Hour))) + storeInfos.PutStore(store3) + + conf := mockconfig.NewTestOptions() + + // Create region labeler + regionLabeler, err := labeler.NewRegionLabeler(ctx, store, time.Second*5) + re.NoError(err) + + // Create affinity manager + manager, err := NewManager(ctx, store, storeInfos, conf, regionLabeler) + re.NoError(err) + + // Create a test affinity group with healthy stores + group1 := &Group{ + ID: "group1", + LeaderStoreID: 1, + VoterStoreIDs: []uint64{1, 2}, + } + err = manager.CreateAffinityGroups([]GroupKeyRanges{{GroupID: group1.ID}}) + re.NoError(err) + _, err = manager.UpdateAffinityGroupPeers(group1.ID, group1.LeaderStoreID, group1.VoterStoreIDs) + re.NoError(err) + + // Create a test affinity group with unhealthy store + group2 := &Group{ + ID: "group2", + LeaderStoreID: 3, + VoterStoreIDs: []uint64{3, 2}, + } + err = manager.CreateAffinityGroups([]GroupKeyRanges{{GroupID: group2.ID}}) + re.NoError(err) + _, err = manager.UpdateAffinityGroupPeers(group2.ID, group2.LeaderStoreID, group2.VoterStoreIDs) + re.NoError(err) + + // Verify initial state - all groups should be in effect + groupInfo1 := manager.groups["group1"] + re.True(groupInfo1.IsAffinitySchedulingEnabled()) + groupInfo2 := manager.groups["group2"] + re.True(groupInfo2.IsAffinitySchedulingEnabled()) + + // Manually call checkStoreHealth to test + manager.checkStoresAvailability() + + // After health check, group1 should still be in effect (all stores healthy) + re.True(manager.groups["group1"].IsAffinitySchedulingEnabled()) + + // After health check, group2 should be invalidated (store3 is unhealthy) + re.False(manager.groups["group2"].IsAffinitySchedulingEnabled()) + + // Now make store3 healthy again + store3Healthy := store3.Clone(core.SetLastHeartbeatTS(time.Now())) + storeInfos.PutStore(store3Healthy) + + // Check health again + manager.checkStoresAvailability() + + // Group2 should be restored to effect state + re.True(manager.groups["group2"].IsAffinitySchedulingEnabled()) +} + +// TestDegradedGroupShouldExpire verifies a degraded group should move to expired even when +// the set of unavailable stores does not change. +func TestDegradedGroupShouldExpire(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + store := storage.NewStorageWithMemoryBackend() + storeInfos := core.NewStoresInfo() + store1 := core.NewStoreInfo(&metapb.Store{Id: 1, Address: "s1", NodeState: metapb.NodeState_Serving}) + store1 = store1.Clone(core.SetLastHeartbeatTS(time.Now())) + storeInfos.PutStore(store1) + store2 := core.NewStoreInfo(&metapb.Store{Id: 2, Address: "s2", NodeState: metapb.NodeState_Serving}) + store2 = store2.Clone(core.SetLastHeartbeatTS(time.Now())) + storeInfos.PutStore(store2) + + conf := mockconfig.NewTestOptions() + + regionLabeler, err := labeler.NewRegionLabeler(ctx, store, time.Second*5) + re.NoError(err) + + manager, err := NewManager(ctx, store, storeInfos, conf, regionLabeler) + re.NoError(err) + + // Create a healthy group first. + re.NoError(manager.CreateAffinityGroups([]GroupKeyRanges{{GroupID: "expire"}})) + _, err = manager.UpdateAffinityGroupPeers("expire", 1, []uint64{1, 2}) + re.NoError(err) + manager.checkStoresAvailability() + groupInfo := getGroupForTest(re, manager, "expire") + re.Equal(groupAvailable, groupInfo.GetAvailability()) + + // Make store2 unhealthy so the group becomes degraded. + store2Disconnected := store2.Clone(core.SetLastHeartbeatTS(time.Now().Add(-2 * time.Minute))) + storeInfos.PutStore(store2Disconnected) + manager.checkStoresAvailability() + groupInfo = getGroupForTest(re, manager, "expire") + re.Equal(groupDegraded, groupInfo.GetAvailability()) + + // Force the degraded status to be considered expired. + manager.Lock() + groupInfo.degradedExpiredAt = uint64(time.Now().Add(-time.Hour).Unix()) + manager.Unlock() + + // Run availability check again without changing the unavailable store set. + manager.checkStoresAvailability() + re.True(groupInfo.IsExpired()) +} + +// TestGroupAvailabilityPriority verifies availability picks the strongest condition +// and respects leader-only constraints. +func TestGroupAvailabilityPriority(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + store := storage.NewStorageWithMemoryBackend() + storeInfos := core.NewStoresInfo() + for i := uint64(1); i <= 3; i++ { + storeInfo := core.NewStoreInfo(&metapb.Store{Id: i, Address: "s"}) + storeInfo = storeInfo.Clone(core.SetLastHeartbeatTS(time.Now())) + storeInfos.PutStore(storeInfo) + } + conf := mockconfig.NewTestOptions() + regionLabeler, err := labeler.NewRegionLabeler(ctx, store, time.Second*5) + re.NoError(err) + manager, err := NewManager(ctx, store, storeInfos, conf, regionLabeler) + re.NoError(err) + + // Case 1: leader-only constraint should degrade when on leader, ignored on followers. + re.NoError(manager.CreateAffinityGroups([]GroupKeyRanges{{GroupID: "leader-only"}})) + _, err = manager.UpdateAffinityGroupPeers("leader-only", 1, []uint64{1, 2}) + re.NoError(err) + + // evict-leader on leader -> degraded + unavailable := map[uint64]storeCondition{1: storeEvictLeader} + changed, changes := manager.getGroupAvailabilityChanges(unavailable) + re.True(changed) + manager.setGroupAvailabilityChanges(unavailable, changes) + groupInfo := getGroupForTest(re, manager, "leader-only") + re.Equal(groupDegraded, groupInfo.GetAvailability()) + + // evict-leader only on follower should not change availability + unavailable = map[uint64]storeCondition{2: storeEvictLeader} + changed, changes = manager.getGroupAvailabilityChanges(unavailable) + re.True(changed) + manager.setGroupAvailabilityChanges(unavailable, changes) + groupInfo = getGroupForTest(re, manager, "leader-only") + re.Equal(groupAvailable, groupInfo.GetAvailability()) + + // Case 2: when multiple conditions exist, higher severity (expired) wins. + re.NoError(manager.CreateAffinityGroups([]GroupKeyRanges{{GroupID: "priority"}})) + _, err = manager.UpdateAffinityGroupPeers("priority", 1, []uint64{1, 2}) + re.NoError(err) + unavailable = map[uint64]storeCondition{ + 1: storeDisconnected, // degraded + 2: storeRemovingOrRemoved, // expired + } + changed, changes = manager.getGroupAvailabilityChanges(unavailable) + re.True(changed) + manager.setGroupAvailabilityChanges(unavailable, changes) + groupInfo = getGroupForTest(re, manager, "priority") + re.Equal(groupExpired, groupInfo.GetAvailability()) +} diff --git a/pkg/schedule/affinity/stub_affinity.go b/pkg/schedule/affinity/stub_affinity.go deleted file mode 100644 index ba8a66064f6..00000000000 --- a/pkg/schedule/affinity/stub_affinity.go +++ /dev/null @@ -1,26 +0,0 @@ -// Copyright 2025 TiKV Project Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package affinity - -// Temporary stubs until availability policy is added. - -func (m *Manager) hasUnavailableStore(uint64, []uint64) error { - _ = m - return nil -} - -func (m *Manager) startAvailabilityCheckLoop() { - _ = m -} diff --git a/pkg/schedule/affinity/txn.go b/pkg/schedule/affinity/txn.go index 1f44059631e..fabee9f577a 100644 --- a/pkg/schedule/affinity/txn.go +++ b/pkg/schedule/affinity/txn.go @@ -281,7 +281,7 @@ func (m *Manager) updateAffinityGroupPeersWithAffinityVer(groupID string, affini }); err != nil { return nil, err } - if err := m.hasUnavailableStore(leaderStoreID, voterStoreIDs); err != nil { + if err := m.checkHasUnavailableStore(leaderStoreID, voterStoreIDs); err != nil { return nil, err } @@ -302,7 +302,7 @@ func (m *Manager) updateAffinityGroupPeersWithAffinityVer(groupID string, affini if group.affinityVer != affinityVer { return group, nil } - // Group must not change voterStoreIDs while it is not in the expired status. + // Group must not change voterStoreIDs while it is not in the expired status. Changing only leaderStoreID is allowed. // RegularSchedulingEnabled == IsExpired // The VoterStoreIDs are already sorted, so they can be compared directly if !group.RegularSchedulingEnabled && !slices.Equal(voterStoreIDs, group.VoterStoreIDs) {