Skip to content

Commit 1750e10

Browse files
committed
Merge branch 'affinity' of github.com:hundundm/pd into HEAD
Signed-off-by: lhy1024 <[email protected]>
2 parents 8b80343 + 45ece22 commit 1750e10

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

63 files changed

+7510
-133
lines changed

client/go.mod

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,10 @@ module github.com/tikv/pd/client
22

33
go 1.23.12
44

5+
// When you modify PD cooperatively with kvproto, this will be useful to submit the PR to PD and the PR to
6+
// kvproto at the same time. You can run `go mod tidy` to make it replaced with go-mod style specification.
7+
// After the PR to kvproto is merged, remember to comment this out and run `go mod tidy`.
8+
59
require (
610
github.com/BurntSushi/toml v0.3.1
711
github.com/cakturk/go-netstat v0.0.0-20200220111822-e5b49efee7a5
@@ -10,7 +14,7 @@ require (
1014
github.com/opentracing/opentracing-go v1.2.0
1115
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c
1216
github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86
13-
github.com/pingcap/kvproto v0.0.0-20251121073615-744c58d5a5f1
17+
github.com/pingcap/kvproto v0.0.0-20251202064041-b6fd818387cd
1418
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3
1519
github.com/prometheus/client_golang v1.20.5
1620
github.com/stretchr/testify v1.9.0

client/go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c h1:xpW9bvK+HuuTm
4949
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ueLEUZgtx2cIogM0v4Zj5uvvzhuuiu7Pn8HzMPg=
5050
github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86 h1:tdMsjOqUR7YXHoBitzdebTvOjs/swniBTOLy5XiMtuE=
5151
github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86/go.mod h1:exzhVYca3WRtd6gclGNErRWb1qEgff3LYta0LvRmON4=
52-
github.com/pingcap/kvproto v0.0.0-20251121073615-744c58d5a5f1 h1:zSM+1a6ugIcttizSWBe1v9Go/Ko5C74u+52Umm8+VaE=
53-
github.com/pingcap/kvproto v0.0.0-20251121073615-744c58d5a5f1/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8=
52+
github.com/pingcap/kvproto v0.0.0-20251202064041-b6fd818387cd h1:YIaqKi3jAjllidGYDXlfKrEmGNGJNSR+H6PFDXN46xc=
53+
github.com/pingcap/kvproto v0.0.0-20251202064041-b6fd818387cd/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8=
5454
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw=
5555
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
5656
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=

client/http/api.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,10 @@ const (
9090
KeyspaceConfig = "/pd/api/v2/keyspaces/%s/config"
9191
GetKeyspaceMetaByName = "/pd/api/v2/keyspaces/%s"
9292
GetKeyspaceMetaByID = "/pd/api/v2/keyspaces/id/%d"
93+
// Affinity
94+
AffinityGroups = "/pd/api/v2/affinity-groups"
95+
AffinityGroupByID = "/pd/api/v2/affinity-groups/%s"
96+
AffinityGroupBatchDelete = "/pd/api/v2/affinity-groups/batch-delete"
9397
)
9498

9599
// RegionByID returns the path of PD HTTP API to get region by ID.

client/http/interface.go

Lines changed: 184 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,26 @@ type Client interface {
126126
GetKeyspaceMetaByName(ctx context.Context, keyspaceName string) (*keyspacepb.KeyspaceMeta, error)
127127
GetKeyspaceMetaByID(ctx context.Context, keyspaceID uint32) (*keyspacepb.KeyspaceMeta, error)
128128

129+
/* Affinity group interfaces */
130+
131+
// CreateAffinityGroups creates one or more affinity groups with key ranges.
132+
// The affinityGroups parameter is a map from group ID to a list of key ranges.
133+
CreateAffinityGroups(ctx context.Context, affinityGroups map[string][]AffinityGroupKeyRange) (map[string]*AffinityGroupState, error)
134+
// GetAffinityGroup gets an affinity group by group ID.
135+
GetAffinityGroup(ctx context.Context, groupID string) (*AffinityGroupState, error)
136+
// GetAllAffinityGroups gets all affinity groups.
137+
GetAllAffinityGroups(ctx context.Context) (map[string]*AffinityGroupState, error)
138+
// UpdateAffinityGroupPeers updates the leader and voter stores of an affinity group.
139+
UpdateAffinityGroupPeers(ctx context.Context, groupID string, leaderStoreID uint64, voterStoreIDs []uint64) (*AffinityGroupState, error)
140+
// DeleteAffinityGroup deletes an affinity group by group ID.
141+
DeleteAffinityGroup(ctx context.Context, groupID string, force bool) error
142+
// BatchDeleteAffinityGroups deletes multiple affinity groups in batch.
143+
BatchDeleteAffinityGroups(ctx context.Context, groupIDs []string, force bool) error
144+
// AddAffinityGroupKeyRanges adds key ranges to affinity groups.
145+
AddAffinityGroupKeyRanges(ctx context.Context, groupKeyRanges map[string][]AffinityGroupKeyRange) (map[string]*AffinityGroupState, error)
146+
// RemoveAffinityGroupKeyRanges removes key ranges from affinity groups.
147+
RemoveAffinityGroupKeyRanges(ctx context.Context, groupKeyRanges map[string][]AffinityGroupKeyRange) (map[string]*AffinityGroupState, error)
148+
129149
/* Client-related methods */
130150
// WithCallerID sets and returns a new client with the given caller ID.
131151
WithCallerID(string) Client
@@ -1213,3 +1233,167 @@ func (c *client) DeleteGCSafePoint(ctx context.Context, serviceID string) (strin
12131233
}
12141234
return msg, nil
12151235
}
1236+
1237+
// CreateAffinityGroups creates one or more affinity groups with key ranges.
1238+
func (c *client) CreateAffinityGroups(ctx context.Context, affinityGroups map[string][]AffinityGroupKeyRange) (map[string]*AffinityGroupState, error) {
1239+
// Construct the request body
1240+
reqGroups := make(map[string]CreateAffinityGroupInput, len(affinityGroups))
1241+
for groupID, ranges := range affinityGroups {
1242+
reqGroups[groupID] = CreateAffinityGroupInput{Ranges: ranges}
1243+
}
1244+
req := CreateAffinityGroupsRequest{AffinityGroups: reqGroups}
1245+
1246+
reqJSON, err := json.Marshal(req)
1247+
if err != nil {
1248+
return nil, errors.Trace(err)
1249+
}
1250+
var resp AffinityGroupsResponse
1251+
err = c.request(ctx, newRequestInfo().
1252+
WithName("CreateAffinityGroups").
1253+
WithURI(AffinityGroups).
1254+
WithMethod(http.MethodPost).
1255+
WithBody(reqJSON).
1256+
WithResp(&resp))
1257+
if err != nil {
1258+
return nil, err
1259+
}
1260+
return resp.AffinityGroups, nil
1261+
}
1262+
1263+
// GetAffinityGroup gets an affinity group by group ID.
1264+
func (c *client) GetAffinityGroup(ctx context.Context, groupID string) (*AffinityGroupState, error) {
1265+
var state AffinityGroupState
1266+
err := c.request(ctx, newRequestInfo().
1267+
WithName("GetAffinityGroup").
1268+
WithURI(fmt.Sprintf(AffinityGroupByID, groupID)).
1269+
WithMethod(http.MethodGet).
1270+
WithResp(&state))
1271+
if err != nil {
1272+
return nil, err
1273+
}
1274+
return &state, nil
1275+
}
1276+
1277+
// GetAllAffinityGroups gets all affinity groups.
1278+
func (c *client) GetAllAffinityGroups(ctx context.Context) (map[string]*AffinityGroupState, error) {
1279+
var resp AffinityGroupsResponse
1280+
err := c.request(ctx, newRequestInfo().
1281+
WithName("GetAllAffinityGroups").
1282+
WithURI(AffinityGroups).
1283+
WithMethod(http.MethodGet).
1284+
WithResp(&resp))
1285+
if err != nil {
1286+
return nil, err
1287+
}
1288+
return resp.AffinityGroups, nil
1289+
}
1290+
1291+
// UpdateAffinityGroupPeers updates the leader and voter stores of an affinity group.
1292+
func (c *client) UpdateAffinityGroupPeers(ctx context.Context, groupID string, leaderStoreID uint64, voterStoreIDs []uint64) (*AffinityGroupState, error) {
1293+
req := UpdateAffinityGroupPeersRequest{
1294+
LeaderStoreID: leaderStoreID,
1295+
VoterStoreIDs: voterStoreIDs,
1296+
}
1297+
reqJSON, err := json.Marshal(req)
1298+
if err != nil {
1299+
return nil, errors.Trace(err)
1300+
}
1301+
var state AffinityGroupState
1302+
err = c.request(ctx, newRequestInfo().
1303+
WithName("UpdateAffinityGroupPeers").
1304+
WithURI(fmt.Sprintf(AffinityGroupByID, groupID)).
1305+
WithMethod(http.MethodPut).
1306+
WithBody(reqJSON).
1307+
WithResp(&state))
1308+
if err != nil {
1309+
return nil, err
1310+
}
1311+
return &state, nil
1312+
}
1313+
1314+
// DeleteAffinityGroup deletes an affinity group by group ID.
1315+
func (c *client) DeleteAffinityGroup(ctx context.Context, groupID string, force bool) error {
1316+
uri := fmt.Sprintf(AffinityGroupByID, groupID)
1317+
if force {
1318+
uri = fmt.Sprintf("%s?force=true", uri)
1319+
}
1320+
return c.request(ctx, newRequestInfo().
1321+
WithName("DeleteAffinityGroup").
1322+
WithURI(uri).
1323+
WithMethod(http.MethodDelete))
1324+
}
1325+
1326+
// BatchDeleteAffinityGroups deletes multiple affinity groups in batch.
1327+
func (c *client) BatchDeleteAffinityGroups(ctx context.Context, groupIDs []string, force bool) error {
1328+
req := BatchDeleteAffinityGroupsRequest{
1329+
IDs: groupIDs,
1330+
Force: force,
1331+
}
1332+
reqJSON, err := json.Marshal(req)
1333+
if err != nil {
1334+
return errors.Trace(err)
1335+
}
1336+
return c.request(ctx, newRequestInfo().
1337+
WithName("BatchDeleteAffinityGroups").
1338+
WithURI(AffinityGroupBatchDelete).
1339+
WithMethod(http.MethodPost).
1340+
WithBody(reqJSON))
1341+
}
1342+
1343+
// AddAffinityGroupKeyRanges adds key ranges to affinity groups.
1344+
func (c *client) AddAffinityGroupKeyRanges(ctx context.Context, groupKeyRanges map[string][]AffinityGroupKeyRange) (map[string]*AffinityGroupState, error) {
1345+
// Convert to the request format
1346+
add := make([]GroupRangesModification, 0, len(groupKeyRanges))
1347+
for groupID, ranges := range groupKeyRanges {
1348+
add = append(add, GroupRangesModification{
1349+
ID: groupID,
1350+
Ranges: ranges,
1351+
})
1352+
}
1353+
req := BatchModifyAffinityGroupsRequest{Add: add}
1354+
1355+
reqJSON, err := json.Marshal(req)
1356+
if err != nil {
1357+
return nil, errors.Trace(err)
1358+
}
1359+
var resp AffinityGroupsResponse
1360+
err = c.request(ctx, newRequestInfo().
1361+
WithName("AddAffinityGroupKeyRanges").
1362+
WithURI(AffinityGroups).
1363+
WithMethod(http.MethodPatch).
1364+
WithBody(reqJSON).
1365+
WithResp(&resp))
1366+
if err != nil {
1367+
return nil, err
1368+
}
1369+
return resp.AffinityGroups, nil
1370+
}
1371+
1372+
// RemoveAffinityGroupKeyRanges removes key ranges from affinity groups.
1373+
func (c *client) RemoveAffinityGroupKeyRanges(ctx context.Context, groupKeyRanges map[string][]AffinityGroupKeyRange) (map[string]*AffinityGroupState, error) {
1374+
// Convert to the request format
1375+
remove := make([]GroupRangesModification, 0, len(groupKeyRanges))
1376+
for groupID, ranges := range groupKeyRanges {
1377+
remove = append(remove, GroupRangesModification{
1378+
ID: groupID,
1379+
Ranges: ranges,
1380+
})
1381+
}
1382+
req := BatchModifyAffinityGroupsRequest{Remove: remove}
1383+
1384+
reqJSON, err := json.Marshal(req)
1385+
if err != nil {
1386+
return nil, errors.Trace(err)
1387+
}
1388+
var resp AffinityGroupsResponse
1389+
err = c.request(ctx, newRequestInfo().
1390+
WithName("RemoveAffinityGroupKeyRanges").
1391+
WithURI(AffinityGroups).
1392+
WithMethod(http.MethodPatch).
1393+
WithBody(reqJSON).
1394+
WithResp(&resp))
1395+
if err != nil {
1396+
return nil, err
1397+
}
1398+
return resp.AffinityGroups, nil
1399+
}

client/http/types.go

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -703,3 +703,82 @@ type Health struct {
703703
ClientUrls []string `json:"client_urls"`
704704
Health bool `json:"health"`
705705
}
706+
707+
// AffinityGroupKeyRange represents a key range for affinity group operations.
708+
type AffinityGroupKeyRange struct {
709+
StartKey []byte `json:"start_key"`
710+
EndKey []byte `json:"end_key"`
711+
}
712+
713+
// CreateAffinityGroupInput defines the input for a single group in the creation request.
714+
type CreateAffinityGroupInput struct {
715+
Ranges []AffinityGroupKeyRange `json:"ranges"`
716+
}
717+
718+
// CreateAffinityGroupsRequest defines the body for the POST request to create affinity groups.
719+
type CreateAffinityGroupsRequest struct {
720+
AffinityGroups map[string]CreateAffinityGroupInput `json:"affinity_groups"`
721+
TableGroup string `json:"table_group,omitempty"`
722+
}
723+
724+
// AffinityGroup defines an affinity group.
725+
type AffinityGroup struct {
726+
ID string `json:"id"`
727+
CreateTimestamp uint64 `json:"create_timestamp"`
728+
LeaderStoreID uint64 `json:"leader_store_id,omitempty"`
729+
VoterStoreIDs []uint64 `json:"voter_store_ids,omitempty"`
730+
}
731+
732+
// AffinityGroupState defines the runtime state of an affinity group.
733+
type AffinityGroupState struct {
734+
AffinityGroup
735+
Phase string `json:"phase"`
736+
RangeCount int `json:"range_count"`
737+
RegionCount int `json:"region_count"`
738+
AffinityRegionCount int `json:"affinity_region_count"`
739+
}
740+
741+
// IsPending indicates that the Group is still determining the StoreIDs.
742+
// If the Group has no KeyRanges, it remains in pending forever.
743+
func (s *AffinityGroupState) IsPending() bool {
744+
return s.Phase == "pending"
745+
}
746+
747+
// IsPreparing indicates that the Group is scheduling Regions according to the required Peers.
748+
func (s *AffinityGroupState) IsPreparing() bool {
749+
return s.Phase == "preparing"
750+
}
751+
752+
// IsStable indicates that the Group has completed the required scheduling and is currently in a stable state.
753+
func (s *AffinityGroupState) IsStable() bool {
754+
return s.Phase == "stable"
755+
}
756+
757+
// AffinityGroupsResponse defines the success response for affinity group operations.
758+
type AffinityGroupsResponse struct {
759+
AffinityGroups map[string]*AffinityGroupState `json:"affinity_groups"`
760+
}
761+
762+
// BatchDeleteAffinityGroupsRequest defines the body for batch delete request.
763+
type BatchDeleteAffinityGroupsRequest struct {
764+
IDs []string `json:"ids"`
765+
Force bool `json:"force,omitempty"`
766+
}
767+
768+
// GroupRangesModification defines add or remove operations for a specific group.
769+
type GroupRangesModification struct {
770+
ID string `json:"id"`
771+
Ranges []AffinityGroupKeyRange `json:"ranges"`
772+
}
773+
774+
// BatchModifyAffinityGroupsRequest defines the body for batch modify request.
775+
type BatchModifyAffinityGroupsRequest struct {
776+
Add []GroupRangesModification `json:"add,omitempty"`
777+
Remove []GroupRangesModification `json:"remove,omitempty"`
778+
}
779+
780+
// UpdateAffinityGroupPeersRequest defines the body for updating peer distribution of an affinity group.
781+
type UpdateAffinityGroupPeersRequest struct {
782+
LeaderStoreID uint64 `json:"leader_store_id"`
783+
VoterStoreIDs []uint64 `json:"voter_store_ids"`
784+
}

go.mod

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ go 1.23.12
55
// When you modify PD cooperatively with kvproto, this will be useful to submit the PR to PD and the PR to
66
// kvproto at the same time. You can run `go mod tidy` to make it replaced with go-mod style specification.
77
// After the PR to kvproto is merged, remember to comment this out and run `go mod tidy`.
8-
// replace github.com/pingcap/kvproto => github.com/$YourPrivateRepo $YourPrivateBranch
98

109
require (
1110
github.com/AlekSi/gocov-xml v1.0.0
@@ -35,7 +34,7 @@ require (
3534
github.com/pingcap/errcode v0.3.0
3635
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c
3736
github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86
38-
github.com/pingcap/kvproto v0.0.0-20251121073615-744c58d5a5f1
37+
github.com/pingcap/kvproto v0.0.0-20251202064041-b6fd818387cd
3938
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3
4039
github.com/pingcap/metering_sdk v0.0.0-20250918015914-468cd6feb1dc
4140
github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -469,8 +469,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ue
469469
github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86 h1:tdMsjOqUR7YXHoBitzdebTvOjs/swniBTOLy5XiMtuE=
470470
github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86/go.mod h1:exzhVYca3WRtd6gclGNErRWb1qEgff3LYta0LvRmON4=
471471
github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
472-
github.com/pingcap/kvproto v0.0.0-20251121073615-744c58d5a5f1 h1:zSM+1a6ugIcttizSWBe1v9Go/Ko5C74u+52Umm8+VaE=
473-
github.com/pingcap/kvproto v0.0.0-20251121073615-744c58d5a5f1/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8=
472+
github.com/pingcap/kvproto v0.0.0-20251202064041-b6fd818387cd h1:YIaqKi3jAjllidGYDXlfKrEmGNGJNSR+H6PFDXN46xc=
473+
github.com/pingcap/kvproto v0.0.0-20251202064041-b6fd818387cd/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8=
474474
github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM=
475475
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw=
476476
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=

pkg/cluster/cluster.go

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919

2020
"github.com/tikv/pd/pkg/core"
2121
"github.com/tikv/pd/pkg/schedule"
22+
"github.com/tikv/pd/pkg/schedule/affinity"
2223
"github.com/tikv/pd/pkg/schedule/placement"
2324
"github.com/tikv/pd/pkg/statistics"
2425
)
@@ -30,6 +31,7 @@ type Cluster interface {
3031
GetLabelStats() *statistics.LabelStatistics
3132
GetCoordinator() *schedule.Coordinator
3233
GetRuleManager() *placement.RuleManager
34+
GetAffinityManager() *affinity.Manager
3335
GetBasicCluster() *core.BasicCluster
3436
}
3537

@@ -59,17 +61,25 @@ func HandleStatsAsync(c Cluster, region *core.RegionInfo) {
5961

6062
// HandleOverlaps handles the overlap regions.
6163
func HandleOverlaps(ctx context.Context, c Cluster, overlaps []*core.RegionInfo) {
64+
regionStats := c.GetRegionStats()
65+
labelStats := c.GetLabelStats()
66+
ruleManager := c.GetRuleManager()
67+
affinityManager := c.GetAffinityManager()
6268
for _, item := range overlaps {
6369
select {
6470
case <-ctx.Done():
6571
return
6672
default:
6773
}
68-
if c.GetRegionStats() != nil {
69-
c.GetRegionStats().ClearDefunctRegion(item.GetID())
74+
id := item.GetID()
75+
if regionStats != nil {
76+
regionStats.ClearDefunctRegion(id)
7077
}
71-
c.GetLabelStats().MarkDefunctRegion(item.GetID())
72-
c.GetRuleManager().InvalidCache(item.GetID())
78+
if affinityManager != nil {
79+
affinityManager.InvalidCache(id)
80+
}
81+
labelStats.MarkDefunctRegion(id)
82+
ruleManager.InvalidCache(id)
7383
}
7484
}
7585

0 commit comments

Comments
 (0)