Skip to content

Commit bf4cf80

Browse files
committed
refactor(fracmanager): move name generation and sealing logic to FractionProvider
1 parent 2daa556 commit bf4cf80

File tree

6 files changed

+109
-68
lines changed

6 files changed

+109
-68
lines changed

fracmanager/fracmanager.go

Lines changed: 14 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,11 @@ package fracmanager
33
import (
44
"context"
55
"errors"
6-
"io"
7-
"math/rand"
86
"os"
97
"path/filepath"
108
"sync"
119
"time"
1210

13-
"github.com/oklog/ulid/v2"
1411
"github.com/prometheus/client_golang/prometheus"
1512
"github.com/prometheus/client_golang/prometheus/promauto"
1613
"go.uber.org/atomic"
@@ -44,6 +41,7 @@ type FracManager struct {
4441
remoteFracs []*frac.Remote
4542
active activeRef
4643

44+
indexer *frac.ActiveIndexer
4745
fracProvider *fractionProvider
4846

4947
oldestCTLocal atomic.Uint64
@@ -56,8 +54,6 @@ type FracManager struct {
5654
cacheWG *sync.WaitGroup
5755

5856
s3cli *s3.Client
59-
60-
ulidEntropy io.Reader
6157
}
6258

6359
type fracRef struct {
@@ -100,25 +96,24 @@ func NewFracManager(ctx context.Context, cfg *Config, s3cli *s3.Client) *FracMan
10096
ChangeGenerations: metric.CacheChangeGenerations,
10197
})
10298

99+
readLimiter := storage.NewReadLimiter(config.ReaderWorkers, storeBytesRead)
100+
indexer := frac.NewActiveIndexer(config.IndexWorkers, config.IndexWorkers)
101+
indexer.Start()
102+
103103
fracManager := &FracManager{
104104
config: cfg,
105105
ctx: ctx,
106106
s3cli: s3cli,
107107
mature: atomic.Bool{},
108108
cacheMaintainer: cacheMaintainer,
109-
fracProvider: newFractionProvider(&cfg.Fraction, s3cli, cacheMaintainer, config.ReaderWorkers, config.IndexWorkers),
110-
ulidEntropy: ulid.Monotonic(rand.New(rand.NewSource(time.Now().UnixNano())), 0),
109+
indexer: indexer,
110+
fracProvider: newFractionProvider(cfg, s3cli, cacheMaintainer, readLimiter, indexer),
111111
fracCache: NewSealedFracCache(filepath.Join(cfg.DataDir, consts.FracCacheFileSuffix)),
112112
}
113113

114114
return fracManager
115115
}
116116

117-
// This method is not thread safe. Use consciously to avoid race
118-
func (fm *FracManager) nextFractionID() string {
119-
return ulid.MustNew(ulid.Timestamp(time.Now()), fm.ulidEntropy).String()
120-
}
121-
122117
func (fm *FracManager) maintenance(sealWg, cleanupWg *sync.WaitGroup) {
123118
logger.Debug("maintenance started")
124119

@@ -289,7 +284,7 @@ func (fm *FracManager) cleanupFractions(cleanupWg *sync.WaitGroup) {
289284
}
290285

291286
offloadStart := time.Now()
292-
mustBeOffloaded, err := outsider.Offload(fm.ctx, s3.NewUploader(fm.s3cli))
287+
remote, err := fm.fracProvider.Offload(fm.ctx, outsider)
293288
if err != nil {
294289
metric.OffloadingTotal.WithLabelValues("failure").Inc()
295290
metric.OffloadingDurationSeconds.Observe(float64(time.Since(offloadStart).Seconds()))
@@ -307,7 +302,7 @@ func (fm *FracManager) cleanupFractions(cleanupWg *sync.WaitGroup) {
307302
return
308303
}
309304

310-
if !mustBeOffloaded {
305+
if remote == nil {
311306
fm.fracCache.RemoveFraction(info.Name())
312307
outsider.Suicide()
313308
return
@@ -322,8 +317,6 @@ func (fm *FracManager) cleanupFractions(cleanupWg *sync.WaitGroup) {
322317
zap.String("took", time.Since(offloadStart).String()),
323318
)
324319

325-
remote := fm.fracProvider.NewRemote(fm.ctx, info.Path, info)
326-
327320
fm.fracMu.Lock()
328321
// FIXME(dkharms): We had previously shifted fraction from local fracs list (in [fm.determineOutsiders] via [fm.shiftFirstFrac])
329322
// and therefore excluded it from search queries.
@@ -565,7 +558,7 @@ func (fm *FracManager) seal(activeRef activeRef) {
565558
sealsDoneSeconds.Observe(time.Since(now).Seconds())
566559
}()
567560

568-
sealed, err := activeRef.frac.Seal(fm.config.SealParams)
561+
sealed, err := activeRef.frac.Seal()
569562
if err != nil {
570563
if errors.Is(err, ErrSealingFractionSuicided) {
571564
// the faction is suicided, this means that it has already pushed out of the list of factions,
@@ -584,18 +577,16 @@ func (fm *FracManager) seal(activeRef activeRef) {
584577
}
585578

586579
func (fm *FracManager) rotate() activeRef {
587-
filePath := fileBasePattern + fm.nextFractionID()
588-
baseFilePath := filepath.Join(fm.config.DataDir, filePath)
589-
logger.Info("creating new fraction", zap.String("filepath", baseFilePath))
590-
591-
next := fm.newActiveRef(fm.fracProvider.NewActive(baseFilePath))
580+
next := fm.newActiveRef(fm.fracProvider.CreateActive())
592581

593582
fm.fracMu.Lock()
594583
prev := fm.active
595584
fm.active = next
596585
fm.localFracs = append(fm.localFracs, fm.active.ref)
597586
fm.fracMu.Unlock()
598587

588+
logger.Info("new fraction created", zap.String("filepath", next.frac.active.BaseFileName))
589+
599590
return prev
600591
}
601592

@@ -604,7 +595,7 @@ func (fm *FracManager) minFracSizeToSeal() uint64 {
604595
}
605596

606597
func (fm *FracManager) Stop() {
607-
fm.fracProvider.Stop()
598+
fm.indexer.Stop()
608599
fm.stopFn()
609600

610601
fm.statWG.Wait()

fracmanager/fracmanager_test.go

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ func TestMatureMode(t *testing.T) {
110110

111111
checkFn(fm)
112112

113-
fm.fracProvider.Stop()
113+
fm.indexer.Stop()
114114
}
115115

116116
id := 1
@@ -151,15 +151,6 @@ func TestMatureMode(t *testing.T) {
151151

152152
}
153153

154-
func TestNewULID(t *testing.T) {
155-
fm := NewFracManager(context.Background(), &Config{}, nil)
156-
ulid1 := fm.nextFractionID()
157-
ulid2 := fm.nextFractionID()
158-
assert.NotEqual(t, ulid1, ulid2, "ULIDs should be different")
159-
assert.Equal(t, 26, len(ulid1), "ULID should have length 26")
160-
assert.Greater(t, ulid2, ulid1)
161-
}
162-
163154
func TestOldestCT(t *testing.T) {
164155
const fracCount = 10
165156

fracmanager/fraction_provider.go

Lines changed: 69 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,19 @@ package fracmanager
22

33
import (
44
"context"
5+
"io"
6+
"math/rand"
7+
"path/filepath"
8+
"time"
59

10+
"github.com/oklog/ulid/v2"
611
"github.com/prometheus/client_golang/prometheus"
712
"github.com/prometheus/client_golang/prometheus/promauto"
813

914
"github.com/ozontech/seq-db/frac"
1015
"github.com/ozontech/seq-db/frac/common"
1116
"github.com/ozontech/seq-db/frac/sealed"
17+
"github.com/ozontech/seq-db/frac/sealed/sealing"
1218
"github.com/ozontech/seq-db/storage"
1319
"github.com/ozontech/seq-db/storage/s3"
1420
)
@@ -19,27 +25,28 @@ var storeBytesRead = promauto.NewCounter(prometheus.CounterOpts{
1925
Name: "bytes_read",
2026
})
2127

28+
// fractionProvider is a factory for creating different types of fractions
29+
// Contains all necessary dependencies for creating and managing fractions
2230
type fractionProvider struct {
23-
s3cli *s3.Client
24-
config *frac.Config
25-
cacheProvider *CacheMaintainer
26-
activeIndexer *frac.ActiveIndexer
27-
readLimiter *storage.ReadLimiter
31+
s3cli *s3.Client // Client for S3 storage operations
32+
config *Config // Fraction manager configuration
33+
cacheProvider *CacheMaintainer // Cache provider for data access optimization
34+
activeIndexer *frac.ActiveIndexer // Indexer for active fractions
35+
readLimiter *storage.ReadLimiter // Read rate limiter
36+
ulidEntropy io.Reader // Entropy source for ULID generation
2837
}
2938

3039
func newFractionProvider(
31-
c *frac.Config, s3cli *s3.Client, cp *CacheMaintainer,
32-
readerWorkers, indexWorkers int,
40+
cfg *Config, s3cli *s3.Client, cp *CacheMaintainer,
41+
readLimiter *storage.ReadLimiter, indexer *frac.ActiveIndexer,
3342
) *fractionProvider {
34-
ai := frac.NewActiveIndexer(indexWorkers, indexWorkers)
35-
ai.Start() // first start indexWorkers to allow active frac replaying
36-
3743
return &fractionProvider{
3844
s3cli: s3cli,
39-
config: c,
45+
config: cfg,
4046
cacheProvider: cp,
41-
activeIndexer: ai,
42-
readLimiter: storage.NewReadLimiter(readerWorkers, storeBytesRead),
47+
activeIndexer: indexer,
48+
readLimiter: readLimiter,
49+
ulidEntropy: ulid.Monotonic(rand.New(rand.NewSource(time.Now().UnixNano())), 0),
4350
}
4451
}
4552

@@ -50,7 +57,7 @@ func (fp *fractionProvider) NewActive(name string) *frac.Active {
5057
fp.readLimiter,
5158
fp.cacheProvider.CreateDocBlockCache(),
5259
fp.cacheProvider.CreateSortDocsCache(),
53-
fp.config,
60+
&fp.config.Fraction,
5461
)
5562
}
5663

@@ -60,37 +67,75 @@ func (fp *fractionProvider) NewSealed(name string, cachedInfo *common.Info) *fra
6067
fp.readLimiter,
6168
fp.cacheProvider.CreateIndexCache(),
6269
fp.cacheProvider.CreateDocBlockCache(),
63-
cachedInfo,
64-
fp.config,
70+
cachedInfo, // Preloaded meta information
71+
&fp.config.Fraction,
6572
)
6673
}
6774

6875
func (fp *fractionProvider) NewSealedPreloaded(name string, preloadedData *sealed.PreloadedData) *frac.Sealed {
6976
return frac.NewSealedPreloaded(
7077
name,
71-
preloadedData,
78+
preloadedData, // Data already loaded into memory
7279
fp.readLimiter,
7380
fp.cacheProvider.CreateIndexCache(),
7481
fp.cacheProvider.CreateDocBlockCache(),
75-
fp.config,
82+
&fp.config.Fraction,
7683
)
7784
}
7885

79-
func (fp *fractionProvider) NewRemote(
80-
ctx context.Context, name string, cachedInfo *common.Info,
81-
) *frac.Remote {
86+
func (fp *fractionProvider) NewRemote(ctx context.Context, name string, cachedInfo *common.Info) *frac.Remote {
8287
return frac.NewRemote(
8388
ctx,
8489
name,
8590
fp.readLimiter,
8691
fp.cacheProvider.CreateIndexCache(),
8792
fp.cacheProvider.CreateDocBlockCache(),
8893
cachedInfo,
89-
fp.config,
94+
&fp.config.Fraction,
9095
fp.s3cli,
9196
)
9297
}
9398

94-
func (fp *fractionProvider) Stop() {
95-
fp.activeIndexer.Stop()
99+
// nextFractionID generates a unique identifier for a new fraction
100+
// IMPORTANT: This method is not thread-safe. When used in concurrent environments,
101+
// external synchronization must be provided to avoid ID collisions
102+
func (fp *fractionProvider) nextFractionID() string {
103+
return ulid.MustNew(ulid.Timestamp(time.Now()), fp.ulidEntropy).String()
104+
}
105+
106+
// CreateActive creates a new active fraction with auto-generated filename
107+
// Filename pattern: base_pattern + ULID
108+
func (fp *fractionProvider) CreateActive() *frac.Active {
109+
filePath := fileBasePattern + fp.nextFractionID()
110+
baseFilePath := filepath.Join(fp.config.DataDir, filePath)
111+
return fp.NewActive(baseFilePath)
112+
}
113+
114+
// Seal converts an active fraction to a sealed one
115+
// Process includes sorting, indexing, and data optimization for reading
116+
func (fp *fractionProvider) Seal(active *frac.Active) (*frac.Sealed, error) {
117+
src, err := frac.NewActiveSealingSource(active, fp.config.SealParams)
118+
if err != nil {
119+
return nil, err
120+
}
121+
preloaded, err := sealing.Seal(src, fp.config.SealParams)
122+
if err != nil {
123+
return nil, err
124+
}
125+
126+
return fp.NewSealedPreloaded(active.BaseFileName, preloaded), nil
127+
}
128+
129+
// Offload uploads fraction to S3 storage and returns a remote fraction
130+
// IMPORTANT: context controls timeouts and operation cancellation
131+
func (fp *fractionProvider) Offload(ctx context.Context, f frac.Fraction) (*frac.Remote, error) {
132+
mustBeOffloaded, err := f.Offload(ctx, s3.NewUploader(fp.s3cli))
133+
if err != nil {
134+
return nil, err
135+
}
136+
if !mustBeOffloaded {
137+
return nil, nil
138+
}
139+
info := f.Info()
140+
return fp.NewRemote(ctx, info.Path, info), nil
96141
}
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package fracmanager
2+
3+
import (
4+
"testing"
5+
6+
"github.com/stretchr/testify/assert"
7+
)
8+
9+
func TestFractionID(t *testing.T) {
10+
fp := newFractionProvider(nil, nil, nil, nil, nil)
11+
ulid1 := fp.nextFractionID()
12+
ulid2 := fp.nextFractionID()
13+
assert.NotEqual(t, ulid1, ulid2, "ULIDs should be different")
14+
assert.Equal(t, 26, len(ulid1), "ULID should have length 26")
15+
assert.Greater(t, ulid2, ulid1)
16+
}

fracmanager/proxy_frac.go

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ import (
1111

1212
"github.com/ozontech/seq-db/frac"
1313
"github.com/ozontech/seq-db/frac/common"
14-
"github.com/ozontech/seq-db/frac/sealed/sealing"
1514
"github.com/ozontech/seq-db/logger"
1615
"github.com/ozontech/seq-db/metric"
1716
"github.com/ozontech/seq-db/seq"
@@ -118,7 +117,7 @@ func (f *proxyFrac) WaitWriteIdle() {
118117
logger.Info("write is stopped", zap.String("name", f.name), zap.Float64("time_wait_s", waitTime))
119118
}
120119

121-
func (f *proxyFrac) Seal(params common.SealParams) (*frac.Sealed, error) {
120+
func (f *proxyFrac) Seal() (*frac.Sealed, error) {
122121
f.useMu.Lock()
123122
if f.isSuicidedState() {
124123
f.useMu.Unlock()
@@ -135,16 +134,10 @@ func (f *proxyFrac) Seal(params common.SealParams) (*frac.Sealed, error) {
135134

136135
f.WaitWriteIdle()
137136

138-
src, err := frac.NewActiveSealingSource(active, params)
137+
sealed, err := f.fp.Seal(active)
139138
if err != nil {
140139
return nil, err
141140
}
142-
preloaded, err := sealing.Seal(src, params)
143-
if err != nil {
144-
return nil, err
145-
}
146-
147-
sealed := f.fp.NewSealedPreloaded(active.BaseFileName, preloaded)
148141

149142
f.useMu.Lock()
150143
f.sealed = sealed

fracmanager/sealer_test.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"github.com/ozontech/seq-db/frac/sealed"
2323
"github.com/ozontech/seq-db/frac/sealed/sealing"
2424
"github.com/ozontech/seq-db/seq"
25+
"github.com/ozontech/seq-db/storage"
2526
testscommon "github.com/ozontech/seq-db/tests/common"
2627
)
2728

@@ -107,15 +108,19 @@ func BenchmarkSealing_WithSort(b *testing.B) {
107108

108109
func runSealingBench(b *testing.B, cfg *frac.Config) {
109110
cm := NewCacheMaintainer(uint64(units.MiB)*64, uint64(units.MiB)*64, nil)
110-
fp := newFractionProvider(cfg, nil, cm, 1, 1)
111+
112+
idx := frac.NewActiveIndexer(1, 1)
113+
rl := storage.NewReadLimiter(1, storeBytesRead)
114+
fp := newFractionProvider(&Config{Fraction: *cfg}, nil, cm, rl, idx)
115+
idx.Start()
111116

112117
dataDir := filepath.Join(b.TempDir(), "BenchmarkSealing")
113118
testscommon.RecreateDir(dataDir)
114119

115120
active := fp.NewActive(filepath.Join(dataDir, "test"))
116121
err := fillActiveFraction(active)
117122
assert.NoError(b, err)
118-
fp.Stop()
123+
idx.Stop()
119124

120125
seal := func(active *frac.Active, params common.SealParams) (*sealed.PreloadedData, error) {
121126
src, err := frac.NewActiveSealingSource(active, params)

0 commit comments

Comments
 (0)