Skip to content

Commit

Permalink
Fix duplicated segment id in bulkinsert task state
Browse files Browse the repository at this point in the history
Signed-off-by: yhmo <[email protected]>
  • Loading branch information
yhmo committed Oct 26, 2023
1 parent 8b45345 commit 6ca0d45
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 6 deletions.
24 changes: 18 additions & 6 deletions internal/rootcoord/import_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -586,12 +586,7 @@ func (m *importManager) updateTaskInfo(ir *rootcoordpb.ImportResult) (*datapb.Im
// Meta persist should be done before memory objs change.
toPersistImportTaskInfo = cloneImportTaskInfo(v)
toPersistImportTaskInfo.State.StateCode = ir.GetState()
// if is started state, append the new created segment id
if v.GetState().GetStateCode() == commonpb.ImportState_ImportStarted {
toPersistImportTaskInfo.State.Segments = append(toPersistImportTaskInfo.State.Segments, ir.GetSegments()...)
} else {
toPersistImportTaskInfo.State.Segments = ir.GetSegments()
}
toPersistImportTaskInfo.State.Segments = mergeArray(toPersistImportTaskInfo.State.Segments, ir.GetSegments())
toPersistImportTaskInfo.State.RowCount = ir.GetRowCount()
toPersistImportTaskInfo.State.RowIds = ir.GetAutoIds()
for _, kv := range ir.GetInfos() {
Expand Down Expand Up @@ -1087,3 +1082,20 @@ func cloneImportTaskInfo(taskInfo *datapb.ImportTaskInfo) *datapb.ImportTaskInfo
}
return cloned
}

func mergeArray(arr1 []int64, arr2 []int64) []int64 {
reduce := make(map[int64]int)
doReduce := func(arr []int64) {
for _, v := range arr {
reduce[v] = 1
}
}
doReduce(arr1)
doReduce(arr2)

result := make([]int64, 0, len(reduce))
for k := range reduce {
result = append(result, k)
}
return result
}
31 changes: 31 additions & 0 deletions internal/rootcoord/import_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package rootcoord

import (
"context"
"sort"
"strings"
"sync"
"testing"
Expand Down Expand Up @@ -1101,3 +1102,33 @@ func TestImportManager_isRowbased(t *testing.T) {
assert.NoError(t, err)
assert.False(t, rb)
}

func TestImportManager_mergeArray(t *testing.T) {
converter := func(arr []int64) []int {
res := make([]int, 0, len(arr))
for _, v := range arr {
res = append(res, int(v))
}
sort.Ints(res)
return res
}

arr1 := []int64{1, 2, 3}
arr2 := []int64{2, 4, 6}
res := converter(mergeArray(arr1, arr2))
assert.Equal(t, []int{1, 2, 3, 4, 6}, res)

res = converter(mergeArray(arr1, nil))
assert.Equal(t, []int{1, 2, 3}, res)

res = converter(mergeArray(nil, arr2))
assert.Equal(t, []int{2, 4, 6}, res)

res = converter(mergeArray(nil, nil))
assert.Equal(t, []int{}, res)

arr1 = []int64{1, 2, 3}
arr2 = []int64{6, 5, 4}
res = converter(mergeArray(arr1, arr2))
assert.Equal(t, []int{1, 2, 3, 4, 5, 6}, res)
}

0 comments on commit 6ca0d45

Please sign in to comment.