|
| 1 | +// Copyright 2025 TiKV Project Authors. |
| 2 | +// |
| 3 | +// Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 | +// you may not use this file except in compliance with the License. |
| 5 | +// You may obtain a copy of the License at |
| 6 | +// |
| 7 | +// http://www.apache.org/licenses/LICENSE-2.0 |
| 8 | +// |
| 9 | +// Unless required by applicable law or agreed to in writing, software |
| 10 | +// distributed under the License is distributed on an "AS IS" BASIS, |
| 11 | +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 | +// See the License for the specific language governing permissions and |
| 13 | +// limitations under the License. |
| 14 | + |
| 15 | +package affinity |
| 16 | + |
| 17 | +import ( |
| 18 | + "fmt" |
| 19 | + "maps" |
| 20 | + "time" |
| 21 | + |
| 22 | + "go.uber.org/zap" |
| 23 | + |
| 24 | + "github.com/pingcap/failpoint" |
| 25 | + "github.com/pingcap/log" |
| 26 | + |
| 27 | + "github.com/tikv/pd/pkg/core" |
| 28 | + "github.com/tikv/pd/pkg/errs" |
| 29 | + "github.com/tikv/pd/pkg/schedule/config" |
| 30 | + "github.com/tikv/pd/pkg/utils/logutil" |
| 31 | +) |
| 32 | + |
| 33 | +const ( |
| 34 | + // defaultAvailabilityCheckInterval is the default interval for checking store availability. |
| 35 | + defaultAvailabilityCheckInterval = 10 * time.Second |
| 36 | +) |
| 37 | + |
| 38 | +// storeCondition is an enum for store conditions. Valid values are the store-prefixed enum constants, |
| 39 | +// which are split into three groups separated by degradedBoundary. |
| 40 | +type storeCondition int |
| 41 | + |
| 42 | +const ( |
| 43 | + storeAvailable storeCondition = iota |
| 44 | + |
| 45 | + // All values greater than storeAvailable and less than degradedBoundary will trigger groupDegraded. |
| 46 | + storeEvictLeader |
| 47 | + storeDisconnected |
| 48 | + storePreparing |
| 49 | + storeLowSpace |
| 50 | + degradedBoundary |
| 51 | + |
| 52 | + // All values greater than degradedBoundary will trigger groupExpired. |
| 53 | + storeDown |
| 54 | + storeRemovingOrRemoved |
| 55 | +) |
| 56 | + |
| 57 | +func (c storeCondition) String() string { |
| 58 | + switch c { |
| 59 | + case storeAvailable: |
| 60 | + return "available" |
| 61 | + case storeEvictLeader: |
| 62 | + return "evicted" |
| 63 | + case storeDisconnected: |
| 64 | + return "disconnected" |
| 65 | + case storePreparing: |
| 66 | + return "preparing" |
| 67 | + case storeLowSpace: |
| 68 | + return "low-space" |
| 69 | + case storeDown: |
| 70 | + return "down" |
| 71 | + case storeRemovingOrRemoved: |
| 72 | + return "removing-or-removed" |
| 73 | + default: |
| 74 | + return "unknown" |
| 75 | + } |
| 76 | +} |
| 77 | + |
| 78 | +func (c storeCondition) groupAvailability() groupAvailability { |
| 79 | + switch { |
| 80 | + case c == storeAvailable: |
| 81 | + return groupAvailable |
| 82 | + case c <= degradedBoundary: |
| 83 | + return groupDegraded |
| 84 | + default: |
| 85 | + return groupExpired |
| 86 | + } |
| 87 | +} |
| 88 | + |
| 89 | +func (c storeCondition) affectsLeaderOnly() bool { |
| 90 | + switch c { |
| 91 | + case storeEvictLeader: |
| 92 | + return true |
| 93 | + default: |
| 94 | + return false |
| 95 | + } |
| 96 | +} |
| 97 | + |
| 98 | +// GetNewAvailability uses the given unavailableStores to compute a new groupAvailability. |
| 99 | +// Note that this function does not update runtimeGroupInfo. |
| 100 | +func (g *runtimeGroupInfo) GetNewAvailability(unavailableStores map[uint64]storeCondition) groupAvailability { |
| 101 | + maxCondition := storeAvailable |
| 102 | + for _, storeID := range g.VoterStoreIDs { |
| 103 | + if condition, ok := unavailableStores[storeID]; ok && (!condition.affectsLeaderOnly() || storeID == g.LeaderStoreID) { |
| 104 | + if maxCondition == storeAvailable || condition > maxCondition { |
| 105 | + maxCondition = condition |
| 106 | + } |
| 107 | + } |
| 108 | + } |
| 109 | + return maxCondition.groupAvailability() |
| 110 | +} |
| 111 | + |
| 112 | +// ObserveAvailableRegion observes available Region and collects information to update the Peer distribution within the Group. |
| 113 | +func (m *Manager) ObserveAvailableRegion(region *core.RegionInfo, group *GroupState) { |
| 114 | + // Use the peer distribution of the first observed available Region as the result. |
| 115 | + // In the future, we may want to use a more sophisticated strategy rather than first-win. |
| 116 | + if group == nil || group.AffinitySchedulingEnabled { |
| 117 | + return |
| 118 | + } |
| 119 | + leaderStoreID := region.GetLeader().GetStoreId() |
| 120 | + voterStoreIDs := make([]uint64, len(region.GetVoters())) |
| 121 | + for i, voter := range region.GetVoters() { |
| 122 | + voterStoreIDs[i] = voter.GetStoreId() |
| 123 | + } |
| 124 | + _, _ = m.updateAffinityGroupPeersWithAffinityVer(group.ID, group.affinityVer, leaderStoreID, voterStoreIDs) |
| 125 | +} |
| 126 | + |
| 127 | +// startAvailabilityCheckLoop starts a goroutine to periodically check store availability and invalidate groups with unavailable stores. |
| 128 | +// TODO: If critical operations are added, a graceful shutdown is required. |
| 129 | +func (m *Manager) startAvailabilityCheckLoop() { |
| 130 | + interval := defaultAvailabilityCheckInterval |
| 131 | + ticker := time.NewTicker(interval) |
| 132 | + failpoint.Inject("changeAvailabilityCheckInterval", func() { |
| 133 | + ticker.Reset(100 * time.Millisecond) |
| 134 | + }) |
| 135 | + go func() { |
| 136 | + defer logutil.LogPanic() |
| 137 | + defer ticker.Stop() |
| 138 | + for { |
| 139 | + select { |
| 140 | + case <-m.ctx.Done(): |
| 141 | + log.Info("affinity manager availability check loop stopped") |
| 142 | + return |
| 143 | + case <-ticker.C: |
| 144 | + m.checkStoresAvailability() |
| 145 | + } |
| 146 | + } |
| 147 | + }() |
| 148 | + log.Info("affinity manager availability check loop started", zap.Duration("interval", interval)) |
| 149 | +} |
| 150 | + |
| 151 | +// checkStoresAvailability checks the availability status of stores and invalidates groups with unavailable stores. |
| 152 | +func (m *Manager) checkStoresAvailability() { |
| 153 | + if !m.IsAvailable() { |
| 154 | + return |
| 155 | + } |
| 156 | + unavailableStores := m.generateUnavailableStores() |
| 157 | + isUnavailableStoresChanged, groupAvailabilityChanges := m.getGroupAvailabilityChanges(unavailableStores) |
| 158 | + if isUnavailableStoresChanged { |
| 159 | + m.setGroupAvailabilityChanges(unavailableStores, groupAvailabilityChanges) |
| 160 | + } |
| 161 | + m.collectMetrics() |
| 162 | +} |
| 163 | + |
| 164 | +// collectMetrics collects the global metrics of the affinity manager. |
| 165 | +func (m *Manager) collectMetrics() { |
| 166 | + m.RLock() |
| 167 | + defer m.RUnlock() |
| 168 | + |
| 169 | + // Collect global metrics |
| 170 | + groupCount.Set(float64(len(m.groups))) |
| 171 | + regionCount.Set(float64(len(m.regions))) |
| 172 | + affinityRegionCount.Set(float64(m.affinityRegionCount)) |
| 173 | +} |
| 174 | + |
| 175 | +func (m *Manager) generateUnavailableStores() map[uint64]storeCondition { |
| 176 | + unavailableStores := make(map[uint64]storeCondition) |
| 177 | + stores := m.storeSetInformer.GetStores() |
| 178 | + lowSpaceRatio := m.conf.GetLowSpaceRatio() |
| 179 | + for _, store := range stores { |
| 180 | + switch { |
| 181 | + // First the conditions that will mark the group as expired |
| 182 | + case store.IsRemoved() || store.IsPhysicallyDestroyed() || store.IsRemoving(): |
| 183 | + unavailableStores[store.GetID()] = storeRemovingOrRemoved |
| 184 | + case store.IsUnhealthy(): |
| 185 | + unavailableStores[store.GetID()] = storeDown |
| 186 | + |
| 187 | + // Then the conditions that will mark the group as degraded |
| 188 | + case !store.AllowLeaderTransferIn() || m.conf.CheckLabelProperty(config.RejectLeader, store.GetLabels()): |
| 189 | + unavailableStores[store.GetID()] = storeEvictLeader |
| 190 | + case store.IsDisconnected(): |
| 191 | + unavailableStores[store.GetID()] = storeDisconnected |
| 192 | + case store.IsLowSpace(lowSpaceRatio): |
| 193 | + unavailableStores[store.GetID()] = storeLowSpace |
| 194 | + case store.IsPreparing(): |
| 195 | + unavailableStores[store.GetID()] = storePreparing |
| 196 | + } |
| 197 | + // Note: We intentionally do NOT check: |
| 198 | + // - IsSlow(): Performance issue, not availability issue |
| 199 | + } |
| 200 | + return unavailableStores |
| 201 | +} |
| 202 | + |
| 203 | +func (m *Manager) getGroupAvailabilityChanges(unavailableStores map[uint64]storeCondition) (isUnavailableStoresChanged bool, groupAvailabilityChanges map[string]groupAvailability) { |
| 204 | + groupAvailabilityChanges = make(map[string]groupAvailability) |
| 205 | + availableGroupCount := 0 |
| 206 | + unavailableGroupCount := 0 |
| 207 | + |
| 208 | + // Validate whether unavailableStores has changed. |
| 209 | + m.RLock() |
| 210 | + isUnavailableStoresChanged = !maps.Equal(unavailableStores, m.unavailableStores) |
| 211 | + if !isUnavailableStoresChanged { |
| 212 | + m.RUnlock() |
| 213 | + return false, nil |
| 214 | + } |
| 215 | + |
| 216 | + // Analyze which Groups have changed availability |
| 217 | + // Collect log messages to print after releasing lock |
| 218 | + for _, groupInfo := range m.groups { |
| 219 | + newAvailability := groupInfo.GetNewAvailability(unavailableStores) |
| 220 | + if newAvailability != groupInfo.GetAvailability() { |
| 221 | + groupAvailabilityChanges[groupInfo.ID] = newAvailability |
| 222 | + } |
| 223 | + if newAvailability == groupAvailable { |
| 224 | + availableGroupCount++ |
| 225 | + } else { |
| 226 | + unavailableGroupCount++ |
| 227 | + } |
| 228 | + } |
| 229 | + m.RUnlock() |
| 230 | + |
| 231 | + if len(unavailableStores) > 0 { |
| 232 | + log.Warn("affinity groups invalidated due to unavailable stores", |
| 233 | + zap.Int("unavailable-store-count", len(unavailableStores)), |
| 234 | + zap.Int("unavailable-group-count", unavailableGroupCount), |
| 235 | + zap.Int("available-group-count", availableGroupCount)) |
| 236 | + } |
| 237 | + |
| 238 | + return |
| 239 | +} |
| 240 | + |
| 241 | +func (m *Manager) setGroupAvailabilityChanges(unavailableStores map[uint64]storeCondition, groupAvailabilityChanges map[string]groupAvailability) { |
| 242 | + m.Lock() |
| 243 | + defer m.Unlock() |
| 244 | + m.unavailableStores = unavailableStores |
| 245 | + for groupID, availability := range groupAvailabilityChanges { |
| 246 | + m.updateGroupAvailabilityLocked(groupID, availability) |
| 247 | + } |
| 248 | +} |
| 249 | + |
| 250 | +func (m *Manager) checkHasUnavailableStore(leaderStoreID uint64, voterStoreIDs []uint64) error { |
| 251 | + m.RLock() |
| 252 | + defer m.RUnlock() |
| 253 | + for _, storeID := range voterStoreIDs { |
| 254 | + condition, ok := m.unavailableStores[storeID] |
| 255 | + if ok && (!condition.affectsLeaderOnly() || storeID == leaderStoreID) { |
| 256 | + return errs.ErrAffinityGroupContent.GenWithStackByArgs(fmt.Sprintf("store %d is %s", storeID, condition.String())) |
| 257 | + } |
| 258 | + } |
| 259 | + return nil |
| 260 | +} |
0 commit comments