From 97589e26a026c4b11ad56c02e8ea96fee8c5b71a Mon Sep 17 00:00:00 2001 From: Timofey Sedov Date: Wed, 24 Sep 2025 14:16:56 +0300 Subject: [PATCH 1/3] fix(parser): remove redundant code, refactor tests --- parser/term_builder.go | 0 1 file changed, 0 insertions(+), 0 deletions(-) create mode 100644 parser/term_builder.go diff --git a/parser/term_builder.go b/parser/term_builder.go new file mode 100644 index 00000000..e69de29b From 1f67ab77dfc5e232f088a761081ddff94fc9e922 Mon Sep 17 00:00:00 2001 From: Timofey Sedov Date: Fri, 26 Sep 2025 16:29:22 +0300 Subject: [PATCH 2/3] chore: remove unused code --- cmd/stress-search/main.go | 6 +-- frac/active_index.go | 2 +- frac/active_token_list.go | 2 +- logger/util.go | 43 ---------------------- prepare/prepare.go | 3 +- proxyapi/grpc_main_test.go | 4 +- proxyapi/http_bulk.go | 2 +- proxyapi/ingestor.go | 17 ++++----- tests/common/byte_string.go | 33 ----------------- tests/common/util.go | 30 --------------- tests/integration_tests/sub_search_test.go | 7 +--- tests/setup/doc.go | 21 ----------- tests/setup/methods.go | 13 ------- util/err.go | 16 -------- util/realloc_solver.go | 4 -- 15 files changed, 18 insertions(+), 185 deletions(-) delete mode 100644 logger/util.go diff --git a/cmd/stress-search/main.go b/cmd/stress-search/main.go index 7a5fb5b6..8ee5fe3c 100644 --- a/cmd/stress-search/main.go +++ b/cmd/stress-search/main.go @@ -20,6 +20,7 @@ import ( "time" "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" "github.com/alecthomas/units" @@ -59,10 +60,9 @@ func main() { cancel() }() - conn, err := grpc.DialContext( - ctx, + conn, err := grpc.NewClient( *endpoint, - grpc.WithInsecure(), + grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithStatsHandler(&tracing.ClientHandler{}), ) diff --git a/frac/active_index.go b/frac/active_index.go index c4c2f5d7..350a8e0d 100644 --- a/frac/active_index.go +++ b/frac/active_index.go @@ -182,7 +182,7 @@ func (si *activeTokenIndex) GetValByTID(tid uint32) []byte { } func (si *activeTokenIndex) GetTIDsByTokenExpr(t parser.Token) ([]uint32, error) { - return si.tokenList.FindPattern(si.ctx, t, nil) + return si.tokenList.FindPattern(si.ctx, t) } func (si *activeTokenIndex) GetLIDsFromTIDs(tids []uint32, _ lids.Counter, minLID, maxLID uint32, order seq.DocsOrder) []node.Node { diff --git a/frac/active_token_list.go b/frac/active_token_list.go index 43f2ef8e..adf94ffd 100644 --- a/frac/active_token_list.go +++ b/frac/active_token_list.go @@ -184,7 +184,7 @@ func (tl *TokenList) getTokenProvider(field string) *activeTokenProvider { } } -func (tl *TokenList) FindPattern(ctx context.Context, t parser.Token, tids []uint32) ([]uint32, error) { +func (tl *TokenList) FindPattern(ctx context.Context, t parser.Token) ([]uint32, error) { field := parser.GetField(t) tp := tl.getTokenProvider(field) tids, err := pattern.Search(ctx, t, tp) diff --git a/logger/util.go b/logger/util.go deleted file mode 100644 index 982b1263..00000000 --- a/logger/util.go +++ /dev/null @@ -1,43 +0,0 @@ -package logger - -import ( - "fmt" - "math" - "strconv" - "strings" - "unicode" -) - -type condFn func() (result string) - -func Header(name string) string { - base := []byte("=================================") - - offset := len(base)/2 - len(name)/2 - - for i, c := range name { - base[offset+i] = byte(unicode.ToUpper(c)) - } - - return string(base) + "\n" -} - -func Cond(is bool, positive string, negative condFn) string { - if is { - return positive - } - return negative() -} -func Numerate(content string) string { - lines := strings.Split(strings.TrimSpace(content), "\n") - - x := len(lines) - digits := int(math.Log10(float64(x))) - - format := "%" + strconv.Itoa(digits) + "d: %s" - for i := range lines { - lines[i] = fmt.Sprintf(format, i+1, lines[i]) - } - - return strings.Join(lines, "\n") -} diff --git a/prepare/prepare.go b/prepare/prepare.go index 93e6cfe3..bbcb812b 100644 --- a/prepare/prepare.go +++ b/prepare/prepare.go @@ -4,7 +4,6 @@ import ( "bytes" "fmt" "io" - "io/ioutil" _ "net/http/pprof" "os" "path/filepath" @@ -94,7 +93,7 @@ func main() { logger.Fatal("can't create dir for batches", zap.Error(err)) } for i, batch := range batches { - err := ioutil.WriteFile(fmt.Sprintf("%s/%d.json", batchesDir, i), batch.Bytes(), 0o777) + err := os.WriteFile(fmt.Sprintf("%s/%d.json", batchesDir, i), batch.Bytes(), 0o777) if err != nil { logger.Fatal("can't write batch", zap.Int("i", i), diff --git a/proxyapi/grpc_main_test.go b/proxyapi/grpc_main_test.go index 4bd20f2b..21689cab 100644 --- a/proxyapi/grpc_main_test.go +++ b/proxyapi/grpc_main_test.go @@ -103,7 +103,6 @@ func TestMain(m *testing.M) { } func runGRPCServerWithClient(apiServer seqproxyapi.SeqProxyApiServer) (seqproxyapi.SeqProxyApiClient, func()) { - ctx := context.Background() lis := bufconn.Listen(10 * 1024 * 1024) server := grpc.NewServer( grpc.StatsHandler(&tracing.ServerHandler{}), @@ -121,8 +120,7 @@ func runGRPCServerWithClient(apiServer seqproxyapi.SeqProxyApiServer) (seqproxya } server.Stop() } - conn, err := grpc.DialContext( - ctx, + conn, err := grpc.NewClient( "", grpc.WithContextDialer(func(context.Context, string) (net.Conn, error) { return lis.Dial() diff --git a/proxyapi/http_bulk.go b/proxyapi/http_bulk.go index 7fcbdaa5..aa0d4f16 100644 --- a/proxyapi/http_bulk.go +++ b/proxyapi/http_bulk.go @@ -256,7 +256,7 @@ func (r *esBulkDocReader) readDoc() ([]byte, bool, error) { // If this is a line that exceeds maxDocumentSize, skip it. for isPrefix { - doc, isPrefix, err = r.r.ReadLine() + _, isPrefix, err = r.r.ReadLine() if err != nil { return nil, true, err } diff --git a/proxyapi/ingestor.go b/proxyapi/ingestor.go index 1eb0d80a..7ca8aec1 100644 --- a/proxyapi/ingestor.go +++ b/proxyapi/ingestor.go @@ -47,38 +47,37 @@ type Ingestor struct { isStopped atomic.Bool } -func clientsFromConfig(ctx context.Context, config search.Config) (map[string]storeapi.StoreApiClient, error) { +func clientsFromConfig(config search.Config) (map[string]storeapi.StoreApiClient, error) { clients := map[string]storeapi.StoreApiClient{} - if err := appendClients(ctx, clients, config.HotStores.Shards); err != nil { + if err := appendClients(clients, config.HotStores.Shards); err != nil { return nil, err } if config.HotReadStores != nil { - if err := appendClients(ctx, clients, config.HotReadStores.Shards); err != nil { + if err := appendClients(clients, config.HotReadStores.Shards); err != nil { return nil, err } } if config.WriteStores != nil { - if err := appendClients(ctx, clients, config.WriteStores.Shards); err != nil { + if err := appendClients(clients, config.WriteStores.Shards); err != nil { return nil, err } } if config.ReadStores != nil { - if err := appendClients(ctx, clients, config.ReadStores.Shards); err != nil { + if err := appendClients(clients, config.ReadStores.Shards); err != nil { return nil, err } } return clients, nil } -func appendClients(ctx context.Context, clients map[string]storeapi.StoreApiClient, shards [][]string) error { +func appendClients(clients map[string]storeapi.StoreApiClient, shards [][]string) error { for _, shard := range shards { for _, replica := range shard { if _, has := clients[replica]; has { continue } // this doesn't block, and if store is down, it will try to reconnect in background - conn, err := grpc.DialContext( - ctx, + conn, err := grpc.NewClient( replica, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithStatsHandler(&tracing.ClientHandler{}), @@ -132,7 +131,7 @@ func NewIngestor(config IngestorConfig, store *storeapiclient.Store) (*Ingestor, config.Bulk.HotStores = stores.NewStoresFromString("memory", 1) config.Search.HotStores = stores.NewStoresFromString("memory", 1) } else { - clients, err = clientsFromConfig(ctx, config.Search) + clients, err = clientsFromConfig(config.Search) if err != nil { cancel() return nil, fmt.Errorf("initialize clients: %s", err) diff --git a/tests/common/byte_string.go b/tests/common/byte_string.go index 8bcc97b8..57ed058c 100644 --- a/tests/common/byte_string.go +++ b/tests/common/byte_string.go @@ -1,16 +1,5 @@ package common -func ToBytesSlice(s []string) [][]byte { - if s == nil { - return nil - } - res := make([][]byte, 0, len(s)) - for _, v := range s { - res = append(res, []byte(v)) - } - return res -} - func ToStringSlice(s [][]byte) []string { if s == nil { return nil @@ -21,25 +10,3 @@ func ToStringSlice(s [][]byte) []string { } return res } - -func ToBytesSlice2d(s [][]string) [][][]byte { - if s == nil { - return nil - } - res := make([][][]byte, 0, len(s)) - for _, v := range s { - res = append(res, ToBytesSlice(v)) - } - return res -} - -func ToStringSlice2d(s [][][]byte) [][]string { - if s == nil { - return nil - } - res := make([][]string, 0, len(s)) - for _, v := range s { - res = append(res, ToStringSlice(v)) - } - return res -} diff --git a/tests/common/util.go b/tests/common/util.go index 1d1648a3..673589a1 100644 --- a/tests/common/util.go +++ b/tests/common/util.go @@ -6,32 +6,11 @@ import ( "sync" "testing" - "github.com/prometheus/client_golang/prometheus" "lukechampine.com/frand" - - "github.com/ozontech/seq-db/seq" ) var baseTmpDir string -func MakeTouchTotal() *prometheus.CounterVec { - return prometheus.NewCounterVec(prometheus.CounterOpts{ - Namespace: "seq_db_store", - Subsystem: "cache", - Name: "touch_total", - Help: "", - }, []string{"layer"}) -} - -func MakeHitsTotal() *prometheus.CounterVec { - return prometheus.NewCounterVec(prometheus.CounterOpts{ - Namespace: "seq_db_store", - Subsystem: "cache", - Name: "hits_total", - Help: "", - }, []string{"layer"}) -} - func CreateDir(path string) { err := os.MkdirAll(path, 0o777) if err != nil { @@ -70,15 +49,6 @@ func GetTestTmpDir(t *testing.T) string { return filepath.Join(CreateTempDir(), t.Name()) } -func IDs(ids ...int) []seq.ID { - r := make([]seq.ID, 0) - for _, n := range ids { - r = append(r, seq.SimpleID(n)) - } - - return r -} - func RandomString(minLen, maxLen int) string { size := frand.Intn(maxLen-minLen+1) + minLen res := make([]byte, size) diff --git a/tests/integration_tests/sub_search_test.go b/tests/integration_tests/sub_search_test.go index 501a0210..44bda4e9 100644 --- a/tests/integration_tests/sub_search_test.go +++ b/tests/integration_tests/sub_search_test.go @@ -32,12 +32,9 @@ func (s *IntegrationTestSuite) ingestData(env *setup.TestingEnv, from, to time.T step := fractionTo.Sub(fractionFrom) / time.Duration(docsPerFraction) - for i, ts := 0, fractionFrom; i < docsPerFraction; i++ { - strTs := ts.Format(time.RFC3339) - origDocs = append(origDocs, fmt.Sprintf(`{"service":"x%d", "ts":%q}`, i, strTs)) - ts, _ = time.Parse(time.RFC3339, strTs) + for i, ts := 0, fractionFrom; i < docsPerFraction; ts, i = ts.Add(step), i+1 { + origDocs = append(origDocs, fmt.Sprintf(`{"service":"x%d", "ts":%q}`, i, ts.Format(time.RFC3339))) docsTimes = append(docsTimes, ts) - ts.Add(step) } setup.Bulk(s.T(), env.IngestorBulkAddr(), origDocs) diff --git a/tests/setup/doc.go b/tests/setup/doc.go index 6c0ab968..6721f291 100644 --- a/tests/setup/doc.go +++ b/tests/setup/doc.go @@ -90,16 +90,6 @@ func RandomWord() string { return randomStringFromPool(words) } -var fields = []string{ - "service", "message", "trace_id", "source", - "zone", "level", "timestamp", "requestObject", - "", // will be random -} - -func RandomField() string { - return randomStringFromPool(fields) -} - func RandomSymbol() byte { symbols := "!@#$%^&*(){}[]:?/\\~;" return symbols[frand.Intn(len(symbols))] @@ -214,17 +204,6 @@ func DocsToStrings(docs []ExampleDoc) []string { return docStr } -func DocsFromStrings(docStr []string) []ExampleDoc { - docs := make([]ExampleDoc, len(docStr)) - for i, doc := range docStr { - err := json.Unmarshal([]byte(doc), &docs[i]) - if err != nil { - panic(err) - } - } - return docs -} - func splitRange(size int, callback func(from int, to int)) { routines := max(1, min(size/10000, runtime.NumCPU())) // do parallel for large arrays diff --git a/tests/setup/methods.go b/tests/setup/methods.go index 302439ac..82fbafd8 100644 --- a/tests/setup/methods.go +++ b/tests/setup/methods.go @@ -26,19 +26,6 @@ func GenBuffer(docs []string) *bytes.Buffer { return b } -func GenBufferBytesReuse(docs [][]byte, b *bytes.Buffer) *bytes.Buffer { - for _, doc := range docs { - _, _ = b.WriteString(`{"index":"seq-db"}` + "\n") - _, _ = b.Write(doc) - _, _ = b.WriteString("\n") - } - return b -} - -func GenBufferBytes(docs [][]byte) *bytes.Buffer { - return GenBufferBytesReuse(docs, bytes.NewBuffer(nil)) -} - func BulkBuffer(t *testing.T, addr string, b *bytes.Buffer) { r, err := http.Post(addr, "", b) require.NoError(t, err, "should be no errors") diff --git a/util/err.go b/util/err.go index 471d6c7d..c7e1e8ea 100644 --- a/util/err.go +++ b/util/err.go @@ -1,9 +1,7 @@ package util import ( - "errors" "fmt" - "runtime" "runtime/debug" "github.com/prometheus/client_golang/prometheus" @@ -23,20 +21,6 @@ func (p *panicWrapper) Error() string { return p.e.Error() } -func IsRecoveredPanicError(e error) bool { - if e == nil { - return false - } - - var p *panicWrapper - if errors.As(e, &p) { - return true - } - - var re runtime.Error - return errors.As(e, &re) -} - func Recover(metric prometheus.Counter, err error) { metric.Inc() logger.Error(err.Error()) diff --git a/util/realloc_solver.go b/util/realloc_solver.go index 72fc5d79..4b6f7230 100644 --- a/util/realloc_solver.go +++ b/util/realloc_solver.go @@ -24,10 +24,6 @@ type ReallocSolver struct { type ReallocSolverOpts func(*ReallocSolver) -func ReallocSolverSize(statSize int) ReallocSolverOpts { - return func(r *ReallocSolver) { r.statSize = statSize } -} - func ReallocSolverLabel(label string) ReallocSolverOpts { return func(r *ReallocSolver) { r.label = label } } From f4070b88b365312d821038356361c992f88ff565 Mon Sep 17 00:00:00 2001 From: Timofey Sedov Date: Tue, 21 Oct 2025 19:13:08 +0300 Subject: [PATCH 3/3] chore: fix tests --- parser/term_builder.go | 0 proxyapi/grpc_fetch_test.go | 1 - proxyapi/grpc_main_test.go | 2 +- tests/integration_tests/sub_search_test.go | 8 +++++--- 4 files changed, 6 insertions(+), 5 deletions(-) delete mode 100644 parser/term_builder.go diff --git a/parser/term_builder.go b/parser/term_builder.go deleted file mode 100644 index e69de29b..00000000 diff --git a/proxyapi/grpc_fetch_test.go b/proxyapi/grpc_fetch_test.go index 81f9bf2c..b2aa3e6b 100644 --- a/proxyapi/grpc_fetch_test.go +++ b/proxyapi/grpc_fetch_test.go @@ -218,7 +218,6 @@ func TestGrpcV1_FetchLive(t *testing.T) { } for _, tt := range tests { - tt := tt t.Run(tt.name, func(t *testing.T) { t.Parallel() r := require.New(t) diff --git a/proxyapi/grpc_main_test.go b/proxyapi/grpc_main_test.go index 21689cab..e0d13dbb 100644 --- a/proxyapi/grpc_main_test.go +++ b/proxyapi/grpc_main_test.go @@ -121,7 +121,7 @@ func runGRPCServerWithClient(apiServer seqproxyapi.SeqProxyApiServer) (seqproxya server.Stop() } conn, err := grpc.NewClient( - "", + "localhost", grpc.WithContextDialer(func(context.Context, string) (net.Conn, error) { return lis.Dial() }), diff --git a/tests/integration_tests/sub_search_test.go b/tests/integration_tests/sub_search_test.go index 44bda4e9..c03c81c4 100644 --- a/tests/integration_tests/sub_search_test.go +++ b/tests/integration_tests/sub_search_test.go @@ -32,9 +32,11 @@ func (s *IntegrationTestSuite) ingestData(env *setup.TestingEnv, from, to time.T step := fractionTo.Sub(fractionFrom) / time.Duration(docsPerFraction) - for i, ts := 0, fractionFrom; i < docsPerFraction; ts, i = ts.Add(step), i+1 { - origDocs = append(origDocs, fmt.Sprintf(`{"service":"x%d", "ts":%q}`, i, ts.Format(time.RFC3339))) - docsTimes = append(docsTimes, ts) + for i, ts := 0, fractionFrom; i < docsPerFraction; i, ts = i+1, ts.Add(step) { + // using truncate, since RFC3339 is sub-second accurate + t := ts.Truncate(time.Second) + origDocs = append(origDocs, fmt.Sprintf(`{"service":"x%d", "ts":%q}`, i, t.Format(time.RFC3339))) + docsTimes = append(docsTimes, t) } setup.Bulk(s.T(), env.IngestorBulkAddr(), origDocs)