Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

r321: Backport ruler fixes #10267

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ require (
github.com/prometheus/alertmanager v0.27.0
github.com/prometheus/client_golang v1.20.5
github.com/prometheus/client_model v0.6.1
github.com/prometheus/common v0.60.1
github.com/prometheus/common v0.61.0
github.com/prometheus/prometheus v1.99.0
github.com/segmentio/fasthash v1.0.3
github.com/sirupsen/logrus v1.9.3
Expand All @@ -49,7 +49,7 @@ require (
golang.org/x/net v0.32.0
golang.org/x/sync v0.10.0
golang.org/x/time v0.8.0
google.golang.org/grpc v1.67.1
google.golang.org/grpc v1.68.1
gopkg.in/yaml.v2 v2.4.0
gopkg.in/yaml.v3 v3.0.1
)
Expand Down Expand Up @@ -285,7 +285,7 @@ require (
)

// Using a fork of Prometheus with Mimir-specific changes.
replace github.com/prometheus/prometheus => github.com/grafana/mimir-prometheus v0.0.0-20241210170917-0a0a41616520
replace github.com/prometheus/prometheus => github.com/grafana/mimir-prometheus v0.0.0-20241216154007-6ce3249dcbb8

// Replace memberlist with our fork which includes some fixes that haven't been
// merged upstream yet:
Expand Down
16 changes: 8 additions & 8 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -931,8 +931,8 @@ github.com/cncf/xds/go v0.0.0-20230105202645-06c439db220b/go.mod h1:eXthEFrGJvWH
github.com/cncf/xds/go v0.0.0-20230310173818-32f1caf87195/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
github.com/cncf/xds/go v0.0.0-20230607035331-e9ce68804cb4/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
github.com/cncf/xds/go v0.0.0-20240423153145-555b57ec207b/go.mod h1:W+zGtBO5Y1IgJhy4+A9GOqVhqLpfZi+vwmdNXUehLA8=
github.com/cncf/xds/go v0.0.0-20240723142845-024c85f92f20 h1:N+3sFI5GUjRKBi+i0TxYVST9h4Ie192jJWpHvthBBgg=
github.com/cncf/xds/go v0.0.0-20240723142845-024c85f92f20/go.mod h1:W+zGtBO5Y1IgJhy4+A9GOqVhqLpfZi+vwmdNXUehLA8=
github.com/cncf/xds/go v0.0.0-20240905190251-b4127c9b8d78 h1:QVw89YDxXxEe+l8gU8ETbOasdwEV+avkR75ZzsVV9WI=
github.com/cncf/xds/go v0.0.0-20240905190251-b4127c9b8d78/go.mod h1:W+zGtBO5Y1IgJhy4+A9GOqVhqLpfZi+vwmdNXUehLA8=
github.com/colega/go-yaml-yaml v0.0.0-20220720105220-255a8d16d094 h1:FpZSn61BWXbtyH68+uSv416veEswX1M2HRyQfdHnOyQ=
github.com/colega/go-yaml-yaml v0.0.0-20220720105220-255a8d16d094/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
github.com/coreos/go-semver v0.3.0 h1:wkHLiw0WNATZnSG7epLsujiMCgPAc9xhjJ4tgnAxmfM=
Expand Down Expand Up @@ -1279,8 +1279,8 @@ github.com/grafana/gomemcache v0.0.0-20241016125027-0a5bcc5aef40 h1:1TeKhyS+pvzO
github.com/grafana/gomemcache v0.0.0-20241016125027-0a5bcc5aef40/go.mod h1:IGRj8oOoxwJbHBYl1+OhS9UjQR0dv6SQOep7HqmtyFU=
github.com/grafana/memberlist v0.3.1-0.20220714140823-09ffed8adbbe h1:yIXAAbLswn7VNWBIvM71O2QsgfgW9fRXZNR0DXe6pDU=
github.com/grafana/memberlist v0.3.1-0.20220714140823-09ffed8adbbe/go.mod h1:MS2lj3INKhZjWNqd3N0m3J+Jxf3DAOnAH9VT3Sh9MUE=
github.com/grafana/mimir-prometheus v0.0.0-20241210170917-0a0a41616520 h1:FADazl5oVYBARbfVMtLkPQ9IfIwhiE9lrPrKNPOHBV4=
github.com/grafana/mimir-prometheus v0.0.0-20241210170917-0a0a41616520/go.mod h1:NpYc1U0eC7m6xUh3t3Pq565KxaIc08Oaquiu71dEMi8=
github.com/grafana/mimir-prometheus v0.0.0-20241216154007-6ce3249dcbb8 h1:wTPjkFjHVU0weI66T4qnVkS6241zgIZuLu5Nasg0wAY=
github.com/grafana/mimir-prometheus v0.0.0-20241216154007-6ce3249dcbb8/go.mod h1:a5LEa2Vy87wOp0Vu6sLmEIR1V59fqH3QosOSiErAr30=
github.com/grafana/opentracing-contrib-go-stdlib v0.0.0-20230509071955-f410e79da956 h1:em1oddjXL8c1tL0iFdtVtPloq2hRPen2MJQKoAWpxu0=
github.com/grafana/opentracing-contrib-go-stdlib v0.0.0-20230509071955-f410e79da956/go.mod h1:qtI1ogk+2JhVPIXVc6q+NHziSmy2W5GbdQZFUHADCBU=
github.com/grafana/prometheus-alertmanager v0.25.1-0.20240930132144-b5e64e81e8d3 h1:6D2gGAwyQBElSrp3E+9lSr7k8gLuP3Aiy20rweLWeBw=
Expand Down Expand Up @@ -1390,8 +1390,8 @@ github.com/imdario/mergo v0.3.16 h1:wwQJbIsHYGMUyLSPrEq1CT16AhnhNJQ51+4fdHUnCl4=
github.com/imdario/mergo v0.3.16/go.mod h1:WBLT9ZmE3lPoWsEzCh9LPo3TiwVN+ZKEjmz+hD27ysY=
github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8=
github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw=
github.com/ionos-cloud/sdk-go/v6 v6.2.1 h1:mxxN+frNVmbFrmmFfXnBC3g2USYJrl6mc1LW2iNYbFY=
github.com/ionos-cloud/sdk-go/v6 v6.2.1/go.mod h1:SXrO9OGyWjd2rZhAhEpdYN6VUAODzzqRdqA9BCviQtI=
github.com/ionos-cloud/sdk-go/v6 v6.3.0 h1:/lTieTH9Mo/CWm3cTlFLnK10jgxjUGkAqRffGqvPteY=
github.com/ionos-cloud/sdk-go/v6 v6.3.0/go.mod h1:SXrO9OGyWjd2rZhAhEpdYN6VUAODzzqRdqA9BCviQtI=
github.com/jessevdk/go-flags v1.5.0 h1:1jKYvbxEjfUl0fmqTCOfonvskHHXMjBySTLW4y9LFvc=
github.com/jessevdk/go-flags v1.5.0/go.mod h1:Fw0T6WPc1dYxT4mKEZRfG5kJhaTDP9pj1c2EWnYs/m4=
github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg=
Expand Down Expand Up @@ -1611,8 +1611,8 @@ github.com/prometheus/common v0.9.1/go.mod h1:yhUN8i9wzaXS3w1O07YhxHEBxD+W35wd8b
github.com/prometheus/common v0.10.0/go.mod h1:Tlit/dnDKsSWFlCLTWaA1cyBgKHSMdTB80sz/V91rCo=
github.com/prometheus/common v0.26.0/go.mod h1:M7rCNAaPfAosfx8veZJCuw84e35h3Cfd9VFqTh1DIvc=
github.com/prometheus/common v0.29.0/go.mod h1:vu+V0TpY+O6vW9J44gczi3Ap/oXXR10b+M/gUGO4Hls=
github.com/prometheus/common v0.60.1 h1:FUas6GcOw66yB/73KC+BOZoFJmbo/1pojoILArPAaSc=
github.com/prometheus/common v0.60.1/go.mod h1:h0LYf1R1deLSKtD4Vdg8gy4RuOvENW2J/h19V5NADQw=
github.com/prometheus/common v0.61.0 h1:3gv/GThfX0cV2lpO7gkTUwZru38mxevy90Bj8YFSRQQ=
github.com/prometheus/common v0.61.0/go.mod h1:zr29OCN/2BsJRaFwG8QOBr41D6kkchKbpeNH7pAjb/s=
github.com/prometheus/common/sigv4 v0.1.0 h1:qoVebwtwwEhS85Czm2dSROY5fTo2PAPEVdDeppTwGX4=
github.com/prometheus/common/sigv4 v0.1.0/go.mod h1:2Jkxxk9yYvCkE5G1sQT7GuEXm57JrvHu9k5YwTjsNtI=
github.com/prometheus/exporter-toolkit v0.13.1 h1:Evsh0gWQo2bdOHlnz9+0Nm7/OFfIwhE2Ws4A2jIlR04=
Expand Down
6 changes: 6 additions & 0 deletions pkg/frontend/querymiddleware/request_validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,14 @@ import (
"net/http"

"github.com/grafana/dskit/cancellation"
"github.com/prometheus/common/model"
)

func init() {
// Mimir doesn't support Prometheus' UTF-8 metric/label name scheme yet.
model.NameValidationScheme = model.LegacyValidation
}

const requestValidationFailedFmt = "request validation failed for "

var errMetricsQueryRequestValidationFailed = cancellation.NewErrorf(
Expand Down
2 changes: 1 addition & 1 deletion pkg/querier/distributor_queryable_streaming_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func TestStreamingChunkSeries_CreateIteratorTwice(t *testing.T) {

iterator = series.Iterator(iterator)
require.NotNil(t, iterator)
require.EqualError(t, iterator.Err(), `can't create iterator multiple times for the one streaming series ({the-name="the-value"})`)
require.EqualError(t, iterator.Err(), `can't create iterator multiple times for the one streaming series ({"the-name"="the-value"})`)
}

func createTestChunk(t *testing.T, time int64, value float64) client.Chunk {
Expand Down
9 changes: 9 additions & 0 deletions pkg/ruler/manager_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type ManagerMetrics struct {
GroupInterval *prometheus.Desc
GroupLastEvalTime *prometheus.Desc
GroupLastDuration *prometheus.Desc
GroupLastRuleDurationSum *prometheus.Desc
GroupLastRestoreDuration *prometheus.Desc
GroupRules *prometheus.Desc
GroupLastEvalSamples *prometheus.Desc
Expand Down Expand Up @@ -89,6 +90,12 @@ func NewManagerMetrics(logger log.Logger) *ManagerMetrics {
[]string{"user", "rule_group"},
nil,
),
GroupLastRuleDurationSum: prometheus.NewDesc(
"cortex_prometheus_rule_group_last_rule_duration_sum_seconds",
"The sum of time in seconds it took to evaluate each rule in the group regardless of concurrency. This should be higher than the group duration if rules are evaluated concurrently.",
[]string{"user", "rule_group"},
nil,
),
GroupLastRestoreDuration: prometheus.NewDesc(
"cortex_prometheus_rule_group_last_restore_duration_seconds",
"The duration of the last alert rules alerts restoration using the `ALERTS_FOR_STATE` series across all rule groups.",
Expand Down Expand Up @@ -131,6 +138,7 @@ func (m *ManagerMetrics) Describe(out chan<- *prometheus.Desc) {
out <- m.GroupInterval
out <- m.GroupLastEvalTime
out <- m.GroupLastDuration
out <- m.GroupLastRuleDurationSum
out <- m.GroupLastRestoreDuration
out <- m.GroupRules
out <- m.GroupLastEvalSamples
Expand All @@ -156,6 +164,7 @@ func (m *ManagerMetrics) Collect(out chan<- prometheus.Metric) {
data.SendSumOfGaugesPerTenant(out, m.GroupInterval, "prometheus_rule_group_interval_seconds", dskit_metrics.WithLabels("rule_group"))
data.SendSumOfGaugesPerTenant(out, m.GroupLastEvalTime, "prometheus_rule_group_last_evaluation_timestamp_seconds", dskit_metrics.WithLabels("rule_group"))
data.SendSumOfGaugesPerTenant(out, m.GroupLastDuration, "prometheus_rule_group_last_duration_seconds", dskit_metrics.WithLabels("rule_group"))
data.SendSumOfGaugesPerTenant(out, m.GroupLastRuleDurationSum, "prometheus_rule_group_last_rule_duration_sum_seconds", dskit_metrics.WithLabels("rule_group"))
data.SendSumOfGaugesPerTenant(out, m.GroupRules, "prometheus_rule_group_rules", dskit_metrics.WithLabels("rule_group"))
data.SendSumOfGaugesPerTenant(out, m.GroupLastEvalSamples, "prometheus_rule_group_last_evaluation_samples", dskit_metrics.WithLabels("rule_group"))
}
14 changes: 12 additions & 2 deletions pkg/ruler/rule_concurrency.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,9 +190,19 @@ func (c *TenantConcurrencyController) Allow(_ context.Context, group *rules.Grou
// isGroupAtRisk checks if the rule group's last evaluation time is within the risk threshold.
func (c *TenantConcurrencyController) isGroupAtRisk(group *rules.Group) bool {
interval := group.Interval().Seconds()
lastEvaluation := group.GetEvaluationTime().Seconds()
runtimeThreshold := interval * c.thresholdRuleConcurrency / 100

return lastEvaluation >= interval*c.thresholdRuleConcurrency/100
// If the group evaluation time is greater than the threshold, the group is at risk.
if group.GetEvaluationTime().Seconds() >= runtimeThreshold {
return true
}

// If the total rule evaluation time is greater than the threshold, the group is at risk.
if group.GetRuleEvaluationTimeSum().Seconds() >= runtimeThreshold {
return true
}

return false
}

// isRuleIndependent checks if the rule is independent of other rules.
Expand Down
113 changes: 80 additions & 33 deletions pkg/ruler/rule_concurrency_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,18 @@ package ruler
import (
"bytes"
"context"
"fmt"
"testing"
"time"

"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/promql/parser"
"github.com/prometheus/prometheus/rules"
"github.com/prometheus/prometheus/util/teststorage"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
"golang.org/x/sync/semaphore"
Expand Down Expand Up @@ -264,11 +267,51 @@ func TestIsRuleIndependent(t *testing.T) {
}

func TestGroupAtRisk(t *testing.T) {
exp, err := parser.ParseExpr("vector(1)")
require.NoError(t, err)
rule1 := rules.NewRecordingRule("test", exp, labels.Labels{})
rule1.SetNoDependencyRules(true)
rule1.SetNoDependentRules(true)
createAndEvalTestGroup := func(interval time.Duration, evalConcurrently bool) *rules.Group {
st := teststorage.New(t)
defer st.Close()

// Create 100 rules that all take 1ms to evaluate.
var createdRules []rules.Rule
ruleCt := 100
ruleWaitTime := 1 * time.Millisecond
for i := 0; i < ruleCt; i++ {
q, err := parser.ParseExpr("vector(1)")
require.NoError(t, err)
rule := rules.NewRecordingRule(fmt.Sprintf("test_rule%d", i), q, labels.Labels{})
rule.SetNoDependencyRules(true)
rule.SetNoDependentRules(true)
createdRules = append(createdRules, rule)
}

// Create the group and evaluate it
opts := rules.GroupOptions{
Interval: interval,
Opts: &rules.ManagerOptions{
Appendable: st,
QueryFunc: func(_ context.Context, _ string, _ time.Time) (promql.Vector, error) {
time.Sleep(ruleWaitTime)
return promql.Vector{}, nil
},
},
Rules: createdRules,
}
if evalConcurrently {
opts.Opts.RuleConcurrencyController = &allowAllConcurrencyController{}
}
g := rules.NewGroup(opts)
rules.DefaultEvalIterationFunc(context.Background(), g, time.Now())

// Sanity check that we're actually running the rules concurrently.
// The group should take less time than the sum of all rules if we're running them concurrently, more otherwise.
if evalConcurrently {
require.Less(t, g.GetEvaluationTime(), time.Duration(ruleCt)*ruleWaitTime)
} else {
require.Greater(t, g.GetEvaluationTime(), time.Duration(ruleCt)*ruleWaitTime)
}

return g
}

m := newMultiTenantConcurrencyControllerMetrics(prometheus.NewPedanticRegistry())
controller := &TenantConcurrencyController{
Expand All @@ -284,44 +327,48 @@ func TestGroupAtRisk(t *testing.T) {
}

tc := map[string]struct {
group *rules.Group
expected bool
groupInterval time.Duration
evalConcurrently bool
expected bool
}{
"group last evaluation greater than interval": {
group: func() *rules.Group {
g := rules.NewGroup(rules.GroupOptions{
Interval: -1 * time.Minute,
Opts: &rules.ManagerOptions{},
})
return g
}(),
expected: true,
// Total runtime: 100x1ms ~ 100ms (run sequentially), > 1ms -> Not at risk
groupInterval: 1 * time.Millisecond,
evalConcurrently: false,
expected: true,
},
"group last evaluation less than interval": {
group: func() *rules.Group {
g := rules.NewGroup(rules.GroupOptions{
Interval: 1 * time.Minute,
Opts: &rules.ManagerOptions{},
})
return g
}(),
expected: false,
// Total runtime: 100x1ms ~ 100ms (run sequentially), < 1s -> Not at risk
groupInterval: 1 * time.Second,
evalConcurrently: false,
expected: false,
},
"group last evaluation exactly at concurrency trigger threshold": {
group: func() *rules.Group {
g := rules.NewGroup(rules.GroupOptions{
Interval: 0 * time.Minute,
Opts: &rules.ManagerOptions{},
})
return g
}(),
expected: true,
"group total rule evaluation duration of last evaluation greater than threshold": {
// Total runtime: 100x1ms ~ 100ms, > 50ms -> Group isn't at risk for its runtime, but it is for the sum of all rules.
groupInterval: 50 * time.Millisecond,
evalConcurrently: true,
expected: true,
},
"group total rule evaluation duration of last evaluation less than threshold": {
// Total runtime: 100x1ms ~ 100ms, < 1s -> Not at risk
groupInterval: 1 * time.Second,
evalConcurrently: true,
expected: false,
},
}

for name, tt := range tc {
t.Run(name, func(t *testing.T) {
require.Equal(t, tt.expected, controller.isGroupAtRisk(tt.group))
group := createAndEvalTestGroup(tt.groupInterval, tt.evalConcurrently)
require.Equal(t, tt.expected, controller.isGroupAtRisk(group))
})
}
}

type allowAllConcurrencyController struct{}

func (a *allowAllConcurrencyController) Allow(_ context.Context, _ *rules.Group, _ rules.Rule) bool {
return true
}

func (a *allowAllConcurrencyController) Done(_ context.Context) {}
35 changes: 35 additions & 0 deletions pkg/streamingpromql/testdata/upstream/functions.test
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,9 @@ clear
load 5m
testcounter_reset_middle_total 0+10x4 0+10x5
http_requests_total{job="app-server", instance="1", group="canary"} 0+80x10
testcounter_reset_middle_mix 0+10x4 0+10x5 {{schema:0 sum:1 count:1}} {{schema:1 sum:2 count:2}}
http_requests_mix{job="app-server", instance="1", group="canary"} 0+80x10 {{schema:0 sum:1 count:1}}
http_requests_histogram{job="app-server", instance="1", group="canary"} {{schema:0 sum:1 count:2}}x10

# deriv should return the same as rate in simple cases.
eval instant at 50m rate(http_requests_total{group="canary", instance="1", job="app-server"}[50m])
Expand All @@ -277,6 +280,16 @@ eval instant at 50m deriv(http_requests_total{group="canary", instance="1", job=
eval instant at 50m deriv(testcounter_reset_middle_total[100m])
{} 0.010606060606060607

# deriv should ignore histograms.
eval instant at 110m deriv(http_requests_mix{group="canary", instance="1", job="app-server"}[110m])
{group="canary", instance="1", job="app-server"} 0.26666666666666666

eval instant at 100m deriv(testcounter_reset_middle_mix[110m])
{} 0.010606060606060607

eval instant at 50m deriv(http_requests_histogram[60m])
#empty

# predict_linear should return correct result.
# X/s = [ 0, 300, 600, 900,1200,1500,1800,2100,2400,2700,3000]
# Y = [ 0, 10, 20, 30, 40, 0, 10, 20, 30, 40, 50]
Expand Down Expand Up @@ -1205,13 +1218,20 @@ clear
# Don't return anything when there's something there.
load 5m
http_requests{job="api-server", instance="0", group="production"} 0+10x10
http_requests_histogram{job="api-server", instance="0", group="production"} {{schema:0 sum:1 count:1}}x11

# Unsupported by streaming engine.
# eval instant at 50m absent(http_requests)

# Unsupported by streaming engine.
# eval instant at 50m absent(sum(http_requests))

# Unsupported by streaming engine.
# eval instant at 50m absent(http_requests_histogram)

# Unsupported by streaming engine.
# eval instant at 50m absent(sum(http_requests_histogram))

clear

# Unsupported by streaming engine.
Expand Down Expand Up @@ -1271,6 +1291,7 @@ load 1m
httpd_handshake_failures_total{instance="127.0.0.1",job="node"} 1+1x15
httpd_log_lines_total{instance="127.0.0.1",job="node"} 1
ssl_certificate_expiry_seconds{job="ingress"} NaN NaN NaN NaN NaN
http_requests_histogram{path="/foo",instance="127.0.0.1",job="httpd"} {{schema:0 sum:1 count:1}}x11

# Unsupported by streaming engine.
# eval instant at 5m absent_over_time(http_requests_total[5m])
Expand Down Expand Up @@ -1331,6 +1352,20 @@ load 1m
# eval instant at 10m absent_over_time({job="ingress"}[4m])
# {job="ingress"} 1

# Unsupported by streaming engine.
# eval instant at 10m absent_over_time(http_requests_histogram[5m])

# Unsupported by streaming engine.
# eval instant at 10m absent_over_time(rate(http_requests_histogram[5m])[5m:1m])

# Unsupported by streaming engine.
# eval instant at 20m absent_over_time(http_requests_histogram[5m])
# {} 1

# Unsupported by streaming engine.
# eval instant at 20m absent_over_time(rate(http_requests_histogram[5m])[5m:1m])
# {} 1

clear

# Testdata for present_over_time()
Expand Down
Loading
Loading