Skip to content

Commit

Permalink
Rule Concurrency: Split into batches only when loading groups
Browse files Browse the repository at this point in the history
Follow-up to prometheus/prometheus#15681
Rather than doing the split on all evaluations, do it only once, when loading the groups.
This ensures that the performance of the "splitting" implementation won't affect the overall evaluation performance

Signed-off-by: Julien Duchesne <[email protected]>
  • Loading branch information
julienduchesne committed Jan 9, 2025
1 parent a27b86b commit 8def43e
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 23 deletions.
23 changes: 21 additions & 2 deletions rules/group.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ type Group struct {
lastEvaluation time.Time // Wall-clock time of most recent evaluation.
lastEvalTimestamp time.Time // Time slot used for most recent evaluation.

// If set, rules will be evaluated concurrently.
// This is calculated when the group is loaded by the RuleConcurrencyController.
concurrencyBatches []ConcurrentRules

shouldRestore bool

markStale bool
Expand Down Expand Up @@ -650,12 +654,27 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) {
}
}

var wg sync.WaitGroup
ctrl := g.opts.RuleConcurrencyController
concurrencyBatches := g.concurrencyBatches
if ctrl == nil {
g.logger.With("group", g.Name()).Warn("Rule concurrency controller not set, but concurrencyBatches are, this shouldn't happen. Evaluating rules sequentially.")
ctrl = sequentialRuleEvalController{}
concurrencyBatches = nil
}

// Evaluate rules sequentially.
if len(concurrencyBatches) == 0 {
for i, rule := range g.rules {
eval(i, rule, nil)
}
g.metrics.GroupSamples.WithLabelValues(GroupKey(g.File(), g.Name())).Set(samplesTotal.Load())
g.cleanupStaleSeries(ctx, ts)
return
}
for _, batch := range ctrl.SplitGroupIntoBatches(ctx, g) {

// Evaluate rules concurrently.
var wg sync.WaitGroup
for _, batch := range g.concurrencyBatches {
for _, ruleIndex := range batch {
select {
case <-g.done:
Expand Down
16 changes: 10 additions & 6 deletions rules/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ func (m *Manager) LoadGroups(
// Check dependencies between rules and store it on the Rule itself.
m.opts.RuleDependencyController.AnalyseRules(rules)

groups[GroupKey(fn, rg.Name)] = NewGroup(GroupOptions{
g := NewGroup(GroupOptions{
Name: rg.Name,
File: fn,
Interval: itv,
Expand All @@ -385,6 +385,14 @@ func (m *Manager) LoadGroups(
EvalIterationFunc: groupEvalIterationFunc,
AlignEvaluationTimeOnInterval: rg.AlignEvaluationTimeOnInterval,
})

// Calculate the group's concurrency batches.
if ctrl := m.opts.RuleConcurrencyController; ctrl != nil {
g.concurrencyBatches = ctrl.SplitGroupIntoBatches(m.opts.Context, g)
m.logger.Info("calculated concurrency batches", "group", g.name, "rules", len(g.rules), "batches", len(g.concurrencyBatches))
}

groups[GroupKey(fn, rg.Name)] = g
}
}

Expand Down Expand Up @@ -579,11 +587,7 @@ func (c sequentialRuleEvalController) Allow(_ context.Context, _ *Group, _ Rule)
}

func (c sequentialRuleEvalController) SplitGroupIntoBatches(_ context.Context, g *Group) []ConcurrentRules {
order := make([]ConcurrentRules, len(g.rules))
for i := range g.rules {
order[i] = []int{i}
}
return order
return nil // When a zero-value slice is used, the rules are evaluated sequentially.
}

func (c sequentialRuleEvalController) Done(_ context.Context) {}
Expand Down
20 changes: 5 additions & 15 deletions rules/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2225,13 +2225,7 @@ func TestAsyncRuleEvaluation(t *testing.T) {
DefaultEvalIterationFunc(ctx, group, start)

// Expected evaluation order
order := group.opts.RuleConcurrencyController.SplitGroupIntoBatches(ctx, group)
require.Equal(t, []ConcurrentRules{
{0},
{1},
{2},
{3},
}, order)
require.Nil(t, group.concurrencyBatches)

// Never expect more than 1 inflight query at a time.
require.EqualValues(t, 1, maxInflight.Load())
Expand Down Expand Up @@ -2312,10 +2306,9 @@ func TestAsyncRuleEvaluation(t *testing.T) {
DefaultEvalIterationFunc(ctx, group, start)

// Expected evaluation order (isn't affected by concurrency settings)
order := group.opts.RuleConcurrencyController.SplitGroupIntoBatches(ctx, group)
require.Equal(t, []ConcurrentRules{
{0, 1, 2, 3, 4, 5},
}, order)
}, group.concurrencyBatches)

// Max inflight can be 1 synchronous eval and up to MaxConcurrentEvals concurrent evals.
require.EqualValues(t, opts.MaxConcurrentEvals+1, maxInflight.Load())
Expand Down Expand Up @@ -2357,10 +2350,9 @@ func TestAsyncRuleEvaluation(t *testing.T) {
DefaultEvalIterationFunc(ctx, group, start)

// Expected evaluation order
order := group.opts.RuleConcurrencyController.SplitGroupIntoBatches(ctx, group)
require.Equal(t, []ConcurrentRules{
{0, 1, 2, 3, 4, 5},
}, order)
}, group.concurrencyBatches)

// Max inflight can be up to MaxConcurrentEvals concurrent evals, since there is sufficient concurrency to run all rules at once.
require.LessOrEqual(t, int64(maxInflight.Load()), opts.MaxConcurrentEvals)
Expand Down Expand Up @@ -2442,11 +2434,10 @@ func TestAsyncRuleEvaluation(t *testing.T) {
start := time.Now()

// Expected evaluation order
order := group.opts.RuleConcurrencyController.SplitGroupIntoBatches(ctx, group)
require.Equal(t, []ConcurrentRules{
{0, 4},
{1, 2, 3, 5, 6, 7},
}, order)
}, group.concurrencyBatches)

group.Eval(ctx, start)

Expand Down Expand Up @@ -2488,13 +2479,12 @@ func TestAsyncRuleEvaluation(t *testing.T) {
start := time.Now()

// Expected evaluation order
order := group.opts.RuleConcurrencyController.SplitGroupIntoBatches(ctx, group)
require.Equal(t, []ConcurrentRules{
{0, 1},
{2},
{3},
{4, 5, 6},
}, order)
}, group.concurrencyBatches)

group.Eval(ctx, start)

Expand Down

0 comments on commit 8def43e

Please sign in to comment.