From 7299d94bac069bb16da0fcc77eec01564c32ea4e Mon Sep 17 00:00:00 2001 From: Evgenii Guguchkin Date: Tue, 7 Oct 2025 02:50:15 +0300 Subject: [PATCH 1/3] chore: make existing token extraction code reusable --- frac/active_indexer.go | 5 +- frac/doc_provider.go | 117 ------------------ frac/meta_data_collector.go | 139 +--------------------- fracmanager/fetcher_test.go | 4 +- fracmanager/fracmanager_test.go | 9 +- fracmanager/sealed_frac_cache_test.go | 7 +- fracmanager/sealer_test.go | 7 +- {frac => indexer}/compress.go | 14 +-- {proxy/bulk => indexer}/indexer.go | 14 +-- indexer/meta_data.go | 103 ++++++++++++++++ indexer/metrics.go | 39 ++++++ {proxy/bulk => indexer}/processor.go | 89 ++++++++++---- {proxy/bulk => indexer}/processor_test.go | 2 +- indexer/test_doc_provider.go | 135 +++++++++++++++++++++ proxy/bulk/ingestor.go | 126 ++++---------------- proxy/bulk/ingestor_test.go | 65 +++++----- proxy/bulk/metrics.go | 29 +++++ seq/tokenizer.go | 33 ----- storeapi/grpc_v1_test.go | 7 +- tokenizer/exists_tokenizer.go | 4 +- tokenizer/keyword_tokenizer.go | 5 +- tokenizer/keyword_tokenizer_test.go | 34 +++--- tokenizer/meta_token.go | 43 +++++++ tokenizer/path_tokenizer.go | 7 +- tokenizer/path_tokenizer_test.go | 60 +++++----- tokenizer/text_tokenizer.go | 9 +- tokenizer/text_tokenizer_test.go | 94 +++++++-------- tokenizer/tokenizer.go | 4 +- 28 files changed, 607 insertions(+), 597 deletions(-) delete mode 100644 frac/doc_provider.go rename {frac => indexer}/compress.go (76%) rename {proxy/bulk => indexer}/indexer.go (93%) create mode 100644 indexer/meta_data.go create mode 100644 indexer/metrics.go rename {proxy/bulk => indexer}/processor.go (63%) rename {proxy/bulk => indexer}/processor_test.go (99%) create mode 100644 indexer/test_doc_provider.go create mode 100644 proxy/bulk/metrics.go create mode 100644 tokenizer/meta_token.go diff --git a/frac/active_indexer.go b/frac/active_indexer.go index 0594b247..0422c105 100644 --- a/frac/active_indexer.go +++ b/frac/active_indexer.go @@ -7,6 +7,7 @@ import ( "go.uber.org/zap" "github.com/ozontech/seq-db/bytespool" + "github.com/ozontech/seq-db/indexer" "github.com/ozontech/seq-db/logger" "github.com/ozontech/seq-db/metric" "github.com/ozontech/seq-db/metric/stopwatch" @@ -95,7 +96,7 @@ func (ai *ActiveIndexer) Stop() { var metaDataPool = sync.Pool{ New: func() any { - return new(MetaData) + return new(indexer.MetaData) }, } @@ -121,7 +122,7 @@ func (ai *ActiveIndexer) appendWorker(index int) { collector.Init(blockIndex) parsingMetric := sw.Start("metas_parsing") - meta := metaDataPool.Get().(*MetaData) + meta := metaDataPool.Get().(*indexer.MetaData) for len(metasPayload) > 0 { n := binary.LittleEndian.Uint32(metasPayload) metasPayload = metasPayload[4:] diff --git a/frac/doc_provider.go b/frac/doc_provider.go deleted file mode 100644 index bad513c8..00000000 --- a/frac/doc_provider.go +++ /dev/null @@ -1,117 +0,0 @@ -package frac - -import ( - "encoding/binary" - "math/rand" - "time" - - insaneJSON "github.com/ozontech/insane-json" - - "github.com/ozontech/seq-db/consts" - "github.com/ozontech/seq-db/seq" - "github.com/ozontech/seq-db/storage" -) - -type DocProvider struct { - DocCount int - Docs []byte - Metas []byte - buf []byte -} - -func NewDocProvider() *DocProvider { - return &DocProvider{ - Docs: make([]byte, 0), - buf: make([]byte, 4), - } -} - -func (dp *DocProvider) appendDoc(doc []byte) { - dp.DocCount++ - numBuf := make([]byte, 4) - binary.LittleEndian.PutUint32(numBuf, uint32(len(doc))) - dp.Docs = append(dp.Docs, numBuf...) - dp.Docs = append(dp.Docs, doc...) -} - -func (dp *DocProvider) appendMeta(docLen int, id seq.ID, tokens []seq.Token) { - dp.buf = dp.buf[:4] - dp.buf = encodeMeta(dp.buf, tokens, id, docLen) - binary.LittleEndian.PutUint32(dp.buf, uint32(len(dp.buf)-4)) - - dp.Metas = append(dp.Metas, dp.buf...) -} - -func (dp *DocProvider) Append(doc []byte, docRoot *insaneJSON.Root, id seq.ID, tokens []seq.Token) { - if id.MID == 0 { - // this case runs only in the integration tests - t, _ := ExtractDocTime(docRoot) - id = seq.NewID(t, uint64(rand.Int63())) - } - - dp.appendMeta(len(doc), id, tokens) - dp.appendDoc(doc) -} - -func (dp *DocProvider) TryReset() { - dp.DocCount = 0 - dp.Docs = dp.Docs[:0] - dp.Metas = dp.Metas[:0] - -} - -func (dp *DocProvider) Provide() (storage.DocBlock, storage.DocBlock) { - c := GetDocsMetasCompressor(-1, -1) - c.CompressDocsAndMetas(dp.Docs, dp.Metas) - return c.DocsMetas() -} - -func encodeMeta(buf []byte, tokens []seq.Token, id seq.ID, size int) []byte { - metaTokens := make([]MetaToken, 0, len(tokens)) - for _, t := range tokens { - metaTokens = append(metaTokens, MetaToken{ - Key: t.Field, - Value: t.Val, - }) - } - md := MetaData{ - ID: id, - Size: uint32(size), - Tokens: metaTokens, - } - return md.MarshalBinaryTo(buf) -} - -// extractDocTime extract time from doc by supported fields and return that field -// if fields are absent or values are not parsable, zero time and empty string are returned -func extractDocTime(docRoot *insaneJSON.Root) (time.Time, []string) { - var t time.Time - var err error - for _, field := range consts.TimeFields { - timeNode := docRoot.Dig(field...) - if timeNode == nil { - continue - } - - timeVal := timeNode.AsString() - for _, f := range consts.TimeFormats { - t, err = time.Parse(f, timeVal) - if err == nil { - return t, field - } - } - } - - return t, nil -} - -// ExtractDocTime extracts timestamp from doc -// It searches by one of supported field name and parses by supported formats -// If no field was found or not parsable it returns time.Now() -func ExtractDocTime(docRoot *insaneJSON.Root) (time.Time, []string) { - t, f := extractDocTime(docRoot) - if t.IsZero() { - t = time.Now() - } - return t, f -} diff --git a/frac/meta_data_collector.go b/frac/meta_data_collector.go index 26e16e3d..35e81e03 100644 --- a/frac/meta_data_collector.go +++ b/frac/meta_data_collector.go @@ -1,145 +1,14 @@ package frac import ( - "encoding/binary" - "fmt" "math" - "slices" + "github.com/ozontech/seq-db/indexer" "github.com/ozontech/seq-db/seq" + "github.com/ozontech/seq-db/tokenizer" "github.com/ozontech/seq-db/util" ) -type MetaData struct { - ID seq.ID - // Size of an uncompressed document in bytes. - Size uint32 - Tokens []MetaToken -} - -// String used in tests for human-readable output. -func (m MetaData) String() string { - return fmt.Sprintf("ID: %s, Size: %d, Tokens: %s", m.ID, m.Size, m.Tokens) -} - -const metadataMagic = uint16(0x3F7C) // 2 magic bytes of the binary encoded metadata. - -func IsItBinaryEncodedMetaData(b []byte) bool { - if len(b) < 2 { - return false - } - return binary.LittleEndian.Uint16(b) == metadataMagic -} - -func (m *MetaData) MarshalBinaryTo(b []byte) []byte { - // Append "magic bytes" to determine that this is binary encoded metadata. - b = binary.LittleEndian.AppendUint16(b, metadataMagic) - - // Append current binary version of the metadata. - const version = 1 - b = binary.LittleEndian.AppendUint16(b, version) - - // Encode seq.ID. - b = binary.LittleEndian.AppendUint64(b, uint64(m.ID.MID)) - b = binary.LittleEndian.AppendUint64(b, uint64(m.ID.RID)) - - // Encode BlockLength. - b = binary.LittleEndian.AppendUint32(b, m.Size) - - // Encode tokens. - toksLen := len(m.Tokens) - b = binary.LittleEndian.AppendUint32(b, uint32(toksLen)) - for i := 0; i < toksLen; i++ { - b = m.Tokens[i].MarshalBinaryTo(b) - } - - return b -} - -func (m *MetaData) UnmarshalBinary(b []byte) error { - if !IsItBinaryEncodedMetaData(b) { - return fmt.Errorf("invalid metadata magic bytes") - } - b = b[2:] - - version := binary.LittleEndian.Uint16(b) - b = b[2:] - - switch version { - case 1: - return m.unmarshalVersion1(b) - default: - return fmt.Errorf("unimplemented metadata version: %d", version) - } -} - -func (m *MetaData) unmarshalVersion1(b []byte) error { - // Decode seq.ID. - m.ID.MID = seq.MID(binary.LittleEndian.Uint64(b)) - b = b[8:] - m.ID.RID = seq.RID(binary.LittleEndian.Uint64(b)) - b = b[8:] - - // Decode uncompressed document size. - m.Size = binary.LittleEndian.Uint32(b) - b = b[4:] - - // Decode tokens length. - toksLen := binary.LittleEndian.Uint32(b) - b = b[4:] - - // Decode tokens. - m.Tokens = m.Tokens[:0] - m.Tokens = slices.Grow(m.Tokens, int(toksLen)) - var err error - for i := uint32(0); i < toksLen; i++ { - var token MetaToken - b, err = token.UnmarshalBinary(b) - if err != nil { - return err - } - m.Tokens = append(m.Tokens, token) - } - return nil -} - -type MetaToken struct { - Key []byte - Value []byte -} - -func (m *MetaToken) MarshalBinaryTo(b []byte) []byte { - b = binary.LittleEndian.AppendUint32(b, uint32(len(m.Key))) - b = append(b, m.Key...) - b = binary.LittleEndian.AppendUint32(b, uint32(len(m.Value))) - b = append(b, m.Value...) - return b -} - -func (m *MetaToken) UnmarshalBinary(b []byte) ([]byte, error) { - keyLen := binary.LittleEndian.Uint32(b) - b = b[4:] - if int(keyLen) > len(b) { - return nil, fmt.Errorf("malformed key") - } - m.Key = b[:keyLen] - b = b[keyLen:] - - valueLen := binary.LittleEndian.Uint32(b) - b = b[4:] - if int(valueLen) > len(b) { - return nil, fmt.Errorf("malformed value") - } - m.Value = b[:valueLen] - b = b[valueLen:] - return b, nil -} - -// String used in tests for human-readable output. -func (m MetaToken) String() string { - return fmt.Sprintf("(%s: %s)", m.Key, m.Value) -} - // metaDataCollector is a collection of metadata // metaDataCollector can reuse its fields to process many requests in a row one after another // metaDataCollector keep track of the size of its fields to avoid memory leak @@ -187,7 +56,7 @@ func newMetaDataCollector() *metaDataCollector { return &c } -func (c *metaDataCollector) AppendMeta(m MetaData) { +func (c *metaDataCollector) AppendMeta(m indexer.MetaData) { var pos seq.DocPos if m.Size == 0 { // This is a nested document that must point to the parent. @@ -272,7 +141,7 @@ func (c *metaDataCollector) Filter(appended []seq.ID) { c.tokensIndex = tokensIndex } -func (c *metaDataCollector) extractTokens(tokens []MetaToken) { +func (c *metaDataCollector) extractTokens(tokens []tokenizer.MetaToken) { for _, token := range tokens { key, value := token.Key, token.Value pos := len(c.tokensBuf) diff --git a/fracmanager/fetcher_test.go b/fracmanager/fetcher_test.go index 9ebc09e2..199bf1cf 100644 --- a/fracmanager/fetcher_test.go +++ b/fracmanager/fetcher_test.go @@ -6,7 +6,7 @@ import ( "github.com/stretchr/testify/assert" - "github.com/ozontech/seq-db/frac" + "github.com/ozontech/seq-db/indexer" "github.com/ozontech/seq-db/seq" "github.com/ozontech/seq-db/tests/common" ) @@ -24,7 +24,7 @@ func testFetcher(t *testing.T, fetcher *Fetcher, hasHint bool) { fm, err := newFracManagerWithBackgroundStart(t.Context(), config) assert.NoError(t, err) - dp := frac.NewDocProvider() + dp := indexer.NewTestDocProvider() addDummyDoc(t, fm, dp, seq.SimpleID(1)) fm.WaitIdle() info := fm.Active().Info() diff --git a/fracmanager/fracmanager_test.go b/fracmanager/fracmanager_test.go index 989c7b2d..71fd38d2 100644 --- a/fracmanager/fracmanager_test.go +++ b/fracmanager/fracmanager_test.go @@ -12,6 +12,7 @@ import ( "github.com/ozontech/seq-db/frac" "github.com/ozontech/seq-db/frac/common" + "github.com/ozontech/seq-db/indexer" "github.com/ozontech/seq-db/seq" testscommon "github.com/ozontech/seq-db/tests/common" ) @@ -26,16 +27,16 @@ func newFracManagerWithBackgroundStart(ctx context.Context, config *Config) (*Fr return fracManager, nil } -func addDummyDoc(t *testing.T, fm *FracManager, dp *frac.DocProvider, seqID seq.ID) { +func addDummyDoc(t *testing.T, fm *FracManager, dp *indexer.TestDocProvider, seqID seq.ID) { doc := []byte("document") - dp.Append(doc, nil, seqID, seq.Tokens("service:100500", "k8s_pod", "_all_:")) + dp.Append(doc, nil, seqID, "service:100500", "k8s_pod", "_all_:") docs, metas := dp.Provide() err := fm.Append(context.Background(), docs, metas) assert.NoError(t, err) } func MakeSomeFractions(t *testing.T, fm *FracManager) { - dp := frac.NewDocProvider() + dp := indexer.NewTestDocProvider() addDummyDoc(t, fm, dp, seq.SimpleID(1)) fm.seal(fm.rotate()) @@ -114,7 +115,7 @@ func TestMatureMode(t *testing.T) { } id := 1 - dp := frac.NewDocProvider() + dp := indexer.NewTestDocProvider() makeSealedFrac := func(fm *FracManager, docsPerFrac int) { for i := 0; i < docsPerFrac; i++ { addDummyDoc(t, fm, dp, seq.SimpleID(id)) diff --git a/fracmanager/sealed_frac_cache_test.go b/fracmanager/sealed_frac_cache_test.go index 5a1dbdaf..92b44814 100644 --- a/fracmanager/sealed_frac_cache_test.go +++ b/fracmanager/sealed_frac_cache_test.go @@ -14,6 +14,7 @@ import ( "github.com/ozontech/seq-db/consts" "github.com/ozontech/seq-db/frac" "github.com/ozontech/seq-db/frac/common" + "github.com/ozontech/seq-db/indexer" "github.com/ozontech/seq-db/seq" testscommon "github.com/ozontech/seq-db/tests/common" ) @@ -280,7 +281,7 @@ func TestFracInfoSavedToCache(t *testing.T) { }) assert.NoError(t, err) - dp := frac.NewDocProvider() + dp := indexer.NewTestDocProvider() metaRoot := insaneJSON.Spawn() defer insaneJSON.Release(metaRoot) @@ -365,7 +366,7 @@ func TestExtraFractionsRemoved(t *testing.T) { assert.NoError(t, err) - dp := frac.NewDocProvider() + dp := indexer.NewTestDocProvider() infos := map[string]*common.Info{} for i := 1; i < times+1; i++ { @@ -425,7 +426,7 @@ func TestMissingCacheFilesDeleted(t *testing.T) { }) assert.NoError(t, err) - dp := frac.NewDocProvider() + dp := indexer.NewTestDocProvider() metaRoot := insaneJSON.Spawn() defer insaneJSON.Release(metaRoot) diff --git a/fracmanager/sealer_test.go b/fracmanager/sealer_test.go index 7b6baaa0..a159f1f9 100644 --- a/fracmanager/sealer_test.go +++ b/fracmanager/sealer_test.go @@ -21,6 +21,7 @@ import ( "github.com/ozontech/seq-db/frac/common" "github.com/ozontech/seq-db/frac/sealed" "github.com/ozontech/seq-db/frac/sealed/sealing" + "github.com/ozontech/seq-db/indexer" "github.com/ozontech/seq-db/seq" testscommon "github.com/ozontech/seq-db/tests/common" ) @@ -49,7 +50,7 @@ func fillActiveFraction(active *frac.Active) error { k := 0 wg := sync.WaitGroup{} - dp := frac.NewDocProvider() + dp := indexer.NewTestDocProvider() for i := 0; i < muliplier; i++ { dp.TryReset() @@ -66,12 +67,12 @@ func fillActiveFraction(active *frac.Active) error { } id := seq.NewID(time.Now(), uint64(rand.Int63())) - dp.Append(doc, docRoot, id, seq.Tokens( + dp.Append(doc, docRoot, id, "_all_:", "service:service"+strconv.Itoa(rand.Intn(200)), "k8s_pod1:"+strconv.Itoa(k%100000), "k8s_pod2:"+strconv.Itoa(k%1000000), - )) + ) } docs, metas := dp.Provide() wg.Add(1) diff --git a/frac/compress.go b/indexer/compress.go similarity index 76% rename from frac/compress.go rename to indexer/compress.go index 4faf2238..9ce4f88a 100644 --- a/frac/compress.go +++ b/indexer/compress.go @@ -1,24 +1,14 @@ -package frac +package indexer import ( "sync" "github.com/alecthomas/units" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" "github.com/ozontech/seq-db/storage" "github.com/ozontech/seq-db/util" ) -var bulkSizeAfterCompression = promauto.NewHistogram(prometheus.HistogramOpts{ - Namespace: "seq_db_ingestor", - Subsystem: "bulk", - Name: "bulk_size_after_compression", - Help: "Bulk request sizes after compression", - Buckets: prometheus.ExponentialBuckets(1024, 2, 16), -}) - type DocsMetasCompressor struct { docsCompressLevel int metaCompressLevel int @@ -53,8 +43,6 @@ func (c *DocsMetasCompressor) CompressDocsAndMetas(docs, meta []byte) { c.docsBuf = storage.CompressDocBlock(docs, c.docsBuf, c.docsCompressLevel) // Compress metas block. c.metaBuf = storage.CompressDocBlock(meta, c.metaBuf, c.metaCompressLevel) - // Set compressed doc block size. - c.metaBuf.SetExt1(uint64(len(c.docsBuf))) bulkSizeAfterCompression.Observe(float64(len(c.docsBuf) + len(c.metaBuf))) } diff --git a/proxy/bulk/indexer.go b/indexer/indexer.go similarity index 93% rename from proxy/bulk/indexer.go rename to indexer/indexer.go index 8eb4eb4e..fd774a80 100644 --- a/proxy/bulk/indexer.go +++ b/indexer/indexer.go @@ -1,4 +1,4 @@ -package bulk +package indexer import ( "bytes" @@ -7,7 +7,6 @@ import ( insaneJSON "github.com/ozontech/insane-json" - "github.com/ozontech/seq-db/frac" "github.com/ozontech/seq-db/seq" "github.com/ozontech/seq-db/tokenizer" ) @@ -18,8 +17,7 @@ import ( type indexer struct { tokenizers map[seq.TokenizerType]tokenizer.Tokenizer mapping seq.Mapping - - metas []frac.MetaData + metas []MetaData } // Index returns a list of metadata of the given json node. @@ -45,7 +43,7 @@ func (i *indexer) Index(node *insaneJSON.Node, id seq.ID, size uint32) { } } -func (i *indexer) Metas() []frac.MetaData { +func (i *indexer) Metas() []MetaData { return i.metas } @@ -109,7 +107,7 @@ func (i *indexer) decodeInternal(n *insaneJSON.Node, id seq.ID, name []byte, met } } -func (i *indexer) index(tokenTypes seq.MappingTypes, tokens []frac.MetaToken, key, value []byte) []frac.MetaToken { +func (i *indexer) index(tokenTypes seq.MappingTypes, tokens []tokenizer.MetaToken, key, value []byte) []tokenizer.MetaToken { for _, tokenType := range tokenTypes.All { if _, has := i.tokenizers[tokenType.TokenizerType]; !has { continue @@ -125,7 +123,7 @@ func (i *indexer) index(tokenTypes seq.MappingTypes, tokens []frac.MetaToken, ke if value != nil { tokens = i.tokenizers[tokenType.TokenizerType].Tokenize(tokens, title, value, tokenType.MaxSize) } - tokens = append(tokens, frac.MetaToken{ + tokens = append(tokens, tokenizer.MetaToken{ Key: seq.ExistsTokenName, Value: title, }) @@ -153,7 +151,7 @@ func (i *indexer) appendMeta(id seq.ID, size uint32) { i.metas[n].ID = id i.metas[n].Size = size - i.metas[n].Tokens = append(i.metas[n].Tokens, frac.MetaToken{ + i.metas[n].Tokens = append(i.metas[n].Tokens, tokenizer.MetaToken{ Key: seq.AllTokenName, Value: []byte{}, }) diff --git a/indexer/meta_data.go b/indexer/meta_data.go new file mode 100644 index 00000000..241f219f --- /dev/null +++ b/indexer/meta_data.go @@ -0,0 +1,103 @@ +package indexer + +import ( + "encoding/binary" + "fmt" + "slices" + + "github.com/ozontech/seq-db/seq" + "github.com/ozontech/seq-db/tokenizer" +) + +type MetaData struct { + ID seq.ID + // Size of an uncompressed document in bytes. + Size uint32 + Tokens []tokenizer.MetaToken +} + +// String used in tests for human-readable output. +func (m MetaData) String() string { + return fmt.Sprintf("ID: %s, Size: %d, Tokens: %s", m.ID, m.Size, m.Tokens) +} + +const metadataMagic = uint16(0x3F7C) // 2 magic bytes of the binary encoded metadata. + +func IsItBinaryEncodedMetaData(b []byte) bool { + if len(b) < 2 { + return false + } + return binary.LittleEndian.Uint16(b) == metadataMagic +} + +func (m *MetaData) MarshalBinaryTo(b []byte) []byte { + // Append "magic bytes" to determine that this is binary encoded metadata. + b = binary.LittleEndian.AppendUint16(b, metadataMagic) + + // Append current binary version of the metadata. + const version = 1 + b = binary.LittleEndian.AppendUint16(b, version) + + // Encode seq.ID. + b = binary.LittleEndian.AppendUint64(b, uint64(m.ID.MID)) + b = binary.LittleEndian.AppendUint64(b, uint64(m.ID.RID)) + + // Encode BlockLength. + b = binary.LittleEndian.AppendUint32(b, m.Size) + + // Encode tokens. + toksLen := len(m.Tokens) + b = binary.LittleEndian.AppendUint32(b, uint32(toksLen)) + for i := 0; i < toksLen; i++ { + b = m.Tokens[i].MarshalBinaryTo(b) + } + + return b +} + +func (m *MetaData) UnmarshalBinary(b []byte) error { + if !IsItBinaryEncodedMetaData(b) { + return fmt.Errorf("invalid metadata magic bytes") + } + b = b[2:] + + version := binary.LittleEndian.Uint16(b) + b = b[2:] + + switch version { + case 1: + return m.unmarshalVersion1(b) + default: + return fmt.Errorf("unimplemented metadata version: %d", version) + } +} + +func (m *MetaData) unmarshalVersion1(b []byte) error { + // Decode seq.ID. + m.ID.MID = seq.MID(binary.LittleEndian.Uint64(b)) + b = b[8:] + m.ID.RID = seq.RID(binary.LittleEndian.Uint64(b)) + b = b[8:] + + // Decode uncompressed document size. + m.Size = binary.LittleEndian.Uint32(b) + b = b[4:] + + // Decode tokens length. + toksLen := binary.LittleEndian.Uint32(b) + b = b[4:] + + // Decode tokens. + m.Tokens = m.Tokens[:0] + m.Tokens = slices.Grow(m.Tokens, int(toksLen)) + var err error + for i := uint32(0); i < toksLen; i++ { + var token tokenizer.MetaToken + b, err = token.UnmarshalBinary(b) + if err != nil { + return err + } + m.Tokens = append(m.Tokens, token) + } + return nil +} diff --git a/indexer/metrics.go b/indexer/metrics.go new file mode 100644 index 00000000..a146de23 --- /dev/null +++ b/indexer/metrics.go @@ -0,0 +1,39 @@ +package indexer + +import ( + "github.com/ozontech/seq-db/metric" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +var ( + bulkParseDurationSeconds = promauto.NewHistogram(prometheus.HistogramOpts{ + Namespace: "seq_db_ingestor", + Subsystem: "bulk", + Name: "parse_duration_seconds", + Help: "", + Buckets: metric.SecondsBuckets, + }) + + notAnObjectTotal = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: "seq_db_ingestor", + Subsystem: "bulk", + Name: "not_an_object_errors_total", + Help: "Number of ingestion errors due to incorrect document type", + }) + + bulkTimeErrors = promauto.NewCounterVec(prometheus.CounterOpts{ + Namespace: "seq_db_ingestor", + Subsystem: "bulk", + Name: "time_errors_total", + Help: "errors for time rules violation in events", + }, []string{"cause"}) + + bulkSizeAfterCompression = promauto.NewHistogram(prometheus.HistogramOpts{ + Namespace: "seq_db_ingestor", + Subsystem: "bulk", + Name: "bulk_size_after_compression", + Help: "Bulk request sizes after compression", + Buckets: prometheus.ExponentialBuckets(1024, 2, 16), + }) +) diff --git a/proxy/bulk/processor.go b/indexer/processor.go similarity index 63% rename from proxy/bulk/processor.go rename to indexer/processor.go index 4b3d4a82..dbf7c106 100644 --- a/proxy/bulk/processor.go +++ b/indexer/processor.go @@ -1,38 +1,34 @@ -package bulk +package indexer import ( + "encoding/binary" + "encoding/json" "errors" + "fmt" "math" "math/rand/v2" "time" insaneJSON "github.com/ozontech/insane-json" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" + "go.uber.org/zap" "github.com/ozontech/seq-db/consts" - "github.com/ozontech/seq-db/frac" + + "github.com/ozontech/seq-db/logger" "github.com/ozontech/seq-db/seq" "github.com/ozontech/seq-db/tokenizer" "github.com/ozontech/seq-db/util" ) var ( - bulkTimeErrors = promauto.NewCounterVec(prometheus.CounterOpts{ - Namespace: "seq_db_ingestor", - Subsystem: "bulk", - Name: "time_errors_total", - Help: "errors for time rules violation in events", - }, []string{"cause"}) - parseErrors = bulkTimeErrors.WithLabelValues("parse_error") delays = bulkTimeErrors.WithLabelValues("delay") futureDelays = bulkTimeErrors.WithLabelValues("future_delay") ) -// processor accumulates meta and docs from a single bulk +// Processor accumulates meta and docs from a single bulk // returns bulk request ready to be sent to store -type processor struct { +type Processor struct { proxyIndex uint64 drift time.Duration futureDrift time.Duration @@ -46,15 +42,15 @@ func init() { insaneJSON.MapUseThreshold = math.MaxInt32 } -func newBulkProcessor(mapping seq.Mapping, tokenizers map[seq.TokenizerType]tokenizer.Tokenizer, drift, futureDrift time.Duration, index uint64) *processor { - return &processor{ +func NewProcessor(mapping seq.Mapping, tokenizers map[seq.TokenizerType]tokenizer.Tokenizer, drift, futureDrift time.Duration, index uint64) *Processor { + return &Processor{ proxyIndex: index, drift: drift, futureDrift: futureDrift, indexer: &indexer{ tokenizers: tokenizers, mapping: mapping, - metas: []frac.MetaData{}, + metas: []MetaData{}, }, decoder: insaneJSON.Spawn(), } @@ -62,7 +58,7 @@ func newBulkProcessor(mapping seq.Mapping, tokenizers map[seq.TokenizerType]toke var errNotAnObject = errors.New("not an object") -func (p *processor) Process(doc []byte, requestTime time.Time) ([]byte, []frac.MetaData, error) { +func (p *Processor) ProcessDoc(doc []byte, requestTime time.Time) ([]byte, []MetaData, error) { err := p.decoder.DecodeBytes(doc) if err != nil { return nil, nil, err @@ -72,8 +68,7 @@ func (p *processor) Process(doc []byte, requestTime time.Time) ([]byte, []frac.M } docTime, timeField := extractDocTime(p.decoder.Node, requestTime) docDelay := requestTime.Sub(docTime) - if timeField == nil { - // couldn't parse given event time + if timeField == nil { // couldn't parse given event time parseErrors.Inc() } else if documentDelayed(docDelay, p.drift, p.futureDrift) { docTime = requestTime @@ -88,11 +83,11 @@ func (p *processor) Process(doc []byte, requestTime time.Time) ([]byte, []frac.M func documentDelayed(docDelay, drift, futureDrift time.Duration) bool { delayed := false - if docDelay > drift { + if docDelay > drift && drift > 0 { delays.Inc() delayed = true } - if docDelay < 0 && docDelay.Abs() > futureDrift { + if docDelay < 0 && docDelay.Abs() > futureDrift && futureDrift > 0 { futureDelays.Inc() delayed = true } @@ -182,3 +177,55 @@ func parseESTime(t string) (time.Time, bool) { return time.Date(int(year), time.Month(month), int(day), int(hour), int(minute), int(second), int(nsecs), time.UTC), true } + +func (p *Processor) ProcessBulk( + requestTime time.Time, + dstDocs, dstMeta []byte, + readNext func() ([]byte, error), +) (int, []byte, []byte, error) { + parseDuration := time.Duration(0) + + total := 0 + for { + originalDoc, err := readNext() + if err != nil { + return 0, nil, nil, fmt.Errorf("reading next document: %s", err) + } + if originalDoc == nil { + break + } + parseStart := time.Now() + doc, meta, err := p.ProcessDoc(originalDoc, requestTime) + if err != nil { + if errors.Is(err, errNotAnObject) { + logger.Error("unable to process the document because it is not an object", zap.Any("document", json.RawMessage(originalDoc))) + notAnObjectTotal.Inc() + continue + } + return 0, nil, nil, fmt.Errorf("processing doc: %s", err) + } + parseDuration += time.Since(parseStart) + + total++ + dstDocs = binary.LittleEndian.AppendUint32(dstDocs, uint32(len(doc))) + dstDocs = append(dstDocs, doc...) + for _, m := range meta { + dstMeta = marshalAppendMeta(dstMeta, m) + } + } + + bulkParseDurationSeconds.Observe(parseDuration.Seconds()) + + return total, dstDocs, dstMeta, nil +} + +func marshalAppendMeta(dst []byte, meta MetaData) []byte { + metaLenPosition := len(dst) + dst = append(dst, make([]byte, 4)...) + dst = meta.MarshalBinaryTo(dst) + // Metadata length = len(slice after append) - len(slice before append). + metaLen := uint32(len(dst) - metaLenPosition - 4) + // Put metadata length before metadata bytes. + binary.LittleEndian.PutUint32(dst[metaLenPosition:], metaLen) + return dst +} diff --git a/proxy/bulk/processor_test.go b/indexer/processor_test.go similarity index 99% rename from proxy/bulk/processor_test.go rename to indexer/processor_test.go index 9818ea7b..0b25c5f3 100644 --- a/proxy/bulk/processor_test.go +++ b/indexer/processor_test.go @@ -1,4 +1,4 @@ -package bulk +package indexer import ( "testing" diff --git a/indexer/test_doc_provider.go b/indexer/test_doc_provider.go new file mode 100644 index 00000000..316464d2 --- /dev/null +++ b/indexer/test_doc_provider.go @@ -0,0 +1,135 @@ +package indexer + +import ( + "encoding/binary" + "math/rand" + "strings" + "time" + + insaneJSON "github.com/ozontech/insane-json" + + "github.com/ozontech/seq-db/consts" + "github.com/ozontech/seq-db/seq" + "github.com/ozontech/seq-db/storage" + "github.com/ozontech/seq-db/tokenizer" + "github.com/ozontech/seq-db/util" +) + +type TestDocProvider struct { + DocCount int + Docs []byte + Metas []byte + buf []byte +} + +func NewTestDocProvider() *TestDocProvider { + return &TestDocProvider{ + Docs: make([]byte, 0), + buf: make([]byte, 4), + } +} + +func (dp *TestDocProvider) appendDoc(doc []byte) { + dp.DocCount++ + numBuf := make([]byte, 4) + binary.LittleEndian.PutUint32(numBuf, uint32(len(doc))) + dp.Docs = append(dp.Docs, numBuf...) + dp.Docs = append(dp.Docs, doc...) +} + +func (dp *TestDocProvider) appendMeta(docLen int, id seq.ID, tokens []tokenizer.MetaToken) { + dp.buf = dp.buf[:4] + dp.buf = encodeMeta(dp.buf, tokens, id, docLen) + binary.LittleEndian.PutUint32(dp.buf, uint32(len(dp.buf)-4)) + + dp.Metas = append(dp.Metas, dp.buf...) +} + +func (dp *TestDocProvider) Append(doc []byte, docRoot *insaneJSON.Root, id seq.ID, tokensStr ...string) { + tokens := stringsToTokens(tokensStr...) + if id.MID == 0 { + // this case runs only in the integration tests + t, _ := extractDocTimeForTest(docRoot) + id = seq.NewID(t, uint64(rand.Int63())) + } + + dp.appendMeta(len(doc), id, tokens) + dp.appendDoc(doc) +} + +func (dp *TestDocProvider) TryReset() { + dp.DocCount = 0 + dp.Docs = dp.Docs[:0] + dp.Metas = dp.Metas[:0] + +} + +func (dp *TestDocProvider) Provide() (storage.DocBlock, storage.DocBlock) { + c := GetDocsMetasCompressor(-1, -1) + c.CompressDocsAndMetas(dp.Docs, dp.Metas) + return c.DocsMetas() +} + +func encodeMeta(buf []byte, tokens []tokenizer.MetaToken, id seq.ID, size int) []byte { + metaTokens := make([]tokenizer.MetaToken, 0, len(tokens)) + for _, t := range tokens { + metaTokens = append(metaTokens, tokenizer.MetaToken{ + Key: t.Key, + Value: t.Value, + }) + } + md := MetaData{ + ID: id, + Size: uint32(size), + Tokens: metaTokens, + } + return md.MarshalBinaryTo(buf) +} + +// extractDocTimeForTest extracts timestamp from doc +// It searches by one of supported field name and parses by supported formats +// If no field was found or not parsable it returns time.Now() +func extractDocTimeForTest(docRoot *insaneJSON.Root) (time.Time, []string) { + var t time.Time + var f []string +top: + for _, field := range consts.TimeFields { + timeNode := docRoot.Dig(field...) + if timeNode == nil { + continue + } + timeVal := timeNode.AsString() + for _, format := range consts.TimeFormats { + if value, err := time.Parse(format, timeVal); err == nil { + t = value + f = field + break top + } + } + } + + if t.IsZero() { + t = time.Now() + } + return t, f +} + +func stringsToTokens(tokens ...string) []tokenizer.MetaToken { + r := make([]tokenizer.MetaToken, 0) + for _, tokenStr := range tokens { + fieldPos := strings.IndexByte(tokenStr, ':') + var t tokenizer.MetaToken + if fieldPos < 0 { + t = tokenizer.MetaToken{ + Key: util.StringToByteUnsafe(tokenStr), + Value: []byte("some_val")} + } else { + t = tokenizer.MetaToken{ + Key: util.StringToByteUnsafe(tokenStr[:fieldPos]), + Value: util.StringToByteUnsafe(tokenStr[fieldPos+1:]), + } + } + r = append(r, t) + } + return r +} diff --git a/proxy/bulk/ingestor.go b/proxy/bulk/ingestor.go index c19b6f55..b59a5022 100644 --- a/proxy/bulk/ingestor.go +++ b/proxy/bulk/ingestor.go @@ -2,46 +2,22 @@ package bulk import ( "context" - "encoding/binary" - "encoding/json" "errors" - "fmt" "math/rand/v2" "sync" "sync/atomic" "time" - "github.com/ozontech/seq-db/seq" - - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" - "go.uber.org/zap" - "github.com/ozontech/seq-db/bytespool" "github.com/ozontech/seq-db/consts" - "github.com/ozontech/seq-db/frac" + "github.com/ozontech/seq-db/indexer" "github.com/ozontech/seq-db/logger" "github.com/ozontech/seq-db/metric" "github.com/ozontech/seq-db/network/circuitbreaker" "github.com/ozontech/seq-db/proxy/stores" + "github.com/ozontech/seq-db/seq" "github.com/ozontech/seq-db/tokenizer" -) - -var ( - inflightBulks = promauto.NewGauge(prometheus.GaugeOpts{ - Namespace: "seq_db_ingestor", - Subsystem: "bulk", - Name: "in_flight_queries_total", - Help: "", - }) - - bulkParseDurationSeconds = promauto.NewHistogram(prometheus.HistogramOpts{ - Namespace: "seq_db_ingestor", - Subsystem: "bulk", - Name: "parse_duration_seconds", - Help: "", - Buckets: metric.SecondsBuckets, - }) + "go.uber.org/zap" ) type MappingProvider interface { @@ -142,29 +118,6 @@ func (i *Ingestor) Stop() { var ErrTooManyInflightBulks = errors.New("too many inflight bulks, dropping") -var ( - rateLimitedTotal = promauto.NewCounter(prometheus.CounterOpts{ - Namespace: "seq_db_ingestor", - Name: "rate_limited_total", - Help: "Count of rate limited requests", - }) - - docsWritten = promauto.NewHistogram(prometheus.HistogramOpts{ - Namespace: "seq_db_ingestor", - Subsystem: "bulk", - Name: "docs_written", - Help: "", - Buckets: prometheus.ExponentialBuckets(1, 2, 16), - }) - - notAnObjectTotal = promauto.NewCounter(prometheus.CounterOpts{ - Namespace: "seq_db_ingestor", - Subsystem: "bulk", - Name: "not_an_object_errors_total", - Help: "Number of ingestion errors due to incorrect document type", - }) -) - func (i *Ingestor) ProcessDocuments(ctx context.Context, requestTime time.Time, readNext func() ([]byte, error)) (int, error) { ctx, cancel := context.WithTimeout(ctx, consts.BulkTimeout) defer cancel() @@ -186,10 +139,10 @@ func (i *Ingestor) ProcessDocuments(ctx context.Context, requestTime time.Time, t := time.Now() - compressor := frac.GetDocsMetasCompressor(i.config.DocsZSTDCompressLevel, i.config.MetasZSTDCompressLevel) - defer frac.PutDocMetasCompressor(compressor) + compressor := indexer.GetDocsMetasCompressor(i.config.DocsZSTDCompressLevel, i.config.MetasZSTDCompressLevel) + defer indexer.PutDocMetasCompressor(compressor) - total, err := i.processDocsToCompressor(compressor, requestTime, readNext) + total, docs, metas, err := i.processDocsToCompressor(compressor, requestTime, readNext) if err != nil { return 0, err } @@ -198,8 +151,6 @@ func (i *Ingestor) ProcessDocuments(ctx context.Context, requestTime time.Time, return 0, nil } - docs, metas := compressor.DocsMetas() - metric.IngestorBulkDocProvideDurationSeconds.Observe(time.Since(t).Seconds()) t = time.Now() @@ -228,79 +179,46 @@ var ( ) func (i *Ingestor) processDocsToCompressor( - compressor *frac.DocsMetasCompressor, + compressor *indexer.DocsMetasCompressor, requestTime time.Time, readNext func() ([]byte, error), -) (int, error) { - parseDuration := time.Duration(0) - +) (int, []byte, []byte, error) { proc := i.getProcessor() defer i.putProcessor(proc) binaryDocs := binaryDocsPool.Get().(*bytespool.Buffer) defer binaryDocsPool.Put(binaryDocs) binaryDocs.Reset() + binaryMetas := binaryMetasPool.Get().(*bytespool.Buffer) defer binaryMetasPool.Put(binaryMetas) binaryMetas.Reset() - total := 0 - for { - originalDoc, err := readNext() - if err != nil { - return total, fmt.Errorf("reading next document: %s", err) - } - if originalDoc == nil { - break - } - parseStart := time.Now() - doc, metas, err := proc.Process(originalDoc, requestTime) - if err != nil { - if errors.Is(err, errNotAnObject) { - logger.Error("unable to process the document because it is not an object", zap.Any("document", json.RawMessage(originalDoc))) - notAnObjectTotal.Inc() - continue - } - return total, fmt.Errorf("processing doc: %s", err) - } - parseDuration += time.Since(parseStart) - - binaryDocs.B = binary.LittleEndian.AppendUint32(binaryDocs.B, uint32(len(doc))) - binaryDocs.B = append(binaryDocs.B, doc...) - for _, meta := range metas { - binaryMetas.B = marshalAppendMeta(binaryMetas.B, meta) - } - total++ + var ( + err error + total int + ) + total, binaryDocs.B, binaryMetas.B, err = proc.ProcessBulk(requestTime, binaryDocs.B, binaryMetas.B, readNext) + if err != nil { + return 0, nil, nil, err } - bulkParseDurationSeconds.Observe(parseDuration.Seconds()) - compressor.CompressDocsAndMetas(binaryDocs.B, binaryMetas.B) + docs, metas := compressor.DocsMetas() - return total, nil -} - -func marshalAppendMeta(dst []byte, meta frac.MetaData) []byte { - metaLenPosition := len(dst) - dst = append(dst, make([]byte, 4)...) - dst = meta.MarshalBinaryTo(dst) - // Metadata length = len(slice after append) - len(slice before append). - metaLen := uint32(len(dst) - metaLenPosition - 4) - // Put metadata length before metadata bytes. - binary.LittleEndian.PutUint32(dst[metaLenPosition:], metaLen) - return dst + return total, docs, metas, nil } -func (i *Ingestor) getProcessor() *processor { +func (i *Ingestor) getProcessor() *indexer.Processor { procEface := i.procPool.Get() if procEface != nil { // The proc already initialized with current ingestor config, so we don't need to reinit it. - return procEface.(*processor) + return procEface.(*indexer.Processor) } index := rand.Uint64() % consts.IngestorMaxInstances - return newBulkProcessor(i.config.MappingProvider.GetMapping(), i.tokenizers, i.config.AllowedTimeDrift, i.config.FutureAllowedTimeDrift, index) + return indexer.NewProcessor(i.config.MappingProvider.GetMapping(), i.tokenizers, i.config.AllowedTimeDrift, i.config.FutureAllowedTimeDrift, index) } -func (i *Ingestor) putProcessor(proc *processor) { +func (i *Ingestor) putProcessor(proc *indexer.Processor) { i.procPool.Put(proc) } diff --git a/proxy/bulk/ingestor_test.go b/proxy/bulk/ingestor_test.go index 8800af6a..4361eff4 100644 --- a/proxy/bulk/ingestor_test.go +++ b/proxy/bulk/ingestor_test.go @@ -13,11 +13,12 @@ import ( "github.com/stretchr/testify/require" "github.com/ozontech/seq-db/consts" - "github.com/ozontech/seq-db/frac" + "github.com/ozontech/seq-db/indexer" "github.com/ozontech/seq-db/mappingprovider" "github.com/ozontech/seq-db/packer" "github.com/ozontech/seq-db/seq" "github.com/ozontech/seq-db/storage" + "github.com/ozontech/seq-db/tokenizer" ) func TestProcessDocuments(t *testing.T) { @@ -92,7 +93,7 @@ func TestProcessDocuments(t *testing.T) { type TestPayload struct { InDocs []string ExpDocs []string - ExpMeta []frac.MetaData + ExpMeta []indexer.MetaData } type TestCase struct { Name string @@ -111,10 +112,10 @@ func TestProcessDocuments(t *testing.T) { return TestPayload{ InDocs: []string{"{}"}, ExpDocs: nil, - ExpMeta: []frac.MetaData{{ + ExpMeta: []indexer.MetaData{{ ID: id, Size: 2, - Tokens: []frac.MetaToken{newToken(seq.TokenAll, "")}, + Tokens: []tokenizer.MetaToken{newToken(seq.TokenAll, "")}, }}, } }, @@ -122,7 +123,7 @@ func TestProcessDocuments(t *testing.T) { { Name: "text_with_asterisks", Payload: func() TestPayload { - tk := func(val string) frac.MetaToken { + tk := func(val string) tokenizer.MetaToken { return newToken("message", val) } return TestPayload{ @@ -139,15 +140,15 @@ func TestProcessDocuments(t *testing.T) { `{"message":"postfix asterisk *"}`, }, ExpDocs: nil, - ExpMeta: []frac.MetaData{ - {ID: id, Size: 30, Tokens: []frac.MetaToken{all, tk("*prefix_asterisk"), existsMsg}}, - {ID: id, Size: 31, Tokens: []frac.MetaToken{all, tk("*"), tk("prefix_asterisk"), existsMsg}}, - {ID: id, Size: 28, Tokens: []frac.MetaToken{all, tk("infix*asterisk"), existsMsg}}, - {ID: id, Size: 30, Tokens: []frac.MetaToken{all, tk("infix"), tk("*"), tk("asterisk"), existsMsg}}, - {ID: id, Size: 29, Tokens: []frac.MetaToken{all, tk("infix"), tk("*asterisk"), existsMsg}}, - {ID: id, Size: 29, Tokens: []frac.MetaToken{all, tk("infix*"), tk("asterisk"), existsMsg}}, - {ID: id, Size: 31, Tokens: []frac.MetaToken{all, tk("postfix"), tk("asterisk*"), existsMsg}}, - {ID: id, Size: 32, Tokens: []frac.MetaToken{all, tk("postfix"), tk("asterisk"), tk("*"), existsMsg}}, + ExpMeta: []indexer.MetaData{ + {ID: id, Size: 30, Tokens: []tokenizer.MetaToken{all, tk("*prefix_asterisk"), existsMsg}}, + {ID: id, Size: 31, Tokens: []tokenizer.MetaToken{all, tk("*"), tk("prefix_asterisk"), existsMsg}}, + {ID: id, Size: 28, Tokens: []tokenizer.MetaToken{all, tk("infix*asterisk"), existsMsg}}, + {ID: id, Size: 30, Tokens: []tokenizer.MetaToken{all, tk("infix"), tk("*"), tk("asterisk"), existsMsg}}, + {ID: id, Size: 29, Tokens: []tokenizer.MetaToken{all, tk("infix"), tk("*asterisk"), existsMsg}}, + {ID: id, Size: 29, Tokens: []tokenizer.MetaToken{all, tk("infix*"), tk("asterisk"), existsMsg}}, + {ID: id, Size: 31, Tokens: []tokenizer.MetaToken{all, tk("postfix"), tk("asterisk*"), existsMsg}}, + {ID: id, Size: 32, Tokens: []tokenizer.MetaToken{all, tk("postfix"), tk("asterisk"), tk("*"), existsMsg}}, }, } }, @@ -159,10 +160,10 @@ func TestProcessDocuments(t *testing.T) { return TestPayload{ InDocs: doc, ExpDocs: doc, - ExpMeta: []frac.MetaData{{ + ExpMeta: []indexer.MetaData{{ ID: id, Size: 22, - Tokens: []frac.MetaToken{ + Tokens: []tokenizer.MetaToken{ newToken(seq.TokenAll, ""), newToken(seq.TokenExists, "exists_only"), }, @@ -177,8 +178,8 @@ func TestProcessDocuments(t *testing.T) { return TestPayload{ InDocs: []string{doc}, ExpDocs: []string{doc}, - ExpMeta: []frac.MetaData{ - {ID: id, Size: uint32(len(doc)), Tokens: []frac.MetaToken{ + ExpMeta: []indexer.MetaData{ + {ID: id, Size: uint32(len(doc)), Tokens: []tokenizer.MetaToken{ newToken(seq.TokenAll, ""), newToken("_exists_", "tags.level"), newToken("_exists_", "tags.message"), @@ -197,10 +198,10 @@ func TestProcessDocuments(t *testing.T) { return TestPayload{ InDocs: []string{doc1, doc2, doc3}, ExpDocs: nil, - ExpMeta: []frac.MetaData{ - {ID: id, Size: uint32(len(doc1)), Tokens: []frac.MetaToken{newToken(seq.TokenAll, ""), newToken(seq.TokenExists, "level")}}, - {ID: id, Size: uint32(len(doc2)), Tokens: []frac.MetaToken{newToken(seq.TokenAll, ""), newToken(seq.TokenExists, "message")}}, - {ID: id, Size: uint32(len(doc3)), Tokens: []frac.MetaToken{newToken(seq.TokenAll, ""), newToken(seq.TokenExists, "path")}}, + ExpMeta: []indexer.MetaData{ + {ID: id, Size: uint32(len(doc1)), Tokens: []tokenizer.MetaToken{newToken(seq.TokenAll, ""), newToken(seq.TokenExists, "level")}}, + {ID: id, Size: uint32(len(doc2)), Tokens: []tokenizer.MetaToken{newToken(seq.TokenAll, ""), newToken(seq.TokenExists, "message")}}, + {ID: id, Size: uint32(len(doc3)), Tokens: []tokenizer.MetaToken{newToken(seq.TokenAll, ""), newToken(seq.TokenExists, "path")}}, }, } }, @@ -209,7 +210,7 @@ func TestProcessDocuments(t *testing.T) { Name: "simple_document", Payload: func() TestPayload { const doc = `{"level":"error", "message":" request 🫦 failed! ", "error": "context cancelled", "shard": "1", "path":"http://localhost:8080/example"}` - meta := frac.MetaData{ID: id, Size: uint32(len(doc)), Tokens: []frac.MetaToken{ + meta := indexer.MetaData{ID: id, Size: uint32(len(doc)), Tokens: []tokenizer.MetaToken{ newToken(seq.TokenAll, ""), newToken("level", "error"), newToken(seq.TokenExists, "level"), @@ -231,7 +232,7 @@ func TestProcessDocuments(t *testing.T) { return TestPayload{ InDocs: []string{doc, doc, doc}, ExpDocs: []string{doc, doc, doc}, - ExpMeta: []frac.MetaData{meta, meta, meta}, + ExpMeta: []indexer.MetaData{meta, meta, meta}, } }, }, @@ -323,7 +324,7 @@ func TestProcessDocuments(t *testing.T) { return TestPayload{ InDocs: []string{string(inTrace)}, ExpDocs: []string{string(expTrace)}, - ExpMeta: []frac.MetaData{ + ExpMeta: []indexer.MetaData{ {ID: id, Size: uint32(len(expTrace)), Tokens: buildKeywordTokens( "trace_id", "aaaaaaaaaaabcmadwewubq==", "trace_duration", "137252000", @@ -378,7 +379,7 @@ func TestProcessDocuments(t *testing.T) { }, }) - meta := []frac.MetaData{ + meta := []indexer.MetaData{ {ID: id, Size: uint32(len(doc)), Tokens: buildKeywordTokens()}, {ID: id, Size: 0, Tokens: buildKeywordTokens("spans.span_id", "1", "spans.operation_name", "op1")}, {ID: id, Size: 0, Tokens: buildKeywordTokens("spans.span_id", "2", "spans.operation_name", "op2")}, @@ -432,9 +433,9 @@ func TestProcessDocuments(t *testing.T) { binaryMetas, err := storage.DocBlock(c.metas).DecompressTo(nil) require.NoError(t, err) metasUnpacker := packer.NewBytesUnpacker(binaryMetas) - var gotMetas []frac.MetaData + var gotMetas []indexer.MetaData for metasUnpacker.Len() > 0 { - meta := frac.MetaData{} + meta := indexer.MetaData{} r.NoError(meta.UnmarshalBinary(metasUnpacker.GetBinary())) gotMetas = append(gotMetas, meta) } @@ -527,15 +528,15 @@ func newMapping(mappingType seq.TokenizerType) seq.MappingTypes { return seq.NewSingleType(mappingType, "", int(units.KiB)) } -func newToken(k, v string) frac.MetaToken { - return frac.MetaToken{ +func newToken(k, v string) tokenizer.MetaToken { + return tokenizer.MetaToken{ Key: []byte(k), Value: []byte(v), } } -func buildKeywordTokens(kvs ...string) []frac.MetaToken { - var tokens []frac.MetaToken +func buildKeywordTokens(kvs ...string) []tokenizer.MetaToken { + var tokens []tokenizer.MetaToken tokens = append(tokens, newToken("_all_", "")) diff --git a/proxy/bulk/metrics.go b/proxy/bulk/metrics.go new file mode 100644 index 00000000..23df56a6 --- /dev/null +++ b/proxy/bulk/metrics.go @@ -0,0 +1,29 @@ +package bulk + +import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +var ( + inflightBulks = promauto.NewGauge(prometheus.GaugeOpts{ + Namespace: "seq_db_ingestor", + Subsystem: "bulk", + Name: "in_flight_queries_total", + Help: "", + }) + + rateLimitedTotal = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: "seq_db_ingestor", + Name: "rate_limited_total", + Help: "Count of rate limited requests", + }) + + docsWritten = promauto.NewHistogram(prometheus.HistogramOpts{ + Namespace: "seq_db_ingestor", + Subsystem: "bulk", + Name: "docs_written", + Help: "", + Buckets: prometheus.ExponentialBuckets(1, 2, 16), + }) +) diff --git a/seq/tokenizer.go b/seq/tokenizer.go index f5a59687..e2ee31f4 100644 --- a/seq/tokenizer.go +++ b/seq/tokenizer.go @@ -1,11 +1,5 @@ package seq -import ( - "strings" - - "github.com/ozontech/seq-db/util" -) - const ( TokenAll = "_all_" TokenExists = "_exists_" @@ -48,30 +42,3 @@ func init() { NamesToTokenTypes[v] = k } } - -type Token struct { - Field []byte - Val []byte -} - -func Tokens(tokens ...string) []Token { - r := make([]Token, 0) - for _, tokenStr := range tokens { - fieldPos := strings.IndexByte(tokenStr, ':') - var t Token - if fieldPos < 0 { - t = Token{ - Field: util.StringToByteUnsafe(tokenStr), - Val: []byte("some_val")} - } else { - t = Token{ - Field: util.StringToByteUnsafe(tokenStr[:fieldPos]), - Val: util.StringToByteUnsafe(tokenStr[fieldPos+1:]), - } - } - - r = append(r, t) - } - - return r -} diff --git a/storeapi/grpc_v1_test.go b/storeapi/grpc_v1_test.go index d0ebc45c..5c699065 100644 --- a/storeapi/grpc_v1_test.go +++ b/storeapi/grpc_v1_test.go @@ -11,8 +11,8 @@ import ( "github.com/stretchr/testify/assert" "github.com/ozontech/seq-db/consts" - "github.com/ozontech/seq-db/frac" "github.com/ozontech/seq-db/fracmanager" + "github.com/ozontech/seq-db/indexer" "github.com/ozontech/seq-db/mappingprovider" "github.com/ozontech/seq-db/pkg/storeapi" "github.com/ozontech/seq-db/seq" @@ -51,12 +51,11 @@ func makeBulkRequest(cnt int) *storeapi.BulkRequest { metaRoot := insaneJSON.Spawn() defer insaneJSON.Release(metaRoot) - dp := frac.NewDocProvider() + dp := indexer.NewTestDocProvider() for i := 0; i < cnt; i++ { id := seq.SimpleID(i + 1) doc := []byte("document") - tokens := seq.Tokens("_all_:", "service:100500", "k8s_pod:"+strconv.Itoa(i)) - dp.Append(doc, nil, id, tokens) + dp.Append(doc, nil, id, "_all_:", "service:100500", "k8s_pod:"+strconv.Itoa(i)) } req := &storeapi.BulkRequest{Count: int64(cnt)} req.Docs, req.Metas = dp.Provide() diff --git a/tokenizer/exists_tokenizer.go b/tokenizer/exists_tokenizer.go index 1069870f..0cdc6bbd 100644 --- a/tokenizer/exists_tokenizer.go +++ b/tokenizer/exists_tokenizer.go @@ -1,13 +1,11 @@ package tokenizer -import "github.com/ozontech/seq-db/frac" - type ExistsTokenizer struct{} func NewExistsTokenizer() *ExistsTokenizer { return &ExistsTokenizer{} } -func (t *ExistsTokenizer) Tokenize(tokens []frac.MetaToken, _, _ []byte, _ int) []frac.MetaToken { +func (t *ExistsTokenizer) Tokenize(tokens []MetaToken, _, _ []byte, _ int) []MetaToken { return tokens } diff --git a/tokenizer/keyword_tokenizer.go b/tokenizer/keyword_tokenizer.go index 7151e0f1..1c893bdf 100644 --- a/tokenizer/keyword_tokenizer.go +++ b/tokenizer/keyword_tokenizer.go @@ -1,7 +1,6 @@ package tokenizer import ( - "github.com/ozontech/seq-db/frac" "github.com/ozontech/seq-db/metric" ) @@ -19,7 +18,7 @@ func NewKeywordTokenizer(maxTokenSize int, caseSensitive, partialIndexing bool) } } -func (t *KeywordTokenizer) Tokenize(tokens []frac.MetaToken, name, value []byte, maxTokenSize int) []frac.MetaToken { +func (t *KeywordTokenizer) Tokenize(tokens []MetaToken, name, value []byte, maxTokenSize int) []MetaToken { if maxTokenSize == 0 { maxTokenSize = t.defaultMaxTokenSize } @@ -34,7 +33,7 @@ func (t *KeywordTokenizer) Tokenize(tokens []frac.MetaToken, name, value []byte, metric.SkippedIndexesBytesKeyword.Add(float64(len(value[maxLength:]))) value = value[:maxLength] - tokens = append(tokens, frac.MetaToken{ + tokens = append(tokens, MetaToken{ Key: name, Value: toLowerIfCaseInsensitive(t.caseSensitive, value), }) diff --git a/tokenizer/keyword_tokenizer_test.go b/tokenizer/keyword_tokenizer_test.go index 1d862e37..6ba1ed20 100644 --- a/tokenizer/keyword_tokenizer_test.go +++ b/tokenizer/keyword_tokenizer_test.go @@ -4,19 +4,17 @@ import ( "testing" "github.com/stretchr/testify/assert" - - "github.com/ozontech/seq-db/frac" ) -func newFracToken(k, v string) frac.MetaToken { - return frac.MetaToken{Key: []byte(k), Value: []byte(v)} +func newMetaToken(k, v string) MetaToken { + return MetaToken{Key: []byte(k), Value: []byte(v)} } func TestKeywordTokenizerEmptyValue(t *testing.T) { tokenizer := NewKeywordTokenizer(10, true, true) - expected := []frac.MetaToken{newFracToken("message", "")} - tokens := tokenizer.Tokenize([]frac.MetaToken{}, []byte("message"), []byte{}, 10) + expected := []MetaToken{newMetaToken("message", "")} + tokens := tokenizer.Tokenize([]MetaToken{}, []byte("message"), []byte{}, 10) assert.Equal(t, expected, tokens) } @@ -25,8 +23,8 @@ func TestKeywordTokenizerSimple1(t *testing.T) { tokenizer := NewKeywordTokenizer(10, true, true) value := []byte("woRld") - expected := []frac.MetaToken{newFracToken("message", "woRld")} - tokens := tokenizer.Tokenize([]frac.MetaToken{}, []byte("message"), value, 10) + expected := []MetaToken{newMetaToken("message", "woRld")} + tokens := tokenizer.Tokenize([]MetaToken{}, []byte("message"), value, 10) assert.Equal(t, expected, tokens) } @@ -36,22 +34,22 @@ func TestKeywordTokenizerMaxLength(t *testing.T) { // maxSize as argument tokenizer := NewKeywordTokenizer(100, true, false) - tokens := tokenizer.Tokenize([]frac.MetaToken{}, []byte("message"), []byte(value), 10) - assert.Equal(t, []frac.MetaToken{}, tokens) + tokens := tokenizer.Tokenize([]MetaToken{}, []byte("message"), []byte(value), 10) + assert.Equal(t, []MetaToken{}, tokens) // default maxSize tokenizer = NewKeywordTokenizer(10, true, false) - tokens = tokenizer.Tokenize([]frac.MetaToken{}, []byte("message"), []byte(value), 0) - assert.Equal(t, []frac.MetaToken{}, tokens) + tokens = tokenizer.Tokenize([]MetaToken{}, []byte("message"), []byte(value), 0) + assert.Equal(t, []MetaToken{}, tokens) } func TestKeywordTokenizerCaseSensitive(t *testing.T) { tokenizer := NewKeywordTokenizer(16, false, true) value := "heLlo WoRld" - tokens := tokenizer.Tokenize([]frac.MetaToken{}, []byte("message"), []byte(value), 16) + tokens := tokenizer.Tokenize([]MetaToken{}, []byte("message"), []byte(value), 16) - assert.Equal(t, []frac.MetaToken{newFracToken("message", "hello world")}, tokens) + assert.Equal(t, []MetaToken{newMetaToken("message", "hello world")}, tokens) } func TestKeywordTokenizerPartialIndexing(t *testing.T) { @@ -61,11 +59,11 @@ func TestKeywordTokenizerPartialIndexing(t *testing.T) { // maxSize as argument tokenizer := NewKeywordTokenizer(100, true, true) - tokens := tokenizer.Tokenize([]frac.MetaToken{}, []byte("message"), []byte(value), maxSize) - assert.Equal(t, []frac.MetaToken{newFracToken("message", value[:maxSize])}, tokens) + tokens := tokenizer.Tokenize([]MetaToken{}, []byte("message"), []byte(value), maxSize) + assert.Equal(t, []MetaToken{newMetaToken("message", value[:maxSize])}, tokens) // default maxSize tokenizer = NewKeywordTokenizer(maxSize, true, true) - tokens = tokenizer.Tokenize([]frac.MetaToken{}, []byte("message"), []byte(value), 0) - assert.Equal(t, []frac.MetaToken{newFracToken("message", value[:maxSize])}, tokens) + tokens = tokenizer.Tokenize([]MetaToken{}, []byte("message"), []byte(value), 0) + assert.Equal(t, []MetaToken{newMetaToken("message", value[:maxSize])}, tokens) } diff --git a/tokenizer/meta_token.go b/tokenizer/meta_token.go new file mode 100644 index 00000000..2929479b --- /dev/null +++ b/tokenizer/meta_token.go @@ -0,0 +1,43 @@ +package tokenizer + +import ( + "encoding/binary" + "fmt" +) + +type MetaToken struct { + Key []byte + Value []byte +} + +func (m *MetaToken) MarshalBinaryTo(b []byte) []byte { + b = binary.LittleEndian.AppendUint32(b, uint32(len(m.Key))) + b = append(b, m.Key...) + b = binary.LittleEndian.AppendUint32(b, uint32(len(m.Value))) + b = append(b, m.Value...) + return b +} + +func (m *MetaToken) UnmarshalBinary(b []byte) ([]byte, error) { + keyLen := binary.LittleEndian.Uint32(b) + b = b[4:] + if int(keyLen) > len(b) { + return nil, fmt.Errorf("malformed key") + } + m.Key = b[:keyLen] + b = b[keyLen:] + + valueLen := binary.LittleEndian.Uint32(b) + b = b[4:] + if int(valueLen) > len(b) { + return nil, fmt.Errorf("malformed value") + } + m.Value = b[:valueLen] + b = b[valueLen:] + return b, nil +} + +// String used in tests for human-readable output. +func (m MetaToken) String() string { + return fmt.Sprintf("(%s: %s)", m.Key, m.Value) +} diff --git a/tokenizer/path_tokenizer.go b/tokenizer/path_tokenizer.go index 3e3b1efc..965cb28f 100644 --- a/tokenizer/path_tokenizer.go +++ b/tokenizer/path_tokenizer.go @@ -3,7 +3,6 @@ package tokenizer import ( "bytes" - "github.com/ozontech/seq-db/frac" "github.com/ozontech/seq-db/metric" ) @@ -29,7 +28,7 @@ func NewPathTokenizer( } } -func (t *PathTokenizer) Tokenize(tokens []frac.MetaToken, name, value []byte, maxTokenSize int) []frac.MetaToken { +func (t *PathTokenizer) Tokenize(tokens []MetaToken, name, value []byte, maxTokenSize int) []MetaToken { if maxTokenSize == 0 { maxTokenSize = t.defaultMaxTokenSize } @@ -57,13 +56,13 @@ func (t *PathTokenizer) Tokenize(tokens []frac.MetaToken, name, value []byte, ma } i += sepIndex - tokens = append(tokens, frac.MetaToken{ + tokens = append(tokens, MetaToken{ Key: name, Value: toLowerIfCaseInsensitive(t.caseSensitive, value[:i]), }) } - tokens = append(tokens, frac.MetaToken{ + tokens = append(tokens, MetaToken{ Key: name, Value: toLowerIfCaseInsensitive(t.caseSensitive, value), }) diff --git a/tokenizer/path_tokenizer_test.go b/tokenizer/path_tokenizer_test.go index 8e6874b0..bf7082f4 100644 --- a/tokenizer/path_tokenizer_test.go +++ b/tokenizer/path_tokenizer_test.go @@ -4,8 +4,6 @@ import ( "testing" "github.com/stretchr/testify/assert" - - "github.com/ozontech/seq-db/frac" ) func TestPathTokenizer(t *testing.T) { @@ -15,24 +13,24 @@ func TestPathTokenizer(t *testing.T) { title, value string maxTokenSize int tokenizer *PathTokenizer - expected []frac.MetaToken + expected []MetaToken }{ { title: "empty value", value: "", maxTokenSize: 100, tokenizer: NewPathTokenizer(100, true, true), - expected: []frac.MetaToken{newFracToken(field, "")}, + expected: []MetaToken{newMetaToken(field, "")}, }, { title: "slashes only", value: "///", maxTokenSize: 100, tokenizer: NewPathTokenizer(100, true, true), - expected: []frac.MetaToken{ - newFracToken(field, "/"), - newFracToken(field, "//"), - newFracToken(field, "///"), + expected: []MetaToken{ + newMetaToken(field, "/"), + newMetaToken(field, "//"), + newMetaToken(field, "///"), }, }, { @@ -40,10 +38,10 @@ func TestPathTokenizer(t *testing.T) { value: "/One/Two/Three", maxTokenSize: 100, tokenizer: NewPathTokenizer(100, true, true), - expected: []frac.MetaToken{ - newFracToken(field, "/One"), - newFracToken(field, "/One/Two"), - newFracToken(field, "/One/Two/Three"), + expected: []MetaToken{ + newMetaToken(field, "/One"), + newMetaToken(field, "/One/Two"), + newMetaToken(field, "/One/Two/Three"), }, }, { @@ -51,11 +49,11 @@ func TestPathTokenizer(t *testing.T) { value: "/One/Two/Three/", maxTokenSize: 100, tokenizer: NewPathTokenizer(100, true, true), - expected: []frac.MetaToken{ - newFracToken(field, "/One"), - newFracToken(field, "/One/Two"), - newFracToken(field, "/One/Two/Three"), - newFracToken(field, "/One/Two/Three/"), + expected: []MetaToken{ + newMetaToken(field, "/One"), + newMetaToken(field, "/One/Two"), + newMetaToken(field, "/One/Two/Three"), + newMetaToken(field, "/One/Two/Three/"), }, }, { @@ -63,24 +61,24 @@ func TestPathTokenizer(t *testing.T) { value: "/one/two/three/", maxTokenSize: 10, tokenizer: NewPathTokenizer(100, true, false), - expected: []frac.MetaToken{}, + expected: []MetaToken{}, }, { title: "max length default", value: "/one/two/three/", maxTokenSize: 0, tokenizer: NewPathTokenizer(10, true, false), - expected: []frac.MetaToken{}, + expected: []MetaToken{}, }, { title: "partial indexing", value: "/one/two/three/", maxTokenSize: 10, tokenizer: NewPathTokenizer(100, true, true), - expected: []frac.MetaToken{ - newFracToken(field, "/one"), - newFracToken(field, "/one/two"), - newFracToken(field, "/one/two/t"), + expected: []MetaToken{ + newMetaToken(field, "/one"), + newMetaToken(field, "/one/two"), + newMetaToken(field, "/one/two/t"), }, }, { @@ -88,10 +86,10 @@ func TestPathTokenizer(t *testing.T) { value: "/one/two/three/", maxTokenSize: 0, tokenizer: NewPathTokenizer(10, true, true), - expected: []frac.MetaToken{ - newFracToken(field, "/one"), - newFracToken(field, "/one/two"), - newFracToken(field, "/one/two/t"), + expected: []MetaToken{ + newMetaToken(field, "/one"), + newMetaToken(field, "/one/two"), + newMetaToken(field, "/one/two/t"), }, }, { @@ -99,16 +97,16 @@ func TestPathTokenizer(t *testing.T) { value: "/OnE/tWo", maxTokenSize: 10, tokenizer: NewPathTokenizer(10, false, true), - expected: []frac.MetaToken{ - newFracToken(field, "/one"), - newFracToken(field, "/one/two"), + expected: []MetaToken{ + newMetaToken(field, "/one"), + newMetaToken(field, "/one/two"), }, }, } for _, tc := range tests { t.Run(tc.title, func(t *testing.T) { - tokens := tc.tokenizer.Tokenize([]frac.MetaToken{}, []byte(field), []byte(tc.value), tc.maxTokenSize) + tokens := tc.tokenizer.Tokenize([]MetaToken{}, []byte(field), []byte(tc.value), tc.maxTokenSize) assert.Equal(t, tc.expected, tokens) }) } diff --git a/tokenizer/text_tokenizer.go b/tokenizer/text_tokenizer.go index 05038d62..6a728a91 100644 --- a/tokenizer/text_tokenizer.go +++ b/tokenizer/text_tokenizer.go @@ -4,7 +4,6 @@ import ( "unicode" "unicode/utf8" - "github.com/ozontech/seq-db/frac" "github.com/ozontech/seq-db/metric" ) @@ -24,7 +23,7 @@ func NewTextTokenizer(maxTokenSize int, caseSensitive, partialIndexing bool, max } } -func (t *TextTokenizer) Tokenize(tokens []frac.MetaToken, name, value []byte, maxFieldValueLength int) []frac.MetaToken { +func (t *TextTokenizer) Tokenize(tokens []MetaToken, name, value []byte, maxFieldValueLength int) []MetaToken { metric.TokenizerIncomingTextLen.Observe(float64(len(value))) if maxFieldValueLength == 0 { @@ -38,7 +37,7 @@ func (t *TextTokenizer) Tokenize(tokens []frac.MetaToken, name, value []byte, ma } if len(value) == 0 { - tokens = append(tokens, frac.MetaToken{Key: name, Value: value}) + tokens = append(tokens, MetaToken{Key: name, Value: value}) return tokens } @@ -85,7 +84,7 @@ func (t *TextTokenizer) Tokenize(tokens []frac.MetaToken, name, value []byte, ma // We can skip the ToLower call if we are sure that there are only ASCII characters and no uppercase letters. token = toLowerTryInplace(token) } - tokens = append(tokens, frac.MetaToken{Key: name, Value: token}) + tokens = append(tokens, MetaToken{Key: name, Value: token}) } hasUpper = false @@ -100,7 +99,7 @@ func (t *TextTokenizer) Tokenize(tokens []frac.MetaToken, name, value []byte, ma if !t.caseSensitive && (asciiOnly && hasUpper || !asciiOnly) { token = toLowerTryInplace(token) } - tokens = append(tokens, frac.MetaToken{Key: name, Value: token}) + tokens = append(tokens, MetaToken{Key: name, Value: token}) return tokens } diff --git a/tokenizer/text_tokenizer_test.go b/tokenizer/text_tokenizer_test.go index 240671c7..b56c82b6 100644 --- a/tokenizer/text_tokenizer_test.go +++ b/tokenizer/text_tokenizer_test.go @@ -6,8 +6,6 @@ import ( "testing" "github.com/stretchr/testify/assert" - - "github.com/ozontech/seq-db/frac" ) const maxTokenSizeDummy = 0 @@ -18,8 +16,8 @@ func TestTokenizeEmptyValue(t *testing.T) { testCase := []byte("") tokenizer := NewTextTokenizer(1000, false, true, 1024) - tokens := tokenizer.Tokenize([]frac.MetaToken{}, []byte("message"), testCase, maxTokenSizeDummy) - expected := []frac.MetaToken{newFracToken("message", "")} + tokens := tokenizer.Tokenize([]MetaToken{}, []byte("message"), testCase, maxTokenSizeDummy) + expected := []MetaToken{newMetaToken("message", "")} assert.Equal(t, expected, tokens) } @@ -29,23 +27,23 @@ func TestTokenizeSimple(t *testing.T) { tokenizer := NewTextTokenizer(1000, false, true, 1024) tokens := tokenizer.Tokenize(nil, []byte("message"), testCase, maxTokenSizeDummy) - assert.Equal(t, newFracToken("message", "arr"), tokens[0]) - assert.Equal(t, newFracToken("message", "hello"), tokens[1]) - assert.Equal(t, newFracToken("message", "world"), tokens[2]) + assert.Equal(t, newMetaToken("message", "arr"), tokens[0]) + assert.Equal(t, newMetaToken("message", "hello"), tokens[1]) + assert.Equal(t, newMetaToken("message", "world"), tokens[2]) } func TestTokenizeSimple2(t *testing.T) { tokenizer := NewTextTokenizer(1000, false, true, 1024) tokens := tokenizer.Tokenize(nil, []byte("message"), bytes.Clone(longDocument), maxTokenSizeDummy) - assert.Equal(t, newFracToken("message", "t1"), tokens[0]) - assert.Equal(t, newFracToken("message", "t2_t3"), tokens[1]) - assert.Equal(t, newFracToken("message", "t4"), tokens[2]) - assert.Equal(t, newFracToken("message", "looooong_t5"), tokens[3]) - assert.Equal(t, newFracToken("message", "readyz"), tokens[4]) - assert.Equal(t, newFracToken("message", "error*"), tokens[5]) - assert.Equal(t, newFracToken("message", "5555"), tokens[6]) - assert.Equal(t, newFracToken("message", "r2"), tokens[7]) + assert.Equal(t, newMetaToken("message", "t1"), tokens[0]) + assert.Equal(t, newMetaToken("message", "t2_t3"), tokens[1]) + assert.Equal(t, newMetaToken("message", "t4"), tokens[2]) + assert.Equal(t, newMetaToken("message", "looooong_t5"), tokens[3]) + assert.Equal(t, newMetaToken("message", "readyz"), tokens[4]) + assert.Equal(t, newMetaToken("message", "error*"), tokens[5]) + assert.Equal(t, newMetaToken("message", "5555"), tokens[6]) + assert.Equal(t, newMetaToken("message", "r2"), tokens[7]) } func TestTokenizePartialDefault(t *testing.T) { @@ -53,9 +51,9 @@ func TestTokenizePartialDefault(t *testing.T) { tokenizer := NewTextTokenizer(maxSize, false, true, maxSize) testCase := []byte(strings.Repeat("1", maxSize+1)) - tokens := tokenizer.Tokenize([]frac.MetaToken{}, []byte("message"), testCase, maxTokenSizeDummy) + tokens := tokenizer.Tokenize([]MetaToken{}, []byte("message"), testCase, maxTokenSizeDummy) - expected := []frac.MetaToken{newFracToken("message", strings.Repeat("1", maxSize))} + expected := []MetaToken{newMetaToken("message", strings.Repeat("1", maxSize))} assert.Equal(t, expected, tokens) } @@ -67,7 +65,7 @@ func TestTokenizePartial(t *testing.T) { tokens := tokenizer.Tokenize(nil, []byte("message"), testCase, maxSize) - expected := []frac.MetaToken{newFracToken("message", strings.Repeat("1", maxSize))} + expected := []MetaToken{newMetaToken("message", strings.Repeat("1", maxSize))} assert.Equal(t, expected, tokens) } @@ -77,9 +75,9 @@ func TestTokenizePartialSkipDefault(t *testing.T) { tokenizer := NewTextTokenizer(maxSize, false, false, maxSize) testCase := []byte(strings.Repeat("1", maxSize+1)) - tokens := tokenizer.Tokenize([]frac.MetaToken{}, []byte("message"), testCase, maxTokenSizeDummy) + tokens := tokenizer.Tokenize([]MetaToken{}, []byte("message"), testCase, maxTokenSizeDummy) - assert.Equal(t, []frac.MetaToken{}, tokens) + assert.Equal(t, []MetaToken{}, tokens) } func TestTokenizePartialSkip(t *testing.T) { @@ -87,22 +85,22 @@ func TestTokenizePartialSkip(t *testing.T) { tokenizer := NewTextTokenizer(maxSize, false, false, 0) testCase := []byte(strings.Repeat("1", maxSize+1)) - tokens := tokenizer.Tokenize([]frac.MetaToken{}, []byte("message"), testCase, maxSize) + tokens := tokenizer.Tokenize([]MetaToken{}, []byte("message"), testCase, maxSize) - assert.Equal(t, []frac.MetaToken{}, tokens) + assert.Equal(t, []MetaToken{}, tokens) } func TestTokenizeDefaultMaxTokenSize(t *testing.T) { tokenizer := NewTextTokenizer(6, false, true, 1024) tokens := tokenizer.Tokenize(nil, []byte("message"), bytes.Clone(longDocument), maxTokenSizeDummy) - assert.Equal(t, newFracToken("message", "t1"), tokens[0]) - assert.Equal(t, newFracToken("message", "t2_t3"), tokens[1]) - assert.Equal(t, newFracToken("message", "t4"), tokens[2]) - assert.Equal(t, newFracToken("message", "readyz"), tokens[3]) - assert.Equal(t, newFracToken("message", "error*"), tokens[4]) - assert.Equal(t, newFracToken("message", "5555"), tokens[5]) - assert.Equal(t, newFracToken("message", "r2"), tokens[6]) + assert.Equal(t, newMetaToken("message", "t1"), tokens[0]) + assert.Equal(t, newMetaToken("message", "t2_t3"), tokens[1]) + assert.Equal(t, newMetaToken("message", "t4"), tokens[2]) + assert.Equal(t, newMetaToken("message", "readyz"), tokens[3]) + assert.Equal(t, newMetaToken("message", "error*"), tokens[4]) + assert.Equal(t, newMetaToken("message", "5555"), tokens[5]) + assert.Equal(t, newMetaToken("message", "r2"), tokens[6]) } func TestTokenizeCaseSensitive(t *testing.T) { @@ -110,14 +108,14 @@ func TestTokenizeCaseSensitive(t *testing.T) { tokens := tokenizer.Tokenize(nil, []byte("message"), bytes.Clone(longDocument), maxTokenSizeDummy) - assert.Equal(t, newFracToken("message", "T1"), tokens[0]) - assert.Equal(t, newFracToken("message", "T2_T3"), tokens[1]) - assert.Equal(t, newFracToken("message", "t4"), tokens[2]) - assert.Equal(t, newFracToken("message", "looooong_t5"), tokens[3]) - assert.Equal(t, newFracToken("message", "readyz"), tokens[4]) - assert.Equal(t, newFracToken("message", "error*"), tokens[5]) - assert.Equal(t, newFracToken("message", "5555"), tokens[6]) - assert.Equal(t, newFracToken("message", "r2"), tokens[7]) + assert.Equal(t, newMetaToken("message", "T1"), tokens[0]) + assert.Equal(t, newMetaToken("message", "T2_T3"), tokens[1]) + assert.Equal(t, newMetaToken("message", "t4"), tokens[2]) + assert.Equal(t, newMetaToken("message", "looooong_t5"), tokens[3]) + assert.Equal(t, newMetaToken("message", "readyz"), tokens[4]) + assert.Equal(t, newMetaToken("message", "error*"), tokens[5]) + assert.Equal(t, newMetaToken("message", "5555"), tokens[6]) + assert.Equal(t, newMetaToken("message", "r2"), tokens[7]) } func TestTokenizeCaseSensitiveAndMaxTokenSize(t *testing.T) { @@ -125,13 +123,13 @@ func TestTokenizeCaseSensitiveAndMaxTokenSize(t *testing.T) { tokens := tokenizer.Tokenize(nil, []byte("message"), bytes.Clone(longDocument), maxTokenSizeDummy) - assert.Equal(t, newFracToken("message", "T1"), tokens[0]) - assert.Equal(t, newFracToken("message", "T2_T3"), tokens[1]) - assert.Equal(t, newFracToken("message", "t4"), tokens[2]) - assert.Equal(t, newFracToken("message", "readyz"), tokens[3]) - assert.Equal(t, newFracToken("message", "error*"), tokens[4]) - assert.Equal(t, newFracToken("message", "5555"), tokens[5]) - assert.Equal(t, newFracToken("message", "r2"), tokens[6]) + assert.Equal(t, newMetaToken("message", "T1"), tokens[0]) + assert.Equal(t, newMetaToken("message", "T2_T3"), tokens[1]) + assert.Equal(t, newMetaToken("message", "t4"), tokens[2]) + assert.Equal(t, newMetaToken("message", "readyz"), tokens[3]) + assert.Equal(t, newMetaToken("message", "error*"), tokens[4]) + assert.Equal(t, newMetaToken("message", "5555"), tokens[5]) + assert.Equal(t, newMetaToken("message", "r2"), tokens[6]) } func TestTokenizeLastTokenLength(t *testing.T) { @@ -140,7 +138,7 @@ func TestTokenizeLastTokenLength(t *testing.T) { tokens := tokenizer.Tokenize(nil, []byte("message"), testCase, maxTokenSizeDummy) assert.Equal(t, 1, len(tokens)) - assert.Equal(t, newFracToken("message", "1"), tokens[0]) + assert.Equal(t, newMetaToken("message", "1"), tokens[0]) } func TestTextTokenizerUTF8(t *testing.T) { @@ -155,14 +153,14 @@ func TestTextTokenizerUTF8(t *testing.T) { tokenizer := NewTextTokenizer(100, true, true, 1024) - tokens := tokenizer.Tokenize([]frac.MetaToken{}, []byte("message"), []byte(in), maxTokenSizeDummy) + tokens := tokenizer.Tokenize([]MetaToken{}, []byte("message"), []byte(in), maxTokenSizeDummy) - expected := []frac.MetaToken{} + expected := []MetaToken{} for _, token := range out { if lowercase { token = strings.ToLower(token) } - expected = append(expected, newFracToken("message", token)) + expected = append(expected, newMetaToken("message", token)) } assert.Equal(t, expected, tokens) } diff --git a/tokenizer/tokenizer.go b/tokenizer/tokenizer.go index 2c201416..9425b378 100644 --- a/tokenizer/tokenizer.go +++ b/tokenizer/tokenizer.go @@ -4,12 +4,10 @@ import ( "bytes" "unicode" "unicode/utf8" - - "github.com/ozontech/seq-db/frac" ) type Tokenizer interface { - Tokenize(tokens []frac.MetaToken, key, value []byte, maxLength int) []frac.MetaToken + Tokenize(tokens []MetaToken, key, value []byte, maxLength int) []MetaToken } func toLowerIfCaseInsensitive(isCaseSensitive bool, x []byte) []byte { From db5b9eb2974aab28a1dbd722d2c541d23f5ec24f Mon Sep 17 00:00:00 2001 From: Evgenii Guguchkin Date: Wed, 8 Oct 2025 01:16:04 +0300 Subject: [PATCH 2/3] perf: add indexer benchmark --- frac/active_indexer_test.go | 116 ++++++++++++++++++++++++++++++++++++ 1 file changed, 116 insertions(+) create mode 100644 frac/active_indexer_test.go diff --git a/frac/active_indexer_test.go b/frac/active_indexer_test.go new file mode 100644 index 00000000..093bc824 --- /dev/null +++ b/frac/active_indexer_test.go @@ -0,0 +1,116 @@ +package frac + +import ( + "bytes" + "os" + "path/filepath" + "sync" + "testing" + "time" + + "github.com/ozontech/seq-db/cache" + "github.com/ozontech/seq-db/indexer" + "github.com/ozontech/seq-db/metric/stopwatch" + "github.com/ozontech/seq-db/seq" + "github.com/ozontech/seq-db/storage" + "github.com/ozontech/seq-db/tests/common" + "github.com/ozontech/seq-db/tokenizer" + "github.com/stretchr/testify/assert" +) + +func readFileAllAtOnce(filename string) ([][]byte, error) { + content, err := os.ReadFile(filename) + if err != nil { + return nil, err + } + lines := bytes.Split(content, []byte{'\n'}) + if len(lines) > 0 && len(lines[len(lines)-1]) == 0 { + lines = lines[:len(lines)-1] + } + return lines, nil +} + +func splitLogsToBulks(data [][]byte, bulkSize int) []func() ([]byte, error) { + funcs := []func() ([]byte, error){} + for len(data) > 0 { + size := min(len(data), bulkSize) + funcs = append(funcs, testBufReader(data[0:size])) + data = data[size:] + } + return funcs +} + +func testBufReader(data [][]byte) func() ([]byte, error) { + orig := data + return func() ([]byte, error) { + if len(data) == 0 { + data = orig + return nil, nil + } + line := data[0] + data = data[1:] + return line, nil + } +} + +func getTestProcessor() *indexer.Processor { + mapping := seq.Mapping{ + "clientip": seq.NewSingleType(seq.TokenizerTypeKeyword, "clientip", 1024), + "request": seq.NewSingleType(seq.TokenizerTypeText, "request", 1024), + "status": seq.NewSingleType(seq.TokenizerTypeKeyword, "status", 1024), + "size": seq.NewSingleType(seq.TokenizerTypeKeyword, "size", 1024), + } + + tokenizers := map[seq.TokenizerType]tokenizer.Tokenizer{ + seq.TokenizerTypeText: tokenizer.NewTextTokenizer(1024, false, true, 8192), + seq.TokenizerTypeKeyword: tokenizer.NewKeywordTokenizer(1024, false, true), + seq.TokenizerTypePath: tokenizer.NewPathTokenizer(1024, false, true), + seq.TokenizerTypeExists: tokenizer.NewExistsTokenizer(), + } + + return indexer.NewProcessor(mapping, tokenizers, 0, 0, 0) + +} + +func BenchmarkIndexer(b *testing.B) { + idx := NewActiveIndexer(8, 8) + idx.Start() + defer idx.Stop() + + dataDir := filepath.Join(b.TempDir(), "BenchmarkIndexing") + common.RecreateDir(dataDir) + + allLogs, err := readFileAllAtOnce(filepath.Join(common.TestDataDir, "k8s.logs")) + readers := splitLogsToBulks(allLogs, 1000) + assert.NoError(b, err) + + active := NewActive( + filepath.Join(dataDir, "test"), + idx, + storage.NewReadLimiter(1, nil), + cache.NewCache[[]byte](nil, nil), + cache.NewCache[[]byte](nil, nil), + &Config{}, + ) + + processor := getTestProcessor() + + b.Run("indexing", func(b *testing.B) { + for i := 0; i < b.N; i++ { + b.StopTimer() + bulks := make([][]byte, 0, len(readers)) + for _, readNext := range readers { + _, _, meta, _ := processor.ProcessBulk(time.Now(), nil, nil, readNext) + bulks = append(bulks, storage.CompressDocBlock(meta, nil, 3)) + } + b.StartTimer() + + wg := sync.WaitGroup{} + for _, meta := range bulks { + wg.Add(1) + idx.Index(active, meta, &wg, stopwatch.New()) + } + wg.Wait() + } + }) +} From cb29875639f104deff11d964319042e4691832cd Mon Sep 17 00:00:00 2001 From: Daniil Date: Tue, 14 Oct 2025 21:53:21 +0300 Subject: [PATCH 3/3] perf: remove indirection overhead caused by `iface` usage (#160) --- frac/active_indexer_test.go | 3 ++- frac/active_lids.go | 43 +++++++++++++------------------------ indexer/metrics.go | 3 ++- proxy/bulk/ingestor.go | 3 ++- 4 files changed, 21 insertions(+), 31 deletions(-) diff --git a/frac/active_indexer_test.go b/frac/active_indexer_test.go index 093bc824..040f355a 100644 --- a/frac/active_indexer_test.go +++ b/frac/active_indexer_test.go @@ -8,6 +8,8 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" + "github.com/ozontech/seq-db/cache" "github.com/ozontech/seq-db/indexer" "github.com/ozontech/seq-db/metric/stopwatch" @@ -15,7 +17,6 @@ import ( "github.com/ozontech/seq-db/storage" "github.com/ozontech/seq-db/tests/common" "github.com/ozontech/seq-db/tokenizer" - "github.com/stretchr/testify/assert" ) func readFileAllAtOnce(filename string) ([][]byte, error) { diff --git a/frac/active_lids.go b/frac/active_lids.go index 47abe92a..4875deb8 100644 --- a/frac/active_lids.go +++ b/frac/active_lids.go @@ -1,29 +1,12 @@ package frac import ( + "cmp" "math" - "sort" + "slices" "sync" ) -type queueIDs struct { - lids []uint32 - mids []uint64 - rids []uint64 -} - -func (p *queueIDs) Len() int { return len(p.lids) } -func (p *queueIDs) Less(i, j int) bool { - if p.mids[p.lids[i]] == p.mids[p.lids[j]] { - if p.rids[p.lids[i]] == p.rids[p.lids[j]] { - return p.lids[i] > p.lids[j] - } - return p.rids[p.lids[i]] > p.rids[p.lids[j]] - } - return p.mids[p.lids[i]] > p.mids[p.lids[j]] -} -func (p *queueIDs) Swap(i, j int) { p.lids[i], p.lids[j] = p.lids[j], p.lids[i] } - type TokenLIDs struct { sortedMu sync.Mutex // global merge mutex, making the merge process strictly sequential sorted []uint32 // slice of actual sorted and merged LIDs of token @@ -39,16 +22,20 @@ func (tl *TokenLIDs) GetLIDs(mids, rids *UInt64s) []uint32 { lids := tl.getQueuedLIDs() if len(lids) != 0 { - midsVals := mids.GetVals() - ridsVals := rids.GetVals() + mids := mids.GetVals() + rids := rids.GetVals() - sort.Sort(&queueIDs{ - lids: lids, - mids: midsVals, - rids: ridsVals, + slices.SortFunc(lids, func(i, j uint32) int { + if mids[i] == mids[j] { + if rids[i] == rids[j] { + return -cmp.Compare(i, j) + } + return -cmp.Compare(rids[i], rids[j]) + } + return -cmp.Compare(mids[i], mids[j]) }) - tl.sorted = mergeSorted(tl.sorted, lids, midsVals, ridsVals) + tl.sorted = mergeSorted(tl.sorted, lids, mids, rids) } return tl.sorted @@ -98,7 +85,7 @@ func mergeSorted(right, left []uint32, mids, rids []uint64) []uint32 { val := uint32(0) prev := uint32(math.MaxUint32) - cmp := SeqIDCmp{ + c := SeqIDCmp{ mid: mids, rid: rids, } @@ -108,7 +95,7 @@ func mergeSorted(right, left []uint32, mids, rids []uint64) []uint32 { for l != len(left) && r != len(right) { ri, li := right[r], left[l] - switch cmp.compare(ri, li) { + switch c.compare(ri, li) { case 0: val = ri r++ diff --git a/indexer/metrics.go b/indexer/metrics.go index a146de23..e91d6fc7 100644 --- a/indexer/metrics.go +++ b/indexer/metrics.go @@ -1,9 +1,10 @@ package indexer import ( - "github.com/ozontech/seq-db/metric" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" + + "github.com/ozontech/seq-db/metric" ) var ( diff --git a/proxy/bulk/ingestor.go b/proxy/bulk/ingestor.go index b59a5022..3f7439e9 100644 --- a/proxy/bulk/ingestor.go +++ b/proxy/bulk/ingestor.go @@ -8,6 +8,8 @@ import ( "sync/atomic" "time" + "go.uber.org/zap" + "github.com/ozontech/seq-db/bytespool" "github.com/ozontech/seq-db/consts" "github.com/ozontech/seq-db/indexer" @@ -17,7 +19,6 @@ import ( "github.com/ozontech/seq-db/proxy/stores" "github.com/ozontech/seq-db/seq" "github.com/ozontech/seq-db/tokenizer" - "go.uber.org/zap" ) type MappingProvider interface {