Skip to content

Commit b3e6781

Browse files
committed
Add concurrency to block-builder wal conversion and flushing
1 parent b7fbc75 commit b3e6781

File tree

2 files changed

+71
-30
lines changed

2 files changed

+71
-30
lines changed

modules/blockbuilder/blockbuilder_test.go

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"testing"
88
"time"
99

10+
"github.com/go-kit/log"
1011
"github.com/grafana/dskit/flagext"
1112
"github.com/grafana/dskit/ring"
1213
"github.com/grafana/dskit/services"
@@ -328,7 +329,12 @@ type ownEverythingSharder struct{}
328329
func (o *ownEverythingSharder) Owns(string) bool { return true }
329330

330331
func newStore(ctx context.Context, t testing.TB) storage.Store {
332+
return newStoreWithLogger(ctx, t, test.NewTestingLogger(t))
333+
}
334+
335+
func newStoreWithLogger(ctx context.Context, t testing.TB, log log.Logger) storage.Store {
331336
tmpDir := t.TempDir()
337+
332338
s, err := storage.NewStore(storage.Config{
333339
Trace: tempodb.Config{
334340
Backend: backend.Local,
@@ -348,7 +354,7 @@ func newStore(ctx context.Context, t testing.TB) storage.Store {
348354
},
349355
BlocklistPoll: 5 * time.Second,
350356
},
351-
}, nil, test.NewTestingLogger(t))
357+
}, nil, log)
352358
require.NoError(t, err)
353359

354360
s.EnablePolling(ctx, &ownEverythingSharder{})
@@ -483,14 +489,22 @@ func requireLastCommitEquals(t testing.TB, ctx context.Context, client *kgo.Clie
483489
func BenchmarkBlockBuilder(b *testing.B) {
484490
var (
485491
ctx = context.Background()
492+
logger = log.NewNopLogger()
486493
_, address = testkafka.CreateCluster(b, 1, testTopic)
487-
store = newStore(ctx, b)
494+
store = newStoreWithLogger(ctx, b, logger)
488495
cfg = blockbuilderConfig(b, address)
489496
client = newKafkaClient(b, cfg.IngestStorageConfig.Kafka)
490497
)
491498

492499
cfg.ConsumeCycleDuration = 1 * time.Hour
493500

501+
bb := New(cfg, logger, newPartitionRingReader(), &mockOverrides{}, store)
502+
defer bb.stopping(nil)
503+
504+
// Startup (without starting the background consume cycle)
505+
err := bb.starting(ctx)
506+
require.NoError(b, err)
507+
494508
b.ResetTimer()
495509

496510
for i := 0; i < b.N; i++ {
@@ -508,12 +522,6 @@ func BenchmarkBlockBuilder(b *testing.B) {
508522

509523
b.ResetTimer()
510524

511-
bb := New(cfg, test.NewTestingLogger(b), newPartitionRingReader(), &mockOverrides{}, store)
512-
513-
// Startup (without starting the background consume cycle)
514-
err := bb.starting(ctx)
515-
require.NoError(b, err)
516-
517525
err = bb.consume(ctx)
518526
require.NoError(b, err)
519527

modules/blockbuilder/tenant_store.go

Lines changed: 55 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"github.com/google/uuid"
1313
"github.com/grafana/tempo/modules/blockbuilder/util"
1414
"github.com/grafana/tempo/modules/overrides"
15+
"github.com/grafana/tempo/pkg/boundedwaitgroup"
1516
"github.com/grafana/tempo/pkg/livetraces"
1617
"github.com/grafana/tempo/pkg/model"
1718
"github.com/grafana/tempo/pkg/tempopb"
@@ -23,6 +24,7 @@ import (
2324
"github.com/grafana/tempo/tempodb/wal"
2425
"github.com/prometheus/client_golang/prometheus"
2526
"github.com/prometheus/client_golang/prometheus/promauto"
27+
"go.uber.org/atomic"
2628
)
2729

2830
var metricBlockBuilderFlushedBlocks = promauto.NewCounterVec(
@@ -33,7 +35,10 @@ var metricBlockBuilderFlushedBlocks = promauto.NewCounterVec(
3335
}, []string{"tenant"},
3436
)
3537

36-
const reasonTraceTooLarge = "trace_too_large"
38+
const (
39+
reasonTraceTooLarge = "trace_too_large"
40+
flushConcurrency = 4
41+
)
3742

3843
// TODO - This needs locking
3944
type tenantStore struct {
@@ -192,34 +197,62 @@ func (s *tenantStore) Flush(ctx context.Context, store tempodb.Writer) error {
192197
s.blocksMtx.Lock()
193198
defer s.blocksMtx.Unlock()
194199

195-
completeBlocks := make([]tempodb.WriteableBlock, 0, len(s.walBlocks))
196-
// Write all blocks
197-
for _, block := range s.walBlocks {
198-
completeBlock, err := s.buildWriteableBlock(ctx, block)
199-
if err != nil {
200-
return err
201-
}
200+
var (
201+
completeBlocks = make([]tempodb.WriteableBlock, len(s.walBlocks))
202+
jobErr = atomic.NewError(nil)
203+
wg = boundedwaitgroup.New(flushConcurrency)
204+
)
205+
206+
// Convert WALs to backend blocks
207+
for i, block := range s.walBlocks {
208+
wg.Add(1)
209+
go func(i int, block common.WALBlock) {
210+
defer wg.Done()
211+
212+
completeBlock, err := s.buildWriteableBlock(ctx, block)
213+
if err != nil {
214+
jobErr.Store(err)
215+
return
216+
}
202217

203-
err = block.Clear()
204-
if err != nil {
205-
return err
206-
}
218+
err = block.Clear()
219+
if err != nil {
220+
jobErr.Store(err)
221+
return
222+
}
207223

208-
completeBlocks = append(completeBlocks, completeBlock)
224+
completeBlocks[i] = completeBlock
225+
}(i, block)
226+
}
227+
228+
wg.Wait()
229+
if err := jobErr.Load(); err != nil {
230+
return err
209231
}
210232

211-
level.Info(s.logger).Log("msg", "writing blocks to storage", "num_blocks", len(completeBlocks))
212233
// Write all blocks to the store
234+
level.Info(s.logger).Log("msg", "writing blocks to storage", "num_blocks", len(completeBlocks))
213235
for _, block := range completeBlocks {
214-
level.Info(s.logger).Log("msg", "writing block to storage", "block_id", block.BlockMeta().BlockID.String())
215-
if err := store.WriteBlock(ctx, block); err != nil {
216-
return err
217-
}
218-
metricBlockBuilderFlushedBlocks.WithLabelValues(s.tenantID).Inc()
236+
wg.Add(1)
237+
go func(block tempodb.WriteableBlock) {
238+
defer wg.Done()
239+
level.Info(s.logger).Log("msg", "writing block to storage", "block_id", block.BlockMeta().BlockID.String())
240+
if err := store.WriteBlock(ctx, block); err != nil {
241+
jobErr.Store(err)
242+
return
243+
}
219244

220-
if err := s.wal.LocalBackend().ClearBlock((uuid.UUID)(block.BlockMeta().BlockID), s.tenantID); err != nil {
221-
return err
222-
}
245+
metricBlockBuilderFlushedBlocks.WithLabelValues(s.tenantID).Inc()
246+
247+
if err := s.wal.LocalBackend().ClearBlock((uuid.UUID)(block.BlockMeta().BlockID), s.tenantID); err != nil {
248+
jobErr.Store(err)
249+
}
250+
}(block)
251+
}
252+
253+
wg.Wait()
254+
if err := jobErr.Load(); err != nil {
255+
return err
223256
}
224257

225258
// Clear the blocks

0 commit comments

Comments
 (0)