Skip to content

Commit

Permalink
Block-builder: pull jobs from scheduler (#10118)
Browse files Browse the repository at this point in the history
This adds a "pull" mode to block-builder so that if it is configured with a scheduler at startup, it will live its life in pull-mode, obtaining and completing jobs from a block-builder-scheduler service.

Pull-mode tests are largely duplicated from existing tests. This is temporary as the prior tests will be deleted once we're relying on the scheduler.
  • Loading branch information
seizethedave authored Jan 18, 2025
1 parent 55d90fc commit 49bc44d
Show file tree
Hide file tree
Showing 7 changed files with 946 additions and 93 deletions.
6 changes: 6 additions & 0 deletions development/mimir-ingest-storage/config/mimir.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@ ingester:
min_partition_owners_duration: 10s
delete_inactive_partition_after: 1m

block_builder:
scheduler_config:
address: mimir-block-builder-scheduler-1:9095
update_interval: 5s
max_update_age: 30m

blocks_storage:
s3:
bucket_name: mimir-blocks
Expand Down
199 changes: 176 additions & 23 deletions pkg/blockbuilder/blockbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,16 @@ import (
"github.com/grafana/dskit/backoff"
"github.com/grafana/dskit/runutil"
"github.com/grafana/dskit/services"
otgrpc "github.com/opentracing-contrib/go-grpc"
"github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus"
"github.com/thanos-io/objstore"
"github.com/twmb/franz-go/pkg/kadm"
"github.com/twmb/franz-go/pkg/kgo"
"go.uber.org/atomic"
"google.golang.org/grpc"

"github.com/grafana/mimir/pkg/blockbuilder/schedulerpb"
"github.com/grafana/mimir/pkg/storage/bucket"
"github.com/grafana/mimir/pkg/storage/ingest"
mimir_tsdb "github.com/grafana/mimir/pkg/storage/tsdb"
Expand All @@ -31,12 +36,18 @@ import (
type BlockBuilder struct {
services.Service

cfg Config
logger log.Logger
register prometheus.Registerer
limits *validation.Overrides
kafkaClient *kgo.Client
bucket objstore.Bucket
cfg Config
logger log.Logger
register prometheus.Registerer
limits *validation.Overrides
kafkaClient *kgo.Client
bucket objstore.Bucket
scheduler schedulerpb.SchedulerClient
schedulerConn *grpc.ClientConn
committer stateCommitter

// the current job iteration number. For tests.
jobIteration atomic.Int64

assignedPartitionIDs []int32
// fallbackOffsetMillis is the milliseconds timestamp after which a partition that doesn't have a commit will be consumed from.
Expand All @@ -51,6 +62,17 @@ func New(
logger log.Logger,
reg prometheus.Registerer,
limits *validation.Overrides,
) (*BlockBuilder, error) {
return newWithSchedulerClient(cfg, logger, reg, limits, nil)
}

// newWithSchedulerClient creates a new BlockBuilder with the given scheduler client.
func newWithSchedulerClient(
cfg Config,
logger log.Logger,
reg prometheus.Registerer,
limits *validation.Overrides,
schedulerClient schedulerpb.SchedulerClient,
) (*BlockBuilder, error) {
b := &BlockBuilder{
cfg: cfg,
Expand All @@ -61,23 +83,75 @@ func New(
tsdbBuilderMetrics: newTSDBBBuilderMetrics(reg),
}

b.assignedPartitionIDs = b.cfg.PartitionAssignment[b.cfg.InstanceID]
if len(b.assignedPartitionIDs) == 0 {
// This is just an assertion check. The config validation prevents this from happening.
return nil, fmt.Errorf("no partitions assigned to instance %s", b.cfg.InstanceID)
}

bucketClient, err := bucket.NewClient(context.Background(), cfg.BlocksStorage.Bucket, "block-builder", logger, reg)
if err != nil {
return nil, fmt.Errorf("failed to create the bucket client: %w", err)
}
b.bucket = bucketClient

b.Service = services.NewBasicService(b.starting, b.running, b.stopping)
var runningFunc services.RunningFn
var stoppingFunc services.StoppingFn

if cfg.SchedulerConfig.Address != "" {
// Pull mode: we learn about jobs from a block-builder-scheduler.

if schedulerClient != nil {
b.scheduler = schedulerClient
} else {
var err error
if b.scheduler, b.schedulerConn, err = b.makeSchedulerClient(); err != nil {
return nil, fmt.Errorf("make scheduler client: %w", err)
}
}

runningFunc = b.runningPullMode
stoppingFunc = b.stoppingPullMode
b.committer = &noOpCommitter{}
} else {
// Standalone mode: we consume from statically assigned partitions.
b.assignedPartitionIDs = b.cfg.PartitionAssignment[b.cfg.InstanceID]
if len(b.assignedPartitionIDs) == 0 {
// This is just an assertion check. The config validation prevents this from happening.
return nil, fmt.Errorf("no partitions assigned to instance %s", b.cfg.InstanceID)
}

runningFunc = b.runningStandaloneMode
stoppingFunc = b.stoppingStandaloneMode
b.committer = &kafkaCommitter{}
}

b.Service = services.NewBasicService(b.starting, runningFunc, stoppingFunc)
return b, nil
}

func (b *BlockBuilder) makeSchedulerClient() (schedulerpb.SchedulerClient, *grpc.ClientConn, error) {
dialOpts, err := b.cfg.SchedulerConfig.GRPCClientConfig.DialOption(
[]grpc.UnaryClientInterceptor{otgrpc.OpenTracingClientInterceptor(opentracing.GlobalTracer())},
nil)
if err != nil {
return nil, nil, err
}

// nolint:staticcheck // grpc.Dial() has been deprecated; we'll address it before upgrading to gRPC 2.
conn, err := grpc.Dial(b.cfg.SchedulerConfig.Address, dialOpts...)
if err != nil {
return nil, nil, err
}

client, err := schedulerpb.NewSchedulerClient(
b.cfg.InstanceID,
schedulerpb.NewBlockBuilderSchedulerClient(conn),
b.logger,
b.cfg.SchedulerConfig.UpdateInterval,
b.cfg.SchedulerConfig.MaxUpdateAge,
)
if err != nil {
return nil, nil, err
}

return client, conn, nil
}

func (b *BlockBuilder) starting(context.Context) (err error) {
// Empty any previous artifacts.
if err := os.RemoveAll(b.cfg.DataDir); err != nil {
Expand All @@ -102,13 +176,77 @@ func (b *BlockBuilder) starting(context.Context) (err error) {
return nil
}

func (b *BlockBuilder) stopping(_ error) error {
func (b *BlockBuilder) stoppingPullMode(_ error) error {
b.kafkaClient.Close()
b.scheduler.Close()

if b.schedulerConn != nil {
return b.schedulerConn.Close()
}
return nil
}

func (b *BlockBuilder) running(ctx context.Context) error {
// runningPullMode is a service `running` function for pull mode, where we learn
// about jobs from a block-builder-scheduler. We consume one job at a time.
func (b *BlockBuilder) runningPullMode(ctx context.Context) error {
// Kick off the scheduler's run loop.
go b.scheduler.Run(ctx)

for {
if err := ctx.Err(); err != nil {
if errors.Is(err, context.Canceled) {
return nil
}
return err
}

key, spec, err := b.scheduler.GetJob(ctx)
if err != nil {
if errors.Is(err, context.Canceled) {
return nil
}
level.Error(b.logger).Log("msg", "failed to get job", "err", err)
continue
}

if _, err := b.consumeJob(ctx, key, spec); err != nil {
level.Error(b.logger).Log("msg", "failed to consume job", "job_id", key.Id, "epoch", key.Epoch, "err", err)
continue
}

if err := b.scheduler.CompleteJob(key); err != nil {
level.Error(b.logger).Log("msg", "failed to complete job", "job_id", key.Id, "epoch", key.Epoch, "err", err)
}

b.jobIteration.Inc()
}
}

// consumeJob performs block consumption from Kafka into object storage based on the given job spec.
func (b *BlockBuilder) consumeJob(ctx context.Context, key schedulerpb.JobKey, spec schedulerpb.JobSpec) (PartitionState, error) {
state := PartitionState{
Commit: kadm.Offset{
Topic: spec.Topic,
Partition: spec.Partition,
At: spec.StartOffset,
},
CommitRecordTimestamp: spec.CommitRecTs,
LastSeenOffset: spec.LastSeenOffset,
LastBlockEnd: spec.LastBlockEndTs,
}

logger := log.With(b.logger, "job_id", key.Id, "job_epoch", key.Epoch)
return b.consumePartition(ctx, spec.Partition, state, spec.CycleEndTs, spec.CycleEndOffset, logger)
}

func (b *BlockBuilder) stoppingStandaloneMode(_ error) error {
b.kafkaClient.Close()
return nil
}

// runningStandaloneMode is a service `running` function for standalone mode,
// where we consume from statically assigned partitions.
func (b *BlockBuilder) runningStandaloneMode(ctx context.Context) error {
// Do initial consumption on start using current time as the point up to which we are consuming.
// To avoid small blocks at startup, we consume until the <consume interval> boundary + buffer.
cycleEndTime := cycleEndAtStartup(time.Now(), b.cfg.ConsumeInterval, b.cfg.ConsumeIntervalBuffer)
Expand Down Expand Up @@ -204,7 +342,7 @@ func (b *BlockBuilder) nextConsumeCycle(ctx context.Context, cycleEndTime time.T
}

state := PartitionStateFromLag(b.logger, lag, b.fallbackOffsetMillis)
if err := b.consumePartition(ctx, partition, state, cycleEndTime, lag.End.Offset); err != nil {
if _, err := b.consumePartition(ctx, partition, state, cycleEndTime, lag.End.Offset, b.logger); err != nil {
level.Error(b.logger).Log("msg", "failed to consume partition", "err", err, "partition", partition)
}
}
Expand Down Expand Up @@ -289,11 +427,11 @@ func PartitionStateFromLag(logger log.Logger, lag kadm.GroupMemberLag, fallbackM

// consumePartition consumes records from the given partition until the cycleEnd timestamp.
// If the partition is lagging behind, it takes care of consuming it in sections.
func (b *BlockBuilder) consumePartition(ctx context.Context, partition int32, state PartitionState, cycleEndTime time.Time, cycleEndOffset int64) (err error) {
sp, ctx := spanlogger.NewWithLogger(ctx, b.logger, "BlockBuilder.consumePartition")
func (b *BlockBuilder) consumePartition(ctx context.Context, partition int32, state PartitionState, cycleEndTime time.Time, cycleEndOffset int64, logger log.Logger) (finalState PartitionState, err error) {
sp, ctx := spanlogger.NewWithLogger(ctx, logger, "BlockBuilder.consumePartition")
defer sp.Finish()

logger := log.With(sp, "partition", partition, "cycle_end", cycleEndTime, "cycle_end_offset", cycleEndOffset)
logger = log.With(sp, "partition", partition, "cycle_end", cycleEndTime, "cycle_end_offset", cycleEndOffset)

builder := NewTSDBBuilder(b.logger, b.cfg.DataDir, b.cfg.BlocksStorage, b.limits, b.tsdbBuilderMetrics, b.cfg.ApplyMaxGlobalSeriesPerUserBelow)
defer runutil.CloseWithErrCapture(&err, builder, "closing tsdb builder")
Expand All @@ -317,12 +455,12 @@ func (b *BlockBuilder) consumePartition(ctx context.Context, partition int32, st
logger := log.With(logger, "section_end", sectionEndTime, "offset", state.Commit.At)
state, err = b.consumePartitionSection(ctx, logger, builder, partition, state, sectionEndTime, cycleEndOffset)
if err != nil {
return fmt.Errorf("consume partition %d: %w", partition, err)
return PartitionState{}, fmt.Errorf("consume partition %d: %w", partition, err)
}
sectionEndTime = sectionEndTime.Add(b.cfg.ConsumeInterval)
}

return nil
return state, nil
}

func (b *BlockBuilder) consumePartitionSection(
Expand Down Expand Up @@ -491,14 +629,20 @@ consumerLoop:
LastSeenOffset: lastSeenOffset,
LastBlockEnd: lastBlockEnd,
}
if err := b.commitState(ctx, logger, b.cfg.ConsumerGroup, newState); err != nil {
if err := b.committer.commitState(ctx, b, logger, b.cfg.ConsumerGroup, newState); err != nil {
return state, err
}

return newState, nil
}

func (b *BlockBuilder) commitState(ctx context.Context, logger log.Logger, group string, state PartitionState) error {
type stateCommitter interface {
commitState(context.Context, *BlockBuilder, log.Logger, string, PartitionState) error
}

type kafkaCommitter struct{}

func (c *kafkaCommitter) commitState(ctx context.Context, b *BlockBuilder, logger log.Logger, group string, state PartitionState) error {
offsets := make(kadm.Offsets)
offsets.Add(state.Commit)

Expand All @@ -520,10 +664,19 @@ func (b *BlockBuilder) commitState(ctx context.Context, logger log.Logger, group
}

level.Info(logger).Log("msg", "successfully committed offset to kafka", "offset", state.Commit.At)
return nil
}

var _ stateCommitter = &kafkaCommitter{}

type noOpCommitter struct{}

func (c *noOpCommitter) commitState(_ context.Context, _ *BlockBuilder, _ log.Logger, _ string, _ PartitionState) error {
return nil
}

var _ stateCommitter = &noOpCommitter{}

func (b *BlockBuilder) uploadBlocks(ctx context.Context, tenantID, dbDir string, blockIDs []string) error {
buc := bucket.NewUserBucketClient(tenantID, b.bucket, b.limits)
for _, bid := range blockIDs {
Expand Down
Loading

0 comments on commit 49bc44d

Please sign in to comment.