From fec31fedce9db7e541801234ec6d9daac90ebb7b Mon Sep 17 00:00:00 2001 From: congqixia Date: Tue, 10 Dec 2024 12:30:43 +0800 Subject: [PATCH] enhance: Move `SyncCreatedPartition` step to proxy (#38296) Related to #38275 This PR move sync created partition step to proxy to avoid potential logic deadlock when create partition happens with target segment change. Signed-off-by: Congqi Xia --- internal/proxy/impl.go | 1 + internal/proxy/task.go | 25 +++++++++++++++--- internal/proxy/task_test.go | 29 ++++++++++++++++----- internal/rootcoord/create_partition_task.go | 6 ----- internal/rootcoord/step.go | 19 -------------- 5 files changed, 46 insertions(+), 34 deletions(-) diff --git a/internal/proxy/impl.go b/internal/proxy/impl.go index afe83c873ecb6..c00161cf2333e 100644 --- a/internal/proxy/impl.go +++ b/internal/proxy/impl.go @@ -1324,6 +1324,7 @@ func (node *Proxy) CreatePartition(ctx context.Context, request *milvuspb.Create Condition: NewTaskCondition(ctx), CreatePartitionRequest: request, rootCoord: node.rootCoord, + queryCoord: node.queryCoord, result: nil, } diff --git a/internal/proxy/task.go b/internal/proxy/task.go index 07b6cf6f864e1..05c881d61caed 100644 --- a/internal/proxy/task.go +++ b/internal/proxy/task.go @@ -1069,9 +1069,10 @@ type createPartitionTask struct { baseTask Condition *milvuspb.CreatePartitionRequest - ctx context.Context - rootCoord types.RootCoordClient - result *commonpb.Status + ctx context.Context + rootCoord types.RootCoordClient + queryCoord types.QueryCoordClient + result *commonpb.Status } func (t *createPartitionTask) TraceCtx() context.Context { @@ -1139,6 +1140,24 @@ func (t *createPartitionTask) PreExecute(ctx context.Context) error { func (t *createPartitionTask) Execute(ctx context.Context) (err error) { t.result, err = t.rootCoord.CreatePartition(ctx, t.CreatePartitionRequest) + if err := merr.CheckRPCCall(t.result, err); err != nil { + return err + } + collectionID, err := globalMetaCache.GetCollectionID(ctx, t.GetDbName(), t.GetCollectionName()) + if err != nil { + t.result = merr.Status(err) + return err + } + partitionID, err := globalMetaCache.GetPartitionID(ctx, t.GetDbName(), t.GetCollectionName(), t.GetPartitionName()) + if err != nil { + t.result = merr.Status(err) + return err + } + t.result, err = t.queryCoord.SyncNewCreatedPartition(ctx, &querypb.SyncNewCreatedPartitionRequest{ + Base: commonpbutil.NewMsgBase(commonpbutil.WithMsgType(commonpb.MsgType_ReleasePartitions)), + CollectionID: collectionID, + PartitionID: partitionID, + }) return merr.CheckRPCCall(t.result, err) } diff --git a/internal/proxy/task_test.go b/internal/proxy/task_test.go index 6847383ac0580..9f0c40c7f3c6d 100644 --- a/internal/proxy/task_test.go +++ b/internal/proxy/task_test.go @@ -1380,9 +1380,19 @@ func TestDescribeCollectionTask_ShardsNum2(t *testing.T) { } func TestCreatePartitionTask(t *testing.T) { - rc := NewRootCoordMock() + rc := mocks.NewMockRootCoordClient(t) + qc := mocks.NewMockQueryCoordClient(t) + + mockCache := NewMockCache(t) + mockCache.EXPECT().GetCollectionSchema(mock.Anything, mock.Anything, mock.Anything).Return(newSchemaInfo(&schemapb.CollectionSchema{ + EnableDynamicField: false, + Fields: []*schemapb.FieldSchema{ + {FieldID: 100, Name: "ID", DataType: schemapb.DataType_Int64}, + {FieldID: 101, Name: "Vector", DataType: schemapb.DataType_FloatVector}, + }, + }), nil) + globalMetaCache = mockCache - defer rc.Close() ctx := context.Background() prefix := "TestCreatePartitionTask" dbName := "" @@ -1401,9 +1411,10 @@ func TestCreatePartitionTask(t *testing.T) { CollectionName: collectionName, PartitionName: partitionName, }, - ctx: ctx, - rootCoord: rc, - result: nil, + ctx: ctx, + rootCoord: rc, + queryCoord: qc, + result: nil, } task.OnEnqueue() task.PreExecute(ctx) @@ -1413,8 +1424,14 @@ func TestCreatePartitionTask(t *testing.T) { assert.Equal(t, Timestamp(100), task.BeginTs()) assert.Equal(t, Timestamp(100), task.EndTs()) assert.Equal(t, paramtable.GetNodeID(), task.GetBase().GetSourceID()) + + // setup global meta cache + mockCache.EXPECT().GetCollectionID(mock.Anything, mock.Anything, mock.Anything).Return(100, nil).Once() + mockCache.EXPECT().GetPartitionID(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(1000, nil).Once() + rc.EXPECT().CreatePartition(mock.Anything, mock.Anything).Return(merr.Success(), nil).Once() + qc.EXPECT().SyncNewCreatedPartition(mock.Anything, mock.Anything).Return(merr.Success(), nil).Once() err := task.Execute(ctx) - assert.Error(t, err) + assert.NoError(t, err) task.CollectionName = "#0xc0de" err = task.PreExecute(ctx) diff --git a/internal/rootcoord/create_partition_task.go b/internal/rootcoord/create_partition_task.go index a2a20a55b5fc8..aac548e1bca77 100644 --- a/internal/rootcoord/create_partition_task.go +++ b/internal/rootcoord/create_partition_task.go @@ -114,12 +114,6 @@ func (t *createPartitionTask) Execute(ctx context.Context) error { partitionIDs: []int64{partID}, }) - undoTask.AddStep(&syncNewCreatedPartitionStep{ - baseStep: baseStep{core: t.core}, - collectionID: t.collMeta.CollectionID, - partitionID: partID, - }, &nullStep{}) - undoTask.AddStep(&changePartitionStateStep{ baseStep: baseStep{core: t.core}, collectionID: t.collMeta.CollectionID, diff --git a/internal/rootcoord/step.go b/internal/rootcoord/step.go index 9b18df3a3c38e..b2cdf8371444b 100644 --- a/internal/rootcoord/step.go +++ b/internal/rootcoord/step.go @@ -327,25 +327,6 @@ func (s *releasePartitionsStep) Weight() stepPriority { return stepPriorityUrgent } -type syncNewCreatedPartitionStep struct { - baseStep - collectionID UniqueID - partitionID UniqueID -} - -func (s *syncNewCreatedPartitionStep) Execute(ctx context.Context) ([]nestedStep, error) { - err := s.core.broker.SyncNewCreatedPartition(ctx, s.collectionID, s.partitionID) - return nil, err -} - -func (s *syncNewCreatedPartitionStep) Desc() string { - return fmt.Sprintf("sync new partition, collectionID=%d, partitionID=%d", s.partitionID, s.partitionID) -} - -func (s *syncNewCreatedPartitionStep) Weight() stepPriority { - return stepPriorityUrgent -} - type dropIndexStep struct { baseStep collID UniqueID