Skip to content

Commit

Permalink
enhance: [2.4]Skip create index for l0 segment (#38335)
Browse files Browse the repository at this point in the history
master pr: #38334

Signed-off-by: Cai Zhang <[email protected]>
  • Loading branch information
xiaocai2333 authored Dec 11, 2024
1 parent d8753a1 commit e843a46
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 5 deletions.
13 changes: 9 additions & 4 deletions internal/datacoord/index_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus/internal/metastore/model"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
Expand Down Expand Up @@ -81,6 +82,10 @@ func (s *Server) createIndexForSegment(segment *SegmentInfo, indexID UniqueID) e
}

func (s *Server) createIndexesForSegment(segment *SegmentInfo) error {
if segment.GetLevel() == datapb.SegmentLevel_L0 {
log.Warn("segment is level zero, skip create indexes", zap.Int64("segmentID", segment.GetID()))
return nil
}
indexes := s.meta.indexMeta.GetIndexesForCollection(segment.CollectionID, "")
indexIDToSegIndexes := s.meta.indexMeta.GetSegmentIndexes(segment.CollectionID, segment.ID)
for _, index := range indexes {
Expand Down Expand Up @@ -436,7 +441,7 @@ func (s *Server) GetIndexState(ctx context.Context, req *indexpb.GetIndexStateRe
indexInfo := &indexpb.IndexInfo{}
// The total rows of all indexes should be based on the current perspective
segments := s.selectSegmentIndexesStats(WithCollection(req.GetCollectionID()), SegmentFilterFunc(func(info *SegmentInfo) bool {
return (isFlush(info) || info.GetState() == commonpb.SegmentState_Dropped)
return (isFlush(info) || info.GetState() == commonpb.SegmentState_Dropped) && info.GetLevel() != datapb.SegmentLevel_L0
}))

s.completeIndexInfo(indexInfo, indexes[0], segments, false, indexes[0].CreateTime)
Expand Down Expand Up @@ -687,7 +692,7 @@ func (s *Server) GetIndexBuildProgress(ctx context.Context, req *indexpb.GetInde

// The total rows of all indexes should be based on the current perspective
segments := s.selectSegmentIndexesStats(WithCollection(req.GetCollectionID()), SegmentFilterFunc(func(info *SegmentInfo) bool {
return (isFlush(info) || info.GetState() == commonpb.SegmentState_Dropped)
return (isFlush(info) || info.GetState() == commonpb.SegmentState_Dropped) && info.GetLevel() != datapb.SegmentLevel_L0
}))

s.completeIndexInfo(indexInfo, indexes[0], segments, false, indexes[0].CreateTime)
Expand Down Expand Up @@ -737,7 +742,7 @@ func (s *Server) DescribeIndex(ctx context.Context, req *indexpb.DescribeIndexRe

// The total rows of all indexes should be based on the current perspective
segments := s.selectSegmentIndexesStats(WithCollection(req.GetCollectionID()), SegmentFilterFunc(func(info *SegmentInfo) bool {
return isFlush(info) || info.GetState() == commonpb.SegmentState_Dropped
return (isFlush(info) || info.GetState() == commonpb.SegmentState_Dropped) && info.GetLevel() != datapb.SegmentLevel_L0
}))

indexInfos := make([]*indexpb.IndexInfo, 0)
Expand Down Expand Up @@ -795,7 +800,7 @@ func (s *Server) GetIndexStatistics(ctx context.Context, req *indexpb.GetIndexSt

// The total rows of all indexes should be based on the current perspective
segments := s.selectSegmentIndexesStats(WithCollection(req.GetCollectionID()), SegmentFilterFunc(func(info *SegmentInfo) bool {
return (isFlush(info) || info.GetState() == commonpb.SegmentState_Dropped)
return (isFlush(info) || info.GetState() == commonpb.SegmentState_Dropped) && info.GetLevel() != datapb.SegmentLevel_L0
}))

indexInfos := make([]*indexpb.IndexInfo, 0)
Expand Down
2 changes: 1 addition & 1 deletion internal/datacoord/task_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ func (s *taskScheduler) run() {
ok := s.process(taskID)
if !ok {
s.taskLock.Unlock(taskID)
log.Ctx(s.ctx).Info("there is no idle indexing node, wait a minute...")
log.Ctx(s.ctx).Info("there is no idle indexing node, waiting for retry...")
break
}
s.taskLock.Unlock(taskID)
Expand Down

0 comments on commit e843a46

Please sign in to comment.