Skip to content

Commit

Permalink
enhance: Make describe collection out of scheduler
Browse files Browse the repository at this point in the history
Related to milvus-io#38275

Make rootcoord describe collection execute without scheduler lock in
order to remove deadlock introduced when sync partition and lock segment
describe collection

Signed-off-by: Congqi Xia <[email protected]>
  • Loading branch information
congqixia committed Dec 6, 2024
1 parent f16232f commit 6c1e22f
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 25 deletions.
16 changes: 12 additions & 4 deletions internal/rootcoord/root_coord.go
Original file line number Diff line number Diff line change
Expand Up @@ -1263,15 +1263,23 @@ func (c *Core) describeCollectionImpl(ctx context.Context, in *milvuspb.Describe
allowUnavailable: allowUnavailable,
}

if err := c.scheduler.AddTask(t); err != nil {
log.Info("failed to enqueue request to describe collection", zap.Error(err))
metrics.RootCoordDDLReqCounter.WithLabelValues("DescribeCollection", metrics.FailLabel).Inc()
ts, err := c.tsoAllocator.GenerateTSO(1)
if err != nil {
log.Warn("failed to generate ts for describe collection", zap.Error(err))
return &milvuspb.DescribeCollectionResponse{
Status: merr.Status(err),
}, nil
}
t.SetTs(ts)

if err := t.WaitToFinish(); err != nil {
if err := t.Prepare(ctx); err != nil {
log.Info("failed to prepare describe collection", zap.Error(err))
metrics.RootCoordDDLReqCounter.WithLabelValues("DescribeCollection", metrics.FailLabel).Inc()
return &milvuspb.DescribeCollectionResponse{
Status: merr.Status(err),
}, nil
}
if err := t.Execute(ctx); err != nil {
log.Warn("failed to describe collection", zap.Error(err))
metrics.RootCoordDDLReqCounter.WithLabelValues("DescribeCollection", metrics.FailLabel).Inc()
return &milvuspb.DescribeCollectionResponse{
Expand Down
110 changes: 89 additions & 21 deletions internal/rootcoord/root_coord_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -599,6 +599,13 @@ func TestRootCoord_ListAliases(t *testing.T) {
}

func TestRootCoord_DescribeCollection(t *testing.T) {
alloc := newMockTsoAllocator()
ts := Timestamp(100)
alloc.GenerateTSOF = func(count uint32) (uint64, error) {
// end ts
return ts, nil
}

t.Run("not healthy", func(t *testing.T) {
c := newTestCore(withAbnormalCode())
ctx := context.Background()
Expand All @@ -610,41 +617,112 @@ func TestRootCoord_DescribeCollection(t *testing.T) {
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
})

t.Run("failed to add task", func(t *testing.T) {
t.Run("failed_to_preexecute", func(t *testing.T) {
c := newTestCore(withHealthyCode(),
withInvalidScheduler())
withInvalidTsoAllocator())

ctx := context.Background()
resp, err := c.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{})
resp, err := c.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_Undefined,
},
})
assert.NoError(t, err)
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
resp, err = c.DescribeCollectionInternal(ctx, &milvuspb.DescribeCollectionRequest{})
resp, err = c.DescribeCollectionInternal(ctx, &milvuspb.DescribeCollectionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_Undefined,
},
})
assert.NoError(t, err)
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
})

t.Run("failed to execute", func(t *testing.T) {
t.Run("failed_to_preexecute", func(t *testing.T) {
c := newTestCore(withHealthyCode(),
withTaskFailScheduler())
withTsoAllocator(alloc))

ctx := context.Background()
resp, err := c.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{})
resp, err := c.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_Undefined,
},
})
assert.NoError(t, err)
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
resp, err = c.DescribeCollectionInternal(ctx, &milvuspb.DescribeCollectionRequest{})
resp, err = c.DescribeCollectionInternal(ctx, &milvuspb.DescribeCollectionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_Undefined,
},
})
assert.NoError(t, err)
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
})

t.Run("execute_fail", func(t *testing.T) {
c := newTestCore(withHealthyCode(),
withTsoAllocator(alloc),
withInvalidMeta())

ctx := context.Background()
resp, err := c.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_Undefined,
},
})
assert.NoError(t, err)
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
resp, err = c.DescribeCollectionInternal(ctx, &milvuspb.DescribeCollectionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_Undefined,
},
})
assert.NoError(t, err)
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
})

t.Run("normal case, everything is ok", func(t *testing.T) {
alias1, alias2 := funcutil.GenRandomStr(), funcutil.GenRandomStr()
meta := mockrootcoord.NewIMetaTable(t)
meta.On("GetCollectionByID",
mock.Anything,
mock.Anything,
mock.Anything,
mock.Anything,
mock.Anything,
).Return(&model.Collection{
CollectionID: 1,
Name: "test coll",
DBID: 1,
}, nil)
meta.On("ListAliasesByID",
mock.Anything,
mock.Anything,
).Return([]string{alias1, alias2})
meta.EXPECT().GetDatabaseByID(mock.Anything, mock.Anything, mock.Anything).Return(&model.Database{
ID: 1,
Name: "test db",
}, nil)

c := newTestCore(withHealthyCode(),
withValidScheduler())
withTsoAllocator(alloc),
withMeta(meta))

ctx := context.Background()
resp, err := c.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{})
resp, err := c.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_DescribeCollection,
},
CollectionID: 1,
})
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
resp, err = c.DescribeCollectionInternal(ctx, &milvuspb.DescribeCollectionRequest{})
resp, err = c.DescribeCollectionInternal(ctx, &milvuspb.DescribeCollectionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_DescribeCollection,
},
CollectionID: 1,
})
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
})
Expand Down Expand Up @@ -699,16 +777,6 @@ func TestRootCoord_ShowCollections(t *testing.T) {
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
})

t.Run("failed to add task", func(t *testing.T) {
c := newTestCore(withHealthyCode(),
withInvalidScheduler())

ctx := context.Background()
resp, err := c.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{})
assert.NoError(t, err)
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
})

t.Run("failed to execute", func(t *testing.T) {
c := newTestCore(withHealthyCode(),
withTaskFailScheduler())
Expand Down

0 comments on commit 6c1e22f

Please sign in to comment.