Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add conditional read after write for rules #584

Draft
wants to merge 13 commits into
base: main
Choose a base branch
from
32 changes: 21 additions & 11 deletions cmd/prometheus/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ type flagConfig struct {
forGracePeriod model.Duration
outageTolerance model.Duration
resendDelay model.Duration
maxConcurrentEvals int64
web web.Options
scrape scrape.Options
tsdb tsdbOptions
Expand All @@ -154,6 +155,7 @@ type flagConfig struct {
enableNewSDManager bool
enablePerStepStats bool
enableAutoGOMAXPROCS bool
enableConcurrentRuleEval bool

prometheusURL string
corsRegexString string
Expand Down Expand Up @@ -197,6 +199,9 @@ func (c *flagConfig) setFeatureListOptions(logger log.Logger) error {
case "auto-gomaxprocs":
c.enableAutoGOMAXPROCS = true
level.Info(logger).Log("msg", "Automatically set GOMAXPROCS to match Linux container CPU quota")
case "concurrent-rule-eval":
c.enableConcurrentRuleEval = true
level.Info(logger).Log("msg", "Experimental concurrent rule evaluation enabled.")
case "no-default-scrape-port":
c.scrape.NoDefaultPort = true
level.Info(logger).Log("msg", "No default port will be appended to scrape targets' addresses.")
Expand Down Expand Up @@ -402,6 +407,9 @@ func main() {
serverOnlyFlag(a, "rules.alert.resend-delay", "Minimum amount of time to wait before resending an alert to Alertmanager.").
Default("1m").SetValue(&cfg.resendDelay)

serverOnlyFlag(a, "rules.max-concurrent-evals", "Global concurrency limit for independent rules which can run concurrently.").
Default("4").Int64Var(&cfg.maxConcurrentEvals)

a.Flag("scrape.adjust-timestamps", "Adjust scrape timestamps by up to `scrape.timestamp-tolerance` to align them to the intended schedule. See https://github.com/prometheus/prometheus/issues/7846 for more context. Experimental. This flag will be removed in a future release.").
Hidden().Default("true").BoolVar(&scrape.AlignScrapeTimestamps)

Expand Down Expand Up @@ -714,17 +722,19 @@ func main() {
queryEngine = promql.NewEngine(opts)

ruleManager = rules.NewManager(&rules.ManagerOptions{
Appendable: fanoutStorage,
Queryable: localStorage,
QueryFunc: rules.EngineQueryFunc(queryEngine, fanoutStorage),
NotifyFunc: rules.SendAlerts(notifierManager, cfg.web.ExternalURL.String()),
Context: ctxRule,
ExternalURL: cfg.web.ExternalURL,
Registerer: prometheus.DefaultRegisterer,
Logger: log.With(logger, "component", "rule manager"),
OutageTolerance: time.Duration(cfg.outageTolerance),
ForGracePeriod: time.Duration(cfg.forGracePeriod),
ResendDelay: time.Duration(cfg.resendDelay),
Appendable: fanoutStorage,
Queryable: localStorage,
QueryFunc: rules.EngineQueryFunc(queryEngine, fanoutStorage),
NotifyFunc: rules.SendAlerts(notifierManager, cfg.web.ExternalURL.String()),
Context: ctxRule,
ExternalURL: cfg.web.ExternalURL,
Registerer: prometheus.DefaultRegisterer,
Logger: log.With(logger, "component", "rule manager"),
OutageTolerance: time.Duration(cfg.outageTolerance),
ForGracePeriod: time.Duration(cfg.forGracePeriod),
ResendDelay: time.Duration(cfg.resendDelay),
MaxConcurrentEvals: cfg.maxConcurrentEvals,
ConcurrentEvalsEnabled: cfg.enableConcurrentRuleEval,
})
}

Expand Down
1 change: 1 addition & 0 deletions docs/command-line/prometheus.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ The Prometheus monitoring server
| <code class="text-nowrap">--rules.alert.for-outage-tolerance</code> | Max time to tolerate prometheus outage for restoring "for" state of alert. Use with server mode only. | `1h` |
| <code class="text-nowrap">--rules.alert.for-grace-period</code> | Minimum duration between alert and restored "for" state. This is maintained only for alerts with configured "for" time greater than grace period. Use with server mode only. | `10m` |
| <code class="text-nowrap">--rules.alert.resend-delay</code> | Minimum amount of time to wait before resending an alert to Alertmanager. Use with server mode only. | `1m` |
| <code class="text-nowrap">--rules.max-concurrent-evals</code> | Global concurrency limit for independent rules which can run concurrently. Use with server mode only. | `4` |
| <code class="text-nowrap">--alertmanager.notification-queue-capacity</code> | The capacity of the queue for pending Alertmanager notifications. Use with server mode only. | `10000` |
| <code class="text-nowrap">--query.lookback-delta</code> | The maximum lookback duration for retrieving metrics during expression evaluations and federation. Use with server mode only. | `5m` |
| <code class="text-nowrap">--query.timeout</code> | Maximum time a query may take before being aborted. Use with server mode only. | `2m` |
Expand Down
10 changes: 10 additions & 0 deletions docs/feature_flags.md
Original file line number Diff line number Diff line change
Expand Up @@ -204,3 +204,13 @@ Enables ingestion of created timestamp. Created timestamps are injected as 0 val
Currently Prometheus supports created timestamps only on the traditional Prometheus Protobuf protocol (WIP for other protocols). As a result, when enabling this feature, the Prometheus protobuf scrape protocol will be prioritized (See `scrape_config.scrape_protocols` settings for more details).

Besides enabling this feature in Prometheus, created timestamps need to be exposed by the application being scraped.

## Concurrent evaluation of independent rules

`--enable-feature=concurrent-rule-eval`

Rule groups execute concurrently, but the rules within a group execute sequentially; this is because rules can use the
output of a preceding rule as its input. However, if there is no detectable relationship between rules then there is no
reason to run them sequentially. This can improve rule reliability at the expense of adding more concurrent query
load. The number of concurrent rule evaluations can be configured with `--rules.max-concurrent-rule-evals` which is set
to `4` by default.
4 changes: 2 additions & 2 deletions rules/alerting.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,8 +323,8 @@ const resolvedRetention = 15 * time.Minute

// Eval evaluates the rule expression and then creates pending alerts and fires
// or removes previously pending alerts accordingly.
func (r *AlertingRule) Eval(ctx context.Context, evalDelay time.Duration, ts time.Time, query QueryFunc, externalURL *url.URL, limit int) (promql.Vector, error) {
ctx = NewOriginContext(ctx, NewRuleDetail(r))
func (r *AlertingRule) Eval(ctx context.Context, evalDelay time.Duration, ts time.Time, query QueryFunc, externalURL *url.URL, limit int, independent bool) (promql.Vector, error) {
ctx = NewOriginContext(ctx, NewRuleDetail(r, independent))

res, err := query(ctx, r.vector.String(), ts.Add(-evalDelay))
if err != nil {
Expand Down
45 changes: 25 additions & 20 deletions rules/alerting_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package rules
import (
"context"
"errors"
"fmt"
"testing"
"time"

Expand Down Expand Up @@ -166,7 +167,7 @@ func TestAlertingRuleLabelsUpdate(t *testing.T) {
t.Logf("case %d", i)
evalTime := baseTime.Add(time.Duration(i) * time.Minute)
result[0].T = timestamp.FromTime(evalTime)
res, err := rule.Eval(context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, 0)
res, err := rule.Eval(context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, 0, false)
require.NoError(t, err)

var filteredRes promql.Vector // After removing 'ALERTS_FOR_STATE' samples.
Expand All @@ -183,7 +184,7 @@ func TestAlertingRuleLabelsUpdate(t *testing.T) {
require.Equal(t, result, filteredRes)
}
evalTime := baseTime.Add(time.Duration(len(results)) * time.Minute)
res, err := rule.Eval(context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, 0)
res, err := rule.Eval(context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, 0, false)
require.NoError(t, err)
require.Empty(t, res)
}
Expand Down Expand Up @@ -251,7 +252,7 @@ func TestAlertingRuleExternalLabelsInTemplate(t *testing.T) {

var filteredRes promql.Vector // After removing 'ALERTS_FOR_STATE' samples.
res, err := ruleWithoutExternalLabels.Eval(
context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, 0,
context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, 0, false,
)
require.NoError(t, err)
for _, smpl := range res {
Expand All @@ -265,7 +266,7 @@ func TestAlertingRuleExternalLabelsInTemplate(t *testing.T) {
}

res, err = ruleWithExternalLabels.Eval(
context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, 0,
context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, 0, false,
)
require.NoError(t, err)
for _, smpl := range res {
Expand Down Expand Up @@ -344,7 +345,7 @@ func TestAlertingRuleExternalURLInTemplate(t *testing.T) {

var filteredRes promql.Vector // After removing 'ALERTS_FOR_STATE' samples.
res, err := ruleWithoutExternalURL.Eval(
context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, 0,
context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, 0, false,
)
require.NoError(t, err)
for _, smpl := range res {
Expand All @@ -358,7 +359,7 @@ func TestAlertingRuleExternalURLInTemplate(t *testing.T) {
}

res, err = ruleWithExternalURL.Eval(
context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, 0,
context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, 0, false,
)
require.NoError(t, err)
for _, smpl := range res {
Expand Down Expand Up @@ -413,7 +414,7 @@ func TestAlertingRuleEmptyLabelFromTemplate(t *testing.T) {

var filteredRes promql.Vector // After removing 'ALERTS_FOR_STATE' samples.
res, err := rule.Eval(
context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, 0,
context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, 0, false,
)
require.NoError(t, err)
for _, smpl := range res {
Expand Down Expand Up @@ -480,7 +481,7 @@ instance: {{ $v.Labels.instance }}, value: {{ printf "%.0f" $v.Value }};
close(getDoneCh)
}()
_, err = ruleWithQueryInTemplate.Eval(
context.TODO(), 0, evalTime, slowQueryFunc, nil, 0,
context.TODO(), 0, evalTime, slowQueryFunc, nil, 0, false,
)
require.NoError(t, err)
}
Expand Down Expand Up @@ -532,7 +533,7 @@ func TestAlertingRuleDuplicate(t *testing.T) {
"",
true, log.NewNopLogger(),
)
_, err := rule.Eval(ctx, 0, now, EngineQueryFunc(engine, storage), nil, 0)
_, err := rule.Eval(ctx, 0, now, EngineQueryFunc(engine, storage), nil, 0, false)
require.Error(t, err)
require.EqualError(t, err, "vector contains metrics with the same labelset after applying alert labels")
}
Expand Down Expand Up @@ -580,7 +581,7 @@ func TestAlertingRuleLimit(t *testing.T) {
evalTime := time.Unix(0, 0)

for _, test := range tests {
switch _, err := rule.Eval(context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, test.limit); {
switch _, err := rule.Eval(context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, test.limit, false); {
case err != nil:
require.EqualError(t, err, test.err)
case test.err != "":
Expand Down Expand Up @@ -809,7 +810,7 @@ func TestKeepFiringFor(t *testing.T) {
t.Logf("case %d", i)
evalTime := baseTime.Add(time.Duration(i) * time.Minute)
result[0].T = timestamp.FromTime(evalTime)
res, err := rule.Eval(context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, 0)
res, err := rule.Eval(context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, 0, false)
require.NoError(t, err)

var filteredRes promql.Vector // After removing 'ALERTS_FOR_STATE' samples.
Expand All @@ -826,7 +827,7 @@ func TestKeepFiringFor(t *testing.T) {
require.Equal(t, result, filteredRes)
}
evalTime := baseTime.Add(time.Duration(len(results)) * time.Minute)
res, err := rule.Eval(context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, 0)
res, err := rule.Eval(context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, 0, false)
require.NoError(t, err)
require.Empty(t, res)
}
Expand Down Expand Up @@ -863,7 +864,7 @@ func TestPendingAndKeepFiringFor(t *testing.T) {

baseTime := time.Unix(0, 0)
result.T = timestamp.FromTime(baseTime)
res, err := rule.Eval(context.TODO(), 0, baseTime, EngineQueryFunc(testEngine, storage), nil, 0)
res, err := rule.Eval(context.TODO(), 0, baseTime, EngineQueryFunc(testEngine, storage), nil, 0, false)
require.NoError(t, err)

require.Len(t, res, 2)
Expand All @@ -878,7 +879,7 @@ func TestPendingAndKeepFiringFor(t *testing.T) {
}

evalTime := baseTime.Add(time.Minute)
res, err = rule.Eval(context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, 0)
res, err = rule.Eval(context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, 0, false)
require.NoError(t, err)
require.Empty(t, res)
}
Expand Down Expand Up @@ -912,11 +913,15 @@ func TestAlertingEvalWithOrigin(t *testing.T) {
true, log.NewNopLogger(),
)

_, err = rule.Eval(ctx, 0, now, func(ctx context.Context, qs string, _ time.Time) (promql.Vector, error) {
detail = FromOriginContext(ctx)
return nil, nil
}, nil, 0)
for _, independent := range []bool{true, false} {
t.Run(fmt.Sprintf("independent = %t", independent), func(t *testing.T) {
_, err = rule.Eval(ctx, 0, now, func(ctx context.Context, qs string, _ time.Time) (promql.Vector, error) {
detail = FromOriginContext(ctx)
return nil, nil
}, nil, 0, independent)

require.NoError(t, err)
require.Equal(t, detail, NewRuleDetail(rule))
require.NoError(t, err)
require.Equal(t, detail, NewRuleDetail(rule, independent))
})
}
}
7 changes: 7 additions & 0 deletions rules/fixtures/rules_dependencies.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
groups:
- name: test
rules:
- record: job:http_requests:rate5m
expr: sum by (job)(rate(http_requests_total[5m]))
- record: HighRequestRate
expr: job:http_requests:rate5m > 100
14 changes: 14 additions & 0 deletions rules/fixtures/rules_multiple.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
groups:
- name: test
rules:
# independents
- record: job:http_requests:rate1m
expr: sum by (job)(rate(http_requests_total[1m]))
- record: job:http_requests:rate5m
expr: sum by (job)(rate(http_requests_total[5m]))

# dependents
- record: job:http_requests:rate15m
expr: sum by (job)(rate(http_requests_total[15m]))
- record: TooManyRequests
expr: job:http_requests:rate15m > 100
28 changes: 28 additions & 0 deletions rules/fixtures/rules_multiple_groups.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
groups:
- name: http
rules:
# independents
- record: job:http_requests:rate1m
expr: sum by (job)(rate(http_requests_total[1m]))
- record: job:http_requests:rate5m
expr: sum by (job)(rate(http_requests_total[5m]))

# dependents
- record: job:http_requests:rate15m
expr: sum by (job)(rate(http_requests_total[15m]))
- record: TooManyHTTPRequests
expr: job:http_requests:rate15m > 100

- name: grpc
rules:
# independents
- record: job:grpc_requests:rate1m
expr: sum by (job)(rate(grpc_requests_total[1m]))
- record: job:grpc_requests:rate5m
expr: sum by (job)(rate(grpc_requests_total[5m]))

# dependents
- record: job:grpc_requests:rate15m
expr: sum by (job)(rate(grpc_requests_total[15m]))
- record: TooManyGRPCRequests
expr: job:grpc_requests:rate15m > 100
15 changes: 15 additions & 0 deletions rules/fixtures/rules_multiple_independent.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
groups:
- name: independents
rules:
- record: job:http_requests:rate1m
expr: sum by (job)(rate(http_requests_total[1m]))
- record: job:http_requests:rate5m
expr: sum by (job)(rate(http_requests_total[5m]))
- record: job:http_requests:rate15m
expr: sum by (job)(rate(http_requests_total[15m]))
- record: job:http_requests:rate30m
expr: sum by (job)(rate(http_requests_total[30m]))
- record: job:http_requests:rate1h
expr: sum by (job)(rate(http_requests_total[1h]))
- record: job:http_requests:rate2h
expr: sum by (job)(rate(http_requests_total[2h]))
Loading
Loading