Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/seq-db/seq-db.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,7 @@ func startStore(
SortCacheSize: uint64(cfg.Resources.SortDocsCacheSize),
FracLoadLimit: 0,
ShouldReplay: true,
ReplayWorkers: cfg.Resources.ReplayWorkers,
MaintenanceDelay: 0,
CacheGCDelay: 0,
CacheCleanupDelay: 0,
Expand Down
3 changes: 3 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,9 @@ type Config struct {
// SearchWorkers specifies number of workers for searchers pool.
// By default this setting is equal to [runtime.GOMAXPROCS].
SearchWorkers int `config:"search_workers"`
// ReplayWorkers specifies number of workers.
// By default this setting is equal to 2.
ReplayWorkers int `config:"replay_workers" default:"2"`
// CacheSize specifies maxium size of cache.
// By default this setting is equal to 30% of available RAM.
CacheSize Bytes `config:"cache_size"`
Expand Down
1 change: 1 addition & 0 deletions config/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ func (c *Config) storeValidations() []validateFn {

greaterThan("resources.reader_workers", 0, c.Resources.ReaderWorkers),
greaterThan("resources.search_workers", 0, c.Resources.SearchWorkers),
greaterThan("resources.replay_workers", 0, c.Resources.ReplayWorkers),
greaterThan("resources.cache_size", 0, c.Resources.CacheSize),

inRange("compression.sealed_zstd_compression_level", -7, 22, c.Compression.SealedZstdCompressionLevel),
Expand Down
2 changes: 2 additions & 0 deletions consts/consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ const (

IngestorMaxInflightBulks = 32

DefaultReplayWorkers = 2

// known extensions
MetaFileSuffix = ".meta"

Expand Down
3 changes: 2 additions & 1 deletion frac/active.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func mustOpenFile(name string, skipFsync bool) (*os.File, os.FileInfo) {
}

func (f *Active) Replay(ctx context.Context) error {
logger.Info("start replaying...")
logger.Info("start replaying...", zap.String("name", f.info.Name()))

t := time.Now()

Expand Down Expand Up @@ -169,6 +169,7 @@ out:
next += step
progress := float64(offset) / float64(f.info.MetaOnDisk) * 100
logger.Info("replaying batch, meta",
zap.String("name", f.info.Name()),
zap.Uint64("from", offset),
zap.Uint64("to", offset+metaSize),
zap.Uint64("target", f.info.MetaOnDisk),
Expand Down
4 changes: 4 additions & 0 deletions fracmanager/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type Config struct {

FracLoadLimit uint64 // how many sealed fractions should fracmanager load, if 0 then loads all
ShouldReplay bool
ReplayWorkers int
MaintenanceDelay time.Duration
CacheCleanupDelay time.Duration
CacheGCDelay time.Duration
Expand Down Expand Up @@ -62,6 +63,9 @@ func FillConfigWithDefault(config *Config) *Config {
if config.SealParams.TokenTableZstdLevel == 0 {
config.SealParams.TokenTableZstdLevel = zstdDefaultLevel
}
if config.ReplayWorkers == 0 {
config.ReplayWorkers = consts.DefaultReplayWorkers
}

if config.SortCacheSize == 0 {
const (
Expand Down
59 changes: 44 additions & 15 deletions fracmanager/fracmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/prometheus/client_golang/prometheus/promauto"
"go.uber.org/atomic"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"

"github.com/ozontech/seq-db/config"
"github.com/ozontech/seq-db/consts"
Expand Down Expand Up @@ -494,27 +495,55 @@ func (fm *FracManager) Load(ctx context.Context) error {
}

func (fm *FracManager) replayAll(ctx context.Context, actives []*frac.Active) error {
for i, a := range actives {
if err := a.Replay(ctx); err != nil {
return err
}
if len(actives) == 0 {
return nil
}

if a.Info().DocsTotal == 0 {
a.Suicide() // remove empty
continue
}
g, ctx := errgroup.WithContext(ctx)
g.SetLimit(fm.config.ReplayWorkers)

r := fm.newActiveRef(a)
fm.localFracs = append(fm.localFracs, r.ref)
// goroutines access different indices, no need for lock protection for fracRefs
var fracRefs = make([]*fracRef, len(actives))
var newActiveRef *activeRef

if i == len(actives)-1 { // last and not empty
fm.active = r
continue
}
for i, f := range actives {
g.Go(func() error {
if err := f.Replay(ctx); err != nil {
return err
}

if f.Info().DocsTotal == 0 {
f.Suicide() // remove empty
return nil
}

fm.seal(r)
ref := fm.newActiveRef(f)
fracRefs[i] = ref.ref

if i != len(actives)-1 {
fm.seal(ref)
} else {
// last frac stays active and is not sealed
newActiveRef = &ref
}

return nil
})
}

if err := g.Wait(); err != nil {
return err
}

for _, ref := range fracRefs {
if ref != nil {
fm.localFracs = append(fm.localFracs, ref)
}
}

if newActiveRef != nil {
fm.active = *newActiveRef
}
return nil
}

Expand Down
Loading
Loading