Skip to content

Commit

Permalink
Fix bulkinsert bug that segments are compacted after import
Browse files Browse the repository at this point in the history
Signed-off-by: yhmo <[email protected]>
  • Loading branch information
yhmo committed Nov 7, 2023
1 parent 765aaf3 commit 244205b
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 30 deletions.
4 changes: 2 additions & 2 deletions internal/datanode/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -499,8 +499,8 @@ func (node *DataNode) Import(ctx context.Context, req *datapb.ImportTaskRequest)

// parse files and generate segments
segmentSize := Params.DataCoordCfg.SegmentMaxSize.GetAsInt64() * 1024 * 1024
importWrapper := importutil.NewImportWrapper(newCtx, collectionInfo, segmentSize, node.allocator.GetIDAlloactor(),
node.chunkManager, importResult, reportFunc)
importWrapper := importutil.NewImportWrapper(newCtx, collectionInfo, segmentSize, Params.DataNodeCfg.BinLogMaxSize.GetAsInt64(),
node.allocator.GetIDAlloactor(), node.chunkManager, importResult, reportFunc)
importWrapper.SetCallbackFunctions(assignSegmentFunc(node, req),
createBinLogsFunc(node, req, colInfo.GetSchema(), ts),
saveSegmentFunc(node, req, importResult, ts))
Expand Down
21 changes: 15 additions & 6 deletions internal/util/importutil/import_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ const (
JSONFileExt = ".json"
NumpyFileExt = ".npy"

// supposed size of a single block, to control a binlog file size, the max biglog file size is no more than 2*SingleBlockSize
SingleBlockSize = 16 * 1024 * 1024 // 16MB
// parsers read JSON/Numpy/CSV files buffer by buffer, this limitation is to define the buffer size.
ReadBufferSize = 16 * 1024 * 1024 // 16MB

// this limitation is to avoid this OOM risk:
// simetimes system segment max size is a large number, a single segment fields data might cause OOM.
Expand Down Expand Up @@ -92,6 +92,7 @@ type ImportWrapper struct {
cancel context.CancelFunc // for canceling parse process
collectionInfo *CollectionInfo // collection details including schema
segmentSize int64 // maximum size of a segment(unit:byte) defined by dataCoord.segment.maxSize (milvus.yml)
binlogSize int64 // average binlog size(unit:byte), the max biglog file size is no more than 2*binlogSize
rowIDAllocator *allocator.IDAllocator // autoid allocator
chunkManager storage.ChunkManager

Expand All @@ -107,7 +108,7 @@ type ImportWrapper struct {
progressPercent int64 // working progress percent
}

func NewImportWrapper(ctx context.Context, collectionInfo *CollectionInfo, segmentSize int64,
func NewImportWrapper(ctx context.Context, collectionInfo *CollectionInfo, segmentSize int64, maxBinlogSize int64,
idAlloc *allocator.IDAllocator, cm storage.ChunkManager, importResult *rootcoordpb.ImportResult,
reportFunc func(res *rootcoordpb.ImportResult) error,
) *ImportWrapper {
Expand All @@ -120,11 +121,19 @@ func NewImportWrapper(ctx context.Context, collectionInfo *CollectionInfo, segme

ctx, cancel := context.WithCancel(ctx)

// average binlogSize is expected to be half of the maxBinlogSize
// and avoid binlogSize to be a tiny value
binlogSize := int64(float32(maxBinlogSize) * 0.5)
if binlogSize < ReadBufferSize {
binlogSize = ReadBufferSize
}

wrapper := &ImportWrapper{
ctx: ctx,
cancel: cancel,
collectionInfo: collectionInfo,
segmentSize: segmentSize,
binlogSize: binlogSize,
rowIDAllocator: idAlloc,
chunkManager: cm,
importResult: importResult,
Expand Down Expand Up @@ -282,7 +291,7 @@ func (p *ImportWrapper) Import(filePaths []string, options ImportOptions) error
printFieldsDataInfo(fields, "import wrapper: prepare to flush binlog data", filePaths)
return p.flushFunc(fields, shardID, partitionID)
}
parser, err := NewNumpyParser(p.ctx, p.collectionInfo, p.rowIDAllocator, SingleBlockSize,
parser, err := NewNumpyParser(p.ctx, p.collectionInfo, p.rowIDAllocator, p.binlogSize,
p.chunkManager, flushFunc, p.updateProgressPercent)
if err != nil {
return err
Expand Down Expand Up @@ -384,7 +393,7 @@ func (p *ImportWrapper) doBinlogImport(filePaths []string, tsStartPoint uint64,
printFieldsDataInfo(fields, "import wrapper: prepare to flush binlog data", filePaths)
return p.flushFunc(fields, shardID, partitionID)
}
parser, err := NewBinlogParser(p.ctx, p.collectionInfo, SingleBlockSize,
parser, err := NewBinlogParser(p.ctx, p.collectionInfo, p.binlogSize,
p.chunkManager, flushFunc, p.updateProgressPercent, tsStartPoint, tsEndPoint)
if err != nil {
return err
Expand Down Expand Up @@ -433,7 +442,7 @@ func (p *ImportWrapper) parseRowBasedJSON(filePath string, onlyValidate bool) er
}
}

consumer, err := NewJSONRowConsumer(p.ctx, p.collectionInfo, p.rowIDAllocator, SingleBlockSize, flushFunc)
consumer, err := NewJSONRowConsumer(p.ctx, p.collectionInfo, p.rowIDAllocator, p.binlogSize, flushFunc)
if err != nil {
return err
}
Expand Down
38 changes: 19 additions & 19 deletions internal/util/importutil/import_wrapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ func Test_ImportWrapperNew(t *testing.T) {
ctx := context.Background()
cm, err := f.NewPersistentStorageChunkManager(ctx)
assert.NoError(t, err)
wrapper := NewImportWrapper(ctx, nil, 1, nil, cm, nil, nil)
wrapper := NewImportWrapper(ctx, nil, 1, ReadBufferSize, nil, cm, nil, nil)
assert.Nil(t, wrapper)

schema := &schemapb.CollectionSchema{
Expand All @@ -205,7 +205,7 @@ func Test_ImportWrapperNew(t *testing.T) {
})
collectionInfo, err := NewCollectionInfo(schema, 2, []int64{1})
assert.NoError(t, err)
wrapper = NewImportWrapper(ctx, collectionInfo, 1, nil, cm, nil, nil)
wrapper = NewImportWrapper(ctx, collectionInfo, 1, ReadBufferSize, nil, cm, nil, nil)
assert.NotNil(t, wrapper)

assignSegFunc := func(shardID int, partID int64) (int64, string, error) {
Expand Down Expand Up @@ -282,7 +282,7 @@ func Test_ImportWrapperRowBased(t *testing.T) {
assert.NoError(t, err)

t.Run("success case", func(t *testing.T) {
wrapper := NewImportWrapper(ctx, collectionInfo, 1, idAllocator, cm, importResult, reportFunc)
wrapper := NewImportWrapper(ctx, collectionInfo, 1, ReadBufferSize, idAllocator, cm, importResult, reportFunc)
wrapper.SetCallbackFunctions(assignSegmentFunc, flushFunc, saveSegmentFunc)
files := make([]string, 0)
files = append(files, filePath)
Expand All @@ -308,7 +308,7 @@ func Test_ImportWrapperRowBased(t *testing.T) {
assert.NoError(t, err)

importResult.State = commonpb.ImportState_ImportStarted
wrapper := NewImportWrapper(ctx, collectionInfo, 1, idAllocator, cm, importResult, reportFunc)
wrapper := NewImportWrapper(ctx, collectionInfo, 1, ReadBufferSize, idAllocator, cm, importResult, reportFunc)
wrapper.SetCallbackFunctions(assignSegmentFunc, flushFunc, saveSegmentFunc)
files := make([]string, 0)
files = append(files, filePath)
Expand All @@ -320,7 +320,7 @@ func Test_ImportWrapperRowBased(t *testing.T) {
t.Run("file doesn't exist", func(t *testing.T) {
files := make([]string, 0)
files = append(files, "/dummy/dummy.json")
wrapper := NewImportWrapper(ctx, collectionInfo, 1, idAllocator, cm, importResult, reportFunc)
wrapper := NewImportWrapper(ctx, collectionInfo, 1, ReadBufferSize, idAllocator, cm, importResult, reportFunc)
err = wrapper.Import(files, ImportOptions{OnlyValidate: true})
assert.Error(t, err)
})
Expand Down Expand Up @@ -362,7 +362,7 @@ func Test_ImportWrapperColumnBased_numpy(t *testing.T) {
files := createSampleNumpyFiles(t, cm)

t.Run("success case", func(t *testing.T) {
wrapper := NewImportWrapper(ctx, collectionInfo, 1, idAllocator, cm, importResult, reportFunc)
wrapper := NewImportWrapper(ctx, collectionInfo, 1, ReadBufferSize, idAllocator, cm, importResult, reportFunc)
wrapper.SetCallbackFunctions(assignSegmentFunc, flushFunc, saveSegmentFunc)

err = wrapper.Import(files, DefaultImportOptions())
Expand All @@ -380,7 +380,7 @@ func Test_ImportWrapperColumnBased_numpy(t *testing.T) {
files[1] = filePath

importResult.State = commonpb.ImportState_ImportStarted
wrapper := NewImportWrapper(ctx, collectionInfo, 1, idAllocator, cm, importResult, reportFunc)
wrapper := NewImportWrapper(ctx, collectionInfo, 1, ReadBufferSize, idAllocator, cm, importResult, reportFunc)
wrapper.SetCallbackFunctions(assignSegmentFunc, flushFunc, saveSegmentFunc)

err = wrapper.Import(files, DefaultImportOptions())
Expand All @@ -391,7 +391,7 @@ func Test_ImportWrapperColumnBased_numpy(t *testing.T) {
t.Run("file doesn't exist", func(t *testing.T) {
files := make([]string, 0)
files = append(files, "/dummy/dummy.npy")
wrapper := NewImportWrapper(ctx, collectionInfo, 1, idAllocator, cm, importResult, reportFunc)
wrapper := NewImportWrapper(ctx, collectionInfo, 1, ReadBufferSize, idAllocator, cm, importResult, reportFunc)
err = wrapper.Import(files, DefaultImportOptions())
assert.Error(t, err)
})
Expand Down Expand Up @@ -511,7 +511,7 @@ func Test_ImportWrapperRowBased_perf(t *testing.T) {
}
collectionInfo, err := NewCollectionInfo(schema, int32(shardNum), []int64{1})
assert.NoError(t, err)
wrapper := NewImportWrapper(ctx, collectionInfo, int64(segmentSize), idAllocator, cm, importResult, reportFunc)
wrapper := NewImportWrapper(ctx, collectionInfo, int64(segmentSize), ReadBufferSize, idAllocator, cm, importResult, reportFunc)
wrapper.SetCallbackFunctions(assignSegmentFunc, flushFunc, saveSegmentFunc)

files := make([]string, 0)
Expand Down Expand Up @@ -555,7 +555,7 @@ func Test_ImportWrapperFileValidation(t *testing.T) {

collectionInfo, err := NewCollectionInfo(schema, int32(shardNum), []int64{1})
assert.NoError(t, err)
wrapper := NewImportWrapper(ctx, collectionInfo, int64(segmentSize), idAllocator, cm, nil, nil)
wrapper := NewImportWrapper(ctx, collectionInfo, int64(segmentSize), ReadBufferSize, idAllocator, cm, nil, nil)

t.Run("unsupported file type", func(t *testing.T) {
files := []string{"uid.txt"}
Expand Down Expand Up @@ -605,7 +605,7 @@ func Test_ImportWrapperFileValidation(t *testing.T) {
t.Run("empty file list", func(t *testing.T) {
files := []string{}
cm.size = 0
wrapper = NewImportWrapper(ctx, collectionInfo, int64(segmentSize), idAllocator, cm, nil, nil)
wrapper = NewImportWrapper(ctx, collectionInfo, int64(segmentSize), ReadBufferSize, idAllocator, cm, nil, nil)
rowBased, err := wrapper.fileValidation(files)
assert.NoError(t, err)
assert.False(t, rowBased)
Expand All @@ -614,7 +614,7 @@ func Test_ImportWrapperFileValidation(t *testing.T) {
t.Run("file size exceed MaxFileSize limit", func(t *testing.T) {
files := []string{"a/1.json"}
cm.size = params.Params.CommonCfg.ImportMaxFileSize.GetAsInt64() + 1
wrapper = NewImportWrapper(ctx, collectionInfo, int64(segmentSize), idAllocator, cm, nil, nil)
wrapper = NewImportWrapper(ctx, collectionInfo, int64(segmentSize), ReadBufferSize, idAllocator, cm, nil, nil)
rowBased, err := wrapper.fileValidation(files)
assert.Error(t, err)
assert.True(t, rowBased)
Expand Down Expand Up @@ -685,7 +685,7 @@ func Test_ImportWrapperReportFailRowBased(t *testing.T) {
}
collectionInfo, err := NewCollectionInfo(sampleSchema(), 2, []int64{1})
assert.NoError(t, err)
wrapper := NewImportWrapper(ctx, collectionInfo, 1, idAllocator, cm, importResult, reportFunc)
wrapper := NewImportWrapper(ctx, collectionInfo, 1, ReadBufferSize, idAllocator, cm, importResult, reportFunc)
wrapper.SetCallbackFunctions(assignSegmentFunc, flushFunc, saveSegmentFunc)

files := []string{filePath}
Expand Down Expand Up @@ -732,7 +732,7 @@ func Test_ImportWrapperReportFailColumnBased_numpy(t *testing.T) {
}
collectionInfo, err := NewCollectionInfo(sampleSchema(), 2, []int64{1})
assert.NoError(t, err)
wrapper := NewImportWrapper(ctx, collectionInfo, 1, idAllocator, cm, importResult, reportFunc)
wrapper := NewImportWrapper(ctx, collectionInfo, 1, ReadBufferSize, idAllocator, cm, importResult, reportFunc)
wrapper.SetCallbackFunctions(assignSegmentFunc, flushFunc, saveSegmentFunc)

wrapper.reportImportAttempts = 2
Expand Down Expand Up @@ -767,7 +767,7 @@ func Test_ImportWrapperIsBinlogImport(t *testing.T) {

collectionInfo, err := NewCollectionInfo(schema, int32(shardNum), []int64{1})
assert.NoError(t, err)
wrapper := NewImportWrapper(ctx, collectionInfo, int64(segmentSize), idAllocator, cm, nil, nil)
wrapper := NewImportWrapper(ctx, collectionInfo, int64(segmentSize), ReadBufferSize, idAllocator, cm, nil, nil)

// empty paths
paths := []string{}
Expand Down Expand Up @@ -831,7 +831,7 @@ func Test_ImportWrapperDoBinlogImport(t *testing.T) {

collectionInfo, err := NewCollectionInfo(schema, int32(shardNum), []int64{1})
assert.NoError(t, err)
wrapper := NewImportWrapper(ctx, collectionInfo, int64(segmentSize), idAllocator, cm, nil, nil)
wrapper := NewImportWrapper(ctx, collectionInfo, int64(segmentSize), ReadBufferSize, idAllocator, cm, nil, nil)
paths := []string{
"/tmp",
"/tmp",
Expand Down Expand Up @@ -894,7 +894,7 @@ func Test_ImportWrapperReportPersisted(t *testing.T) {
}
collectionInfo, err := NewCollectionInfo(sampleSchema(), 2, []int64{1})
assert.NoError(t, err)
wrapper := NewImportWrapper(ctx, collectionInfo, int64(1024), nil, nil, importResult, reportFunc)
wrapper := NewImportWrapper(ctx, collectionInfo, int64(1024), ReadBufferSize, nil, nil, importResult, reportFunc)
assert.NotNil(t, wrapper)

rowCounter := &rowCounterTest{}
Expand Down Expand Up @@ -937,7 +937,7 @@ func Test_ImportWrapperUpdateProgressPercent(t *testing.T) {

collectionInfo, err := NewCollectionInfo(sampleSchema(), 2, []int64{1})
assert.NoError(t, err)
wrapper := NewImportWrapper(ctx, collectionInfo, 1, nil, nil, nil, nil)
wrapper := NewImportWrapper(ctx, collectionInfo, 1, ReadBufferSize, nil, nil, nil, nil)
assert.NotNil(t, wrapper)
assert.Equal(t, int64(0), wrapper.progressPercent)

Expand Down Expand Up @@ -976,7 +976,7 @@ func Test_ImportWrapperFlushFunc(t *testing.T) {
schema := sampleSchema()
collectionInfo, err := NewCollectionInfo(schema, 2, []int64{1})
assert.NoError(t, err)
wrapper := NewImportWrapper(ctx, collectionInfo, 1, nil, nil, importResult, reportFunc)
wrapper := NewImportWrapper(ctx, collectionInfo, 1, ReadBufferSize, nil, nil, importResult, reportFunc)
assert.NotNil(t, wrapper)
wrapper.SetCallbackFunctions(assignSegmentFunc, flushFunc, saveSegmentFunc)

Expand Down
2 changes: 1 addition & 1 deletion internal/util/importutil/json_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func adjustBufSize(parser *JSONParser, collectionSchema *schemapb.CollectionSche
// for low dimensional vector, the bufSize is a large value, read more rows each time
bufRowCount := parser.bufRowCount
for {
if bufRowCount*sizePerRecord > SingleBlockSize {
if bufRowCount*sizePerRecord > ReadBufferSize {
bufRowCount--
} else {
break
Expand Down
4 changes: 2 additions & 2 deletions internal/util/importutil/numpy_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -563,9 +563,9 @@ func (n *NumpyAdapter) ReadString(count int) ([]string, error) {
// read string one by one is not efficient, here we read strings batch by batch, each bach size is no more than 16MB
batchRead := 1 // rows of each batch, make sure this value is equal or greater than 1
if utf {
batchRead += SingleBlockSize / (utf8.UTFMax * maxLen)
batchRead += ReadBufferSize / (utf8.UTFMax * maxLen)
} else {
batchRead += SingleBlockSize / maxLen
batchRead += ReadBufferSize / maxLen
}

log.Info("Numpy adapter: prepare to read varchar batch by batch",
Expand Down

0 comments on commit 244205b

Please sign in to comment.