diff --git a/internal/datacoord/meta.go b/internal/datacoord/meta.go index 5fdb4dee3ea9a..536d800680509 100644 --- a/internal/datacoord/meta.go +++ b/internal/datacoord/meta.go @@ -1401,7 +1401,7 @@ func (m *meta) completeClusterCompactionMutation(t *datapb.CompactionTask, resul zap.String("channel", t.GetChannel())) metricMutation := &segMetricMutation{stateChange: make(map[string]map[string]int)} - compactFromSegIDs := make([]int64, 0) + compactFromSegIDs := t.GetInputSegments() compactToSegIDs := make([]int64, 0) compactToSegInfos := make([]*SegmentInfo, 0) var ( diff --git a/internal/datacoord/meta_test.go b/internal/datacoord/meta_test.go index 4545e5d31bb7a..c56a14dbda9f0 100644 --- a/internal/datacoord/meta_test.go +++ b/internal/datacoord/meta_test.go @@ -418,6 +418,73 @@ func (suite *MetaBasicSuite) TestCompleteCompactionMutationForL2Single() { assert.Equal(suite.T(), datapb.SegmentLevel_L1, seg2.GetLevel()) } +func (suite *MetaBasicSuite) TestCompleteCompactionMutationForClustering() { + latestSegments := NewSegmentsInfo() + for segID, segment := range map[UniqueID]*SegmentInfo{ + 1: {SegmentInfo: &datapb.SegmentInfo{ + ID: 1, + CollectionID: 100, + PartitionID: 10, + State: commonpb.SegmentState_Flushed, + Level: datapb.SegmentLevel_L1, + Binlogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 10000, 10001)}, + Statslogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 20000, 20001)}, + // latest segment has 2 deltalogs, one submit for compaction, one is appended before compaction done + Deltalogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 30000), getFieldBinlogIDs(0, 30001)}, + NumOfRows: 2, + PartitionStatsVersion: int64(10001), + }}, + 2: {SegmentInfo: &datapb.SegmentInfo{ + ID: 2, + CollectionID: 100, + PartitionID: 10, + State: commonpb.SegmentState_Flushed, + Level: datapb.SegmentLevel_L1, + Binlogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 11000)}, + Statslogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 21000)}, + // latest segment has 2 deltalogs, one submit for compaction, one is appended before compaction done + Deltalogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 31000), getFieldBinlogIDs(0, 31001)}, + NumOfRows: 2, + PartitionStatsVersion: int64(10001), + }}, + } { + latestSegments.SetSegment(segID, segment) + } + + mockChMgr := mocks.NewChunkManager(suite.T()) + m := &meta{ + catalog: &datacoord.Catalog{MetaKv: NewMetaMemoryKV()}, + segments: latestSegments, + chunkManager: mockChMgr, + } + + compactToSeg := &datapb.CompactionSegment{ + SegmentID: 3, + InsertLogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 50000)}, + Field2StatslogPaths: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 50001)}, + NumOfRows: 2, + } + + result := &datapb.CompactionPlanResult{ + Segments: []*datapb.CompactionSegment{compactToSeg}, + } + task := &datapb.CompactionTask{ + InputSegments: []UniqueID{1, 2}, + Type: datapb.CompactionType_ClusteringCompaction, + } + + infos, _, err := m.CompleteCompactionMutation(task, result) + suite.NoError(err) + suite.Equal(1, len(infos)) + seg1 := m.GetSegment(1) + seg2 := m.GetSegment(2) + suite.Equal(datapb.SegmentLevel_L1, seg1.GetLevel()) + suite.Equal(datapb.SegmentLevel_L1, seg2.GetLevel()) + seg3 := m.GetSegment(3) + suite.Equal(datapb.SegmentLevel_L2, seg3.GetLevel()) + suite.ElementsMatch([]int64{1, 2}, seg3.GetCompactionFrom()) +} + func (suite *MetaBasicSuite) TestSetSegment() { meta := suite.meta catalog := mocks2.NewDataCoordCatalog(suite.T())