From 6c1e22fb1cd309f6260be62a796e3565de2fe411 Mon Sep 17 00:00:00 2001 From: Congqi Xia Date: Fri, 6 Dec 2024 16:02:43 +0800 Subject: [PATCH] enhance: Make describe collection out of scheduler Related to #38275 Make rootcoord describe collection execute without scheduler lock in order to remove deadlock introduced when sync partition and lock segment describe collection Signed-off-by: Congqi Xia --- internal/rootcoord/root_coord.go | 16 +++- internal/rootcoord/root_coord_test.go | 110 +++++++++++++++++++++----- 2 files changed, 101 insertions(+), 25 deletions(-) diff --git a/internal/rootcoord/root_coord.go b/internal/rootcoord/root_coord.go index ed07f88a8c65d..13810fc990d12 100644 --- a/internal/rootcoord/root_coord.go +++ b/internal/rootcoord/root_coord.go @@ -1263,15 +1263,23 @@ func (c *Core) describeCollectionImpl(ctx context.Context, in *milvuspb.Describe allowUnavailable: allowUnavailable, } - if err := c.scheduler.AddTask(t); err != nil { - log.Info("failed to enqueue request to describe collection", zap.Error(err)) - metrics.RootCoordDDLReqCounter.WithLabelValues("DescribeCollection", metrics.FailLabel).Inc() + ts, err := c.tsoAllocator.GenerateTSO(1) + if err != nil { + log.Warn("failed to generate ts for describe collection", zap.Error(err)) return &milvuspb.DescribeCollectionResponse{ Status: merr.Status(err), }, nil } + t.SetTs(ts) - if err := t.WaitToFinish(); err != nil { + if err := t.Prepare(ctx); err != nil { + log.Info("failed to prepare describe collection", zap.Error(err)) + metrics.RootCoordDDLReqCounter.WithLabelValues("DescribeCollection", metrics.FailLabel).Inc() + return &milvuspb.DescribeCollectionResponse{ + Status: merr.Status(err), + }, nil + } + if err := t.Execute(ctx); err != nil { log.Warn("failed to describe collection", zap.Error(err)) metrics.RootCoordDDLReqCounter.WithLabelValues("DescribeCollection", metrics.FailLabel).Inc() return &milvuspb.DescribeCollectionResponse{ diff --git a/internal/rootcoord/root_coord_test.go b/internal/rootcoord/root_coord_test.go index b8915e5deab79..39d4b434e2ce0 100644 --- a/internal/rootcoord/root_coord_test.go +++ b/internal/rootcoord/root_coord_test.go @@ -599,6 +599,13 @@ func TestRootCoord_ListAliases(t *testing.T) { } func TestRootCoord_DescribeCollection(t *testing.T) { + alloc := newMockTsoAllocator() + ts := Timestamp(100) + alloc.GenerateTSOF = func(count uint32) (uint64, error) { + // end ts + return ts, nil + } + t.Run("not healthy", func(t *testing.T) { c := newTestCore(withAbnormalCode()) ctx := context.Background() @@ -610,41 +617,112 @@ func TestRootCoord_DescribeCollection(t *testing.T) { assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) }) - t.Run("failed to add task", func(t *testing.T) { + t.Run("failed_to_preexecute", func(t *testing.T) { c := newTestCore(withHealthyCode(), - withInvalidScheduler()) + withInvalidTsoAllocator()) ctx := context.Background() - resp, err := c.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{}) + resp, err := c.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_Undefined, + }, + }) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) - resp, err = c.DescribeCollectionInternal(ctx, &milvuspb.DescribeCollectionRequest{}) + resp, err = c.DescribeCollectionInternal(ctx, &milvuspb.DescribeCollectionRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_Undefined, + }, + }) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) }) - t.Run("failed to execute", func(t *testing.T) { + t.Run("failed_to_preexecute", func(t *testing.T) { c := newTestCore(withHealthyCode(), - withTaskFailScheduler()) + withTsoAllocator(alloc)) ctx := context.Background() - resp, err := c.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{}) + resp, err := c.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_Undefined, + }, + }) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) - resp, err = c.DescribeCollectionInternal(ctx, &milvuspb.DescribeCollectionRequest{}) + resp, err = c.DescribeCollectionInternal(ctx, &milvuspb.DescribeCollectionRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_Undefined, + }, + }) + assert.NoError(t, err) + assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) + }) + + t.Run("execute_fail", func(t *testing.T) { + c := newTestCore(withHealthyCode(), + withTsoAllocator(alloc), + withInvalidMeta()) + + ctx := context.Background() + resp, err := c.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_Undefined, + }, + }) + assert.NoError(t, err) + assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) + resp, err = c.DescribeCollectionInternal(ctx, &milvuspb.DescribeCollectionRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_Undefined, + }, + }) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) }) t.Run("normal case, everything is ok", func(t *testing.T) { + alias1, alias2 := funcutil.GenRandomStr(), funcutil.GenRandomStr() + meta := mockrootcoord.NewIMetaTable(t) + meta.On("GetCollectionByID", + mock.Anything, + mock.Anything, + mock.Anything, + mock.Anything, + mock.Anything, + ).Return(&model.Collection{ + CollectionID: 1, + Name: "test coll", + DBID: 1, + }, nil) + meta.On("ListAliasesByID", + mock.Anything, + mock.Anything, + ).Return([]string{alias1, alias2}) + meta.EXPECT().GetDatabaseByID(mock.Anything, mock.Anything, mock.Anything).Return(&model.Database{ + ID: 1, + Name: "test db", + }, nil) + c := newTestCore(withHealthyCode(), - withValidScheduler()) + withTsoAllocator(alloc), + withMeta(meta)) ctx := context.Background() - resp, err := c.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{}) + resp, err := c.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_DescribeCollection, + }, + CollectionID: 1, + }) assert.NoError(t, err) assert.Equal(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) - resp, err = c.DescribeCollectionInternal(ctx, &milvuspb.DescribeCollectionRequest{}) + resp, err = c.DescribeCollectionInternal(ctx, &milvuspb.DescribeCollectionRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_DescribeCollection, + }, + CollectionID: 1, + }) assert.NoError(t, err) assert.Equal(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) }) @@ -699,16 +777,6 @@ func TestRootCoord_ShowCollections(t *testing.T) { assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) }) - t.Run("failed to add task", func(t *testing.T) { - c := newTestCore(withHealthyCode(), - withInvalidScheduler()) - - ctx := context.Background() - resp, err := c.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{}) - assert.NoError(t, err) - assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) - }) - t.Run("failed to execute", func(t *testing.T) { c := newTestCore(withHealthyCode(), withTaskFailScheduler())