Skip to content

Commit 62c6c7a

Browse files
committed
Merge remote-tracking branch 'upstream/master' into upgrade-golang-version
2 parents c93580b + 5c5948c commit 62c6c7a

File tree

16 files changed

+395
-199
lines changed

16 files changed

+395
-199
lines changed

internal/core/src/segcore/SegmentSealedImpl.cpp

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1591,7 +1591,11 @@ SegmentSealedImpl::bulk_subscript(
15911591
return fill_with_empty(field_id, 0);
15921592
}
15931593

1594-
auto column = fields_.at(field_id);
1594+
std::shared_ptr<SingleChunkColumnBase> column;
1595+
{
1596+
std::shared_lock lck(mutex_);
1597+
column = fields_.at(field_id);
1598+
}
15951599
auto ret = fill_with_empty(field_id, count);
15961600
if (column->IsNullable()) {
15971601
auto dst = ret->mutable_valid_data()->mutable_data();

internal/proxy/impl.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5439,10 +5439,6 @@ func (node *Proxy) validateOperatePrivilegeV2Params(req *milvuspb.OperatePrivile
54395439
if err := ValidatePrivilege(req.Grantor.Privilege.Name); err != nil {
54405440
return err
54415441
}
5442-
// validate built-in privilege group params
5443-
if err := ValidateBuiltInPrivilegeGroup(req.Grantor.Privilege.Name, req.DbName, req.CollectionName); err != nil {
5444-
return err
5445-
}
54465442
if req.Type != milvuspb.OperatePrivilegeType_Grant && req.Type != milvuspb.OperatePrivilegeType_Revoke {
54475443
return merr.WrapErrParameterInvalidMsg("the type in the request not grant or revoke")
54485444
}

internal/proxy/util.go

Lines changed: 0 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1120,31 +1120,6 @@ func ValidatePrivilege(entity string) error {
11201120
return validateName(entity, "Privilege")
11211121
}
11221122

1123-
func ValidateBuiltInPrivilegeGroup(entity string, dbName string, collectionName string) error {
1124-
if !util.IsBuiltinPrivilegeGroup(entity) {
1125-
return nil
1126-
}
1127-
switch {
1128-
case strings.HasPrefix(entity, milvuspb.PrivilegeLevel_Cluster.String()):
1129-
if !util.IsAnyWord(dbName) || !util.IsAnyWord(collectionName) {
1130-
return merr.WrapErrParameterInvalidMsg("dbName and collectionName should be * for the cluster level privilege: %s", entity)
1131-
}
1132-
return nil
1133-
case strings.HasPrefix(entity, milvuspb.PrivilegeLevel_Database.String()):
1134-
if collectionName != "" && collectionName != util.AnyWord {
1135-
return merr.WrapErrParameterInvalidMsg("collectionName should be * for the database level privilege: %s", entity)
1136-
}
1137-
return nil
1138-
case strings.HasPrefix(entity, milvuspb.PrivilegeLevel_Collection.String()):
1139-
if util.IsAnyWord(dbName) && !util.IsAnyWord(collectionName) && collectionName != "" {
1140-
return merr.WrapErrParameterInvalidMsg("please specify database name for the collection level privilege: %s", entity)
1141-
}
1142-
return nil
1143-
default:
1144-
return nil
1145-
}
1146-
}
1147-
11481123
func GetCurUserFromContext(ctx context.Context) (string, error) {
11491124
return contextutil.GetCurUserFromContext(ctx)
11501125
}

internal/querycoordv2/balance/balance.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,10 +46,13 @@ func (segPlan *SegmentAssignPlan) String() string {
4646
}
4747

4848
type ChannelAssignPlan struct {
49-
Channel *meta.DmChannel
50-
Replica *meta.Replica
51-
From int64
52-
To int64
49+
Channel *meta.DmChannel
50+
Replica *meta.Replica
51+
From int64
52+
To int64
53+
FromScore int64
54+
ToScore int64
55+
ChannelScore int64
5356
}
5457

5558
func (chanPlan *ChannelAssignPlan) String() string {

internal/querycoordv2/balance/rowcount_based_balancer.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -393,7 +393,7 @@ func newNodeItem(currentScore int, nodeID int64) nodeItem {
393393

394394
func (b *nodeItem) getPriority() int {
395395
// if node lacks more score between assignedScore and currentScore, then higher priority
396-
return int(b.currentScore - b.assignedScore)
396+
return int(math.Ceil(b.currentScore - b.assignedScore))
397397
}
398398

399399
func (b *nodeItem) setPriority(priority int) {

internal/querycoordv2/balance/score_based_balancer.go

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -191,19 +191,19 @@ func (b *ScoreBasedBalancer) assignChannel(br *balanceReport, collectionID int64
191191
}
192192

193193
from := int64(-1)
194-
// fromScore := int64(0)
194+
fromScore := int64(0)
195195
if sourceNode != nil {
196196
from = sourceNode.nodeID
197-
// fromScore = int64(sourceNode.getPriority())
197+
fromScore = int64(sourceNode.getPriority())
198198
}
199199

200200
plan := ChannelAssignPlan{
201-
From: from,
202-
To: targetNode.nodeID,
203-
Channel: ch,
204-
// FromScore: fromScore,
205-
// ToScore: int64(targetNode.getPriority()),
206-
// SegmentScore: int64(scoreChanges),
201+
From: from,
202+
To: targetNode.nodeID,
203+
Channel: ch,
204+
FromScore: fromScore,
205+
ToScore: int64(targetNode.getPriority()),
206+
ChannelScore: int64(scoreChanges),
207207
}
208208
br.AddRecord(StrRecordf("add segment plan %s", plan))
209209
plans = append(plans, plan)
@@ -487,6 +487,20 @@ func (b *ScoreBasedBalancer) BalanceReplica(ctx context.Context, replica *meta.R
487487
return segmentPlans, channelPlans
488488
}
489489

490+
func (b *ScoreBasedBalancer) genStoppingChannelPlan(ctx context.Context, replica *meta.Replica, rwNodes []int64, roNodes []int64) []ChannelAssignPlan {
491+
channelPlans := make([]ChannelAssignPlan, 0)
492+
for _, nodeID := range roNodes {
493+
dmChannels := b.dist.ChannelDistManager.GetByCollectionAndFilter(replica.GetCollectionID(), meta.WithNodeID2Channel(nodeID))
494+
plans := b.AssignChannel(ctx, replica.GetCollectionID(), dmChannels, rwNodes, false)
495+
for i := range plans {
496+
plans[i].From = nodeID
497+
plans[i].Replica = replica
498+
}
499+
channelPlans = append(channelPlans, plans...)
500+
}
501+
return channelPlans
502+
}
503+
490504
func (b *ScoreBasedBalancer) genStoppingSegmentPlan(ctx context.Context, replica *meta.Replica, onlineNodes []int64, offlineNodes []int64) []SegmentAssignPlan {
491505
segmentPlans := make([]SegmentAssignPlan, 0)
492506
for _, nodeID := range offlineNodes {

internal/querycoordv2/balance/score_based_balancer_test.go

Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"github.com/samber/lo"
2424
"github.com/stretchr/testify/mock"
2525
"github.com/stretchr/testify/suite"
26+
"go.uber.org/atomic"
2627

2728
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
2829
"github.com/milvus-io/milvus/internal/metastore/kv/querycoord"
@@ -1470,3 +1471,127 @@ func (suite *ScoreBasedBalancerTestSuite) TestBalanceChannelOnChannelExclusive()
14701471
_, channelPlans = suite.getCollectionBalancePlans(balancer, 3)
14711472
suite.Len(channelPlans, 2)
14721473
}
1474+
1475+
func (suite *ScoreBasedBalancerTestSuite) TestBalanceChannelOnStoppingNode() {
1476+
ctx := context.Background()
1477+
balancer := suite.balancer
1478+
1479+
// mock 10 collections with each collection has 1 channel
1480+
collectionNum := 10
1481+
channelNum := 1
1482+
for i := 1; i <= collectionNum; i++ {
1483+
collectionID := int64(i)
1484+
collection := utils.CreateTestCollection(collectionID, int32(1))
1485+
collection.LoadPercentage = 100
1486+
collection.Status = querypb.LoadStatus_Loaded
1487+
balancer.meta.CollectionManager.PutCollection(ctx, collection)
1488+
balancer.meta.CollectionManager.PutPartition(ctx, utils.CreateTestPartition(collectionID, collectionID))
1489+
balancer.meta.ReplicaManager.Spawn(ctx, collectionID, map[string]int{meta.DefaultResourceGroupName: 1}, nil)
1490+
1491+
channels := make([]*datapb.VchannelInfo, channelNum)
1492+
for i := 0; i < channelNum; i++ {
1493+
channels[i] = &datapb.VchannelInfo{CollectionID: collectionID, ChannelName: fmt.Sprintf("channel-%d-%d", collectionID, i)}
1494+
}
1495+
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, collectionID).Return(
1496+
channels, nil, nil)
1497+
suite.broker.EXPECT().GetPartitions(mock.Anything, collectionID).Return([]int64{collectionID}, nil).Maybe()
1498+
balancer.targetMgr.UpdateCollectionNextTarget(ctx, collectionID)
1499+
balancer.targetMgr.UpdateCollectionCurrentTarget(ctx, collectionID)
1500+
}
1501+
1502+
// mock querynode-1 to node manager
1503+
nodeInfo := session.NewNodeInfo(session.ImmutableNodeInfo{
1504+
NodeID: 1,
1505+
Address: "127.0.0.1:0",
1506+
Hostname: "localhost",
1507+
Version: common.Version,
1508+
})
1509+
nodeInfo.SetState(session.NodeStateNormal)
1510+
suite.balancer.nodeManager.Add(nodeInfo)
1511+
suite.balancer.meta.ResourceManager.HandleNodeUp(ctx, 1)
1512+
utils.RecoverAllCollection(balancer.meta)
1513+
1514+
// mock channel distribution
1515+
channelDist := make([]*meta.DmChannel, 0)
1516+
for i := 1; i <= collectionNum; i++ {
1517+
collectionID := int64(i)
1518+
for i := 0; i < channelNum; i++ {
1519+
channelDist = append(channelDist, &meta.DmChannel{
1520+
VchannelInfo: &datapb.VchannelInfo{CollectionID: collectionID, ChannelName: fmt.Sprintf("channel-%d-%d", collectionID, i)}, Node: 1,
1521+
})
1522+
}
1523+
}
1524+
balancer.dist.ChannelDistManager.Update(1, channelDist...)
1525+
1526+
// assert balance channel won't happens on 1 querynode
1527+
ret := make([]ChannelAssignPlan, 0)
1528+
for i := 1; i <= collectionNum; i++ {
1529+
collectionID := int64(i)
1530+
_, channelPlans := suite.getCollectionBalancePlans(balancer, collectionID)
1531+
ret = append(ret, channelPlans...)
1532+
}
1533+
suite.Len(ret, 0)
1534+
1535+
// mock querynode-2 and querynode-3 to node manager
1536+
nodeInfo2 := session.NewNodeInfo(session.ImmutableNodeInfo{
1537+
NodeID: 2,
1538+
Address: "127.0.0.1:0",
1539+
Hostname: "localhost",
1540+
Version: common.Version,
1541+
})
1542+
suite.balancer.nodeManager.Add(nodeInfo2)
1543+
suite.balancer.meta.ResourceManager.HandleNodeUp(ctx, 2)
1544+
// mock querynode-2 and querynode-3 to node manager
1545+
nodeInfo3 := session.NewNodeInfo(session.ImmutableNodeInfo{
1546+
NodeID: 3,
1547+
Address: "127.0.0.1:0",
1548+
Hostname: "localhost",
1549+
Version: common.Version,
1550+
})
1551+
suite.balancer.nodeManager.Add(nodeInfo3)
1552+
suite.balancer.meta.ResourceManager.HandleNodeUp(ctx, 3)
1553+
utils.RecoverAllCollection(balancer.meta)
1554+
// mock querynode-1 to stopping, trigger stopping balance, expect to generate 10 balance channel task, and 5 for node-2, 5 for node-3
1555+
nodeInfo.SetState(session.NodeStateStopping)
1556+
suite.balancer.meta.ResourceManager.HandleNodeDown(ctx, 1)
1557+
utils.RecoverAllCollection(balancer.meta)
1558+
1559+
node2Counter := atomic.NewInt32(0)
1560+
node3Counter := atomic.NewInt32(0)
1561+
1562+
suite.mockScheduler.ExpectedCalls = nil
1563+
suite.mockScheduler.EXPECT().GetSegmentTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe()
1564+
suite.mockScheduler.EXPECT().GetChannelTaskDelta(mock.Anything, mock.Anything).RunAndReturn(func(nodeID, collection int64) int {
1565+
if collection == -1 {
1566+
if nodeID == 2 {
1567+
return int(node2Counter.Load())
1568+
}
1569+
1570+
if nodeID == 3 {
1571+
return int(node3Counter.Load())
1572+
}
1573+
}
1574+
return 0
1575+
})
1576+
suite.mockScheduler.EXPECT().GetSegmentTaskNum(mock.Anything, mock.Anything).Return(0).Maybe()
1577+
suite.mockScheduler.EXPECT().GetChannelTaskNum(mock.Anything, mock.Anything).Return(0).Maybe()
1578+
1579+
for i := 1; i <= collectionNum; i++ {
1580+
collectionID := int64(i)
1581+
_, channelPlans := suite.getCollectionBalancePlans(balancer, collectionID)
1582+
suite.Len(channelPlans, 1)
1583+
if channelPlans[0].To == 2 {
1584+
node2Counter.Inc()
1585+
}
1586+
1587+
if channelPlans[0].To == 3 {
1588+
node3Counter.Inc()
1589+
}
1590+
1591+
if i%2 == 0 {
1592+
suite.Equal(node2Counter.Load(), node3Counter.Load())
1593+
}
1594+
}
1595+
suite.Equal(node2Counter.Load(), int32(5))
1596+
suite.Equal(node3Counter.Load(), int32(5))
1597+
}

internal/rootcoord/meta_table.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1525,7 +1525,7 @@ func (mt *MetaTable) RestoreRBAC(ctx context.Context, tenant string, meta *milvu
15251525
return mt.catalog.RestoreRBAC(ctx, tenant, meta)
15261526
}
15271527

1528-
// check if the privielge group name is defined by users
1528+
// check if the privilege group name is defined by users
15291529
func (mt *MetaTable) IsCustomPrivilegeGroup(ctx context.Context, groupName string) (bool, error) {
15301530
privGroups, err := mt.catalog.ListPrivilegeGroups(ctx)
15311531
if err != nil {
@@ -1641,7 +1641,7 @@ func (mt *MetaTable) OperatePrivilegeGroup(ctx context.Context, groupName string
16411641
if group.GroupName == p.Name {
16421642
privileges = append(privileges, group.Privileges...)
16431643
} else {
1644-
return merr.WrapErrParameterInvalidMsg("there is no privilege name or privielge group name [%s] defined in system to operate", p.Name)
1644+
return merr.WrapErrParameterInvalidMsg("there is no privilege name or privilege group name [%s] defined in system to operate", p.Name)
16451645
}
16461646
}
16471647
}

internal/rootcoord/rbac_task.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,7 @@ func executeOperatePrivilegeTaskSteps(ctx context.Context, core *Core, in *milvu
179179
}
180180
grants := []*milvuspb.GrantEntity{in.Entity}
181181

182-
allGroups, err := core.getPrivilegeGroups(ctx)
182+
allGroups, err := core.getDefaultAndCustomPrivilegeGroups(ctx)
183183
if err != nil {
184184
return nil, err
185185
}
@@ -275,12 +275,12 @@ func executeOperatePrivilegeGroupTaskSteps(ctx context.Context, core *Core, in *
275275
return p.Name
276276
})
277277

278-
// check if privileges are the same object type
279-
objectTypes := lo.SliceToMap(newPrivs, func(p *milvuspb.PrivilegeEntity) (string, struct{}) {
280-
return util.GetObjectType(p.Name), struct{}{}
278+
// check if privileges are the same privilege level
279+
privilegeLevels := lo.SliceToMap(newPrivs, func(p *milvuspb.PrivilegeEntity) (string, struct{}) {
280+
return util.GetPrivilegeLevel(p.Name), struct{}{}
281281
})
282-
if len(objectTypes) > 1 {
283-
return nil, errors.New("privileges are not the same object type")
282+
if len(privilegeLevels) > 1 {
283+
return nil, errors.New("privileges are not the same privilege level")
284284
}
285285
case milvuspb.OperatePrivilegeGroupType_RemovePrivilegesFromGroup:
286286
newPrivs, _ := lo.Difference(v, in.Privileges)

internal/rootcoord/root_coord.go

Lines changed: 43 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2595,6 +2595,10 @@ func (c *Core) OperatePrivilege(ctx context.Context, in *milvuspb.OperatePrivile
25952595
ctxLog.Error("", zap.Error(err))
25962596
return merr.StatusWithErrorCode(err, commonpb.ErrorCode_OperatePrivilegeFailure), nil
25972597
}
2598+
if err := c.validatePrivilegeGroupParams(ctx, privName, in.Entity.DbName, in.Entity.ObjectName); err != nil {
2599+
ctxLog.Error("", zap.Error(err))
2600+
return merr.StatusWithErrorCode(err, commonpb.ErrorCode_OperatePrivilegeFailure), nil
2601+
}
25982602
// set up object type for metastore, to be compatible with v1 version
25992603
in.Entity.Object.Name = util.GetObjectType(privName)
26002604
default:
@@ -2656,6 +2660,42 @@ func (c *Core) operatePrivilegeCommonCheck(ctx context.Context, in *milvuspb.Ope
26562660
return nil
26572661
}
26582662

2663+
func (c *Core) validatePrivilegeGroupParams(ctx context.Context, entity string, dbName string, collectionName string) error {
2664+
allGroups, err := c.getDefaultAndCustomPrivilegeGroups(ctx)
2665+
if err != nil {
2666+
return err
2667+
}
2668+
groups := lo.SliceToMap(allGroups, func(group *milvuspb.PrivilegeGroupInfo) (string, []*milvuspb.PrivilegeEntity) {
2669+
return group.GroupName, group.Privileges
2670+
})
2671+
privs, exists := groups[entity]
2672+
if !exists || len(privs) == 0 {
2673+
// it is a privilege, no need to check with other params
2674+
return nil
2675+
}
2676+
// since all privileges are same level in a group, just check the first privilege
2677+
level := util.GetPrivilegeLevel(privs[0].GetName())
2678+
switch level {
2679+
case milvuspb.PrivilegeLevel_Cluster.String():
2680+
if !util.IsAnyWord(dbName) || !util.IsAnyWord(collectionName) {
2681+
return merr.WrapErrParameterInvalidMsg("dbName and collectionName should be * for the cluster level privilege: %s", entity)
2682+
}
2683+
return nil
2684+
case milvuspb.PrivilegeLevel_Database.String():
2685+
if collectionName != "" && collectionName != util.AnyWord {
2686+
return merr.WrapErrParameterInvalidMsg("collectionName should be * for the database level privilege: %s", entity)
2687+
}
2688+
return nil
2689+
case milvuspb.PrivilegeLevel_Collection.String():
2690+
if util.IsAnyWord(dbName) && !util.IsAnyWord(collectionName) && collectionName != "" {
2691+
return merr.WrapErrParameterInvalidMsg("please specify database name for the collection level privilege: %s", entity)
2692+
}
2693+
return nil
2694+
default:
2695+
return errors.New("not found the privilege level")
2696+
}
2697+
}
2698+
26592699
func (c *Core) getMetastorePrivilegeName(ctx context.Context, privName string) (string, error) {
26602700
// if it is built-in privilege, return the privilege name directly
26612701
if util.IsPrivilegeNameDefined(privName) {
@@ -2757,7 +2797,7 @@ func (c *Core) ListPolicy(ctx context.Context, in *internalpb.ListPolicyRequest)
27572797
}, nil
27582798
}
27592799
// expand privilege groups and turn to policies
2760-
allGroups, err := c.getPrivilegeGroups(ctx)
2800+
allGroups, err := c.getDefaultAndCustomPrivilegeGroups(ctx)
27612801
if err != nil {
27622802
errMsg := "fail to get privilege groups"
27632803
ctxLog.Warn(errMsg, zap.Error(err))
@@ -3131,8 +3171,8 @@ func (c *Core) expandPrivilegeGroups(ctx context.Context, grants []*milvuspb.Gra
31313171
}), nil
31323172
}
31333173

3134-
// getPrivilegeGroups returns default privilege groups and user-defined privilege groups.
3135-
func (c *Core) getPrivilegeGroups(ctx context.Context) ([]*milvuspb.PrivilegeGroupInfo, error) {
3174+
// getDefaultAndCustomPrivilegeGroups returns default privilege groups and user-defined privilege groups.
3175+
func (c *Core) getDefaultAndCustomPrivilegeGroups(ctx context.Context) ([]*milvuspb.PrivilegeGroupInfo, error) {
31363176
allGroups, err := c.meta.ListPrivilegeGroups(ctx)
31373177
allGroups = append(allGroups, Params.RbacConfig.GetDefaultPrivilegeGroups()...)
31383178
if err != nil {

0 commit comments

Comments
 (0)