diff --git a/pkg/ruler/rule_concurrency.go b/pkg/ruler/rule_concurrency.go index 3b26f20e95..09e721ff08 100644 --- a/pkg/ruler/rule_concurrency.go +++ b/pkg/ruler/rule_concurrency.go @@ -4,6 +4,7 @@ package ruler import ( "context" + "fmt" "sync" "github.com/go-kit/log" @@ -178,6 +179,16 @@ func (c *TenantConcurrencyController) Allow(_ context.Context, _ *rules.Group, _ return false } +// stringableConcurrentRules is a type that allows us to print a slice of rules.ConcurrentRules. +// This prevents premature evaluation, it will only be evaluated when the logger needs to print it. +type stringableConcurrentRules []rules.ConcurrentRules + +func (p stringableConcurrentRules) String() string { + return fmt.Sprintf("%v", []rules.ConcurrentRules(p)) +} + +var _ fmt.Stringer = stringableConcurrentRules{} + // SplitGroupIntoBatches splits the group into batches of rules that can be evaluated concurrently. // It tries to batch rules that have no dependencies together and rules that have dependencies in separate batches. // Returning no batches or nil means that the group should be evaluated sequentially. @@ -251,7 +262,7 @@ func (c *TenantConcurrencyController) SplitGroupIntoBatches(_ context.Context, g } level.Info(logger).Log("msg", "Batched rules into concurrent blocks", "rules", len(g.Rules()), "batches", len(result)) - level.Debug(logger).Log("msg", "Batched rules into concurrent blocks", "batches", result) + level.Debug(logger).Log("msg", "Batched rules into concurrent blocks", "batches", stringableConcurrentRules(result)) return result } diff --git a/pkg/ruler/rule_concurrency_test.go b/pkg/ruler/rule_concurrency_test.go index e7c485db30..36a88c1d30 100644 --- a/pkg/ruler/rule_concurrency_test.go +++ b/pkg/ruler/rule_concurrency_test.go @@ -183,6 +183,40 @@ cortex_ruler_independent_rule_evaluation_concurrency_attempts_completed_total{us require.True(t, user3ControllerTwo.Allow(ctx, rg, rule1)) } +var splitToBatchesTestCases = map[string]struct { + inputFile string + expectedGroups []rules.ConcurrentRules +}{ + "chained": { + inputFile: "fixtures/rules_chain.yaml", + expectedGroups: []rules.ConcurrentRules{ + {0, 1}, + {2}, + {3, 4}, + {5, 6}, + }, + }, + "indeterminates": { + inputFile: "fixtures/rules_indeterminates.yaml", + expectedGroups: nil, + }, + "all independent": { + inputFile: "fixtures/rules_multiple_independent.yaml", + expectedGroups: []rules.ConcurrentRules{ + {0, 1, 2, 3, 4, 5}, + }, + }, + "topological sort": { + inputFile: "fixtures/rules_topological_sort_needed.json", + expectedGroups: []rules.ConcurrentRules{ + {0, 4, 8, 12, 16, 20, 24, 28, 32, 36, 37, 38, 58}, + {1, 2, 5, 6, 9, 10, 13, 14, 17, 18, 21, 22, 25, 26, 29, 30, 33, 34, 39, 40, 41, 42, 45, 46, 51, 52, 55, 56}, + {3, 7, 11, 15, 19, 23, 27, 31, 35}, + {43, 44, 47, 48, 49, 50, 53, 54, 57}, + }, + }, +} + func TestSplitGroupIntoBatches(t *testing.T) { limits := validation.MockOverrides(func(_ *validation.Limits, tenantLimits map[string]*validation.Limits) { tenantLimits["user1"] = validation.MockDefaultLimits() @@ -196,41 +230,7 @@ func TestSplitGroupIntoBatches(t *testing.T) { RuleConcurrencyController: controller, }) - tests := map[string]struct { - inputFile string - expectedGroups []rules.ConcurrentRules - }{ - "chained": { - inputFile: "fixtures/rules_chain.yaml", - expectedGroups: []rules.ConcurrentRules{ - {0, 1}, - {2}, - {3, 4}, - {5, 6}, - }, - }, - "indeterminates": { - inputFile: "fixtures/rules_indeterminates.yaml", - expectedGroups: nil, - }, - "all independent": { - inputFile: "fixtures/rules_multiple_independent.yaml", - expectedGroups: []rules.ConcurrentRules{ - {0, 1, 2, 3, 4, 5}, - }, - }, - "topological sort": { - inputFile: "fixtures/rules_topological_sort_needed.json", - expectedGroups: []rules.ConcurrentRules{ - {0, 4, 8, 12, 16, 20, 24, 28, 32, 36, 37, 38, 58}, - {1, 2, 5, 6, 9, 10, 13, 14, 17, 18, 21, 22, 25, 26, 29, 30, 33, 34, 39, 40, 41, 42, 45, 46, 51, 52, 55, 56}, - {3, 7, 11, 15, 19, 23, 27, 31, 35}, - {43, 44, 47, 48, 49, 50, 53, 54, 57}, - }, - }, - } - - for name, tc := range tests { + for name, tc := range splitToBatchesTestCases { t.Run(name, func(t *testing.T) { // Load group with a -1 interval so it's always at risk. groups, errs := ruleManager.LoadGroups(-1*time.Second, labels.EmptyLabels(), "", nil, []string{tc.inputFile}...) @@ -252,6 +252,39 @@ func TestSplitGroupIntoBatches(t *testing.T) { } } +func BenchmarkSplitGroupIntoBatches(b *testing.B) { + limits := validation.MockOverrides(func(_ *validation.Limits, tenantLimits map[string]*validation.Limits) { + tenantLimits["user1"] = validation.MockDefaultLimits() + tenantLimits["user1"].RulerMaxIndependentRuleEvaluationConcurrencyPerTenant = 2 + }) + + mtController := NewMultiTenantConcurrencyController(log.NewNopLogger(), 3, 50.0, prometheus.NewPedanticRegistry(), limits) + controller := mtController.NewTenantConcurrencyControllerFor("user1") + + ruleManager := rules.NewManager(&rules.ManagerOptions{ + RuleConcurrencyController: controller, + }) + + for name, tc := range splitToBatchesTestCases { + b.Run(name, func(b *testing.B) { + // Load group with a -1 interval so it's always at risk. + groups, errs := ruleManager.LoadGroups(-1*time.Second, labels.EmptyLabels(), "", nil, []string{tc.inputFile}...) + require.Empty(b, errs) + require.Len(b, groups, 1) + + var group *rules.Group + for _, g := range groups { + group = g + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + _ = controller.SplitGroupIntoBatches(context.Background(), group) + } + }) + } +} + func requireConcurrentRulesEqual(t *testing.T, expected, actual []rules.ConcurrentRules) { t.Helper()