Skip to content

Commit

Permalink
fix:[2.4]Set the correct compactionFroms for clustering segments (#38376
Browse files Browse the repository at this point in the history
)

issue: #38373 
master pr: #36799 
This bug was introduced by PR #37653 .

Signed-off-by: Cai Zhang <[email protected]>
  • Loading branch information
xiaocai2333 authored Dec 11, 2024
1 parent b2a8b9c commit dde9d6c
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 1 deletion.
2 changes: 1 addition & 1 deletion internal/datacoord/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -1401,7 +1401,7 @@ func (m *meta) completeClusterCompactionMutation(t *datapb.CompactionTask, resul
zap.String("channel", t.GetChannel()))

metricMutation := &segMetricMutation{stateChange: make(map[string]map[string]int)}
compactFromSegIDs := make([]int64, 0)
compactFromSegIDs := t.GetInputSegments()
compactToSegIDs := make([]int64, 0)
compactToSegInfos := make([]*SegmentInfo, 0)
var (
Expand Down
67 changes: 67 additions & 0 deletions internal/datacoord/meta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,73 @@ func (suite *MetaBasicSuite) TestCompleteCompactionMutationForL2Single() {
assert.Equal(suite.T(), datapb.SegmentLevel_L1, seg2.GetLevel())
}

func (suite *MetaBasicSuite) TestCompleteCompactionMutationForClustering() {
latestSegments := NewSegmentsInfo()
for segID, segment := range map[UniqueID]*SegmentInfo{
1: {SegmentInfo: &datapb.SegmentInfo{
ID: 1,
CollectionID: 100,
PartitionID: 10,
State: commonpb.SegmentState_Flushed,
Level: datapb.SegmentLevel_L1,
Binlogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 10000, 10001)},
Statslogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 20000, 20001)},
// latest segment has 2 deltalogs, one submit for compaction, one is appended before compaction done
Deltalogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 30000), getFieldBinlogIDs(0, 30001)},
NumOfRows: 2,
PartitionStatsVersion: int64(10001),
}},
2: {SegmentInfo: &datapb.SegmentInfo{
ID: 2,
CollectionID: 100,
PartitionID: 10,
State: commonpb.SegmentState_Flushed,
Level: datapb.SegmentLevel_L1,
Binlogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 11000)},
Statslogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 21000)},
// latest segment has 2 deltalogs, one submit for compaction, one is appended before compaction done
Deltalogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 31000), getFieldBinlogIDs(0, 31001)},
NumOfRows: 2,
PartitionStatsVersion: int64(10001),
}},
} {
latestSegments.SetSegment(segID, segment)
}

mockChMgr := mocks.NewChunkManager(suite.T())
m := &meta{
catalog: &datacoord.Catalog{MetaKv: NewMetaMemoryKV()},
segments: latestSegments,
chunkManager: mockChMgr,
}

compactToSeg := &datapb.CompactionSegment{
SegmentID: 3,
InsertLogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 50000)},
Field2StatslogPaths: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 50001)},
NumOfRows: 2,
}

result := &datapb.CompactionPlanResult{
Segments: []*datapb.CompactionSegment{compactToSeg},
}
task := &datapb.CompactionTask{
InputSegments: []UniqueID{1, 2},
Type: datapb.CompactionType_ClusteringCompaction,
}

infos, _, err := m.CompleteCompactionMutation(task, result)
suite.NoError(err)
suite.Equal(1, len(infos))
seg1 := m.GetSegment(1)
seg2 := m.GetSegment(2)
suite.Equal(datapb.SegmentLevel_L1, seg1.GetLevel())
suite.Equal(datapb.SegmentLevel_L1, seg2.GetLevel())
seg3 := m.GetSegment(3)
suite.Equal(datapb.SegmentLevel_L2, seg3.GetLevel())
suite.ElementsMatch([]int64{1, 2}, seg3.GetCompactionFrom())
}

func (suite *MetaBasicSuite) TestSetSegment() {
meta := suite.meta
catalog := mocks2.NewDataCoordCatalog(suite.T())
Expand Down

0 comments on commit dde9d6c

Please sign in to comment.