Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 0 additions & 11 deletions frac/sealed/sealing/sealer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,10 @@ import (
"iter"
"os"
"path/filepath"
"time"

"go.uber.org/zap"

"github.com/ozontech/seq-db/consts"
"github.com/ozontech/seq-db/frac/common"
"github.com/ozontech/seq-db/frac/sealed"
"github.com/ozontech/seq-db/logger"
"github.com/ozontech/seq-db/seq"
"github.com/ozontech/seq-db/util"
)
Expand Down Expand Up @@ -44,7 +40,6 @@ type Source interface {
// - *sealed.PreloadedData: Preloaded data structures for initialization of sealed fraction
// - error: Any error encountered during the sealing process
func Seal(src Source, params common.SealParams) (*sealed.PreloadedData, error) {
start := time.Now()
info := src.Info()

// Validate that we're not sealing an empty fraction
Expand Down Expand Up @@ -101,11 +96,5 @@ func Seal(src Source, params common.SealParams) (*sealed.PreloadedData, error) {
},
}

// Log successful sealing operation
logger.Info(
"fraction sealed",
zap.String("fraction", filepath.Dir(info.Path)),
zap.Float64("time_spent_s", util.DurationToUnit(time.Since(start), "s")),
)
return &preloaded, nil
}
49 changes: 22 additions & 27 deletions fracmanager/fracmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,11 @@ package fracmanager
import (
"context"
"errors"
"io"
"math/rand"
"os"
"path/filepath"
"sync"
"time"

"github.com/oklog/ulid/v2"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"go.uber.org/atomic"
Expand Down Expand Up @@ -44,6 +41,7 @@ type FracManager struct {
remoteFracs []*frac.Remote
active activeRef

indexer *frac.ActiveIndexer
fracProvider *fractionProvider

oldestCTLocal atomic.Uint64
Expand All @@ -56,8 +54,6 @@ type FracManager struct {
cacheWG sync.WaitGroup

s3cli *s3.Client

ulidEntropy io.Reader
}

type fracRef struct {
Expand All @@ -82,25 +78,24 @@ func NewFracManager(ctx context.Context, cfg *Config, s3cli *s3.Client) *FracMan

cacheMaintainer := NewCacheMaintainer(cfg.CacheSize, cfg.SortCacheSize, newDefaultCacheMetrics())

readLimiter := storage.NewReadLimiter(config.ReaderWorkers, storeBytesRead)
indexer := frac.NewActiveIndexer(config.IndexWorkers, config.IndexWorkers)
indexer.Start()

fracManager := &FracManager{
config: cfg,
ctx: ctx,
s3cli: s3cli,
mature: atomic.Bool{},
cacheMaintainer: cacheMaintainer,
fracProvider: newFractionProvider(&cfg.Fraction, s3cli, cacheMaintainer, config.ReaderWorkers, config.IndexWorkers),
ulidEntropy: ulid.Monotonic(rand.New(rand.NewSource(time.Now().UnixNano())), 0),
indexer: indexer,
fracProvider: newFractionProvider(cfg, s3cli, cacheMaintainer, readLimiter, indexer),
fracCache: NewFracInfoCache(filepath.Join(cfg.DataDir, consts.FracCacheFileSuffix)),
}

return fracManager
}

// This method is not thread safe. Use consciously to avoid race
func (fm *FracManager) nextFractionID() string {
return ulid.MustNew(ulid.Timestamp(time.Now()), fm.ulidEntropy).String()
}

func (fm *FracManager) maintenance(sealWg, cleanupWg *sync.WaitGroup) {
logger.Debug("maintenance started")

Expand Down Expand Up @@ -271,7 +266,7 @@ func (fm *FracManager) cleanupFractions(cleanupWg *sync.WaitGroup) {
}

offloadStart := time.Now()
mustBeOffloaded, err := outsider.Offload(fm.ctx, s3.NewUploader(fm.s3cli))
remote, err := fm.fracProvider.Offload(fm.ctx, outsider)
if err != nil {
metric.OffloadingTotal.WithLabelValues("failure").Inc()
metric.OffloadingDurationSeconds.Observe(float64(time.Since(offloadStart).Seconds()))
Expand All @@ -289,7 +284,7 @@ func (fm *FracManager) cleanupFractions(cleanupWg *sync.WaitGroup) {
return
}

if !mustBeOffloaded {
if remote == nil {
fm.fracCache.Remove(info.Name())
outsider.Suicide()
return
Expand All @@ -304,8 +299,6 @@ func (fm *FracManager) cleanupFractions(cleanupWg *sync.WaitGroup) {
zap.String("took", time.Since(offloadStart).String()),
)

remote := fm.fracProvider.NewRemote(fm.ctx, info.Path, info)

fm.fracMu.Lock()
// FIXME(dkharms): We had previously shifted fraction from local fracs list (in [fm.determineOutsiders] via [fm.shiftFirstFrac])
// and therefore excluded it from search queries.
Expand Down Expand Up @@ -556,11 +549,7 @@ var (
func (fm *FracManager) seal(activeRef activeRef) {
sealsTotal.Inc()
now := time.Now()
defer func() {
sealsDoneSeconds.Observe(time.Since(now).Seconds())
}()

sealed, err := activeRef.frac.Seal(fm.config.SealParams)
sealed, err := activeRef.frac.Seal()
if err != nil {
if errors.Is(err, ErrSealingFractionSuicided) {
// the faction is suicided, this means that it has already pushed out of the list of factions,
Expand All @@ -569,6 +558,14 @@ func (fm *FracManager) seal(activeRef activeRef) {
}
logger.Fatal("sealing error", zap.Error(err))
}
sealingTime := time.Since(now)
sealsDoneSeconds.Observe(sealingTime.Seconds())

logger.Info(
"fraction sealed",
zap.String("fraction", filepath.Dir(sealed.Info().Path)),
zap.Float64("time_spent_s", util.DurationToUnit(sealingTime, "s")),
)

info := sealed.Info()
fm.fracCache.Add(info)
Expand All @@ -579,18 +576,16 @@ func (fm *FracManager) seal(activeRef activeRef) {
}

func (fm *FracManager) rotate() activeRef {
filePath := fileBasePattern + fm.nextFractionID()
baseFilePath := filepath.Join(fm.config.DataDir, filePath)
logger.Info("creating new fraction", zap.String("filepath", baseFilePath))

next := fm.newActiveRef(fm.fracProvider.NewActive(baseFilePath))
next := fm.newActiveRef(fm.fracProvider.CreateActive())

fm.fracMu.Lock()
prev := fm.active
fm.active = next
fm.localFracs = append(fm.localFracs, fm.active.ref)
fm.fracMu.Unlock()

logger.Info("new fraction created", zap.String("filepath", next.frac.active.BaseFileName))

return prev
}

Expand All @@ -599,7 +594,7 @@ func (fm *FracManager) minFracSizeToSeal() uint64 {
}

func (fm *FracManager) Stop() {
fm.fracProvider.Stop()
fm.indexer.Stop()
fm.stopFn()

fm.statWG.Wait()
Expand Down
11 changes: 1 addition & 10 deletions fracmanager/fracmanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func TestMatureMode(t *testing.T) {

checkFn(fm)

fm.fracProvider.Stop()
fm.indexer.Stop()
}

id := 1
Expand Down Expand Up @@ -151,15 +151,6 @@ func TestMatureMode(t *testing.T) {

}

func TestNewULID(t *testing.T) {
fm := NewFracManager(context.Background(), &Config{}, nil)
ulid1 := fm.nextFractionID()
ulid2 := fm.nextFractionID()
assert.NotEqual(t, ulid1, ulid2, "ULIDs should be different")
assert.Equal(t, 26, len(ulid1), "ULID should have length 26")
assert.Greater(t, ulid2, ulid1)
}

func TestOldestCT(t *testing.T) {
const fracCount = 10

Expand Down
93 changes: 69 additions & 24 deletions fracmanager/fraction_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,19 @@ package fracmanager

import (
"context"
"io"
"math/rand"
"path/filepath"
"time"

"github.com/oklog/ulid/v2"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

"github.com/ozontech/seq-db/frac"
"github.com/ozontech/seq-db/frac/common"
"github.com/ozontech/seq-db/frac/sealed"
"github.com/ozontech/seq-db/frac/sealed/sealing"
"github.com/ozontech/seq-db/storage"
"github.com/ozontech/seq-db/storage/s3"
)
Expand All @@ -19,27 +25,28 @@ var storeBytesRead = promauto.NewCounter(prometheus.CounterOpts{
Name: "bytes_read",
})

// fractionProvider is a factory for creating different types of fractions
// Contains all necessary dependencies for creating and managing fractions
type fractionProvider struct {
s3cli *s3.Client
config *frac.Config
cacheProvider *CacheMaintainer
activeIndexer *frac.ActiveIndexer
readLimiter *storage.ReadLimiter
s3cli *s3.Client // Client for S3 storage operations
config *Config // Fraction manager configuration
cacheProvider *CacheMaintainer // Cache provider for data access optimization
activeIndexer *frac.ActiveIndexer // Indexer for active fractions
readLimiter *storage.ReadLimiter // Read rate limiter
ulidEntropy io.Reader // Entropy source for ULID generation
}

func newFractionProvider(
c *frac.Config, s3cli *s3.Client, cp *CacheMaintainer,
readerWorkers, indexWorkers int,
cfg *Config, s3cli *s3.Client, cp *CacheMaintainer,
readLimiter *storage.ReadLimiter, indexer *frac.ActiveIndexer,
) *fractionProvider {
ai := frac.NewActiveIndexer(indexWorkers, indexWorkers)
ai.Start() // first start indexWorkers to allow active frac replaying

return &fractionProvider{
s3cli: s3cli,
config: c,
config: cfg,
cacheProvider: cp,
activeIndexer: ai,
readLimiter: storage.NewReadLimiter(readerWorkers, storeBytesRead),
activeIndexer: indexer,
readLimiter: readLimiter,
ulidEntropy: ulid.Monotonic(rand.New(rand.NewSource(time.Now().UnixNano())), 0),
}
}

Expand All @@ -50,7 +57,7 @@ func (fp *fractionProvider) NewActive(name string) *frac.Active {
fp.readLimiter,
fp.cacheProvider.CreateDocBlockCache(),
fp.cacheProvider.CreateSortDocsCache(),
fp.config,
&fp.config.Fraction,
)
}

Expand All @@ -60,37 +67,75 @@ func (fp *fractionProvider) NewSealed(name string, cachedInfo *common.Info) *fra
fp.readLimiter,
fp.cacheProvider.CreateIndexCache(),
fp.cacheProvider.CreateDocBlockCache(),
cachedInfo,
fp.config,
cachedInfo, // Preloaded meta information
&fp.config.Fraction,
)
}

func (fp *fractionProvider) NewSealedPreloaded(name string, preloadedData *sealed.PreloadedData) *frac.Sealed {
return frac.NewSealedPreloaded(
name,
preloadedData,
preloadedData, // Data already loaded into memory
fp.readLimiter,
fp.cacheProvider.CreateIndexCache(),
fp.cacheProvider.CreateDocBlockCache(),
fp.config,
&fp.config.Fraction,
)
}

func (fp *fractionProvider) NewRemote(
ctx context.Context, name string, cachedInfo *common.Info,
) *frac.Remote {
func (fp *fractionProvider) NewRemote(ctx context.Context, name string, cachedInfo *common.Info) *frac.Remote {
return frac.NewRemote(
ctx,
name,
fp.readLimiter,
fp.cacheProvider.CreateIndexCache(),
fp.cacheProvider.CreateDocBlockCache(),
cachedInfo,
fp.config,
&fp.config.Fraction,
fp.s3cli,
)
}

func (fp *fractionProvider) Stop() {
fp.activeIndexer.Stop()
// nextFractionID generates a unique identifier for a new fraction
// IMPORTANT: This method is not thread-safe. When used in concurrent environments,
// external synchronization must be provided to avoid ID collisions
func (fp *fractionProvider) nextFractionID() string {
return ulid.MustNew(ulid.Timestamp(time.Now()), fp.ulidEntropy).String()
}

// CreateActive creates a new active fraction with auto-generated filename
// Filename pattern: base_pattern + ULID
func (fp *fractionProvider) CreateActive() *frac.Active {
filePath := fileBasePattern + fp.nextFractionID()
baseFilePath := filepath.Join(fp.config.DataDir, filePath)
return fp.NewActive(baseFilePath)
}

// Seal converts an active fraction to a sealed one
// Process includes sorting, indexing, and data optimization for reading
func (fp *fractionProvider) Seal(active *frac.Active) (*frac.Sealed, error) {
src, err := frac.NewActiveSealingSource(active, fp.config.SealParams)
if err != nil {
return nil, err
}
preloaded, err := sealing.Seal(src, fp.config.SealParams)
if err != nil {
return nil, err
}

return fp.NewSealedPreloaded(active.BaseFileName, preloaded), nil
}

// Offload uploads fraction to S3 storage and returns a remote fraction
// IMPORTANT: context controls timeouts and operation cancellation
func (fp *fractionProvider) Offload(ctx context.Context, f frac.Fraction) (*frac.Remote, error) {
mustBeOffloaded, err := f.Offload(ctx, s3.NewUploader(fp.s3cli))
if err != nil {
return nil, err
}
if !mustBeOffloaded {
return nil, nil
}
info := f.Info()
return fp.NewRemote(ctx, info.Path, info), nil
}
16 changes: 16 additions & 0 deletions fracmanager/fraction_provider_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package fracmanager

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestFractionID(t *testing.T) {
fp := newFractionProvider(nil, nil, nil, nil, nil)
ulid1 := fp.nextFractionID()
ulid2 := fp.nextFractionID()
assert.NotEqual(t, ulid1, ulid2, "ULIDs should be different")
assert.Equal(t, 26, len(ulid1), "ULID should have length 26")
assert.Greater(t, ulid2, ulid1)
}
Loading
Loading