Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions cmd/stress-search/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"

"github.com/alecthomas/units"

Expand Down Expand Up @@ -59,10 +60,9 @@ func main() {
cancel()
}()

conn, err := grpc.DialContext(
ctx,
conn, err := grpc.NewClient(
*endpoint,
grpc.WithInsecure(),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithStatsHandler(&tracing.ClientHandler{}),
)

Expand Down
2 changes: 1 addition & 1 deletion frac/active_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ func (si *activeTokenIndex) GetValByTID(tid uint32) []byte {
}

func (si *activeTokenIndex) GetTIDsByTokenExpr(t parser.Token) ([]uint32, error) {
return si.tokenList.FindPattern(si.ctx, t, nil)
return si.tokenList.FindPattern(si.ctx, t)
}

func (si *activeTokenIndex) GetLIDsFromTIDs(tids []uint32, _ lids.Counter, minLID, maxLID uint32, order seq.DocsOrder) []node.Node {
Expand Down
2 changes: 1 addition & 1 deletion frac/active_token_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ func (tl *TokenList) getTokenProvider(field string) *activeTokenProvider {
}
}

func (tl *TokenList) FindPattern(ctx context.Context, t parser.Token, tids []uint32) ([]uint32, error) {
func (tl *TokenList) FindPattern(ctx context.Context, t parser.Token) ([]uint32, error) {
field := parser.GetField(t)
tp := tl.getTokenProvider(field)
tids, err := pattern.Search(ctx, t, tp)
Expand Down
43 changes: 0 additions & 43 deletions logger/util.go

This file was deleted.

3 changes: 1 addition & 2 deletions prepare/prepare.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"bytes"
"fmt"
"io"
"io/ioutil"
_ "net/http/pprof"
"os"
"path/filepath"
Expand Down Expand Up @@ -94,7 +93,7 @@ func main() {
logger.Fatal("can't create dir for batches", zap.Error(err))
}
for i, batch := range batches {
err := ioutil.WriteFile(fmt.Sprintf("%s/%d.json", batchesDir, i), batch.Bytes(), 0o777)
err := os.WriteFile(fmt.Sprintf("%s/%d.json", batchesDir, i), batch.Bytes(), 0o777)
if err != nil {
logger.Fatal("can't write batch",
zap.Int("i", i),
Expand Down
1 change: 0 additions & 1 deletion proxyapi/grpc_fetch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,6 @@ func TestGrpcV1_FetchLive(t *testing.T) {
}

for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
r := require.New(t)
Expand Down
6 changes: 2 additions & 4 deletions proxyapi/grpc_main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,6 @@ func TestMain(m *testing.M) {
}

func runGRPCServerWithClient(apiServer seqproxyapi.SeqProxyApiServer) (seqproxyapi.SeqProxyApiClient, func()) {
ctx := context.Background()
lis := bufconn.Listen(10 * 1024 * 1024)
server := grpc.NewServer(
grpc.StatsHandler(&tracing.ServerHandler{}),
Expand All @@ -121,9 +120,8 @@ func runGRPCServerWithClient(apiServer seqproxyapi.SeqProxyApiServer) (seqproxya
}
server.Stop()
}
conn, err := grpc.DialContext(
ctx,
"",
conn, err := grpc.NewClient(
"localhost",
grpc.WithContextDialer(func(context.Context, string) (net.Conn, error) {
return lis.Dial()
}),
Expand Down
2 changes: 1 addition & 1 deletion proxyapi/http_bulk.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ func (r *esBulkDocReader) readDoc() ([]byte, bool, error) {

// If this is a line that exceeds maxDocumentSize, skip it.
for isPrefix {
doc, isPrefix, err = r.r.ReadLine()
_, isPrefix, err = r.r.ReadLine()
if err != nil {
return nil, true, err
}
Expand Down
17 changes: 8 additions & 9 deletions proxyapi/ingestor.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,38 +47,37 @@ type Ingestor struct {
isStopped atomic.Bool
}

func clientsFromConfig(ctx context.Context, config search.Config) (map[string]storeapi.StoreApiClient, error) {
func clientsFromConfig(config search.Config) (map[string]storeapi.StoreApiClient, error) {
clients := map[string]storeapi.StoreApiClient{}
if err := appendClients(ctx, clients, config.HotStores.Shards); err != nil {
if err := appendClients(clients, config.HotStores.Shards); err != nil {
return nil, err
}
if config.HotReadStores != nil {
if err := appendClients(ctx, clients, config.HotReadStores.Shards); err != nil {
if err := appendClients(clients, config.HotReadStores.Shards); err != nil {
return nil, err
}
}
if config.WriteStores != nil {
if err := appendClients(ctx, clients, config.WriteStores.Shards); err != nil {
if err := appendClients(clients, config.WriteStores.Shards); err != nil {
return nil, err
}
}
if config.ReadStores != nil {
if err := appendClients(ctx, clients, config.ReadStores.Shards); err != nil {
if err := appendClients(clients, config.ReadStores.Shards); err != nil {
return nil, err
}
}
return clients, nil
}

func appendClients(ctx context.Context, clients map[string]storeapi.StoreApiClient, shards [][]string) error {
func appendClients(clients map[string]storeapi.StoreApiClient, shards [][]string) error {
for _, shard := range shards {
for _, replica := range shard {
if _, has := clients[replica]; has {
continue
}
// this doesn't block, and if store is down, it will try to reconnect in background
conn, err := grpc.DialContext(
ctx,
conn, err := grpc.NewClient(
replica,
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithStatsHandler(&tracing.ClientHandler{}),
Expand Down Expand Up @@ -132,7 +131,7 @@ func NewIngestor(config IngestorConfig, store *storeapiclient.Store) (*Ingestor,
config.Bulk.HotStores = stores.NewStoresFromString("memory", 1)
config.Search.HotStores = stores.NewStoresFromString("memory", 1)
} else {
clients, err = clientsFromConfig(ctx, config.Search)
clients, err = clientsFromConfig(config.Search)
if err != nil {
cancel()
return nil, fmt.Errorf("initialize clients: %s", err)
Expand Down
33 changes: 0 additions & 33 deletions tests/common/byte_string.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,5 @@
package common

func ToBytesSlice(s []string) [][]byte {
if s == nil {
return nil
}
res := make([][]byte, 0, len(s))
for _, v := range s {
res = append(res, []byte(v))
}
return res
}

func ToStringSlice(s [][]byte) []string {
if s == nil {
return nil
Expand All @@ -21,25 +10,3 @@ func ToStringSlice(s [][]byte) []string {
}
return res
}

func ToBytesSlice2d(s [][]string) [][][]byte {
if s == nil {
return nil
}
res := make([][][]byte, 0, len(s))
for _, v := range s {
res = append(res, ToBytesSlice(v))
}
return res
}

func ToStringSlice2d(s [][][]byte) [][]string {
if s == nil {
return nil
}
res := make([][]string, 0, len(s))
for _, v := range s {
res = append(res, ToStringSlice(v))
}
return res
}
30 changes: 0 additions & 30 deletions tests/common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,32 +6,11 @@ import (
"sync"
"testing"

"github.com/prometheus/client_golang/prometheus"
"lukechampine.com/frand"

"github.com/ozontech/seq-db/seq"
)

var baseTmpDir string

func MakeTouchTotal() *prometheus.CounterVec {
return prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "seq_db_store",
Subsystem: "cache",
Name: "touch_total",
Help: "",
}, []string{"layer"})
}

func MakeHitsTotal() *prometheus.CounterVec {
return prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "seq_db_store",
Subsystem: "cache",
Name: "hits_total",
Help: "",
}, []string{"layer"})
}

func CreateDir(path string) {
err := os.MkdirAll(path, 0o777)
if err != nil {
Expand Down Expand Up @@ -70,15 +49,6 @@ func GetTestTmpDir(t *testing.T) string {
return filepath.Join(CreateTempDir(), t.Name())
}

func IDs(ids ...int) []seq.ID {
r := make([]seq.ID, 0)
for _, n := range ids {
r = append(r, seq.SimpleID(n))
}

return r
}

func RandomString(minLen, maxLen int) string {
size := frand.Intn(maxLen-minLen+1) + minLen
res := make([]byte, size)
Expand Down
11 changes: 5 additions & 6 deletions tests/integration_tests/sub_search_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,11 @@ func (s *IntegrationTestSuite) ingestData(env *setup.TestingEnv, from, to time.T

step := fractionTo.Sub(fractionFrom) / time.Duration(docsPerFraction)

for i, ts := 0, fractionFrom; i < docsPerFraction; i++ {
strTs := ts.Format(time.RFC3339)
origDocs = append(origDocs, fmt.Sprintf(`{"service":"x%d", "ts":%q}`, i, strTs))
ts, _ = time.Parse(time.RFC3339, strTs)
docsTimes = append(docsTimes, ts)
ts.Add(step)
for i, ts := 0, fractionFrom; i < docsPerFraction; i, ts = i+1, ts.Add(step) {
// using truncate, since RFC3339 is sub-second accurate
t := ts.Truncate(time.Second)
origDocs = append(origDocs, fmt.Sprintf(`{"service":"x%d", "ts":%q}`, i, t.Format(time.RFC3339)))
docsTimes = append(docsTimes, t)
}

setup.Bulk(s.T(), env.IngestorBulkAddr(), origDocs)
Expand Down
21 changes: 0 additions & 21 deletions tests/setup/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,16 +90,6 @@ func RandomWord() string {
return randomStringFromPool(words)
}

var fields = []string{
"service", "message", "trace_id", "source",
"zone", "level", "timestamp", "requestObject",
"", // will be random
}

func RandomField() string {
return randomStringFromPool(fields)
}

func RandomSymbol() byte {
symbols := "!@#$%^&*(){}[]:?/\\~;"
return symbols[frand.Intn(len(symbols))]
Expand Down Expand Up @@ -214,17 +204,6 @@ func DocsToStrings(docs []ExampleDoc) []string {
return docStr
}

func DocsFromStrings(docStr []string) []ExampleDoc {
docs := make([]ExampleDoc, len(docStr))
for i, doc := range docStr {
err := json.Unmarshal([]byte(doc), &docs[i])
if err != nil {
panic(err)
}
}
return docs
}

func splitRange(size int, callback func(from int, to int)) {
routines := max(1, min(size/10000, runtime.NumCPU())) // do parallel for large arrays

Expand Down
13 changes: 0 additions & 13 deletions tests/setup/methods.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,6 @@ func GenBuffer(docs []string) *bytes.Buffer {
return b
}

func GenBufferBytesReuse(docs [][]byte, b *bytes.Buffer) *bytes.Buffer {
for _, doc := range docs {
_, _ = b.WriteString(`{"index":"seq-db"}` + "\n")
_, _ = b.Write(doc)
_, _ = b.WriteString("\n")
}
return b
}

func GenBufferBytes(docs [][]byte) *bytes.Buffer {
return GenBufferBytesReuse(docs, bytes.NewBuffer(nil))
}

func BulkBuffer(t *testing.T, addr string, b *bytes.Buffer) {
r, err := http.Post(addr, "", b)
require.NoError(t, err, "should be no errors")
Expand Down
16 changes: 0 additions & 16 deletions util/err.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
package util

import (
"errors"
"fmt"
"runtime"
"runtime/debug"

"github.com/prometheus/client_golang/prometheus"
Expand All @@ -23,20 +21,6 @@ func (p *panicWrapper) Error() string {
return p.e.Error()
}

func IsRecoveredPanicError(e error) bool {
if e == nil {
return false
}

var p *panicWrapper
if errors.As(e, &p) {
return true
}

var re runtime.Error
return errors.As(e, &re)
}

func Recover(metric prometheus.Counter, err error) {
metric.Inc()
logger.Error(err.Error())
Expand Down
4 changes: 0 additions & 4 deletions util/realloc_solver.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,6 @@ type ReallocSolver struct {

type ReallocSolverOpts func(*ReallocSolver)

func ReallocSolverSize(statSize int) ReallocSolverOpts {
return func(r *ReallocSolver) { r.statSize = statSize }
}

func ReallocSolverLabel(label string) ReallocSolverOpts {
return func(r *ReallocSolver) { r.label = label }
}
Expand Down
Loading