diff --git a/internal/datacoord/index_service.go b/internal/datacoord/index_service.go index 9421e8e83a67a..f24e6e88fb59b 100644 --- a/internal/datacoord/index_service.go +++ b/internal/datacoord/index_service.go @@ -26,6 +26,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus/internal/metastore/model" + "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/indexpb" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/metrics" @@ -81,6 +82,10 @@ func (s *Server) createIndexForSegment(segment *SegmentInfo, indexID UniqueID) e } func (s *Server) createIndexesForSegment(segment *SegmentInfo) error { + if segment.GetLevel() == datapb.SegmentLevel_L0 { + log.Warn("segment is level zero, skip create indexes", zap.Int64("segmentID", segment.GetID())) + return nil + } indexes := s.meta.indexMeta.GetIndexesForCollection(segment.CollectionID, "") indexIDToSegIndexes := s.meta.indexMeta.GetSegmentIndexes(segment.CollectionID, segment.ID) for _, index := range indexes { @@ -436,7 +441,7 @@ func (s *Server) GetIndexState(ctx context.Context, req *indexpb.GetIndexStateRe indexInfo := &indexpb.IndexInfo{} // The total rows of all indexes should be based on the current perspective segments := s.selectSegmentIndexesStats(WithCollection(req.GetCollectionID()), SegmentFilterFunc(func(info *SegmentInfo) bool { - return (isFlush(info) || info.GetState() == commonpb.SegmentState_Dropped) + return (isFlush(info) || info.GetState() == commonpb.SegmentState_Dropped) && info.GetLevel() != datapb.SegmentLevel_L0 })) s.completeIndexInfo(indexInfo, indexes[0], segments, false, indexes[0].CreateTime) @@ -687,7 +692,7 @@ func (s *Server) GetIndexBuildProgress(ctx context.Context, req *indexpb.GetInde // The total rows of all indexes should be based on the current perspective segments := s.selectSegmentIndexesStats(WithCollection(req.GetCollectionID()), SegmentFilterFunc(func(info *SegmentInfo) bool { - return (isFlush(info) || info.GetState() == commonpb.SegmentState_Dropped) + return (isFlush(info) || info.GetState() == commonpb.SegmentState_Dropped) && info.GetLevel() != datapb.SegmentLevel_L0 })) s.completeIndexInfo(indexInfo, indexes[0], segments, false, indexes[0].CreateTime) @@ -737,7 +742,7 @@ func (s *Server) DescribeIndex(ctx context.Context, req *indexpb.DescribeIndexRe // The total rows of all indexes should be based on the current perspective segments := s.selectSegmentIndexesStats(WithCollection(req.GetCollectionID()), SegmentFilterFunc(func(info *SegmentInfo) bool { - return isFlush(info) || info.GetState() == commonpb.SegmentState_Dropped + return (isFlush(info) || info.GetState() == commonpb.SegmentState_Dropped) && info.GetLevel() != datapb.SegmentLevel_L0 })) indexInfos := make([]*indexpb.IndexInfo, 0) @@ -795,7 +800,7 @@ func (s *Server) GetIndexStatistics(ctx context.Context, req *indexpb.GetIndexSt // The total rows of all indexes should be based on the current perspective segments := s.selectSegmentIndexesStats(WithCollection(req.GetCollectionID()), SegmentFilterFunc(func(info *SegmentInfo) bool { - return (isFlush(info) || info.GetState() == commonpb.SegmentState_Dropped) + return (isFlush(info) || info.GetState() == commonpb.SegmentState_Dropped) && info.GetLevel() != datapb.SegmentLevel_L0 })) indexInfos := make([]*indexpb.IndexInfo, 0) diff --git a/internal/datacoord/task_scheduler.go b/internal/datacoord/task_scheduler.go index 2cb29f4841733..b08e257709e68 100644 --- a/internal/datacoord/task_scheduler.go +++ b/internal/datacoord/task_scheduler.go @@ -211,7 +211,7 @@ func (s *taskScheduler) run() { ok := s.process(taskID) if !ok { s.taskLock.Unlock(taskID) - log.Ctx(s.ctx).Info("there is no idle indexing node, wait a minute...") + log.Ctx(s.ctx).Info("there is no idle indexing node, waiting for retry...") break } s.taskLock.Unlock(taskID)