Skip to content

Commit

Permalink
Address PR comments
Browse files Browse the repository at this point in the history
  • Loading branch information
julienduchesne committed Jan 10, 2025
1 parent 014d6d1 commit 13b96f4
Showing 1 changed file with 12 additions and 6 deletions.
18 changes: 12 additions & 6 deletions pkg/ruler/rule_concurrency.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,9 @@ func (c *TenantConcurrencyController) Allow(_ context.Context, _ *rules.Group, _
return false
}

// SplitGroupIntoBatches splits the group into batches of rules that can be evaluated concurrently.
// It tries to batch rules that have no dependencies together and rules that have dependencies in separate batches.
// Returning no batches or nil means that the group should be evaluated sequentially.
func (c *TenantConcurrencyController) SplitGroupIntoBatches(_ context.Context, g *rules.Group) []rules.ConcurrentRules {
if !c.isGroupAtRisk(g) {
// If the group is not at risk, we can evaluate the rules sequentially.
Expand All @@ -191,6 +194,8 @@ func (c *TenantConcurrencyController) SplitGroupIntoBatches(_ context.Context, g
unevaluatedDependencies map[rules.Rule]struct{}
}
remainingRules := make(map[rules.Rule]ruleInfo)

// This batch holds the rules that have no dependencies and will be run first.
firstBatch := rules.ConcurrentRules{}
for i, r := range g.Rules() {
if r.NoDependencyRules() {
Expand All @@ -208,14 +213,14 @@ func (c *TenantConcurrencyController) SplitGroupIntoBatches(_ context.Context, g
if len(firstBatch) == 0 {
// There are no rules without dependencies.
// Fall back to sequential evaluation.
level.Info(logger).Log("msg", "No rules without dependencies found, falling back to sequential rule evaluation. This may be due to indeterminate rule dependencies.")
level.Info(logger).Log("msg", "No rules without dependencies found, falling back to sequential rule evaluation.")
return nil
}
order := []rules.ConcurrentRules{firstBatch}
result := []rules.ConcurrentRules{firstBatch}

// Build the order of rules to evaluate based on dependencies.
for len(remainingRules) > 0 {
previousBatch := order[len(order)-1]
previousBatch := result[len(result)-1]
// Remove the batch's rules from the dependencies of its dependents.
for _, idx := range previousBatch {
rule := g.Rules()[idx]
Expand All @@ -242,12 +247,13 @@ func (c *TenantConcurrencyController) SplitGroupIntoBatches(_ context.Context, g
return nil
}

order = append(order, batch)
result = append(result, batch)
}

level.Info(logger).Log("msg", "Batched rules into concurrent blocks", "rules", len(g.Rules()), "batches", len(order))
level.Info(logger).Log("msg", "Batched rules into concurrent blocks", "rules", len(g.Rules()), "batches", len(result))
level.Debug(logger).Log("msg", "Batched rules into concurrent blocks", "batches", result)

return order
return result
}

// isGroupAtRisk checks if the rule group's last evaluation time is within the risk threshold.
Expand Down

0 comments on commit 13b96f4

Please sign in to comment.