Skip to content

Commit

Permalink
test
Browse files Browse the repository at this point in the history
  • Loading branch information
julienduchesne committed Jan 7, 2025
1 parent 9e94f1c commit 5bf11af
Show file tree
Hide file tree
Showing 11 changed files with 517 additions and 301 deletions.
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,8 @@ require (
sigs.k8s.io/yaml v1.4.0 // indirect
)

replace github.com/prometheus/prometheus => github.com/grafana/mimir-prometheus v0.0.0-20250102152619-93fa7617c041
// TODO: This references a temporary branch (https://github.com/prometheus/prometheus/pull/15788), fix it before merging.
replace github.com/prometheus/prometheus => github.com/grafana/mimir-prometheus v0.0.0-20250106214327-c8fa41a84f3e

// Replace memberlist with our fork which includes some fixes that haven't been
// merged upstream yet:
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1283,8 +1283,8 @@ github.com/grafana/gomemcache v0.0.0-20241016125027-0a5bcc5aef40 h1:1TeKhyS+pvzO
github.com/grafana/gomemcache v0.0.0-20241016125027-0a5bcc5aef40/go.mod h1:IGRj8oOoxwJbHBYl1+OhS9UjQR0dv6SQOep7HqmtyFU=
github.com/grafana/memberlist v0.3.1-0.20220714140823-09ffed8adbbe h1:yIXAAbLswn7VNWBIvM71O2QsgfgW9fRXZNR0DXe6pDU=
github.com/grafana/memberlist v0.3.1-0.20220714140823-09ffed8adbbe/go.mod h1:MS2lj3INKhZjWNqd3N0m3J+Jxf3DAOnAH9VT3Sh9MUE=
github.com/grafana/mimir-prometheus v0.0.0-20250102152619-93fa7617c041 h1:tZFQRbiyOW630aJ7r+p+N3kUWhMVeWLRGSjZsK9KA0s=
github.com/grafana/mimir-prometheus v0.0.0-20250102152619-93fa7617c041/go.mod h1:a5LEa2Vy87wOp0Vu6sLmEIR1V59fqH3QosOSiErAr30=
github.com/grafana/mimir-prometheus v0.0.0-20250106214327-c8fa41a84f3e h1:IXOhnRUbzAtaXzMj4oeFFXY0lHdP8//0mHPHDe57aGA=
github.com/grafana/mimir-prometheus v0.0.0-20250106214327-c8fa41a84f3e/go.mod h1:a5LEa2Vy87wOp0Vu6sLmEIR1V59fqH3QosOSiErAr30=
github.com/grafana/opentracing-contrib-go-stdlib v0.0.0-20230509071955-f410e79da956 h1:em1oddjXL8c1tL0iFdtVtPloq2hRPen2MJQKoAWpxu0=
github.com/grafana/opentracing-contrib-go-stdlib v0.0.0-20230509071955-f410e79da956/go.mod h1:qtI1ogk+2JhVPIXVc6q+NHziSmy2W5GbdQZFUHADCBU=
github.com/grafana/prometheus-alertmanager v0.25.1-0.20240930132144-b5e64e81e8d3 h1:6D2gGAwyQBElSrp3E+9lSr7k8gLuP3Aiy20rweLWeBw=
Expand Down
151 changes: 122 additions & 29 deletions pkg/ruler/rule_concurrency.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,13 @@ func (ds *DynamicSemaphore) Release() {
}

type MultiTenantConcurrencyControllerMetrics struct {
SlotsInUse *prometheus.GaugeVec
AttemptsStartedTotal *prometheus.CounterVec
AttemptsIncompleteTotal *prometheus.CounterVec
AttemptsCompletedTotal *prometheus.CounterVec
SlotsInUse *prometheus.GaugeVec
AttemptsStartedTotal *prometheus.CounterVec
AttemptsIncompleteTotal *prometheus.CounterVec
AttemptsCompletedTotal *prometheus.CounterVec
BatchAttemptsStartedTotal *prometheus.CounterVec
BatchAttemptsIncompleteTotal *prometheus.CounterVec
BatchAttemptsCompletedTotal *prometheus.CounterVec
}

func newMultiTenantConcurrencyControllerMetrics(reg prometheus.Registerer) *MultiTenantConcurrencyControllerMetrics {
Expand All @@ -86,6 +89,18 @@ func newMultiTenantConcurrencyControllerMetrics(reg prometheus.Registerer) *Mult
Name: "cortex_ruler_independent_rule_evaluation_concurrency_attempts_completed_total",
Help: "Total number of concurrency slots we're done using across all tenants",
}, []string{"user"}),
BatchAttemptsStartedTotal: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_ruler_independent_rule_evaluation_batch_attempts_started_total",
Help: "Total number of started attempts to split rules into concurrent batches across all tenants",
}, []string{"user"}),
BatchAttemptsIncompleteTotal: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_ruler_independent_rule_evaluation_batch_attempts_incomplete_total",
Help: "Total number of incomplete attempts to split rules into concurrent batches across all tenants",
}, []string{"user", "group", "reason"}),
BatchAttemptsCompletedTotal: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_ruler_independent_rule_evaluation_batch_attempts_completed_total",
Help: "Total number of completed attempts to split rules into concurrent batches across all tenants",
}, []string{"user"}),
}

return m
Expand Down Expand Up @@ -115,10 +130,13 @@ func NewMultiTenantConcurrencyController(logger log.Logger, maxGlobalConcurrency
// NewTenantConcurrencyControllerFor returns a new rules.RuleConcurrencyController to use for the input tenantID.
func (c *MultiTenantConcurrencyController) NewTenantConcurrencyControllerFor(tenantID string) rules.RuleConcurrencyController {
return &TenantConcurrencyController{
slotsInUse: c.metrics.SlotsInUse.WithLabelValues(tenantID),
attemptsStartedTotal: c.metrics.AttemptsStartedTotal.WithLabelValues(tenantID),
attemptsIncompleteTotal: c.metrics.AttemptsIncompleteTotal.WithLabelValues(tenantID),
attemptsCompletedTotal: c.metrics.AttemptsCompletedTotal.WithLabelValues(tenantID),
slotsInUse: c.metrics.SlotsInUse.WithLabelValues(tenantID),
attemptsStartedTotal: c.metrics.AttemptsStartedTotal.WithLabelValues(tenantID),
attemptsIncompleteTotal: c.metrics.AttemptsIncompleteTotal.WithLabelValues(tenantID),
attemptsCompletedTotal: c.metrics.AttemptsCompletedTotal.WithLabelValues(tenantID),
batchAttemptsStartedTotal: c.metrics.BatchAttemptsStartedTotal.WithLabelValues(tenantID),
batchAttemptsIncompleteTotal: c.metrics.BatchAttemptsIncompleteTotal.MustCurryWith(prometheus.Labels{"user": tenantID}),
batchAttemptsCompletedTotal: c.metrics.BatchAttemptsCompletedTotal.WithLabelValues(tenantID),

tenantID: tenantID,
thresholdRuleConcurrency: c.thresholdRuleConcurrency,
Expand All @@ -136,10 +154,13 @@ type TenantConcurrencyController struct {
thresholdRuleConcurrency float64 // Percentage of the rule interval at which we consider the rule group at risk of missing its evaluation.

// Metrics with the tenant label already in them. This avoids having to call WithLabelValues on every metric change.
slotsInUse prometheus.Gauge
attemptsStartedTotal prometheus.Counter
attemptsIncompleteTotal prometheus.Counter
attemptsCompletedTotal prometheus.Counter
slotsInUse prometheus.Gauge
attemptsStartedTotal prometheus.Counter
attemptsIncompleteTotal prometheus.Counter
attemptsCompletedTotal prometheus.Counter
batchAttemptsStartedTotal prometheus.Counter
batchAttemptsIncompleteTotal *prometheus.CounterVec
batchAttemptsCompletedTotal prometheus.Counter

globalConcurrency *semaphore.Weighted
tenantConcurrency *DynamicSemaphore
Expand All @@ -156,18 +177,6 @@ func (c *TenantConcurrencyController) Done(_ context.Context) {

// Allow tries to acquire a slot from the concurrency controller.
func (c *TenantConcurrencyController) Allow(_ context.Context, group *rules.Group, rule rules.Rule) bool {
// To allow a rule to be executed concurrently, we need 3 conditions:
// 1. The rule group must be at risk of missing its evaluation.
// 2. The rule must not have any rules that depend on it.
// 3. The rule itself must not depend on any other rules.
if !c.isGroupAtRisk(group) {
return false
}

if !isRuleIndependent(rule) {
return false
}

// Next, try to acquire a global concurrency slot.
c.attemptsStartedTotal.Inc()
if !c.globalConcurrency.TryAcquire(1) {
Expand All @@ -187,6 +196,84 @@ func (c *TenantConcurrencyController) Allow(_ context.Context, group *rules.Grou
return false
}

func (c *TenantConcurrencyController) SplitGroupIntoBatches(_ context.Context, g *rules.Group) []rules.ConcurrentRules {
c.batchAttemptsStartedTotal.Inc()

incomplete := func(reason string) []rules.ConcurrentRules {
c.batchAttemptsIncompleteTotal.With(prometheus.Labels{"group": g.Name(), "reason": reason}).Inc()
return sequentialOrdering(g)
}

if !c.isGroupAtRisk(g) {
return incomplete("group_not_at_risk")
}

type ruleInfo struct {
ruleIdx int
unevaluatedDependencies map[rules.Rule]struct{}
}
remainingRules := make(map[rules.Rule]ruleInfo)
firstBatch := rules.ConcurrentRules{}
for i, r := range g.Rules() {
if r.NoDependencyRules() {
firstBatch = append(firstBatch, i)
continue
}
// Initialize the rule info with the rule's dependencies.
// Use a copy of the dependencies to avoid mutating the rule.
info := ruleInfo{ruleIdx: i, unevaluatedDependencies: map[rules.Rule]struct{}{}}
for _, dep := range r.DependencyRules() {
info.unevaluatedDependencies[dep] = struct{}{}
}
remainingRules[r] = info
}
if len(firstBatch) == 0 {
// There are no rules without dependencies.
// Fall back to sequential evaluation.
return incomplete("no_rules_without_dependencies")
}
order := []rules.ConcurrentRules{firstBatch}

// Build the order of rules to evaluate based on dependencies.
for len(remainingRules) > 0 {
previousBatch := order[len(order)-1]
// Remove the batch's rules from the dependencies of its dependents.
for _, idx := range previousBatch {
rule := g.Rules()[idx]
for _, dependent := range rule.DependentRules() {
dependentInfo := remainingRules[dependent]
delete(dependentInfo.unevaluatedDependencies, rule)
}
}

var batch rules.ConcurrentRules
// Find rules that have no remaining dependencies.
for name, info := range remainingRules {
if len(info.unevaluatedDependencies) == 0 {
batch = append(batch, info.ruleIdx)
delete(remainingRules, name)
}
}

if len(batch) == 0 {
// There is a cycle in the rules' dependencies.
// We can't evaluate them concurrently.
// Fall back to sequential evaluation.
return incomplete("cyclic_dependencies")
}

order = append(order, batch)
}

if len(order) == len(g.Rules()) {
// All rules depend on each other, this is equivalent to sequential evaluation.
return incomplete("all_rules_depend_on_each_other")
}

c.batchAttemptsCompletedTotal.Inc()
return order
}

// isGroupAtRisk checks if the rule group's last evaluation time is within the risk threshold.
func (c *TenantConcurrencyController) isGroupAtRisk(group *rules.Group) bool {
interval := group.Interval().Seconds()
Expand All @@ -205,11 +292,6 @@ func (c *TenantConcurrencyController) isGroupAtRisk(group *rules.Group) bool {
return false
}

// isRuleIndependent checks if the rule is independent of other rules.
func isRuleIndependent(rule rules.Rule) bool {
return rule.NoDependentRules() && rule.NoDependencyRules()
}

// NoopMultiTenantConcurrencyController is a concurrency controller that does not allow for concurrency.
type NoopMultiTenantConcurrencyController struct{}

Expand All @@ -221,6 +303,17 @@ func (n *NoopMultiTenantConcurrencyController) NewTenantConcurrencyControllerFor
type NoopTenantConcurrencyController struct{}

func (n *NoopTenantConcurrencyController) Done(_ context.Context) {}
func (n *NoopTenantConcurrencyController) SplitGroupIntoBatches(_ context.Context, g *rules.Group) []rules.ConcurrentRules {
return sequentialOrdering(g)
}
func (n *NoopTenantConcurrencyController) Allow(_ context.Context, _ *rules.Group, _ rules.Rule) bool {
return false
}

func sequentialOrdering(g *rules.Group) []rules.ConcurrentRules {
order := make([]rules.ConcurrentRules, len(g.Rules()))
for i := 0; i < len(g.Rules()); i++ {
order[i] = []int{i}
}
return order
}
113 changes: 47 additions & 66 deletions pkg/ruler/rule_concurrency_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,8 @@ func TestMultiTenantConcurrencyController(t *testing.T) {
exp, err := parser.ParseExpr("vector(1)")
require.NoError(t, err)
rule1 := rules.NewRecordingRule("test", exp, labels.Labels{})
rule1.SetNoDependencyRules(true)
rule1.SetNoDependentRules(true)
rule1.SetDependencyRules([]rules.Rule{})
rule1.SetDependentRules([]rules.Rule{})

globalController := NewMultiTenantConcurrencyController(logger, 3, 50.0, reg, limits)
user1Controller := globalController.NewTenantConcurrencyControllerFor("user1")
Expand All @@ -123,6 +123,14 @@ func TestMultiTenantConcurrencyController(t *testing.T) {

// Let's check the metrics up until this point.
require.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(`
# HELP cortex_ruler_independent_rule_evaluation_batch_attempts_completed_total Total number of completed attempts to split rules into concurrent batches across all tenants
# TYPE cortex_ruler_independent_rule_evaluation_batch_attempts_completed_total counter
cortex_ruler_independent_rule_evaluation_batch_attempts_completed_total{user="user1"} 0
cortex_ruler_independent_rule_evaluation_batch_attempts_completed_total{user="user2"} 0
# HELP cortex_ruler_independent_rule_evaluation_batch_attempts_started_total Total number of started attempts to split rules into concurrent batches across all tenants
# TYPE cortex_ruler_independent_rule_evaluation_batch_attempts_started_total counter
cortex_ruler_independent_rule_evaluation_batch_attempts_started_total{user="user1"} 0
cortex_ruler_independent_rule_evaluation_batch_attempts_started_total{user="user2"} 0
# HELP cortex_ruler_independent_rule_evaluation_concurrency_attempts_completed_total Total number of concurrency slots we're done using across all tenants
# TYPE cortex_ruler_independent_rule_evaluation_concurrency_attempts_completed_total counter
cortex_ruler_independent_rule_evaluation_concurrency_attempts_completed_total{user="user1"} 0
Expand All @@ -148,6 +156,14 @@ cortex_ruler_independent_rule_evaluation_concurrency_slots_in_use{user="user2"}

// Let's look at the metrics again.
require.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(`
# HELP cortex_ruler_independent_rule_evaluation_batch_attempts_completed_total Total number of completed attempts to split rules into concurrent batches across all tenants
# TYPE cortex_ruler_independent_rule_evaluation_batch_attempts_completed_total counter
cortex_ruler_independent_rule_evaluation_batch_attempts_completed_total{user="user1"} 0
cortex_ruler_independent_rule_evaluation_batch_attempts_completed_total{user="user2"} 0
# HELP cortex_ruler_independent_rule_evaluation_batch_attempts_started_total Total number of started attempts to split rules into concurrent batches across all tenants
# TYPE cortex_ruler_independent_rule_evaluation_batch_attempts_started_total counter
cortex_ruler_independent_rule_evaluation_batch_attempts_started_total{user="user1"} 0
cortex_ruler_independent_rule_evaluation_batch_attempts_started_total{user="user2"} 0
# HELP cortex_ruler_independent_rule_evaluation_concurrency_attempts_incomplete_total Total number of incomplete attempts to acquire concurrency slots across all tenants
# TYPE cortex_ruler_independent_rule_evaluation_concurrency_attempts_incomplete_total counter
cortex_ruler_independent_rule_evaluation_concurrency_attempts_incomplete_total{user="user1"} 2
Expand All @@ -171,19 +187,27 @@ cortex_ruler_independent_rule_evaluation_concurrency_attempts_completed_total{us
user2Controller.Done(ctx)
user2Controller.Done(ctx)

// Finally, let's try a few edge cases.
rg2 := rules.NewGroup(rules.GroupOptions{
File: "test.rules",
Name: "test",
Interval: 1 * time.Minute, // group not at risk.
Opts: &rules.ManagerOptions{},
})
require.False(t, user1Controller.Allow(ctx, rg2, rule1)) // Should not be allowed with a group that is not at risk.
rule1.SetNoDependencyRules(false)
require.False(t, user1Controller.Allow(ctx, rg, rule1)) // Should not be allowed as the rule is no longer independent.
// // Finally, let's try a few edge cases.
// rg2 := rules.NewGroup(rules.GroupOptions{
// File: "test.rules",
// Name: "test",
// Interval: 1 * time.Minute, // group not at risk.
// Opts: &rules.ManagerOptions{},
// })
// require.False(t, user1Controller.Allow(ctx, rg2, rule1)) // Should not be allowed with a group that is not at risk.
// rule1.SetDependencyRules([]rules.Rule{rules.NewRecordingRule("test2", nil, labels.Labels{})})
// require.False(t, user1Controller.Allow(ctx, rg, rule1)) // Should not be allowed as the rule is no longer independent.

// Check the metrics one final time to ensure there are no active slots in use.
require.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(`
# HELP cortex_ruler_independent_rule_evaluation_batch_attempts_completed_total Total number of completed attempts to split rules into concurrent batches across all tenants
# TYPE cortex_ruler_independent_rule_evaluation_batch_attempts_completed_total counter
cortex_ruler_independent_rule_evaluation_batch_attempts_completed_total{user="user1"} 0
cortex_ruler_independent_rule_evaluation_batch_attempts_completed_total{user="user2"} 0
# HELP cortex_ruler_independent_rule_evaluation_batch_attempts_started_total Total number of started attempts to split rules into concurrent batches across all tenants
# TYPE cortex_ruler_independent_rule_evaluation_batch_attempts_started_total counter
cortex_ruler_independent_rule_evaluation_batch_attempts_started_total{user="user1"} 0
cortex_ruler_independent_rule_evaluation_batch_attempts_started_total{user="user2"} 0
# HELP cortex_ruler_independent_rule_evaluation_concurrency_attempts_incomplete_total Total number of incomplete attempts to acquire concurrency slots across all tenants
# TYPE cortex_ruler_independent_rule_evaluation_concurrency_attempts_incomplete_total counter
cortex_ruler_independent_rule_evaluation_concurrency_attempts_incomplete_total{user="user1"} 2
Expand All @@ -203,7 +227,7 @@ cortex_ruler_independent_rule_evaluation_concurrency_attempts_completed_total{us
`)))

// Make the rule independent again.
rule1.SetNoDependencyRules(true)
rule1.SetDependencyRules([]rules.Rule{})

// Now let's test having a controller two times for the same tenant.
user3Controller := globalController.NewTenantConcurrencyControllerFor("user3")
Expand All @@ -215,57 +239,6 @@ cortex_ruler_independent_rule_evaluation_concurrency_attempts_completed_total{us
require.True(t, user3ControllerTwo.Allow(ctx, rg, rule1))
}

func TestIsRuleIndependent(t *testing.T) {
tests := map[string]struct {
rule rules.Rule
expected bool
}{
"rule has neither dependencies nor dependents": {
rule: func() rules.Rule {
r := rules.NewRecordingRule("test", nil, labels.Labels{})
r.SetNoDependentRules(true)
r.SetNoDependencyRules(true)
return r
}(),
expected: true,
},
"rule has both dependencies and dependents": {
rule: func() rules.Rule {
r := rules.NewRecordingRule("test", nil, labels.Labels{})
r.SetNoDependentRules(false)
r.SetNoDependencyRules(false)
return r
}(),
expected: false,
},
"rule has dependents": {
rule: func() rules.Rule {
r := rules.NewRecordingRule("test", nil, labels.Labels{})
r.SetNoDependentRules(false)
r.SetNoDependencyRules(true)
return r
}(),
expected: false,
},
"rule has dependencies": {
rule: func() rules.Rule {
r := rules.NewRecordingRule("test", nil, labels.Labels{})
r.SetNoDependentRules(true)
r.SetNoDependencyRules(false)
return r
}(),
expected: false,
},
}

for name, tc := range tests {
t.Run(name, func(t *testing.T) {
result := isRuleIndependent(tc.rule)
require.Equal(t, tc.expected, result)
})
}
}

func TestGroupAtRisk(t *testing.T) {
createAndEvalTestGroup := func(interval time.Duration, evalConcurrently bool) *rules.Group {
st := teststorage.New(t)
Expand All @@ -279,8 +252,8 @@ func TestGroupAtRisk(t *testing.T) {
q, err := parser.ParseExpr("vector(1)")
require.NoError(t, err)
rule := rules.NewRecordingRule(fmt.Sprintf("test_rule%d", i), q, labels.Labels{})
rule.SetNoDependencyRules(true)
rule.SetNoDependentRules(true)
rule.SetDependencyRules([]rules.Rule{})
rule.SetDependentRules([]rules.Rule{})
createdRules = append(createdRules, rule)
}

Expand Down Expand Up @@ -371,4 +344,12 @@ func (a *allowAllConcurrencyController) Allow(_ context.Context, _ *rules.Group,
return true
}

func (a *allowAllConcurrencyController) SplitGroupIntoBatches(_ context.Context, g *rules.Group) []rules.ConcurrentRules {
indexes := make([]int, len(g.Rules()))
for i := 0; i < len(g.Rules()); i++ {
indexes[i] = i
}
return []rules.ConcurrentRules{indexes}
}

func (a *allowAllConcurrencyController) Done(_ context.Context) {}
Loading

0 comments on commit 5bf11af

Please sign in to comment.