From 8f0ac08fe31d3eb070eabc9c66bc2425b2a79866 Mon Sep 17 00:00:00 2001 From: Danny Kopping Date: Wed, 25 Oct 2023 22:31:26 +0200 Subject: [PATCH 01/10] Implementation NOTE: Rebased from main after refactor in #13014 Signed-off-by: Danny Kopping --- cmd/prometheus/main.go | 27 +- docs/command-line/prometheus.md | 1 + rules/fixtures/rules_dependencies.yaml | 7 + rules/fixtures/rules_multiple.yaml | 14 + rules/fixtures/rules_multiple_groups.yaml | 28 ++ rules/group.go | 134 +++++++- rules/manager.go | 29 +- rules/manager_test.go | 382 ++++++++++++++++++++++ 8 files changed, 594 insertions(+), 28 deletions(-) create mode 100644 rules/fixtures/rules_dependencies.yaml create mode 100644 rules/fixtures/rules_multiple.yaml create mode 100644 rules/fixtures/rules_multiple_groups.yaml diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index 569eb56324..392c1ea254 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -136,6 +136,7 @@ type flagConfig struct { forGracePeriod model.Duration outageTolerance model.Duration resendDelay model.Duration + maxConcurrentEvals int64 web web.Options scrape scrape.Options tsdb tsdbOptions @@ -393,6 +394,9 @@ func main() { serverOnlyFlag(a, "rules.alert.resend-delay", "Minimum amount of time to wait before resending an alert to Alertmanager."). Default("1m").SetValue(&cfg.resendDelay) + serverOnlyFlag(a, "rules.max-concurrent-rule-evals", "Global concurrency limit for independent rules which can run concurrently."). + Default("4").Int64Var(&cfg.maxConcurrentEvals) + a.Flag("scrape.adjust-timestamps", "Adjust scrape timestamps by up to `scrape.timestamp-tolerance` to align them to the intended schedule. See https://github.com/prometheus/prometheus/issues/7846 for more context. Experimental. This flag will be removed in a future release."). Hidden().Default("true").BoolVar(&scrape.AlignScrapeTimestamps) @@ -667,17 +671,18 @@ func main() { queryEngine = promql.NewEngine(opts) ruleManager = rules.NewManager(&rules.ManagerOptions{ - Appendable: fanoutStorage, - Queryable: localStorage, - QueryFunc: rules.EngineQueryFunc(queryEngine, fanoutStorage), - NotifyFunc: rules.SendAlerts(notifierManager, cfg.web.ExternalURL.String()), - Context: ctxRule, - ExternalURL: cfg.web.ExternalURL, - Registerer: prometheus.DefaultRegisterer, - Logger: log.With(logger, "component", "rule manager"), - OutageTolerance: time.Duration(cfg.outageTolerance), - ForGracePeriod: time.Duration(cfg.forGracePeriod), - ResendDelay: time.Duration(cfg.resendDelay), + Appendable: fanoutStorage, + Queryable: localStorage, + QueryFunc: rules.EngineQueryFunc(queryEngine, fanoutStorage), + NotifyFunc: rules.SendAlerts(notifierManager, cfg.web.ExternalURL.String()), + Context: ctxRule, + ExternalURL: cfg.web.ExternalURL, + Registerer: prometheus.DefaultRegisterer, + Logger: log.With(logger, "component", "rule manager"), + OutageTolerance: time.Duration(cfg.outageTolerance), + ForGracePeriod: time.Duration(cfg.forGracePeriod), + ResendDelay: time.Duration(cfg.resendDelay), + MaxConcurrentEvals: cfg.maxConcurrentEvals, }) } diff --git a/docs/command-line/prometheus.md b/docs/command-line/prometheus.md index 78ec205f24..0c702b4f7a 100644 --- a/docs/command-line/prometheus.md +++ b/docs/command-line/prometheus.md @@ -47,6 +47,7 @@ The Prometheus monitoring server | --rules.alert.for-outage-tolerance | Max time to tolerate prometheus outage for restoring "for" state of alert. Use with server mode only. | `1h` | | --rules.alert.for-grace-period | Minimum duration between alert and restored "for" state. This is maintained only for alerts with configured "for" time greater than grace period. Use with server mode only. | `10m` | | --rules.alert.resend-delay | Minimum amount of time to wait before resending an alert to Alertmanager. Use with server mode only. | `1m` | +| --rules.max-concurrent-rule-evals | Global concurrency limit for independent rules which can run concurrently. Use with server mode only. | `4` | | --alertmanager.notification-queue-capacity | The capacity of the queue for pending Alertmanager notifications. Use with server mode only. | `10000` | | --query.lookback-delta | The maximum lookback duration for retrieving metrics during expression evaluations and federation. Use with server mode only. | `5m` | | --query.timeout | Maximum time a query may take before being aborted. Use with server mode only. | `2m` | diff --git a/rules/fixtures/rules_dependencies.yaml b/rules/fixtures/rules_dependencies.yaml new file mode 100644 index 0000000000..31d2c61763 --- /dev/null +++ b/rules/fixtures/rules_dependencies.yaml @@ -0,0 +1,7 @@ +groups: + - name: test + rules: + - record: job:http_requests:rate5m + expr: sum by (job)(rate(http_requests_total[5m])) + - record: HighRequestRate + expr: job:http_requests:rate5m > 100 diff --git a/rules/fixtures/rules_multiple.yaml b/rules/fixtures/rules_multiple.yaml new file mode 100644 index 0000000000..db57bede1b --- /dev/null +++ b/rules/fixtures/rules_multiple.yaml @@ -0,0 +1,14 @@ +groups: + - name: test + rules: + # independents + - record: job:http_requests:rate1m + expr: sum by (job)(rate(http_requests_total[1m])) + - record: job:http_requests:rate5m + expr: sum by (job)(rate(http_requests_total[5m])) + + # dependents + - record: job:http_requests:rate15m + expr: sum by (job)(rate(http_requests_total[15m])) + - record: TooManyRequests + expr: job:http_requests:rate15m > 100 diff --git a/rules/fixtures/rules_multiple_groups.yaml b/rules/fixtures/rules_multiple_groups.yaml new file mode 100644 index 0000000000..87f31a6ca5 --- /dev/null +++ b/rules/fixtures/rules_multiple_groups.yaml @@ -0,0 +1,28 @@ +groups: + - name: http + rules: + # independents + - record: job:http_requests:rate1m + expr: sum by (job)(rate(http_requests_total[1m])) + - record: job:http_requests:rate5m + expr: sum by (job)(rate(http_requests_total[5m])) + + # dependents + - record: job:http_requests:rate15m + expr: sum by (job)(rate(http_requests_total[15m])) + - record: TooManyHTTPRequests + expr: job:http_requests:rate15m > 100 + + - name: grpc + rules: + # independents + - record: job:grpc_requests:rate1m + expr: sum by (job)(rate(grpc_requests_total[1m])) + - record: job:grpc_requests:rate5m + expr: sum by (job)(rate(grpc_requests_total[5m])) + + # dependents + - record: job:grpc_requests:rate15m + expr: sum by (job)(rate(grpc_requests_total[15m])) + - record: TooManyGRPCRequests + expr: job:grpc_requests:rate15m > 100 diff --git a/rules/group.go b/rules/group.go index 5eba20767f..c9abc8f9a7 100644 --- a/rules/group.go +++ b/rules/group.go @@ -21,6 +21,8 @@ import ( "sync" "time" + "github.com/prometheus/prometheus/promql/parser" + "go.uber.org/atomic" "golang.org/x/exp/slices" "github.com/go-kit/log" @@ -68,6 +70,7 @@ type Group struct { // Rule group evaluation iteration function, // defaults to DefaultEvalIterationFunc. evalIterationFunc GroupEvalIterationFunc + dependencyMap dependencyMap } // GroupEvalIterationFunc is used to implement and extend rule group @@ -126,6 +129,7 @@ func NewGroup(o GroupOptions) *Group { logger: log.With(o.Opts.Logger, "file", o.File, "group", o.Name), metrics: metrics, evalIterationFunc: evalIterationFunc, + dependencyMap: buildDependencyMap(o.Rules), } } @@ -421,7 +425,7 @@ func (g *Group) CopyState(from *Group) { // Eval runs a single evaluation cycle in which all rules are evaluated sequentially. func (g *Group) Eval(ctx context.Context, ts time.Time) { - var samplesTotal float64 + var samplesTotal atomic.Float64 for i, rule := range g.rules { select { case <-g.done: @@ -429,7 +433,12 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) { default: } - func(i int, rule Rule) { + eval := func(i int, rule Rule, async bool) { + if async { + defer func() { + g.opts.ConcurrentEvalSema.Release(1) + }() + } ctx, sp := otel.Tracer("").Start(ctx, "rule") sp.SetAttributes(attribute.String("name", rule.Name())) defer func(t time.Time) { @@ -460,7 +469,7 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) { } rule.SetHealth(HealthGood) rule.SetLastError(nil) - samplesTotal += float64(len(vector)) + samplesTotal.Add(float64(len(vector))) if ar, ok := rule.(*AlertingRule); ok { ar.sendAlerts(ctx, ts, g.opts.ResendDelay, g.interval, g.opts.NotifyFunc) @@ -549,10 +558,19 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) { } } } - }(i, rule) + } + + // If the rule has no dependencies, it can run concurrently because no other rules in this group depend on its output. + // Try run concurrently if there are slots available. + if g.dependencyMap.isIndependent(rule) && g.opts.ConcurrentEvalSema != nil && g.opts.ConcurrentEvalSema.TryAcquire(1) { + go eval(i, rule, true) + } else { + eval(i, rule, false) + } } + if g.metrics != nil { - g.metrics.GroupSamples.WithLabelValues(GroupKey(g.File(), g.Name())).Set(samplesTotal) + g.metrics.GroupSamples.WithLabelValues(GroupKey(g.File(), g.Name())).Set(samplesTotal.Load()) } g.cleanupStaleSeries(ctx, ts) } @@ -861,3 +879,109 @@ func NewGroupMetrics(reg prometheus.Registerer) *Metrics { return m } + +// dependencyMap is a data-structure which contains the relationships between rules within a group. +// It is used to describe the dependency associations between recording rules in a group whereby one rule uses the +// output metric produced by another recording rule in its expression (i.e. as its "input"). +type dependencyMap map[Rule][]Rule + +// dependents returns all rules which use the output of the given rule as one of their inputs. +func (m dependencyMap) dependents(r Rule) []Rule { + if len(m) == 0 { + return nil + } + + return m[r] +} + +// dependencies returns all the rules on which the given rule is dependent for input. +func (m dependencyMap) dependencies(r Rule) []Rule { + if len(m) == 0 { + return nil + } + + var parents []Rule + for parent, children := range m { + if len(children) == 0 { + continue + } + + for _, child := range children { + if child == r { + parents = append(parents, parent) + } + } + } + + return parents +} + +func (m dependencyMap) isIndependent(r Rule) bool { + if m == nil { + return false + } + + return len(m.dependents(r)) == 0 && len(m.dependencies(r)) == 0 +} + +// buildDependencyMap builds a data-structure which contains the relationships between rules within a group. +func buildDependencyMap(rules []Rule) dependencyMap { + dependencies := make(dependencyMap) + + if len(rules) <= 1 { + // No relationships if group has 1 or fewer rules. + return nil + } + + inputs := make(map[string][]Rule, len(rules)) + outputs := make(map[string][]Rule, len(rules)) + + var indeterminate bool + + for _, rule := range rules { + rule := rule + + name := rule.Name() + outputs[name] = append(outputs[name], rule) + + parser.Inspect(rule.Query(), func(node parser.Node, path []parser.Node) error { + if n, ok := node.(*parser.VectorSelector); ok { + // A wildcard metric expression means we cannot reliably determine if this rule depends on any other, + // which means we cannot safely run any rules concurrently. + if n.Name == "" && len(n.LabelMatchers) > 0 { + indeterminate = true + return nil + } + + // Rules which depend on "meta-metrics" like ALERTS and ALERTS_FOR_STATE will have undefined behaviour + // if they run concurrently. + if n.Name == alertMetricName || n.Name == alertForStateMetricName { + indeterminate = true + return nil + } + + inputs[n.Name] = append(inputs[n.Name], rule) + } + return nil + }) + } + + if indeterminate { + return nil + } + + if len(inputs) == 0 || len(outputs) == 0 { + // No relationships can be inferred. + return nil + } + + for output, outRules := range outputs { + for _, outRule := range outRules { + if rs, found := inputs[output]; found && len(rs) > 0 { + dependencies[outRule] = append(dependencies[outRule], rs...) + } + } + } + + return dependencies +} diff --git a/rules/manager.go b/rules/manager.go index eed314ade2..84af863359 100644 --- a/rules/manager.go +++ b/rules/manager.go @@ -26,6 +26,7 @@ import ( "github.com/go-kit/log/level" "github.com/prometheus/client_golang/prometheus" "golang.org/x/exp/slices" + "golang.org/x/sync/semaphore" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/rulefmt" @@ -103,18 +104,20 @@ type NotifyFunc func(ctx context.Context, expr string, alerts ...*Alert) // ManagerOptions bundles options for the Manager. type ManagerOptions struct { - ExternalURL *url.URL - QueryFunc QueryFunc - NotifyFunc NotifyFunc - Context context.Context - Appendable storage.Appendable - Queryable storage.Queryable - Logger log.Logger - Registerer prometheus.Registerer - OutageTolerance time.Duration - ForGracePeriod time.Duration - ResendDelay time.Duration - GroupLoader GroupLoader + ExternalURL *url.URL + QueryFunc QueryFunc + NotifyFunc NotifyFunc + Context context.Context + Appendable storage.Appendable + Queryable storage.Queryable + Logger log.Logger + Registerer prometheus.Registerer + OutageTolerance time.Duration + ForGracePeriod time.Duration + ResendDelay time.Duration + MaxConcurrentEvals int64 + ConcurrentEvalSema *semaphore.Weighted + GroupLoader GroupLoader Metrics *Metrics } @@ -130,6 +133,8 @@ func NewManager(o *ManagerOptions) *Manager { o.GroupLoader = FileLoader{} } + o.ConcurrentEvalSema = semaphore.NewWeighted(o.MaxConcurrentEvals) + m := &Manager{ groups: map[string]*Group{}, opts: o, diff --git a/rules/manager_test.go b/rules/manager_test.go index 75ee34919f..be0a3a3d7e 100644 --- a/rules/manager_test.go +++ b/rules/manager_test.go @@ -19,15 +19,18 @@ import ( "math" "os" "sort" + "sync" "testing" "time" "github.com/go-kit/log" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/testutil" "github.com/prometheus/common/model" "github.com/stretchr/testify/require" "go.uber.org/atomic" "go.uber.org/goleak" + "golang.org/x/sync/semaphore" "gopkg.in/yaml.v2" "github.com/prometheus/prometheus/model/labels" @@ -1402,3 +1405,382 @@ func TestNativeHistogramsInRecordingRules(t *testing.T) { require.Equal(t, expHist, fh) require.Equal(t, chunkenc.ValNone, it.Next()) } + +func TestDependencyMap(t *testing.T) { + ctx := context.Background() + opts := &ManagerOptions{ + Context: ctx, + Logger: log.NewNopLogger(), + } + + expr, err := parser.ParseExpr("sum by (user) (rate(requests[1m]))") + require.NoError(t, err) + rule := NewRecordingRule("user:requests:rate1m", expr, labels.Labels{}) + + expr, err = parser.ParseExpr("user:requests:rate1m <= 0") + require.NoError(t, err) + rule2 := NewAlertingRule("ZeroRequests", expr, 0, 0, labels.Labels{}, labels.Labels{}, labels.EmptyLabels(), "", true, log.NewNopLogger()) + group := NewGroup(GroupOptions{ + Name: "rule_group", + Interval: time.Second, + Rules: []Rule{rule, rule2}, + Opts: opts, + }) + + require.Equal(t, []Rule{rule2}, group.dependencyMap.dependents(rule)) + require.Len(t, group.dependencyMap.dependencies(rule), 0) + require.False(t, group.dependencyMap.isIndependent(rule)) + + require.Len(t, group.dependencyMap.dependents(rule2), 0) + require.Equal(t, []Rule{rule}, group.dependencyMap.dependencies(rule2)) + require.False(t, group.dependencyMap.isIndependent(rule2)) +} + +func TestNoDependency(t *testing.T) { + ctx := context.Background() + opts := &ManagerOptions{ + Context: ctx, + Logger: log.NewNopLogger(), + } + + expr, err := parser.ParseExpr("sum by (user) (rate(requests[1m]))") + require.NoError(t, err) + rule := NewRecordingRule("user:requests:rate1m", expr, labels.Labels{}) + + group := NewGroup(GroupOptions{ + Name: "rule_group", + Interval: time.Second, + Rules: []Rule{rule}, + Opts: opts, + }) + + // A group with only one rule cannot have dependencies. + require.False(t, group.dependencyMap.isIndependent(rule)) +} + +func TestNoMetricSelector(t *testing.T) { + ctx := context.Background() + opts := &ManagerOptions{ + Context: ctx, + Logger: log.NewNopLogger(), + } + + expr, err := parser.ParseExpr("sum by (user) (rate(requests[1m]))") + require.NoError(t, err) + rule := NewRecordingRule("user:requests:rate1m", expr, labels.Labels{}) + + expr, err = parser.ParseExpr(`count({user="bob"})`) + require.NoError(t, err) + rule2 := NewRecordingRule("user:requests:rate1m", expr, labels.Labels{}) + + group := NewGroup(GroupOptions{ + Name: "rule_group", + Interval: time.Second, + Rules: []Rule{rule, rule2}, + Opts: opts, + }) + + // A rule with no metric selector cannot be reliably determined to have no dependencies on other rules, and therefore + // all rules are not considered independent. + require.False(t, group.dependencyMap.isIndependent(rule)) + require.False(t, group.dependencyMap.isIndependent(rule2)) +} + +func TestDependentRulesWithNonMetricExpression(t *testing.T) { + ctx := context.Background() + opts := &ManagerOptions{ + Context: ctx, + Logger: log.NewNopLogger(), + } + + expr, err := parser.ParseExpr("sum by (user) (rate(requests[1m]))") + require.NoError(t, err) + rule := NewRecordingRule("user:requests:rate1m", expr, labels.Labels{}) + + expr, err = parser.ParseExpr("user:requests:rate1m <= 0") + require.NoError(t, err) + rule2 := NewAlertingRule("ZeroRequests", expr, 0, 0, labels.Labels{}, labels.Labels{}, labels.EmptyLabels(), "", true, log.NewNopLogger()) + + expr, err = parser.ParseExpr("3") + require.NoError(t, err) + rule3 := NewRecordingRule("three", expr, labels.Labels{}) + + group := NewGroup(GroupOptions{ + Name: "rule_group", + Interval: time.Second, + Rules: []Rule{rule, rule2, rule3}, + Opts: opts, + }) + + require.False(t, group.dependencyMap.isIndependent(rule)) + require.False(t, group.dependencyMap.isIndependent(rule2)) + require.True(t, group.dependencyMap.isIndependent(rule3)) +} + +func TestRulesDependentOnMetaMetrics(t *testing.T) { + ctx := context.Background() + opts := &ManagerOptions{ + Context: ctx, + Logger: log.NewNopLogger(), + } + + // This rule is not dependent on any other rules in its group but it does depend on `ALERTS`, which is produced by + // the rule engine, and is therefore not independent. + expr, err := parser.ParseExpr("count(ALERTS)") + require.NoError(t, err) + rule := NewRecordingRule("alert_count", expr, labels.Labels{}) + + // Create another rule so a dependency map is built (no map is built if a group contains one or fewer rules). + expr, err = parser.ParseExpr("1") + require.NoError(t, err) + rule2 := NewRecordingRule("one", expr, labels.Labels{}) + + group := NewGroup(GroupOptions{ + Name: "rule_group", + Interval: time.Second, + Rules: []Rule{rule, rule2}, + Opts: opts, + }) + + require.False(t, group.dependencyMap.isIndependent(rule)) +} + +func TestDependencyMapUpdatesOnGroupUpdate(t *testing.T) { + files := []string{"fixtures/rules.yaml"} + ruleManager := NewManager(&ManagerOptions{ + Context: context.Background(), + Logger: log.NewNopLogger(), + }) + + ruleManager.start() + defer ruleManager.Stop() + + err := ruleManager.Update(10*time.Second, files, labels.EmptyLabels(), "", nil) + require.NoError(t, err) + require.Greater(t, len(ruleManager.groups), 0, "expected non-empty rule groups") + + orig := make(map[string]dependencyMap, len(ruleManager.groups)) + for _, g := range ruleManager.groups { + // No dependency map is expected because there is only one rule in the group. + require.Empty(t, g.dependencyMap) + orig[g.Name()] = g.dependencyMap + } + + // Update once without changing groups. + err = ruleManager.Update(10*time.Second, files, labels.EmptyLabels(), "", nil) + require.NoError(t, err) + for h, g := range ruleManager.groups { + // Dependency maps are the same because of no updates. + require.Equal(t, orig[h], g.dependencyMap) + } + + // Groups will be recreated when updated. + files[0] = "fixtures/rules_dependencies.yaml" + err = ruleManager.Update(10*time.Second, files, labels.EmptyLabels(), "", nil) + require.NoError(t, err) + + for h, g := range ruleManager.groups { + // Dependency maps must change because the groups would've been updated. + require.NotEqual(t, orig[h], g.dependencyMap) + // We expect there to be some dependencies since the new rule group contains a dependency. + require.Greater(t, len(g.dependencyMap), 0) + } +} + +func TestAsyncRuleEvaluation(t *testing.T) { + storage := teststorage.New(t) + t.Cleanup(func() { storage.Close() }) + + const artificialDelay = time.Second + + var ( + inflightQueries atomic.Int32 + maxInflight atomic.Int32 + ) + + files := []string{"fixtures/rules_multiple.yaml"} + ruleManager := NewManager(&ManagerOptions{ + Context: context.Background(), + Logger: log.NewNopLogger(), + Appendable: storage, + QueryFunc: func(ctx context.Context, q string, ts time.Time) (promql.Vector, error) { + inflightQueries.Add(1) + defer func() { + inflightQueries.Add(-1) + }() + + // Artificially delay all query executions to highly concurrent execution improvement. + time.Sleep(artificialDelay) + + // return a stub sample + return promql.Vector{ + promql.Sample{Metric: labels.FromStrings("__name__", "test"), T: ts.UnixMilli(), F: 12345}, + }, nil + }, + }) + + // Evaluate groups manually to show the impact of async rule evaluations. + groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, files...) + require.Empty(t, errs) + require.Len(t, groups, 1) + + expectedRules := 4 + + t.Run("synchronous evaluation with independent rules", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + + for _, group := range groups { + require.Len(t, group.rules, expectedRules) + + start := time.Now() + + // Never expect more than 1 inflight query at a time. + go func() { + for { + select { + case <-ctx.Done(): + return + default: + highWatermark := maxInflight.Load() + current := inflightQueries.Load() + if current > highWatermark { + maxInflight.Store(current) + } + + time.Sleep(time.Millisecond) + } + } + }() + + group.Eval(ctx, start) + + require.EqualValues(t, 1, maxInflight.Load()) + // Each rule should take at least 1 second to execute sequentially. + require.GreaterOrEqual(t, time.Since(start).Seconds(), (time.Duration(expectedRules) * artificialDelay).Seconds()) + // Each rule produces one vector. + require.EqualValues(t, expectedRules, testutil.ToFloat64(group.metrics.GroupSamples)) + } + + cancel() + }) + + t.Run("asynchronous evaluation with independent rules", func(t *testing.T) { + // Reset. + inflightQueries.Store(0) + maxInflight.Store(0) + ctx, cancel := context.WithCancel(context.Background()) + + for _, group := range groups { + // Allow up to 2 concurrent rule evaluations. + group.opts.ConcurrentEvalSema = semaphore.NewWeighted(2) + require.Len(t, group.rules, expectedRules) + + start := time.Now() + + go func() { + for { + select { + case <-ctx.Done(): + return + default: + highWatermark := maxInflight.Load() + current := inflightQueries.Load() + if current > highWatermark { + maxInflight.Store(current) + } + + time.Sleep(time.Millisecond) + } + } + }() + + group.Eval(ctx, start) + + require.EqualValues(t, 3, maxInflight.Load()) + // Some rules should execute concurrently so should complete quicker. + require.Less(t, time.Since(start).Seconds(), (time.Duration(expectedRules) * artificialDelay).Seconds()) + // Each rule produces one vector. + require.EqualValues(t, expectedRules, testutil.ToFloat64(group.metrics.GroupSamples)) + } + + cancel() + }) +} + +func TestBoundedRuleEvalConcurrency(t *testing.T) { + storage := teststorage.New(t) + t.Cleanup(func() { storage.Close() }) + + const artificialDelay = time.Millisecond * 100 + + var ( + inflightQueries atomic.Int32 + maxInflight atomic.Int32 + maxConcurrency int64 = 3 + groupCount = 2 + ) + + files := []string{"fixtures/rules_multiple_groups.yaml"} + ruleManager := NewManager(&ManagerOptions{ + Context: context.Background(), + Logger: log.NewNopLogger(), + Appendable: storage, + MaxConcurrentEvals: maxConcurrency, + QueryFunc: func(ctx context.Context, q string, ts time.Time) (promql.Vector, error) { + inflightQueries.Add(1) + defer func() { + inflightQueries.Add(-1) + }() + + // Artificially delay all query executions to highly concurrent execution improvement. + time.Sleep(artificialDelay) + + // return a stub sample + return promql.Vector{ + promql.Sample{Metric: labels.FromStrings("__name__", "test"), T: ts.UnixMilli(), F: 12345}, + }, nil + }, + }) + + // Evaluate groups manually to show the impact of async rule evaluations. + groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, files...) + require.Empty(t, errs) + require.Len(t, groups, groupCount) + + ctx, cancel := context.WithCancel(context.Background()) + + go func() { + for { + select { + case <-ctx.Done(): + return + default: + highWatermark := maxInflight.Load() + current := inflightQueries.Load() + if current > highWatermark { + maxInflight.Store(current) + } + + time.Sleep(time.Millisecond) + } + } + }() + + // Evaluate groups concurrently (like they normally do). + var wg sync.WaitGroup + for _, group := range groups { + group := group + + wg.Add(1) + go func() { + group.Eval(ctx, time.Now()) + wg.Done() + }() + } + + wg.Wait() + cancel() + + // Synchronous queries also count towards inflight, so at most we can have maxConcurrency+$groupCount inflight evaluations. + require.EqualValues(t, maxInflight.Load(), int32(maxConcurrency)+int32(groupCount)) +} From b4c3efbc8c9c76c7ccb06155e627b8021c7014ef Mon Sep 17 00:00:00 2001 From: Danny Kopping Date: Wed, 25 Oct 2023 23:05:01 +0200 Subject: [PATCH 02/10] Add feature flag Signed-off-by: Danny Kopping --- cmd/prometheus/main.go | 4 ++++ docs/feature_flags.md | 12 +++++++++++- 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index 392c1ea254..21322a09bd 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -155,6 +155,7 @@ type flagConfig struct { enableNewSDManager bool enablePerStepStats bool enableAutoGOMAXPROCS bool + enableConcurrentRuleEval bool prometheusURL string corsRegexString string @@ -198,6 +199,9 @@ func (c *flagConfig) setFeatureListOptions(logger log.Logger) error { case "auto-gomaxprocs": c.enableAutoGOMAXPROCS = true level.Info(logger).Log("msg", "Automatically set GOMAXPROCS to match Linux container CPU quota") + case "concurrent-rule-eval": + c.enableConcurrentRuleEval = true + level.Info(logger).Log("msg", "Experimental concurrent rule evaluation enabled.") case "no-default-scrape-port": c.scrape.NoDefaultPort = true level.Info(logger).Log("msg", "No default port will be appended to scrape targets' addresses.") diff --git a/docs/feature_flags.md b/docs/feature_flags.md index 1cf54c47f8..2e812a2ab8 100644 --- a/docs/feature_flags.md +++ b/docs/feature_flags.md @@ -133,4 +133,14 @@ notable exception of exemplars, which are always ingested. The OTLP receiver allows Prometheus to accept [OpenTelemetry](https://opentelemetry.io/) metrics writes. Prometheus is best used as a Pull based system, and staleness, `up` metric, and other Pull enabled features -won't work when you push OTLP metrics. \ No newline at end of file +won't work when you push OTLP metrics. + +## Concurrent evaluation of independent rules + +`--enable-feature=concurrent-rule-eval` + +Rule groups execute concurrently, but the rules within a group execute sequentially; this is because rules can use the +output of a preceding rule as its input. However, if there is no detectable relationship between rules then there is no +reason to run them sequentially. This can improve rule reliability at the expense of adding more concurrent query +load. The number of concurrent rule evaluations can be configured with `--rules.max-concurrent-rule-evals` which is set +to `4` by default. \ No newline at end of file From 31a65d14ee34d226881f23c0748dc54afd4d5544 Mon Sep 17 00:00:00 2001 From: Danny Kopping Date: Wed, 25 Oct 2023 23:05:25 +0200 Subject: [PATCH 03/10] Refactor concurrency control Signed-off-by: Danny Kopping --- cmd/prometheus/main.go | 25 ++++++++++--------- rules/group.go | 13 +++++----- rules/manager.go | 56 +++++++++++++++++++++++++++++++----------- rules/manager_test.go | 12 ++++----- 4 files changed, 67 insertions(+), 39 deletions(-) diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index 21322a09bd..d09c82be13 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -675,18 +675,19 @@ func main() { queryEngine = promql.NewEngine(opts) ruleManager = rules.NewManager(&rules.ManagerOptions{ - Appendable: fanoutStorage, - Queryable: localStorage, - QueryFunc: rules.EngineQueryFunc(queryEngine, fanoutStorage), - NotifyFunc: rules.SendAlerts(notifierManager, cfg.web.ExternalURL.String()), - Context: ctxRule, - ExternalURL: cfg.web.ExternalURL, - Registerer: prometheus.DefaultRegisterer, - Logger: log.With(logger, "component", "rule manager"), - OutageTolerance: time.Duration(cfg.outageTolerance), - ForGracePeriod: time.Duration(cfg.forGracePeriod), - ResendDelay: time.Duration(cfg.resendDelay), - MaxConcurrentEvals: cfg.maxConcurrentEvals, + Appendable: fanoutStorage, + Queryable: localStorage, + QueryFunc: rules.EngineQueryFunc(queryEngine, fanoutStorage), + NotifyFunc: rules.SendAlerts(notifierManager, cfg.web.ExternalURL.String()), + Context: ctxRule, + ExternalURL: cfg.web.ExternalURL, + Registerer: prometheus.DefaultRegisterer, + Logger: log.With(logger, "component", "rule manager"), + OutageTolerance: time.Duration(cfg.outageTolerance), + ForGracePeriod: time.Duration(cfg.forGracePeriod), + ResendDelay: time.Duration(cfg.resendDelay), + MaxConcurrentEvals: cfg.maxConcurrentEvals, + ConcurrentEvalsEnabled: cfg.enableConcurrentRuleEval, }) } diff --git a/rules/group.go b/rules/group.go index c9abc8f9a7..ee9755738b 100644 --- a/rules/group.go +++ b/rules/group.go @@ -434,11 +434,12 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) { } eval := func(i int, rule Rule, async bool) { - if async { - defer func() { - g.opts.ConcurrentEvalSema.Release(1) - }() - } + defer func() { + if async { + g.opts.ConcurrencyController.Done() + } + }() + ctx, sp := otel.Tracer("").Start(ctx, "rule") sp.SetAttributes(attribute.String("name", rule.Name())) defer func(t time.Time) { @@ -562,7 +563,7 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) { // If the rule has no dependencies, it can run concurrently because no other rules in this group depend on its output. // Try run concurrently if there are slots available. - if g.dependencyMap.isIndependent(rule) && g.opts.ConcurrentEvalSema != nil && g.opts.ConcurrentEvalSema.TryAcquire(1) { + if g.dependencyMap.isIndependent(rule) && g.opts.ConcurrencyController.Allow() { go eval(i, rule, true) } else { eval(i, rule, false) diff --git a/rules/manager.go b/rules/manager.go index 84af863359..3910be7cd6 100644 --- a/rules/manager.go +++ b/rules/manager.go @@ -104,20 +104,21 @@ type NotifyFunc func(ctx context.Context, expr string, alerts ...*Alert) // ManagerOptions bundles options for the Manager. type ManagerOptions struct { - ExternalURL *url.URL - QueryFunc QueryFunc - NotifyFunc NotifyFunc - Context context.Context - Appendable storage.Appendable - Queryable storage.Queryable - Logger log.Logger - Registerer prometheus.Registerer - OutageTolerance time.Duration - ForGracePeriod time.Duration - ResendDelay time.Duration - MaxConcurrentEvals int64 - ConcurrentEvalSema *semaphore.Weighted - GroupLoader GroupLoader + ExternalURL *url.URL + QueryFunc QueryFunc + NotifyFunc NotifyFunc + Context context.Context + Appendable storage.Appendable + Queryable storage.Queryable + Logger log.Logger + Registerer prometheus.Registerer + OutageTolerance time.Duration + ForGracePeriod time.Duration + ResendDelay time.Duration + MaxConcurrentEvals int64 + ConcurrentEvalsEnabled bool + ConcurrencyController ConcurrencyController + GroupLoader GroupLoader Metrics *Metrics } @@ -133,7 +134,7 @@ func NewManager(o *ManagerOptions) *Manager { o.GroupLoader = FileLoader{} } - o.ConcurrentEvalSema = semaphore.NewWeighted(o.MaxConcurrentEvals) + o.ConcurrencyController = NewConcurrencyController(o.ConcurrentEvalsEnabled, o.MaxConcurrentEvals) m := &Manager{ groups: map[string]*Group{}, @@ -408,3 +409,28 @@ func SendAlerts(s Sender, externalURL string) NotifyFunc { } } } + +type ConcurrencyController struct { + enabled bool + sema *semaphore.Weighted +} + +func NewConcurrencyController(enabled bool, maxConcurrency int64) ConcurrencyController { + return ConcurrencyController{enabled: enabled, sema: semaphore.NewWeighted(maxConcurrency)} +} + +func (c ConcurrencyController) Allow() bool { + if !c.enabled { + return false + } + + return c.sema.TryAcquire(1) +} + +func (c ConcurrencyController) Done() { + if !c.enabled { + return + } + + c.sema.Release(1) +} diff --git a/rules/manager_test.go b/rules/manager_test.go index be0a3a3d7e..3ea932f577 100644 --- a/rules/manager_test.go +++ b/rules/manager_test.go @@ -30,7 +30,6 @@ import ( "github.com/stretchr/testify/require" "go.uber.org/atomic" "go.uber.org/goleak" - "golang.org/x/sync/semaphore" "gopkg.in/yaml.v2" "github.com/prometheus/prometheus/model/labels" @@ -1672,7 +1671,7 @@ func TestAsyncRuleEvaluation(t *testing.T) { for _, group := range groups { // Allow up to 2 concurrent rule evaluations. - group.opts.ConcurrentEvalSema = semaphore.NewWeighted(2) + group.opts.ConcurrencyController = NewConcurrencyController(true, 2) require.Len(t, group.rules, expectedRules) start := time.Now() @@ -1722,10 +1721,11 @@ func TestBoundedRuleEvalConcurrency(t *testing.T) { files := []string{"fixtures/rules_multiple_groups.yaml"} ruleManager := NewManager(&ManagerOptions{ - Context: context.Background(), - Logger: log.NewNopLogger(), - Appendable: storage, - MaxConcurrentEvals: maxConcurrency, + Context: context.Background(), + Logger: log.NewNopLogger(), + Appendable: storage, + ConcurrentEvalsEnabled: true, + MaxConcurrentEvals: maxConcurrency, QueryFunc: func(ctx context.Context, q string, ts time.Time) (promql.Vector, error) { inflightQueries.Add(1) defer func() { From fe8b7ed0b648abaf12fc9d2f59930262161803a6 Mon Sep 17 00:00:00 2001 From: Danny Kopping Date: Wed, 25 Oct 2023 23:10:21 +0200 Subject: [PATCH 04/10] goimports Signed-off-by: Danny Kopping --- rules/group.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/rules/group.go b/rules/group.go index ee9755738b..e4d4c9b8ec 100644 --- a/rules/group.go +++ b/rules/group.go @@ -21,10 +21,11 @@ import ( "sync" "time" - "github.com/prometheus/prometheus/promql/parser" "go.uber.org/atomic" "golang.org/x/exp/slices" + "github.com/prometheus/prometheus/promql/parser" + "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/prometheus/client_golang/prometheus" From af8f96580365504f5f944c00d6e9bdd158cecfce Mon Sep 17 00:00:00 2001 From: Danny Kopping Date: Sat, 28 Oct 2023 11:25:12 +0200 Subject: [PATCH 05/10] Optimising dependencies/dependents funcs to not produce new slices each request Signed-off-by: Danny Kopping --- rules/group.go | 26 +++++++++++--------------- rules/manager_test.go | 27 ++++++++++++++++++++++----- 2 files changed, 33 insertions(+), 20 deletions(-) diff --git a/rules/group.go b/rules/group.go index e4d4c9b8ec..62b9ebdc0c 100644 --- a/rules/group.go +++ b/rules/group.go @@ -887,35 +887,31 @@ func NewGroupMetrics(reg prometheus.Registerer) *Metrics { // output metric produced by another recording rule in its expression (i.e. as its "input"). type dependencyMap map[Rule][]Rule -// dependents returns all rules which use the output of the given rule as one of their inputs. -func (m dependencyMap) dependents(r Rule) []Rule { - if len(m) == 0 { - return nil - } - - return m[r] +// dependents returns the count of rules which use the output of the given rule as one of their inputs. +func (m dependencyMap) dependents(r Rule) int { + return len(m[r]) } -// dependencies returns all the rules on which the given rule is dependent for input. -func (m dependencyMap) dependencies(r Rule) []Rule { +// dependencies returns the count of rules on which the given rule is dependent for input. +func (m dependencyMap) dependencies(r Rule) int { if len(m) == 0 { - return nil + return 0 } - var parents []Rule - for parent, children := range m { + var count int + for _, children := range m { if len(children) == 0 { continue } for _, child := range children { if child == r { - parents = append(parents, parent) + count++ } } } - return parents + return count } func (m dependencyMap) isIndependent(r Rule) bool { @@ -923,7 +919,7 @@ func (m dependencyMap) isIndependent(r Rule) bool { return false } - return len(m.dependents(r)) == 0 && len(m.dependencies(r)) == 0 + return m.dependents(r)+m.dependencies(r) == 0 } // buildDependencyMap builds a data-structure which contains the relationships between rules within a group. diff --git a/rules/manager_test.go b/rules/manager_test.go index 3ea932f577..728aaf187d 100644 --- a/rules/manager_test.go +++ b/rules/manager_test.go @@ -1419,20 +1419,37 @@ func TestDependencyMap(t *testing.T) { expr, err = parser.ParseExpr("user:requests:rate1m <= 0") require.NoError(t, err) rule2 := NewAlertingRule("ZeroRequests", expr, 0, 0, labels.Labels{}, labels.Labels{}, labels.EmptyLabels(), "", true, log.NewNopLogger()) + + expr, err = parser.ParseExpr("sum by (user) (rate(requests[5m]))") + require.NoError(t, err) + rule3 := NewRecordingRule("user:requests:rate5m", expr, labels.Labels{}) + + expr, err = parser.ParseExpr("increase(user:requests:rate1m[1h])") + require.NoError(t, err) + rule4 := NewRecordingRule("user:requests:increase1h", expr, labels.Labels{}) + group := NewGroup(GroupOptions{ Name: "rule_group", Interval: time.Second, - Rules: []Rule{rule, rule2}, + Rules: []Rule{rule, rule2, rule3, rule4}, Opts: opts, }) - require.Equal(t, []Rule{rule2}, group.dependencyMap.dependents(rule)) - require.Len(t, group.dependencyMap.dependencies(rule), 0) + require.Zero(t, group.dependencyMap.dependencies(rule)) + require.Equal(t, 2, group.dependencyMap.dependents(rule)) require.False(t, group.dependencyMap.isIndependent(rule)) - require.Len(t, group.dependencyMap.dependents(rule2), 0) - require.Equal(t, []Rule{rule}, group.dependencyMap.dependencies(rule2)) + require.Zero(t, group.dependencyMap.dependents(rule2)) + require.Equal(t, 1, group.dependencyMap.dependencies(rule2)) require.False(t, group.dependencyMap.isIndependent(rule2)) + + require.Zero(t, group.dependencyMap.dependents(rule3)) + require.Zero(t, group.dependencyMap.dependencies(rule3)) + require.True(t, group.dependencyMap.isIndependent(rule3)) + + require.Zero(t, group.dependencyMap.dependents(rule4)) + require.Equal(t, 1, group.dependencyMap.dependencies(rule4)) + require.False(t, group.dependencyMap.isIndependent(rule4)) } func TestNoDependency(t *testing.T) { From e5eac61c72812363e05ed7fa0fc91b4d8e5d7c6f Mon Sep 17 00:00:00 2001 From: Danny Kopping Date: Sat, 28 Oct 2023 11:44:20 +0200 Subject: [PATCH 06/10] Refactoring Signed-off-by: Danny Kopping --- rules/group.go | 24 +++++++++----- rules/manager.go | 53 ++++++++++++++++++------------- rules/manager_test.go | 73 ++++++++++++++++++++++++++++++++++++++----- 3 files changed, 113 insertions(+), 37 deletions(-) diff --git a/rules/group.go b/rules/group.go index 62b9ebdc0c..21ab0328df 100644 --- a/rules/group.go +++ b/rules/group.go @@ -437,7 +437,7 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) { eval := func(i int, rule Rule, async bool) { defer func() { if async { - g.opts.ConcurrencyController.Done() + g.opts.ConcurrentEvalsController.Done() } }() @@ -564,7 +564,7 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) { // If the rule has no dependencies, it can run concurrently because no other rules in this group depend on its output. // Try run concurrently if there are slots available. - if g.dependencyMap.isIndependent(rule) && g.opts.ConcurrencyController.Allow() { + if g.dependencyMap.isIndependent(rule) && g.opts.ConcurrentEvalsController != nil && g.opts.ConcurrentEvalsController.Allow() { go eval(i, rule, true) } else { eval(i, rule, false) @@ -883,8 +883,8 @@ func NewGroupMetrics(reg prometheus.Registerer) *Metrics { } // dependencyMap is a data-structure which contains the relationships between rules within a group. -// It is used to describe the dependency associations between recording rules in a group whereby one rule uses the -// output metric produced by another recording rule in its expression (i.e. as its "input"). +// It is used to describe the dependency associations between rules in a group whereby one rule uses the +// output metric produced by another rule in its expression (i.e. as its "input"). type dependencyMap map[Rule][]Rule // dependents returns the count of rules which use the output of the given rule as one of their inputs. @@ -900,10 +900,6 @@ func (m dependencyMap) dependencies(r Rule) int { var count int for _, children := range m { - if len(children) == 0 { - continue - } - for _, child := range children { if child == r { count++ @@ -914,6 +910,8 @@ func (m dependencyMap) dependencies(r Rule) int { return count } +// isIndependent determines whether the given rule is not dependent on another rule for its input, nor is any other rule +// dependent on its output. func (m dependencyMap) isIndependent(r Rule) bool { if m == nil { return false @@ -923,6 +921,16 @@ func (m dependencyMap) isIndependent(r Rule) bool { } // buildDependencyMap builds a data-structure which contains the relationships between rules within a group. +// +// Alert rules, by definition, cannot have any dependents - but they can have dependencies. Any recording rule on whose +// output an Alert rule depends will not be able to run concurrently. +// +// There is a class of rule expressions which are considered "indeterminate", because either relationships cannot be +// inferred, or concurrent evaluation of rules depending on these series would produce undefined/unexpected behaviour: +// - wildcard queriers like {cluster="prod1"} which would match every series with that label selector +// - any "meta" series (series produced by Prometheus itself) like ALERTS, ALERTS_FOR_STATE +// +// Rules which are independent can run concurrently with no side-effects. func buildDependencyMap(rules []Rule) dependencyMap { dependencies := make(dependencyMap) diff --git a/rules/manager.go b/rules/manager.go index 3910be7cd6..e9f85efad4 100644 --- a/rules/manager.go +++ b/rules/manager.go @@ -104,21 +104,21 @@ type NotifyFunc func(ctx context.Context, expr string, alerts ...*Alert) // ManagerOptions bundles options for the Manager. type ManagerOptions struct { - ExternalURL *url.URL - QueryFunc QueryFunc - NotifyFunc NotifyFunc - Context context.Context - Appendable storage.Appendable - Queryable storage.Queryable - Logger log.Logger - Registerer prometheus.Registerer - OutageTolerance time.Duration - ForGracePeriod time.Duration - ResendDelay time.Duration - MaxConcurrentEvals int64 - ConcurrentEvalsEnabled bool - ConcurrencyController ConcurrencyController - GroupLoader GroupLoader + ExternalURL *url.URL + QueryFunc QueryFunc + NotifyFunc NotifyFunc + Context context.Context + Appendable storage.Appendable + Queryable storage.Queryable + Logger log.Logger + Registerer prometheus.Registerer + OutageTolerance time.Duration + ForGracePeriod time.Duration + ResendDelay time.Duration + GroupLoader GroupLoader + MaxConcurrentEvals int64 + ConcurrentEvalsEnabled bool + ConcurrentEvalsController ConcurrentRuleEvalController Metrics *Metrics } @@ -134,7 +134,7 @@ func NewManager(o *ManagerOptions) *Manager { o.GroupLoader = FileLoader{} } - o.ConcurrencyController = NewConcurrencyController(o.ConcurrentEvalsEnabled, o.MaxConcurrentEvals) + o.ConcurrentEvalsController = NewConcurrentRuleEvalController(o.ConcurrentEvalsEnabled, o.MaxConcurrentEvals) m := &Manager{ groups: map[string]*Group{}, @@ -410,16 +410,26 @@ func SendAlerts(s Sender, externalURL string) NotifyFunc { } } -type ConcurrencyController struct { +// ConcurrentRuleEvalController controls whether rules can be evaluated concurrently. Its purpose it to bound the amount +// of concurrency in rule evaluations so they do not overwhelm the Prometheus server with additional query load. +// Concurrency is controlled globally, not on a per-group basis. +type ConcurrentRuleEvalController interface { + Allow() bool + Done() +} + +// concurrentRuleEvalController holds a weighted semaphore which controls the concurrent evaluation of rules. +type concurrentRuleEvalController struct { enabled bool sema *semaphore.Weighted } -func NewConcurrencyController(enabled bool, maxConcurrency int64) ConcurrencyController { - return ConcurrencyController{enabled: enabled, sema: semaphore.NewWeighted(maxConcurrency)} +func NewConcurrentRuleEvalController(enabled bool, maxConcurrency int64) ConcurrentRuleEvalController { + return concurrentRuleEvalController{enabled: enabled, sema: semaphore.NewWeighted(maxConcurrency)} } -func (c ConcurrencyController) Allow() bool { +// Allow determines whether any concurrency slots are available. +func (c concurrentRuleEvalController) Allow() bool { if !c.enabled { return false } @@ -427,7 +437,8 @@ func (c ConcurrencyController) Allow() bool { return c.sema.TryAcquire(1) } -func (c ConcurrencyController) Done() { +// Done releases a concurrent evaluation slot. +func (c concurrentRuleEvalController) Done() { if !c.enabled { return } diff --git a/rules/manager_test.go b/rules/manager_test.go index 728aaf187d..5949769aef 100644 --- a/rules/manager_test.go +++ b/rules/manager_test.go @@ -1471,7 +1471,53 @@ func TestNoDependency(t *testing.T) { }) // A group with only one rule cannot have dependencies. - require.False(t, group.dependencyMap.isIndependent(rule)) + require.Empty(t, group.dependencyMap) +} + +func TestDependenciesEdgeCases(t *testing.T) { + ctx := context.Background() + opts := &ManagerOptions{ + Context: ctx, + Logger: log.NewNopLogger(), + } + + t.Run("empty group", func(t *testing.T) { + group := NewGroup(GroupOptions{ + Name: "rule_group", + Interval: time.Second, + Rules: []Rule{}, // empty group + Opts: opts, + }) + + expr, err := parser.ParseExpr("sum by (user) (rate(requests[1m]))") + require.NoError(t, err) + rule := NewRecordingRule("user:requests:rate1m", expr, labels.Labels{}) + + // A group with no rules has no dependency map, but doesn't panic if the map is queried. + require.Nil(t, group.dependencyMap) + require.False(t, group.dependencyMap.isIndependent(rule)) + }) + + t.Run("rules which reference no series", func(t *testing.T) { + expr, err := parser.ParseExpr("one") + require.NoError(t, err) + rule1 := NewRecordingRule("1", expr, labels.Labels{}) + + expr, err = parser.ParseExpr("two") + require.NoError(t, err) + rule2 := NewRecordingRule("2", expr, labels.Labels{}) + + group := NewGroup(GroupOptions{ + Name: "rule_group", + Interval: time.Second, + Rules: []Rule{rule1, rule2}, + Opts: opts, + }) + + // A group with rules which reference no series will still produce a dependency map + require.True(t, group.dependencyMap.isIndependent(rule1)) + require.True(t, group.dependencyMap.isIndependent(rule2)) + }) } func TestNoMetricSelector(t *testing.T) { @@ -1596,10 +1642,23 @@ func TestDependencyMapUpdatesOnGroupUpdate(t *testing.T) { require.NoError(t, err) for h, g := range ruleManager.groups { + const ruleName = "job:http_requests:rate5m" + var rr *RecordingRule + + for _, r := range g.rules { + if r.Name() == ruleName { + rr = r.(*RecordingRule) + } + } + + require.NotEmptyf(t, rr, "expected to find %q recording rule in fixture", ruleName) + // Dependency maps must change because the groups would've been updated. require.NotEqual(t, orig[h], g.dependencyMap) // We expect there to be some dependencies since the new rule group contains a dependency. require.Greater(t, len(g.dependencyMap), 0) + require.Equal(t, 1, g.dependencyMap.dependents(rr)) + require.Zero(t, g.dependencyMap.dependencies(rr)) } } @@ -1625,17 +1684,16 @@ func TestAsyncRuleEvaluation(t *testing.T) { inflightQueries.Add(-1) }() - // Artificially delay all query executions to highly concurrent execution improvement. + // Artificially delay all query executions to highlight concurrent execution improvement. time.Sleep(artificialDelay) - // return a stub sample + // Return a stub sample. return promql.Vector{ promql.Sample{Metric: labels.FromStrings("__name__", "test"), T: ts.UnixMilli(), F: 12345}, }, nil }, }) - // Evaluate groups manually to show the impact of async rule evaluations. groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, files...) require.Empty(t, errs) require.Len(t, groups, 1) @@ -1688,7 +1746,7 @@ func TestAsyncRuleEvaluation(t *testing.T) { for _, group := range groups { // Allow up to 2 concurrent rule evaluations. - group.opts.ConcurrencyController = NewConcurrencyController(true, 2) + group.opts.ConcurrentEvalsController = NewConcurrentRuleEvalController(true, 2) require.Len(t, group.rules, expectedRules) start := time.Now() @@ -1749,17 +1807,16 @@ func TestBoundedRuleEvalConcurrency(t *testing.T) { inflightQueries.Add(-1) }() - // Artificially delay all query executions to highly concurrent execution improvement. + // Artificially delay all query executions to highlight concurrent execution improvement. time.Sleep(artificialDelay) - // return a stub sample + // Return a stub sample. return promql.Vector{ promql.Sample{Metric: labels.FromStrings("__name__", "test"), T: ts.UnixMilli(), F: 12345}, }, nil }, }) - // Evaluate groups manually to show the impact of async rule evaluations. groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, files...) require.Empty(t, errs) require.Len(t, groups, groupCount) From 055d9ffcc233d5cd5cfd337cf318dc620232f399 Mon Sep 17 00:00:00 2001 From: Danny Kopping Date: Sat, 28 Oct 2023 13:25:06 +0200 Subject: [PATCH 07/10] Rename flag Signed-off-by: Danny Kopping --- cmd/prometheus/main.go | 2 +- docs/command-line/prometheus.md | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index d09c82be13..ad4d0fc676 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -398,7 +398,7 @@ func main() { serverOnlyFlag(a, "rules.alert.resend-delay", "Minimum amount of time to wait before resending an alert to Alertmanager."). Default("1m").SetValue(&cfg.resendDelay) - serverOnlyFlag(a, "rules.max-concurrent-rule-evals", "Global concurrency limit for independent rules which can run concurrently."). + serverOnlyFlag(a, "rules.max-concurrent-evals", "Global concurrency limit for independent rules which can run concurrently."). Default("4").Int64Var(&cfg.maxConcurrentEvals) a.Flag("scrape.adjust-timestamps", "Adjust scrape timestamps by up to `scrape.timestamp-tolerance` to align them to the intended schedule. See https://github.com/prometheus/prometheus/issues/7846 for more context. Experimental. This flag will be removed in a future release."). diff --git a/docs/command-line/prometheus.md b/docs/command-line/prometheus.md index 0c702b4f7a..561f2c0712 100644 --- a/docs/command-line/prometheus.md +++ b/docs/command-line/prometheus.md @@ -47,7 +47,7 @@ The Prometheus monitoring server | --rules.alert.for-outage-tolerance | Max time to tolerate prometheus outage for restoring "for" state of alert. Use with server mode only. | `1h` | | --rules.alert.for-grace-period | Minimum duration between alert and restored "for" state. This is maintained only for alerts with configured "for" time greater than grace period. Use with server mode only. | `10m` | | --rules.alert.resend-delay | Minimum amount of time to wait before resending an alert to Alertmanager. Use with server mode only. | `1m` | -| --rules.max-concurrent-rule-evals | Global concurrency limit for independent rules which can run concurrently. Use with server mode only. | `4` | +| --rules.max-concurrent-evals | Global concurrency limit for independent rules which can run concurrently. Use with server mode only. | `4` | | --alertmanager.notification-queue-capacity | The capacity of the queue for pending Alertmanager notifications. Use with server mode only. | `10000` | | --query.lookback-delta | The maximum lookback duration for retrieving metrics during expression evaluations and federation. Use with server mode only. | `5m` | | --query.timeout | Maximum time a query may take before being aborted. Use with server mode only. | `2m` | From 7b45ee6f80243483833e7de19548e658ecc9df8e Mon Sep 17 00:00:00 2001 From: Danny Kopping Date: Thu, 2 Nov 2023 20:33:06 +0200 Subject: [PATCH 08/10] Refactoring for performance, and to allow controller to be overridden Signed-off-by: Danny Kopping --- rules/group.go | 7 +- rules/manager.go | 64 ++++++++++++++---- rules/manager_test.go | 148 ++++++++++++++++++++++-------------------- 3 files changed, 134 insertions(+), 85 deletions(-) diff --git a/rules/group.go b/rules/group.go index 21ab0328df..e51136ac88 100644 --- a/rules/group.go +++ b/rules/group.go @@ -71,7 +71,6 @@ type Group struct { // Rule group evaluation iteration function, // defaults to DefaultEvalIterationFunc. evalIterationFunc GroupEvalIterationFunc - dependencyMap dependencyMap } // GroupEvalIterationFunc is used to implement and extend rule group @@ -130,7 +129,6 @@ func NewGroup(o GroupOptions) *Group { logger: log.With(o.Opts.Logger, "file", o.File, "group", o.Name), metrics: metrics, evalIterationFunc: evalIterationFunc, - dependencyMap: buildDependencyMap(o.Rules), } } @@ -437,7 +435,7 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) { eval := func(i int, rule Rule, async bool) { defer func() { if async { - g.opts.ConcurrentEvalsController.Done() + g.opts.RuleConcurrencyController.Done() } }() @@ -564,7 +562,8 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) { // If the rule has no dependencies, it can run concurrently because no other rules in this group depend on its output. // Try run concurrently if there are slots available. - if g.dependencyMap.isIndependent(rule) && g.opts.ConcurrentEvalsController != nil && g.opts.ConcurrentEvalsController.Allow() { + ctrl := g.opts.RuleConcurrencyController + if ctrl != nil && ctrl.RuleEligible(g, rule) && ctrl.Allow() { go eval(i, rule, true) } else { eval(i, rule, false) diff --git a/rules/manager.go b/rules/manager.go index e9f85efad4..90769d6ae0 100644 --- a/rules/manager.go +++ b/rules/manager.go @@ -118,7 +118,7 @@ type ManagerOptions struct { GroupLoader GroupLoader MaxConcurrentEvals int64 ConcurrentEvalsEnabled bool - ConcurrentEvalsController ConcurrentRuleEvalController + RuleConcurrencyController RuleConcurrencyController Metrics *Metrics } @@ -134,7 +134,9 @@ func NewManager(o *ManagerOptions) *Manager { o.GroupLoader = FileLoader{} } - o.ConcurrentEvalsController = NewConcurrentRuleEvalController(o.ConcurrentEvalsEnabled, o.MaxConcurrentEvals) + if o.RuleConcurrencyController == nil { + o.RuleConcurrencyController = newRuleConcurrencyController(o.ConcurrentEvalsEnabled, o.MaxConcurrentEvals) + } m := &Manager{ groups: map[string]*Group{}, @@ -182,6 +184,10 @@ func (m *Manager) Update(interval time.Duration, files []string, externalLabels m.mtx.Lock() defer m.mtx.Unlock() + if m.opts.RuleConcurrencyController != nil { + m.opts.RuleConcurrencyController.Invalidate() + } + groups, errs := m.LoadGroups(interval, externalLabels, externalURL, groupEvalIterationFunc, files...) if errs != nil { @@ -410,26 +416,55 @@ func SendAlerts(s Sender, externalURL string) NotifyFunc { } } -// ConcurrentRuleEvalController controls whether rules can be evaluated concurrently. Its purpose it to bound the amount -// of concurrency in rule evaluations so they do not overwhelm the Prometheus server with additional query load. +// RuleConcurrencyController controls whether rules can be evaluated concurrently. Its purpose it to bound the amount +// of concurrency in rule evaluations, to not overwhelm the Prometheus server with additional query load. // Concurrency is controlled globally, not on a per-group basis. -type ConcurrentRuleEvalController interface { +type RuleConcurrencyController interface { + // RuleEligible determines if a rule can be run concurrently. + RuleEligible(g *Group, r Rule) bool + + // Allow determines whether any concurrent evaluation slots are available. Allow() bool + + // Done releases a concurrent evaluation slot. Done() + + // Invalidate instructs the controller to invalidate its state. + // This should be called when groups are modified (during a reload, for instance), because the controller may + // store some state about each group in order to more efficiently determine rule eligibility. + Invalidate() +} + +func newRuleConcurrencyController(enabled bool, maxConcurrency int64) RuleConcurrencyController { + return &concurrentRuleEvalController{ + enabled: enabled, + sema: semaphore.NewWeighted(maxConcurrency), + depMaps: map[*Group]dependencyMap{}, + } } // concurrentRuleEvalController holds a weighted semaphore which controls the concurrent evaluation of rules. type concurrentRuleEvalController struct { + mu sync.Mutex enabled bool sema *semaphore.Weighted + depMaps map[*Group]dependencyMap } -func NewConcurrentRuleEvalController(enabled bool, maxConcurrency int64) ConcurrentRuleEvalController { - return concurrentRuleEvalController{enabled: enabled, sema: semaphore.NewWeighted(maxConcurrency)} +func (c *concurrentRuleEvalController) RuleEligible(g *Group, r Rule) bool { + c.mu.Lock() + defer c.mu.Unlock() + + depMap, found := c.depMaps[g] + if !found { + depMap = buildDependencyMap(g.rules) + c.depMaps[g] = depMap + } + + return depMap.isIndependent(r) } -// Allow determines whether any concurrency slots are available. -func (c concurrentRuleEvalController) Allow() bool { +func (c *concurrentRuleEvalController) Allow() bool { if !c.enabled { return false } @@ -437,11 +472,18 @@ func (c concurrentRuleEvalController) Allow() bool { return c.sema.TryAcquire(1) } -// Done releases a concurrent evaluation slot. -func (c concurrentRuleEvalController) Done() { +func (c *concurrentRuleEvalController) Done() { if !c.enabled { return } c.sema.Release(1) } + +func (c *concurrentRuleEvalController) Invalidate() { + c.mu.Lock() + defer c.mu.Unlock() + + // Clear out the memoized dependency maps because some or all groups may have been updated. + c.depMaps = map[*Group]dependencyMap{} +} diff --git a/rules/manager_test.go b/rules/manager_test.go index 5949769aef..c3ac195ac8 100644 --- a/rules/manager_test.go +++ b/rules/manager_test.go @@ -1435,21 +1435,23 @@ func TestDependencyMap(t *testing.T) { Opts: opts, }) - require.Zero(t, group.dependencyMap.dependencies(rule)) - require.Equal(t, 2, group.dependencyMap.dependents(rule)) - require.False(t, group.dependencyMap.isIndependent(rule)) + depMap := buildDependencyMap(group.rules) - require.Zero(t, group.dependencyMap.dependents(rule2)) - require.Equal(t, 1, group.dependencyMap.dependencies(rule2)) - require.False(t, group.dependencyMap.isIndependent(rule2)) + require.Zero(t, depMap.dependencies(rule)) + require.Equal(t, 2, depMap.dependents(rule)) + require.False(t, depMap.isIndependent(rule)) - require.Zero(t, group.dependencyMap.dependents(rule3)) - require.Zero(t, group.dependencyMap.dependencies(rule3)) - require.True(t, group.dependencyMap.isIndependent(rule3)) + require.Zero(t, depMap.dependents(rule2)) + require.Equal(t, 1, depMap.dependencies(rule2)) + require.False(t, depMap.isIndependent(rule2)) - require.Zero(t, group.dependencyMap.dependents(rule4)) - require.Equal(t, 1, group.dependencyMap.dependencies(rule4)) - require.False(t, group.dependencyMap.isIndependent(rule4)) + require.Zero(t, depMap.dependents(rule3)) + require.Zero(t, depMap.dependencies(rule3)) + require.True(t, depMap.isIndependent(rule3)) + + require.Zero(t, depMap.dependents(rule4)) + require.Equal(t, 1, depMap.dependencies(rule4)) + require.False(t, depMap.isIndependent(rule4)) } func TestNoDependency(t *testing.T) { @@ -1470,8 +1472,9 @@ func TestNoDependency(t *testing.T) { Opts: opts, }) + depMap := buildDependencyMap(group.rules) // A group with only one rule cannot have dependencies. - require.Empty(t, group.dependencyMap) + require.Empty(t, depMap) } func TestDependenciesEdgeCases(t *testing.T) { @@ -1493,9 +1496,10 @@ func TestDependenciesEdgeCases(t *testing.T) { require.NoError(t, err) rule := NewRecordingRule("user:requests:rate1m", expr, labels.Labels{}) + depMap := buildDependencyMap(group.rules) // A group with no rules has no dependency map, but doesn't panic if the map is queried. - require.Nil(t, group.dependencyMap) - require.False(t, group.dependencyMap.isIndependent(rule)) + require.Nil(t, depMap) + require.False(t, depMap.isIndependent(rule)) }) t.Run("rules which reference no series", func(t *testing.T) { @@ -1514,9 +1518,10 @@ func TestDependenciesEdgeCases(t *testing.T) { Opts: opts, }) + depMap := buildDependencyMap(group.rules) // A group with rules which reference no series will still produce a dependency map - require.True(t, group.dependencyMap.isIndependent(rule1)) - require.True(t, group.dependencyMap.isIndependent(rule2)) + require.True(t, depMap.isIndependent(rule1)) + require.True(t, depMap.isIndependent(rule2)) }) } @@ -1542,10 +1547,11 @@ func TestNoMetricSelector(t *testing.T) { Opts: opts, }) + depMap := buildDependencyMap(group.rules) // A rule with no metric selector cannot be reliably determined to have no dependencies on other rules, and therefore // all rules are not considered independent. - require.False(t, group.dependencyMap.isIndependent(rule)) - require.False(t, group.dependencyMap.isIndependent(rule2)) + require.False(t, depMap.isIndependent(rule)) + require.False(t, depMap.isIndependent(rule2)) } func TestDependentRulesWithNonMetricExpression(t *testing.T) { @@ -1574,9 +1580,10 @@ func TestDependentRulesWithNonMetricExpression(t *testing.T) { Opts: opts, }) - require.False(t, group.dependencyMap.isIndependent(rule)) - require.False(t, group.dependencyMap.isIndependent(rule2)) - require.True(t, group.dependencyMap.isIndependent(rule3)) + depMap := buildDependencyMap(group.rules) + require.False(t, depMap.isIndependent(rule)) + require.False(t, depMap.isIndependent(rule2)) + require.True(t, depMap.isIndependent(rule3)) } func TestRulesDependentOnMetaMetrics(t *testing.T) { @@ -1604,7 +1611,8 @@ func TestRulesDependentOnMetaMetrics(t *testing.T) { Opts: opts, }) - require.False(t, group.dependencyMap.isIndependent(rule)) + depMap := buildDependencyMap(group.rules) + require.False(t, depMap.isIndependent(rule)) } func TestDependencyMapUpdatesOnGroupUpdate(t *testing.T) { @@ -1623,17 +1631,19 @@ func TestDependencyMapUpdatesOnGroupUpdate(t *testing.T) { orig := make(map[string]dependencyMap, len(ruleManager.groups)) for _, g := range ruleManager.groups { + depMap := buildDependencyMap(g.rules) // No dependency map is expected because there is only one rule in the group. - require.Empty(t, g.dependencyMap) - orig[g.Name()] = g.dependencyMap + require.Empty(t, depMap) + orig[g.Name()] = depMap } // Update once without changing groups. err = ruleManager.Update(10*time.Second, files, labels.EmptyLabels(), "", nil) require.NoError(t, err) for h, g := range ruleManager.groups { + depMap := buildDependencyMap(g.rules) // Dependency maps are the same because of no updates. - require.Equal(t, orig[h], g.dependencyMap) + require.Equal(t, orig[h], depMap) } // Groups will be recreated when updated. @@ -1653,12 +1663,13 @@ func TestDependencyMapUpdatesOnGroupUpdate(t *testing.T) { require.NotEmptyf(t, rr, "expected to find %q recording rule in fixture", ruleName) + depMap := buildDependencyMap(g.rules) // Dependency maps must change because the groups would've been updated. - require.NotEqual(t, orig[h], g.dependencyMap) + require.NotEqual(t, orig[h], depMap) // We expect there to be some dependencies since the new rule group contains a dependency. - require.Greater(t, len(g.dependencyMap), 0) - require.Equal(t, 1, g.dependencyMap.dependents(rr)) - require.Zero(t, g.dependencyMap.dependencies(rr)) + require.Greater(t, len(depMap), 0) + require.Equal(t, 1, depMap.dependents(rr)) + require.Zero(t, depMap.dependencies(rr)) } } @@ -1674,7 +1685,7 @@ func TestAsyncRuleEvaluation(t *testing.T) { ) files := []string{"fixtures/rules_multiple.yaml"} - ruleManager := NewManager(&ManagerOptions{ + opts := &ManagerOptions{ Context: context.Background(), Logger: log.NewNopLogger(), Appendable: storage, @@ -1692,39 +1703,42 @@ func TestAsyncRuleEvaluation(t *testing.T) { promql.Sample{Metric: labels.FromStrings("__name__", "test"), T: ts.UnixMilli(), F: 12345}, }, nil }, - }) + } - groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, files...) - require.Empty(t, errs) - require.Len(t, groups, 1) + inflightTracker := func(ctx context.Context) { + for { + select { + case <-ctx.Done(): + return + default: + highWatermark := maxInflight.Load() + current := inflightQueries.Load() + if current > highWatermark { + maxInflight.Store(current) + } + + time.Sleep(time.Millisecond) + } + } + } expectedRules := 4 t.Run("synchronous evaluation with independent rules", func(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) + ruleManager := NewManager(opts) + groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, files...) + require.Empty(t, errs) + require.Len(t, groups, 1) + for _, group := range groups { require.Len(t, group.rules, expectedRules) start := time.Now() // Never expect more than 1 inflight query at a time. - go func() { - for { - select { - case <-ctx.Done(): - return - default: - highWatermark := maxInflight.Load() - current := inflightQueries.Load() - if current > highWatermark { - maxInflight.Store(current) - } - - time.Sleep(time.Millisecond) - } - } - }() + go inflightTracker(ctx) group.Eval(ctx, start) @@ -1744,33 +1758,27 @@ func TestAsyncRuleEvaluation(t *testing.T) { maxInflight.Store(0) ctx, cancel := context.WithCancel(context.Background()) + // Configure concurrency settings. + opts.ConcurrentEvalsEnabled = true + opts.MaxConcurrentEvals = 2 + opts.RuleConcurrencyController = nil + ruleManager := NewManager(opts) + + groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, files...) + require.Empty(t, errs) + require.Len(t, groups, 1) + for _, group := range groups { - // Allow up to 2 concurrent rule evaluations. - group.opts.ConcurrentEvalsController = NewConcurrentRuleEvalController(true, 2) require.Len(t, group.rules, expectedRules) start := time.Now() - go func() { - for { - select { - case <-ctx.Done(): - return - default: - highWatermark := maxInflight.Load() - current := inflightQueries.Load() - if current > highWatermark { - maxInflight.Store(current) - } - - time.Sleep(time.Millisecond) - } - } - }() + go inflightTracker(ctx) group.Eval(ctx, start) - require.EqualValues(t, 3, maxInflight.Load()) + // Max inflight can be 1 synchronous eval and up to MaxConcurrentEvals concurrent evals. + require.EqualValues(t, opts.MaxConcurrentEvals+1, maxInflight.Load()) // Some rules should execute concurrently so should complete quicker. require.Less(t, time.Since(start).Seconds(), (time.Duration(expectedRules) * artificialDelay).Seconds()) // Each rule produces one vector. From 5f0aa5f48e5d7cde4f496d355b37f572bee92d90 Mon Sep 17 00:00:00 2001 From: Danny Kopping Date: Fri, 5 Jan 2024 22:48:30 +0200 Subject: [PATCH 09/10] Block until all rules, both sync & async, have completed evaluating Updated & added tests Signed-off-by: Danny Kopping --- .../fixtures/rules_multiple_independent.yaml | 15 ++ rules/group.go | 10 +- rules/manager_test.go | 147 ++++++++++++++---- 3 files changed, 139 insertions(+), 33 deletions(-) create mode 100644 rules/fixtures/rules_multiple_independent.yaml diff --git a/rules/fixtures/rules_multiple_independent.yaml b/rules/fixtures/rules_multiple_independent.yaml new file mode 100644 index 0000000000..e071be3eff --- /dev/null +++ b/rules/fixtures/rules_multiple_independent.yaml @@ -0,0 +1,15 @@ +groups: + - name: independents + rules: + - record: job:http_requests:rate1m + expr: sum by (job)(rate(http_requests_total[1m])) + - record: job:http_requests:rate5m + expr: sum by (job)(rate(http_requests_total[5m])) + - record: job:http_requests:rate15m + expr: sum by (job)(rate(http_requests_total[15m])) + - record: job:http_requests:rate30m + expr: sum by (job)(rate(http_requests_total[30m])) + - record: job:http_requests:rate1h + expr: sum by (job)(rate(http_requests_total[1h])) + - record: job:http_requests:rate2h + expr: sum by (job)(rate(http_requests_total[2h])) diff --git a/rules/group.go b/rules/group.go index 8de0900d1a..7957d2f79f 100644 --- a/rules/group.go +++ b/rules/group.go @@ -423,8 +423,13 @@ func (g *Group) CopyState(from *Group) { } // Eval runs a single evaluation cycle in which all rules are evaluated sequentially. +// Rules can be evaluated concurrently if the `concurrent-rule-eval` feature flag is enabled. func (g *Group) Eval(ctx context.Context, ts time.Time) { - var samplesTotal atomic.Float64 + var ( + samplesTotal atomic.Float64 + wg sync.WaitGroup + ) + for i, rule := range g.rules { select { case <-g.done: @@ -435,6 +440,7 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) { eval := func(i int, rule Rule, async bool) { defer func() { if async { + wg.Done() g.opts.RuleConcurrencyController.Done() } }() @@ -569,12 +575,14 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) { // Try run concurrently if there are slots available. ctrl := g.opts.RuleConcurrencyController if ctrl != nil && ctrl.RuleEligible(g, rule) && ctrl.Allow() { + wg.Add(1) go eval(i, rule, true) } else { eval(i, rule, false) } } + wg.Wait() if g.metrics != nil { g.metrics.GroupSamples.WithLabelValues(GroupKey(g.File(), g.Name())).Set(samplesTotal.Load()) } diff --git a/rules/manager_test.go b/rules/manager_test.go index c23b9eca74..dc7fc83575 100644 --- a/rules/manager_test.go +++ b/rules/manager_test.go @@ -1677,32 +1677,33 @@ func TestAsyncRuleEvaluation(t *testing.T) { storage := teststorage.New(t) t.Cleanup(func() { storage.Close() }) - const artificialDelay = time.Second + const artificialDelay = 500 * time.Millisecond var ( inflightQueries atomic.Int32 maxInflight atomic.Int32 ) - files := []string{"fixtures/rules_multiple.yaml"} - opts := &ManagerOptions{ - Context: context.Background(), - Logger: log.NewNopLogger(), - Appendable: storage, - QueryFunc: func(ctx context.Context, q string, ts time.Time) (promql.Vector, error) { - inflightQueries.Add(1) - defer func() { - inflightQueries.Add(-1) - }() - - // Artificially delay all query executions to highlight concurrent execution improvement. - time.Sleep(artificialDelay) - - // Return a stub sample. - return promql.Vector{ - promql.Sample{Metric: labels.FromStrings("__name__", "test"), T: ts.UnixMilli(), F: 12345}, - }, nil - }, + optsFactory := func() *ManagerOptions { + return &ManagerOptions{ + Context: context.Background(), + Logger: log.NewNopLogger(), + Appendable: storage, + QueryFunc: func(ctx context.Context, q string, ts time.Time) (promql.Vector, error) { + inflightQueries.Add(1) + defer func() { + inflightQueries.Add(-1) + }() + + // Artificially delay all query executions to highlight concurrent execution improvement. + time.Sleep(artificialDelay) + + // Return a stub sample. + return promql.Vector{ + promql.Sample{Metric: labels.FromStrings("__name__", "test"), T: ts.UnixMilli(), F: 12345}, + }, nil + }, + } } inflightTracker := func(ctx context.Context) { @@ -1722,18 +1723,18 @@ func TestAsyncRuleEvaluation(t *testing.T) { } } - expectedRules := 4 - t.Run("synchronous evaluation with independent rules", func(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) - ruleManager := NewManager(opts) - groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, files...) + ruleManager := NewManager(optsFactory()) + groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, []string{"fixtures/rules_multiple.yaml"}...) require.Empty(t, errs) require.Len(t, groups, 1) + ruleCount := 4 + for _, group := range groups { - require.Len(t, group.rules, expectedRules) + require.Len(t, group.rules, ruleCount) start := time.Now() @@ -1744,32 +1745,74 @@ func TestAsyncRuleEvaluation(t *testing.T) { require.EqualValues(t, 1, maxInflight.Load()) // Each rule should take at least 1 second to execute sequentially. - require.GreaterOrEqual(t, time.Since(start).Seconds(), (time.Duration(expectedRules) * artificialDelay).Seconds()) + require.GreaterOrEqual(t, time.Since(start).Seconds(), (time.Duration(ruleCount) * artificialDelay).Seconds()) + // Each rule produces one vector. + require.EqualValues(t, ruleCount, testutil.ToFloat64(group.metrics.GroupSamples)) + } + + cancel() + }) + + t.Run("asynchronous evaluation with independent and dependent rules", func(t *testing.T) { + // Reset. + inflightQueries.Store(0) + maxInflight.Store(0) + ctx, cancel := context.WithCancel(context.Background()) + + ruleCount := 4 + opts := optsFactory() + + // Configure concurrency settings. + opts.ConcurrentEvalsEnabled = true + opts.MaxConcurrentEvals = 2 + opts.RuleConcurrencyController = nil + ruleManager := NewManager(opts) + + groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, []string{"fixtures/rules_multiple.yaml"}...) + require.Empty(t, errs) + require.Len(t, groups, 1) + + for _, group := range groups { + require.Len(t, group.rules, ruleCount) + + start := time.Now() + + go inflightTracker(ctx) + + group.Eval(ctx, start) + + // Max inflight can be 1 synchronous eval and up to MaxConcurrentEvals concurrent evals. + require.EqualValues(t, opts.MaxConcurrentEvals+1, maxInflight.Load()) + // Some rules should execute concurrently so should complete quicker. + require.Less(t, time.Since(start).Seconds(), (time.Duration(ruleCount) * artificialDelay).Seconds()) // Each rule produces one vector. - require.EqualValues(t, expectedRules, testutil.ToFloat64(group.metrics.GroupSamples)) + require.EqualValues(t, ruleCount, testutil.ToFloat64(group.metrics.GroupSamples)) } cancel() }) - t.Run("asynchronous evaluation with independent rules", func(t *testing.T) { + t.Run("asynchronous evaluation of all independent rules, insufficient concurrency", func(t *testing.T) { // Reset. inflightQueries.Store(0) maxInflight.Store(0) ctx, cancel := context.WithCancel(context.Background()) + ruleCount := 6 + opts := optsFactory() + // Configure concurrency settings. opts.ConcurrentEvalsEnabled = true opts.MaxConcurrentEvals = 2 opts.RuleConcurrencyController = nil ruleManager := NewManager(opts) - groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, files...) + groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, []string{"fixtures/rules_multiple_independent.yaml"}...) require.Empty(t, errs) require.Len(t, groups, 1) for _, group := range groups { - require.Len(t, group.rules, expectedRules) + require.Len(t, group.rules, ruleCount) start := time.Now() @@ -1780,9 +1823,49 @@ func TestAsyncRuleEvaluation(t *testing.T) { // Max inflight can be 1 synchronous eval and up to MaxConcurrentEvals concurrent evals. require.EqualValues(t, opts.MaxConcurrentEvals+1, maxInflight.Load()) // Some rules should execute concurrently so should complete quicker. - require.Less(t, time.Since(start).Seconds(), (time.Duration(expectedRules) * artificialDelay).Seconds()) + require.Less(t, time.Since(start).Seconds(), (time.Duration(ruleCount) * artificialDelay).Seconds()) + // Each rule produces one vector. + require.EqualValues(t, ruleCount, testutil.ToFloat64(group.metrics.GroupSamples)) + + } + + cancel() + }) + + t.Run("asynchronous evaluation of all independent rules, sufficient concurrency", func(t *testing.T) { + // Reset. + inflightQueries.Store(0) + maxInflight.Store(0) + ctx, cancel := context.WithCancel(context.Background()) + + ruleCount := 6 + opts := optsFactory() + + // Configure concurrency settings. + opts.ConcurrentEvalsEnabled = true + opts.MaxConcurrentEvals = int64(ruleCount) * 2 + opts.RuleConcurrencyController = nil + ruleManager := NewManager(opts) + + groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, []string{"fixtures/rules_multiple_independent.yaml"}...) + require.Empty(t, errs) + require.Len(t, groups, 1) + + for _, group := range groups { + require.Len(t, group.rules, ruleCount) + + start := time.Now() + + go inflightTracker(ctx) + + group.Eval(ctx, start) + + // Max inflight can be up to MaxConcurrentEvals concurrent evals, since there is sufficient concurrency to run all rules at once. + require.LessOrEqual(t, int64(maxInflight.Load()), opts.MaxConcurrentEvals) + // Some rules should execute concurrently so should complete quicker. + require.Less(t, time.Since(start).Seconds(), (time.Duration(ruleCount) * artificialDelay).Seconds()) // Each rule produces one vector. - require.EqualValues(t, expectedRules, testutil.ToFloat64(group.metrics.GroupSamples)) + require.EqualValues(t, ruleCount, testutil.ToFloat64(group.metrics.GroupSamples)) } cancel() From 34dc573613285ca36d2983f258bc5b89e031ad35 Mon Sep 17 00:00:00 2001 From: Marco Pracucci Date: Tue, 16 Jan 2024 15:38:05 +0100 Subject: [PATCH 10/10] Added RuleDetail.Independent Signed-off-by: Marco Pracucci --- rules/alerting.go | 4 +- rules/alerting_test.go | 45 ++++++++++++---------- rules/group.go | 13 ++++--- rules/manager.go | 83 ++++++++++++++++++++++++----------------- rules/manager_test.go | 62 +++++++++++++++++++++--------- rules/origin.go | 15 +++++--- rules/origin_test.go | 14 ++++++- rules/recording.go | 4 +- rules/recording_test.go | 27 ++++++++------ rules/rule.go | 2 +- web/api/v1/api_test.go | 2 + 11 files changed, 171 insertions(+), 100 deletions(-) diff --git a/rules/alerting.go b/rules/alerting.go index 6602d2dff3..90f8226689 100644 --- a/rules/alerting.go +++ b/rules/alerting.go @@ -323,8 +323,8 @@ const resolvedRetention = 15 * time.Minute // Eval evaluates the rule expression and then creates pending alerts and fires // or removes previously pending alerts accordingly. -func (r *AlertingRule) Eval(ctx context.Context, evalDelay time.Duration, ts time.Time, query QueryFunc, externalURL *url.URL, limit int) (promql.Vector, error) { - ctx = NewOriginContext(ctx, NewRuleDetail(r)) +func (r *AlertingRule) Eval(ctx context.Context, evalDelay time.Duration, ts time.Time, query QueryFunc, externalURL *url.URL, limit int, independent bool) (promql.Vector, error) { + ctx = NewOriginContext(ctx, NewRuleDetail(r, independent)) res, err := query(ctx, r.vector.String(), ts.Add(-evalDelay)) if err != nil { diff --git a/rules/alerting_test.go b/rules/alerting_test.go index f13015f320..69fae14548 100644 --- a/rules/alerting_test.go +++ b/rules/alerting_test.go @@ -16,6 +16,7 @@ package rules import ( "context" "errors" + "fmt" "testing" "time" @@ -166,7 +167,7 @@ func TestAlertingRuleLabelsUpdate(t *testing.T) { t.Logf("case %d", i) evalTime := baseTime.Add(time.Duration(i) * time.Minute) result[0].T = timestamp.FromTime(evalTime) - res, err := rule.Eval(context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, 0) + res, err := rule.Eval(context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, 0, false) require.NoError(t, err) var filteredRes promql.Vector // After removing 'ALERTS_FOR_STATE' samples. @@ -183,7 +184,7 @@ func TestAlertingRuleLabelsUpdate(t *testing.T) { require.Equal(t, result, filteredRes) } evalTime := baseTime.Add(time.Duration(len(results)) * time.Minute) - res, err := rule.Eval(context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, 0) + res, err := rule.Eval(context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, 0, false) require.NoError(t, err) require.Empty(t, res) } @@ -251,7 +252,7 @@ func TestAlertingRuleExternalLabelsInTemplate(t *testing.T) { var filteredRes promql.Vector // After removing 'ALERTS_FOR_STATE' samples. res, err := ruleWithoutExternalLabels.Eval( - context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, 0, + context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, 0, false, ) require.NoError(t, err) for _, smpl := range res { @@ -265,7 +266,7 @@ func TestAlertingRuleExternalLabelsInTemplate(t *testing.T) { } res, err = ruleWithExternalLabels.Eval( - context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, 0, + context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, 0, false, ) require.NoError(t, err) for _, smpl := range res { @@ -344,7 +345,7 @@ func TestAlertingRuleExternalURLInTemplate(t *testing.T) { var filteredRes promql.Vector // After removing 'ALERTS_FOR_STATE' samples. res, err := ruleWithoutExternalURL.Eval( - context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, 0, + context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, 0, false, ) require.NoError(t, err) for _, smpl := range res { @@ -358,7 +359,7 @@ func TestAlertingRuleExternalURLInTemplate(t *testing.T) { } res, err = ruleWithExternalURL.Eval( - context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, 0, + context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, 0, false, ) require.NoError(t, err) for _, smpl := range res { @@ -413,7 +414,7 @@ func TestAlertingRuleEmptyLabelFromTemplate(t *testing.T) { var filteredRes promql.Vector // After removing 'ALERTS_FOR_STATE' samples. res, err := rule.Eval( - context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, 0, + context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, 0, false, ) require.NoError(t, err) for _, smpl := range res { @@ -480,7 +481,7 @@ instance: {{ $v.Labels.instance }}, value: {{ printf "%.0f" $v.Value }}; close(getDoneCh) }() _, err = ruleWithQueryInTemplate.Eval( - context.TODO(), 0, evalTime, slowQueryFunc, nil, 0, + context.TODO(), 0, evalTime, slowQueryFunc, nil, 0, false, ) require.NoError(t, err) } @@ -532,7 +533,7 @@ func TestAlertingRuleDuplicate(t *testing.T) { "", true, log.NewNopLogger(), ) - _, err := rule.Eval(ctx, 0, now, EngineQueryFunc(engine, storage), nil, 0) + _, err := rule.Eval(ctx, 0, now, EngineQueryFunc(engine, storage), nil, 0, false) require.Error(t, err) require.EqualError(t, err, "vector contains metrics with the same labelset after applying alert labels") } @@ -580,7 +581,7 @@ func TestAlertingRuleLimit(t *testing.T) { evalTime := time.Unix(0, 0) for _, test := range tests { - switch _, err := rule.Eval(context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, test.limit); { + switch _, err := rule.Eval(context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, test.limit, false); { case err != nil: require.EqualError(t, err, test.err) case test.err != "": @@ -809,7 +810,7 @@ func TestKeepFiringFor(t *testing.T) { t.Logf("case %d", i) evalTime := baseTime.Add(time.Duration(i) * time.Minute) result[0].T = timestamp.FromTime(evalTime) - res, err := rule.Eval(context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, 0) + res, err := rule.Eval(context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, 0, false) require.NoError(t, err) var filteredRes promql.Vector // After removing 'ALERTS_FOR_STATE' samples. @@ -826,7 +827,7 @@ func TestKeepFiringFor(t *testing.T) { require.Equal(t, result, filteredRes) } evalTime := baseTime.Add(time.Duration(len(results)) * time.Minute) - res, err := rule.Eval(context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, 0) + res, err := rule.Eval(context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, 0, false) require.NoError(t, err) require.Empty(t, res) } @@ -863,7 +864,7 @@ func TestPendingAndKeepFiringFor(t *testing.T) { baseTime := time.Unix(0, 0) result.T = timestamp.FromTime(baseTime) - res, err := rule.Eval(context.TODO(), 0, baseTime, EngineQueryFunc(testEngine, storage), nil, 0) + res, err := rule.Eval(context.TODO(), 0, baseTime, EngineQueryFunc(testEngine, storage), nil, 0, false) require.NoError(t, err) require.Len(t, res, 2) @@ -878,7 +879,7 @@ func TestPendingAndKeepFiringFor(t *testing.T) { } evalTime := baseTime.Add(time.Minute) - res, err = rule.Eval(context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, 0) + res, err = rule.Eval(context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, 0, false) require.NoError(t, err) require.Empty(t, res) } @@ -912,11 +913,15 @@ func TestAlertingEvalWithOrigin(t *testing.T) { true, log.NewNopLogger(), ) - _, err = rule.Eval(ctx, 0, now, func(ctx context.Context, qs string, _ time.Time) (promql.Vector, error) { - detail = FromOriginContext(ctx) - return nil, nil - }, nil, 0) + for _, independent := range []bool{true, false} { + t.Run(fmt.Sprintf("independent = %t", independent), func(t *testing.T) { + _, err = rule.Eval(ctx, 0, now, func(ctx context.Context, qs string, _ time.Time) (promql.Vector, error) { + detail = FromOriginContext(ctx) + return nil, nil + }, nil, 0, independent) - require.NoError(t, err) - require.Equal(t, detail, NewRuleDetail(rule)) + require.NoError(t, err) + require.Equal(t, detail, NewRuleDetail(rule, independent)) + }) + } } diff --git a/rules/group.go b/rules/group.go index bad098af55..600715253f 100644 --- a/rules/group.go +++ b/rules/group.go @@ -455,7 +455,7 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) { default: } - eval := func(i int, rule Rule, async bool) { + eval := func(i int, rule Rule, independent, async bool) { defer func() { if async { wg.Done() @@ -481,7 +481,7 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) { g.metrics.EvalTotal.WithLabelValues(GroupKey(g.File(), g.Name())).Inc() - vector, err := rule.Eval(ctx, evaluationDelay, ts, g.opts.QueryFunc, g.opts.ExternalURL, g.Limit()) + vector, err := rule.Eval(ctx, evaluationDelay, ts, g.opts.QueryFunc, g.opts.ExternalURL, g.Limit(), independent) if err != nil { rule.SetHealth(HealthBad) rule.SetLastError(err) @@ -590,13 +590,14 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) { } // If the rule has no dependencies, it can run concurrently because no other rules in this group depend on its output. + independent := g.opts.RuleDependencyController.IsRuleIndependent(g, rule) + // Try run concurrently if there are slots available. - ctrl := g.opts.RuleConcurrencyController - if ctrl != nil && ctrl.RuleEligible(g, rule) && ctrl.Allow() { + if ctrl := g.opts.RuleConcurrencyController; independent && ctrl != nil && ctrl.Allow() { wg.Add(1) - go eval(i, rule, true) + go eval(i, rule, independent, true) } else { - eval(i, rule, false) + eval(i, rule, independent, false) } } diff --git a/rules/manager.go b/rules/manager.go index 7423d97099..b6e9385631 100644 --- a/rules/manager.go +++ b/rules/manager.go @@ -120,6 +120,7 @@ type ManagerOptions struct { GroupLoader GroupLoader MaxConcurrentEvals int64 ConcurrentEvalsEnabled bool + RuleDependencyController RuleDependencyController RuleConcurrencyController RuleConcurrencyController DefaultEvaluationDelay func() time.Duration @@ -146,6 +147,10 @@ func NewManager(o *ManagerOptions) *Manager { o.GroupLoader = FileLoader{} } + if o.RuleDependencyController == nil { + o.RuleDependencyController = NewRuleDependencyController() + } + if o.RuleConcurrencyController == nil { o.RuleConcurrencyController = newRuleConcurrencyController(o.ConcurrentEvalsEnabled, o.MaxConcurrentEvals) } @@ -196,8 +201,8 @@ func (m *Manager) Update(interval time.Duration, files []string, externalLabels m.mtx.Lock() defer m.mtx.Unlock() - if m.opts.RuleConcurrencyController != nil { - m.opts.RuleConcurrencyController.Invalidate() + if m.opts.RuleDependencyController != nil { + m.opts.RuleDependencyController.Invalidate() } groups, errs := m.LoadGroups(interval, externalLabels, externalURL, groupEvalIterationFunc, files...) @@ -436,52 +441,70 @@ func SendAlerts(s Sender, externalURL string) NotifyFunc { } } +// TODO doc +type RuleDependencyController interface { + // TODO doc + IsRuleIndependent(g *Group, r Rule) bool + + // TODO doc + Invalidate() +} + +// TODO unit test +type ruleDependencyController struct { + depMapsMu sync.Mutex + depMaps map[*Group]dependencyMap +} + +func NewRuleDependencyController() RuleDependencyController { + return &ruleDependencyController{ + depMaps: map[*Group]dependencyMap{}, + } +} + +func (c *ruleDependencyController) IsRuleIndependent(g *Group, r Rule) bool { + c.depMapsMu.Lock() + defer c.depMapsMu.Unlock() + + depMap, found := c.depMaps[g] + if !found { + depMap = buildDependencyMap(g.rules) + c.depMaps[g] = depMap + } + + return depMap.isIndependent(r) +} + +func (c *ruleDependencyController) Invalidate() { + c.depMapsMu.Lock() + defer c.depMapsMu.Unlock() + + // Clear out the memoized dependency maps because some or all groups may have been updated. + c.depMaps = map[*Group]dependencyMap{} +} + // RuleConcurrencyController controls whether rules can be evaluated concurrently. Its purpose it to bound the amount // of concurrency in rule evaluations, to not overwhelm the Prometheus server with additional query load. // Concurrency is controlled globally, not on a per-group basis. type RuleConcurrencyController interface { - // RuleEligible determines if a rule can be run concurrently. - RuleEligible(g *Group, r Rule) bool - // Allow determines whether any concurrent evaluation slots are available. Allow() bool // Done releases a concurrent evaluation slot. Done() - - // Invalidate instructs the controller to invalidate its state. - // This should be called when groups are modified (during a reload, for instance), because the controller may - // store some state about each group in order to more efficiently determine rule eligibility. - Invalidate() } func newRuleConcurrencyController(enabled bool, maxConcurrency int64) RuleConcurrencyController { return &concurrentRuleEvalController{ enabled: enabled, sema: semaphore.NewWeighted(maxConcurrency), - depMaps: map[*Group]dependencyMap{}, } } // concurrentRuleEvalController holds a weighted semaphore which controls the concurrent evaluation of rules. type concurrentRuleEvalController struct { - mu sync.Mutex enabled bool sema *semaphore.Weighted - depMaps map[*Group]dependencyMap -} - -func (c *concurrentRuleEvalController) RuleEligible(g *Group, r Rule) bool { - c.mu.Lock() - defer c.mu.Unlock() - - depMap, found := c.depMaps[g] - if !found { - depMap = buildDependencyMap(g.rules) - c.depMaps[g] = depMap - } - - return depMap.isIndependent(r) } func (c *concurrentRuleEvalController) Allow() bool { @@ -499,11 +522,3 @@ func (c *concurrentRuleEvalController) Done() { c.sema.Release(1) } - -func (c *concurrentRuleEvalController) Invalidate() { - c.mu.Lock() - defer c.mu.Unlock() - - // Clear out the memoized dependency maps because some or all groups may have been updated. - c.depMaps = map[*Group]dependencyMap{} -} diff --git a/rules/manager_test.go b/rules/manager_test.go index ac0b4007de..6786ff8921 100644 --- a/rules/manager_test.go +++ b/rules/manager_test.go @@ -161,7 +161,7 @@ func TestAlertingRule(t *testing.T) { evalTime := baseTime.Add(test.time) - res, err := rule.Eval(context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, 0) + res, err := rule.Eval(context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, 0, false) require.NoError(t, err) var filteredRes promql.Vector // After removing 'ALERTS_FOR_STATE' samples. @@ -309,7 +309,7 @@ func TestForStateAddSamples(t *testing.T) { forState = float64(value.StaleNaN) } - res, err := rule.Eval(context.TODO(), evalDelay, evalTime, EngineQueryFunc(testEngine, storage), nil, 0) + res, err := rule.Eval(context.TODO(), evalDelay, evalTime, EngineQueryFunc(testEngine, storage), nil, 0, false) require.NoError(t, err) var filteredRes promql.Vector // After removing 'ALERTS' samples. @@ -371,6 +371,8 @@ func TestForStateRestore(t *testing.T) { NotifyFunc: func(ctx context.Context, expr string, alerts ...*Alert) {}, OutageTolerance: 30 * time.Minute, ForGracePeriod: 10 * time.Minute, + + RuleDependencyController: NewRuleDependencyController(), } alertForDuration := 25 * time.Minute @@ -543,6 +545,8 @@ func TestStaleness(t *testing.T) { Queryable: st, Context: context.Background(), Logger: log.NewNopLogger(), + + RuleDependencyController: NewRuleDependencyController(), } expr, err := parser.ParseExpr("a + 1") @@ -735,6 +739,8 @@ func TestUpdate(t *testing.T) { QueryFunc: EngineQueryFunc(engine, st), Context: context.Background(), Logger: log.NewNopLogger(), + + RuleDependencyController: NewRuleDependencyController(), }) ruleManager.start() defer ruleManager.Stop() @@ -812,11 +818,12 @@ func TestUpdate_AlwaysRestore(t *testing.T) { defer st.Close() ruleManager := NewManager(&ManagerOptions{ - Appendable: st, - Queryable: st, - Context: context.Background(), - Logger: log.NewNopLogger(), - AlwaysRestoreAlertState: true, + Appendable: st, + Queryable: st, + Context: context.Background(), + Logger: log.NewNopLogger(), + AlwaysRestoreAlertState: true, + RuleDependencyController: NewRuleDependencyController(), }) ruleManager.start() defer ruleManager.Stop() @@ -844,11 +851,12 @@ func TestUpdate_AlwaysRestoreDoesntAffectUnchangedGroups(t *testing.T) { defer st.Close() ruleManager := NewManager(&ManagerOptions{ - Appendable: st, - Queryable: st, - Context: context.Background(), - Logger: log.NewNopLogger(), - AlwaysRestoreAlertState: true, + Appendable: st, + Queryable: st, + Context: context.Background(), + Logger: log.NewNopLogger(), + AlwaysRestoreAlertState: true, + RuleDependencyController: NewRuleDependencyController(), }) ruleManager.start() defer ruleManager.Stop() @@ -882,11 +890,12 @@ func TestUpdateSetsSourceTenants(t *testing.T) { } engine := promql.NewEngine(opts) ruleManager := NewManager(&ManagerOptions{ - Appendable: st, - Queryable: st, - QueryFunc: EngineQueryFunc(engine, st), - Context: context.Background(), - Logger: log.NewNopLogger(), + Appendable: st, + Queryable: st, + QueryFunc: EngineQueryFunc(engine, st), + Context: context.Background(), + Logger: log.NewNopLogger(), + RuleDependencyController: NewRuleDependencyController(), }) ruleManager.start() defer ruleManager.Stop() @@ -929,6 +938,8 @@ func TestAlignEvaluationTimeOnInterval(t *testing.T) { QueryFunc: EngineQueryFunc(engine, st), Context: context.Background(), Logger: log.NewNopLogger(), + + RuleDependencyController: NewRuleDependencyController(), }) ruleManager.start() defer ruleManager.Stop() @@ -1002,6 +1013,7 @@ func TestGroupEvaluationContextFuncIsCalledWhenSupplied(t *testing.T) { Context: context.Background(), Logger: log.NewNopLogger(), GroupEvaluationContextFunc: mockContextWrapFunc, + RuleDependencyController: NewRuleDependencyController(), }) rgs, errs := rulefmt.ParseFile("fixtures/rules_with_source_tenants.yaml") @@ -1116,6 +1128,8 @@ func TestNotify(t *testing.T) { Logger: log.NewNopLogger(), NotifyFunc: notifyFunc, ResendDelay: 2 * time.Second, + + RuleDependencyController: NewRuleDependencyController(), } expr, err := parser.ParseExpr("a > 1") @@ -1186,6 +1200,8 @@ func TestMetricsUpdate(t *testing.T) { Context: context.Background(), Logger: log.NewNopLogger(), Registerer: registry, + + RuleDependencyController: NewRuleDependencyController(), }) ruleManager.start() defer ruleManager.Stop() @@ -1259,6 +1275,8 @@ func TestGroupStalenessOnRemoval(t *testing.T) { QueryFunc: EngineQueryFunc(engine, storage), Context: context.Background(), Logger: log.NewNopLogger(), + + RuleDependencyController: NewRuleDependencyController(), }) var stopped bool ruleManager.start() @@ -1336,6 +1354,8 @@ func TestMetricsStalenessOnManagerShutdown(t *testing.T) { QueryFunc: EngineQueryFunc(engine, storage), Context: context.Background(), Logger: log.NewNopLogger(), + + RuleDependencyController: NewRuleDependencyController(), }) var stopped bool ruleManager.start() @@ -1438,6 +1458,8 @@ func TestRuleHealthUpdates(t *testing.T) { Queryable: st, Context: context.Background(), Logger: log.NewNopLogger(), + + RuleDependencyController: NewRuleDependencyController(), } expr, err := parser.ParseExpr("a + 1") @@ -1753,6 +1775,8 @@ func TestRuleGroupEvalIterationFunc(t *testing.T) { NotifyFunc: func(ctx context.Context, expr string, alerts ...*Alert) {}, OutageTolerance: 30 * time.Minute, ForGracePeriod: 10 * time.Minute, + + RuleDependencyController: NewRuleDependencyController(), } activeAlert := &Alert{ @@ -1833,6 +1857,8 @@ func TestNativeHistogramsInRecordingRules(t *testing.T) { Queryable: storage, Context: context.Background(), Logger: log.NewNopLogger(), + + RuleDependencyController: NewRuleDependencyController(), } expr, err := parser.ParseExpr("sum(histogram_metric)") @@ -2169,6 +2195,7 @@ func TestAsyncRuleEvaluation(t *testing.T) { promql.Sample{Metric: labels.FromStrings("__name__", "test"), T: ts.UnixMilli(), F: 12345}, }, nil }, + RuleDependencyController: NewRuleDependencyController(), } } @@ -2372,6 +2399,7 @@ func TestBoundedRuleEvalConcurrency(t *testing.T) { promql.Sample{Metric: labels.FromStrings("__name__", "test"), T: ts.UnixMilli(), F: 12345}, }, nil }, + RuleDependencyController: NewRuleDependencyController(), }) groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, files...) diff --git a/rules/origin.go b/rules/origin.go index 996538767d..9c1a125368 100644 --- a/rules/origin.go +++ b/rules/origin.go @@ -28,6 +28,10 @@ type RuleDetail struct { Query string Labels labels.Labels Kind string + + // Independent holds whether this rule depends on the result of other rules + // within the same rule group or not. + Independent bool } const ( @@ -36,7 +40,7 @@ const ( ) // NewRuleDetail creates a RuleDetail from a given Rule. -func NewRuleDetail(r Rule) RuleDetail { +func NewRuleDetail(r Rule, independent bool) RuleDetail { var kind string switch r.(type) { case *AlertingRule: @@ -48,10 +52,11 @@ func NewRuleDetail(r Rule) RuleDetail { } return RuleDetail{ - Name: r.Name(), - Query: r.Query().String(), - Labels: r.Labels(), - Kind: kind, + Name: r.Name(), + Query: r.Query().String(), + Labels: r.Labels(), + Kind: kind, + Independent: independent, } } diff --git a/rules/origin_test.go b/rules/origin_test.go index eda5f9247c..ce21b36072 100644 --- a/rules/origin_test.go +++ b/rules/origin_test.go @@ -30,7 +30,7 @@ type unknownRule struct{} func (u unknownRule) Name() string { return "" } func (u unknownRule) Labels() labels.Labels { return labels.EmptyLabels() } -func (u unknownRule) Eval(context.Context, time.Duration, time.Time, QueryFunc, *url.URL, int) (promql.Vector, error) { +func (u unknownRule) Eval(context.Context, time.Duration, time.Time, QueryFunc, *url.URL, int, bool) (promql.Vector, error) { return nil, nil } func (u unknownRule) String() string { return "" } @@ -46,6 +46,16 @@ func (u unknownRule) GetEvaluationTimestamp() time.Time { return time.Time{} func TestNewRuleDetailPanics(t *testing.T) { require.PanicsWithValue(t, `unknown rule type "rules.unknownRule"`, func() { - NewRuleDetail(unknownRule{}) + NewRuleDetail(unknownRule{}, false) + }) +} + +func TestFromOriginContext(t *testing.T) { + t.Run("should return zero value if RuleDetail is missing in the context", func(t *testing.T) { + detail := FromOriginContext(context.Background()) + require.Zero(t, detail) + + // The zero value for the Independent rule must be the most conservative option. + require.False(t, detail.Independent) }) } diff --git a/rules/recording.go b/rules/recording.go index 8abf8fa5f2..739fa7fc9e 100644 --- a/rules/recording.go +++ b/rules/recording.go @@ -72,8 +72,8 @@ func (rule *RecordingRule) Labels() labels.Labels { } // Eval evaluates the rule and then overrides the metric names and labels accordingly. -func (rule *RecordingRule) Eval(ctx context.Context, evalDelay time.Duration, ts time.Time, query QueryFunc, _ *url.URL, limit int) (promql.Vector, error) { - ctx = NewOriginContext(ctx, NewRuleDetail(rule)) +func (rule *RecordingRule) Eval(ctx context.Context, evalDelay time.Duration, ts time.Time, query QueryFunc, _ *url.URL, limit int, independent bool) (promql.Vector, error) { + ctx = NewOriginContext(ctx, NewRuleDetail(rule, independent)) vector, err := query(ctx, rule.vector.String(), ts.Add(-evalDelay)) if err != nil { diff --git a/rules/recording_test.go b/rules/recording_test.go index 9841513070..98d6091302 100644 --- a/rules/recording_test.go +++ b/rules/recording_test.go @@ -15,6 +15,7 @@ package rules import ( "context" + "fmt" "testing" "time" @@ -124,7 +125,7 @@ func TestRuleEval(t *testing.T) { for _, scenario := range ruleEvalTestScenarios { t.Run(scenario.name, func(t *testing.T) { rule := NewRecordingRule("test_rule", scenario.expr, scenario.ruleLabels) - result, err := rule.Eval(context.TODO(), 0, ruleEvaluationTime, EngineQueryFunc(testEngine, storage), nil, 0) + result, err := rule.Eval(context.TODO(), 0, ruleEvaluationTime, EngineQueryFunc(testEngine, storage), nil, 0, false) require.NoError(t, err) require.Equal(t, scenario.expected, result) }) @@ -142,7 +143,7 @@ func BenchmarkRuleEval(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - _, err := rule.Eval(context.TODO(), 0, ruleEvaluationTime, EngineQueryFunc(testEngine, storage), nil, 0) + _, err := rule.Eval(context.TODO(), 0, ruleEvaluationTime, EngineQueryFunc(testEngine, storage), nil, 0, false) if err != nil { require.NoError(b, err) } @@ -171,7 +172,7 @@ func TestRuleEvalDuplicate(t *testing.T) { expr, _ := parser.ParseExpr(`vector(0) or label_replace(vector(0),"test","x","","")`) rule := NewRecordingRule("foo", expr, labels.FromStrings("test", "test")) - _, err := rule.Eval(ctx, 0, now, EngineQueryFunc(engine, storage), nil, 0) + _, err := rule.Eval(ctx, 0, now, EngineQueryFunc(engine, storage), nil, 0, false) require.Error(t, err) require.EqualError(t, err, "vector contains metrics with the same labelset after applying rule labels") } @@ -213,7 +214,7 @@ func TestRecordingRuleLimit(t *testing.T) { evalTime := time.Unix(0, 0) for _, test := range tests { - switch _, err := rule.Eval(context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, test.limit); { + switch _, err := rule.Eval(context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, test.limit, false); { case err != nil: require.EqualError(t, err, test.err) case test.err != "": @@ -240,12 +241,16 @@ func TestRecordingEvalWithOrigin(t *testing.T) { expr, err := parser.ParseExpr(query) require.NoError(t, err) - rule := NewRecordingRule(name, expr, lbs) - _, err = rule.Eval(ctx, 0, now, func(ctx context.Context, qs string, _ time.Time) (promql.Vector, error) { - detail = FromOriginContext(ctx) - return nil, nil - }, nil, 0) + for _, independent := range []bool{true, false} { + t.Run(fmt.Sprintf("independent = %t", independent), func(t *testing.T) { + rule := NewRecordingRule(name, expr, lbs) + _, err = rule.Eval(ctx, 0, now, func(ctx context.Context, qs string, _ time.Time) (promql.Vector, error) { + detail = FromOriginContext(ctx) + return nil, nil + }, nil, 0, independent) - require.NoError(t, err) - require.Equal(t, detail, NewRuleDetail(rule)) + require.NoError(t, err) + require.Equal(t, detail, NewRuleDetail(rule, independent)) + }) + } } diff --git a/rules/rule.go b/rules/rule.go index 42e882918a..336605d8a2 100644 --- a/rules/rule.go +++ b/rules/rule.go @@ -41,7 +41,7 @@ type Rule interface { Labels() labels.Labels // Eval evaluates the rule, including any associated recording or alerting actions. // The duration passed is the evaluation delay. - Eval(context.Context, time.Duration, time.Time, QueryFunc, *url.URL, int) (promql.Vector, error) + Eval(_ context.Context, evalDelay time.Duration, ts time.Time, _ QueryFunc, externalURL *url.URL, limit int, independent bool) (promql.Vector, error) // String returns a human-readable string representation of the rule. String() string // Query returns the rule query expression. diff --git a/web/api/v1/api_test.go b/web/api/v1/api_test.go index c9ab84087e..59a841f40d 100644 --- a/web/api/v1/api_test.go +++ b/web/api/v1/api_test.go @@ -293,6 +293,8 @@ func (m *rulesRetrieverMock) CreateRuleGroups() { Context: context.Background(), Logger: log.NewNopLogger(), NotifyFunc: func(ctx context.Context, expr string, alerts ...*rules.Alert) {}, + + RuleDependencyController: rules.NewRuleDependencyController(), } var r []rules.Rule