Skip to content

Commit

Permalink
Add conditional read-after–write support to rules evaluation (#7142)
Browse files Browse the repository at this point in the history
* Added QueryFunc and Queryable wrappers

Signed-off-by: Marco Pracucci <[email protected]>

* Inject header in RemoteQuerier

Signed-off-by: Marco Pracucci <[email protected]>

* Improve test config files

Signed-off-by: Marco Pracucci <[email protected]>

* Improvements to local dev env

Signed-off-by: Marco Pracucci <[email protected]>

* Added DefaultManagerFactory unit test

Signed-off-by: Marco Pracucci <[email protected]>

* Fix linter

Signed-off-by: Marco Pracucci <[email protected]>

* Added TestDefaultManagerFactory_ShouldInjectStrongReadConsistencyToContextWhenQueryingAlertsForStateMetric

Signed-off-by: Marco Pracucci <[email protected]>

* Improved TestQueryFrontendWithIngestStorageViaFlagsAndWithQuerySchedulerAndQueryStatsEnabled

Signed-off-by: Marco Pracucci <[email protected]>

* Added TestRulerRemoteEvaluation_ShouldEnforceStrongReadConsistencyForDependentRulesWhenUsingTheIngestStorage

Signed-off-by: Marco Pracucci <[email protected]>

* Swap mimir-prometheus implementation to a simpler one

Signed-off-by: Marco Pracucci <[email protected]>

* Re-vendored mimir-prometheus

Signed-off-by: Marco Pracucci <[email protected]>

* Fixed TestDefaultManagerFactory_ShouldInjectReadConsistencyToContextBasedOnRuleDetail

Signed-off-by: Marco Pracucci <[email protected]>

---------

Signed-off-by: Marco Pracucci <[email protected]>
  • Loading branch information
pracucci authored Jan 22, 2024
1 parent 9262e96 commit 094b110
Show file tree
Hide file tree
Showing 19 changed files with 1,007 additions and 119 deletions.
11 changes: 10 additions & 1 deletion development/mimir-ingest-storage/config/example-rules.yaml
Original file line number Diff line number Diff line change
@@ -1,9 +1,18 @@
# Example rules file to load to Mimir via the ruler API.
groups:
- name: alerts
# Frequently evaluate rules.
interval: 10s
rules:
# The following recording rule is independent.
- record: cortex_build_info:sum
expr: sum(cortex_build_info)
# The following recording rule is used by the AlwaysFiring alert.
- record: up:count
expr: count(up)
- alert: AlwaysFiring
expr: count(up) >= 0
expr: up:count >= 0
for: 10s
labels:
severity: page
annotations:
Expand Down
5 changes: 5 additions & 0 deletions development/mimir-ingest-storage/config/mimir.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ ruler:
rule_path: /data/ruler
# Each ruler is configured to route alerts to the Alertmanager running within the same component.
alertmanager_url: http://localhost:8080/alertmanager
# Force the ruler to restore the state of any alert with a "for" period longer than 1s.
for_grace_period: 1s
# Evaluate rules via query-frontend (remote rule evaluation).
query_frontend:
address: dns:///mimir-read-1:9095

ruler_storage:
s3:
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ require (
)

// Using a fork of Prometheus with Mimir-specific changes.
replace github.com/prometheus/prometheus => github.com/grafana/mimir-prometheus v0.0.0-20240116133529-e3a486e7f801
replace github.com/prometheus/prometheus => github.com/grafana/mimir-prometheus v0.0.0-20240122165117-baa5d82b5bc8

// Replace memberlist with our fork which includes some fixes that haven't been
// merged upstream yet:
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -553,8 +553,8 @@ github.com/grafana/gomemcache v0.0.0-20231023152154-6947259a0586 h1:/of8Z8taCPft
github.com/grafana/gomemcache v0.0.0-20231023152154-6947259a0586/go.mod h1:PGk3RjYHpxMM8HFPhKKo+vve3DdlPUELZLSDEFehPuU=
github.com/grafana/memberlist v0.3.1-0.20220714140823-09ffed8adbbe h1:yIXAAbLswn7VNWBIvM71O2QsgfgW9fRXZNR0DXe6pDU=
github.com/grafana/memberlist v0.3.1-0.20220714140823-09ffed8adbbe/go.mod h1:MS2lj3INKhZjWNqd3N0m3J+Jxf3DAOnAH9VT3Sh9MUE=
github.com/grafana/mimir-prometheus v0.0.0-20240116133529-e3a486e7f801 h1:65eoE+Cwgi8PS+TBmdBn3xtS/JFeuTImzQI4GNDrhTQ=
github.com/grafana/mimir-prometheus v0.0.0-20240116133529-e3a486e7f801/go.mod h1:W4s/zaz2ypTeyg7h7HDJ4/g0+p5tXBWJ6ToK3g0a5zs=
github.com/grafana/mimir-prometheus v0.0.0-20240122165117-baa5d82b5bc8 h1:nicadoSO2KafJRExlss8+PZkgH5OaCAujWSUV7EIB7E=
github.com/grafana/mimir-prometheus v0.0.0-20240122165117-baa5d82b5bc8/go.mod h1:W4s/zaz2ypTeyg7h7HDJ4/g0+p5tXBWJ6ToK3g0a5zs=
github.com/grafana/opentracing-contrib-go-stdlib v0.0.0-20230509071955-f410e79da956 h1:em1oddjXL8c1tL0iFdtVtPloq2hRPen2MJQKoAWpxu0=
github.com/grafana/opentracing-contrib-go-stdlib v0.0.0-20230509071955-f410e79da956/go.mod h1:qtI1ogk+2JhVPIXVc6q+NHziSmy2W5GbdQZFUHADCBU=
github.com/grafana/pyroscope-go/godeltaprof v0.1.6 h1:nEdZ8louGAplSvIJi1HVp7kWvFvdiiYg3COLlTwJiFo=
Expand Down
25 changes: 18 additions & 7 deletions integration/query_frontend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,18 +394,21 @@ func runQueryFrontendTest(t *testing.T, cfg queryFrontendTestConfig) {

wg.Wait()

extra := float64(2)
// Compute the expected number of queries.
expectedQueriesCount := float64(numUsers*numQueriesPerUser) + 2
expectedIngesterQueriesCount := float64(numUsers * numQueriesPerUser) // The "time()" query and the query with time range < "query ingesters within" are not pushed down to ingesters.
if cfg.queryStatsEnabled {
extra++
expectedQueriesCount++
expectedIngesterQueriesCount++
}

require.NoError(t, queryFrontend.WaitSumMetrics(e2e.Equals(numUsers*numQueriesPerUser+extra), "cortex_query_frontend_queries_total"))
require.NoError(t, queryFrontend.WaitSumMetrics(e2e.Equals(expectedQueriesCount), "cortex_query_frontend_queries_total"))

// The number of received request is greater then the query requests because include
// The number of received requests may be greater than the query requests because include
// requests to /metrics and /ready.
require.NoError(t, queryFrontend.WaitSumMetricsWithOptions(e2e.Greater(numUsers*numQueriesPerUser), []string{"cortex_request_duration_seconds"}, e2e.WithMetricCount))
require.NoError(t, querier.WaitSumMetricsWithOptions(e2e.Greater(numUsers*numQueriesPerUser), []string{"cortex_request_duration_seconds"}, e2e.WithMetricCount))
require.NoError(t, querier.WaitSumMetricsWithOptions(e2e.Greater(numUsers*numQueriesPerUser), []string{"cortex_querier_request_duration_seconds"}, e2e.WithMetricCount))
require.NoError(t, queryFrontend.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(expectedQueriesCount), []string{"cortex_request_duration_seconds"}, e2e.WithMetricCount))
require.NoError(t, querier.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(expectedQueriesCount), []string{"cortex_request_duration_seconds"}, e2e.WithMetricCount))
require.NoError(t, querier.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(expectedQueriesCount), []string{"cortex_querier_request_duration_seconds"}, e2e.WithMetricCount))

// Ensure query stats metrics are tracked only when enabled.
if cfg.queryStatsEnabled {
Expand All @@ -417,6 +420,14 @@ func runQueryFrontendTest(t *testing.T, cfg queryFrontendTestConfig) {
require.NoError(t, queryFrontend.WaitRemovedMetric("cortex_query_seconds_total"))
}

// When the ingest storage is used, we expect that each query issued by this test was processed
// with strong read consistency by the ingester.
if flags["-ingest-storage.enabled"] == "true" {
require.NoError(t, ingester.WaitSumMetrics(e2e.Equals(expectedIngesterQueriesCount), "cortex_ingest_storage_strong_consistency_requests_total"))
} else {
require.NoError(t, ingester.WaitRemovedMetric("cortex_ingest_storage_strong_consistency_requests_total"))
}

// Ensure no service-specific metrics prefix is used by the wrong service.
assertServiceMetricsPrefixes(t, Distributor, distributor)
assertServiceMetricsPrefixes(t, Ingester, ingester)
Expand Down
193 changes: 153 additions & 40 deletions integration/ruler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,14 @@ import (

"github.com/grafana/mimir/integration/ca"
"github.com/grafana/mimir/integration/e2emimir"
"github.com/grafana/mimir/pkg/querier/api"
)

func TestRulerAPI(t *testing.T) {
var (
namespaceOne = "test_/encoded_+namespace/?"
namespaceTwo = "test_/encoded_+namespace/?/two"
ruleGroup = createTestRuleGroup(t)
ruleGroup = createTestRuleGroup()
)

s, err := e2e.NewScenario(networkName)
Expand Down Expand Up @@ -404,7 +405,7 @@ func TestRulerSharding(t *testing.T) {

func TestRulerAlertmanager(t *testing.T) {
var namespaceOne = "test_/encoded_+namespace/?"
ruleGroup := createTestRuleGroup(t)
ruleGroup := createTestRuleGroup()

s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
Expand Down Expand Up @@ -458,7 +459,7 @@ func TestRulerAlertmanager(t *testing.T) {

func TestRulerAlertmanagerTLS(t *testing.T) {
var namespaceOne = "test_/encoded_+namespace/?"
ruleGroup := createTestRuleGroup(t)
ruleGroup := createTestRuleGroup()

s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
Expand Down Expand Up @@ -1094,6 +1095,128 @@ func TestRulerRemoteEvaluation(t *testing.T) {
}
}

func TestRulerRemoteEvaluation_ShouldEnforceStrongReadConsistencyForDependentRulesWhenUsingTheIngestStorage(t *testing.T) {
const (
ruleGroupNamespace = "test"
ruleGroupName = "test"
)

s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
t.Cleanup(s.Close)

flags := mergeFlags(
CommonStorageBackendFlags(),
RulerFlags(),
BlocksStorageFlags(),
IngestStorageFlags(),
map[string]string{
"-ingester.ring.replication-factor": "1",

// No strong read consistency by default for this test. We want the ruler to enforce the strong
// consistency when required.
"-ingest-storage.read-consistency": api.ReadConsistencyEventual,
},
)

// Start dependencies.
consul := e2edb.NewConsul()
minio := e2edb.NewMinio(9000, mimirBucketName)
kafka := e2edb.NewKafka()
require.NoError(t, s.StartAndWaitReady(minio, consul, kafka))

// Start the query-frontend.
queryFrontend := e2emimir.NewQueryFrontend("query-frontend", flags)
require.NoError(t, s.Start(queryFrontend))
flags["-querier.frontend-address"] = queryFrontend.NetworkGRPCEndpoint()

// Use query-frontend for rule evaluation.
flags["-ruler.query-frontend.address"] = fmt.Sprintf("dns:///%s", queryFrontend.NetworkGRPCEndpoint())

// Start up services
distributor := e2emimir.NewDistributor("distributor", consul.NetworkHTTPEndpoint(), flags)
ingester := e2emimir.NewIngester("ingester-0", consul.NetworkHTTPEndpoint(), flags)
querier := e2emimir.NewQuerier("querier", consul.NetworkHTTPEndpoint(), flags)

require.NoError(t, s.StartAndWaitReady(distributor, ingester, querier))
require.NoError(t, s.WaitReady(queryFrontend))

// Wait until the distributor is ready.
// The distributor should have 512 tokens for the ingester ring and 1 for the distributor ring.
require.NoError(t, distributor.WaitSumMetrics(e2e.Equals(512+1), "cortex_ring_tokens_total"))

client, err := e2emimir.NewClient(distributor.HTTPEndpoint(), queryFrontend.HTTPEndpoint(), "", "", userID)
require.NoError(t, err)

// Push a test series.
now := time.Now()
series, _, _ := generateFloatSeries("series_1", now)

res, err := client.Push(series)
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)

t.Run("evaluation of independent rules should not require strong consistency", func(t *testing.T) {
ruler := e2emimir.NewRuler("ruler", consul.NetworkHTTPEndpoint(), flags)
require.NoError(t, s.StartAndWaitReady(ruler))
t.Cleanup(func() {
require.NoError(t, s.Stop(ruler))
})

rulerClient, err := e2emimir.NewClient("", "", "", ruler.HTTPEndpoint(), userID)
require.NoError(t, err)

// Create a rule group containing 2 independent rules.
group := ruleGroupWithRules(ruleGroupName, time.Second,
recordingRule("series_1:count", "count(series_1)"),
recordingRule("series_1:sum", "sum(series_1)"),
)
require.NoError(t, rulerClient.SetRuleGroup(group, ruleGroupNamespace))

// Cleanup the ruler config when the test will end, so that it doesn't interfere with other test cases.
t.Cleanup(func() {
require.NoError(t, rulerClient.DeleteRuleNamespace(ruleGroupNamespace))
})

// Wait until the rules are evaluated.
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(float64(len(group.Rules))), []string{"cortex_prometheus_rule_evaluations_total"}, e2e.WaitMissingMetrics))

// The rules have been evaluated at least once. We expect the rule queries
// have run with eventual consistency because they are independent.
require.NoError(t, ingester.WaitSumMetrics(e2e.Equals(0), "cortex_ingest_storage_strong_consistency_requests_total"))
})

t.Run("evaluation of dependent rules should require strong consistency", func(t *testing.T) {
ruler := e2emimir.NewRuler("ruler", consul.NetworkHTTPEndpoint(), flags)
require.NoError(t, s.StartAndWaitReady(ruler))
t.Cleanup(func() {
require.NoError(t, s.Stop(ruler))
})

rulerClient, err := e2emimir.NewClient("", "", "", ruler.HTTPEndpoint(), userID)
require.NoError(t, err)

// Create a rule group containing 2 rules: the 2nd one depends on the 1st one.
group := ruleGroupWithRules(ruleGroupName, time.Second,
recordingRule("series_1:count", "count(series_1)"),
recordingRule("series_1:count:sum", "sum(series_1:count)"),
)
require.NoError(t, rulerClient.SetRuleGroup(group, ruleGroupNamespace))

// Cleanup the ruler config when the test will end, so that it doesn't interfere with other test cases.
t.Cleanup(func() {
require.NoError(t, rulerClient.DeleteRuleNamespace(ruleGroupNamespace))
})

// Wait until the rules are evaluated.
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(float64(len(group.Rules))), []string{"cortex_prometheus_rule_evaluations_total"}, e2e.WaitMissingMetrics))

// The rules have been evaluated at least once. We expect the 2nd rule query
// has run with strong consistency because it depends on the 1st one.
require.NoError(t, ingester.WaitSumMetrics(e2e.GreaterOrEqual(1), "cortex_ingest_storage_strong_consistency_requests_total"))
})
}

func TestRuler_RestoreWithLongForPeriod(t *testing.T) {
const (
forGracePeriod = 5 * time.Second
Expand Down Expand Up @@ -1304,58 +1427,48 @@ func ruleGroupMatcher(user, namespace, groupName string) *labels.Matcher {
}

func ruleGroupWithRecordingRule(groupName string, ruleName string, expression string) rulefmt.RuleGroup {
var recordNode = yaml.Node{}
var exprNode = yaml.Node{}
return ruleGroupWithRules(groupName, 10, recordingRule(ruleName, expression))
}

recordNode.SetString(ruleName)
exprNode.SetString(expression)
func ruleGroupWithAlertingRule(groupName string, ruleName string, expression string) rulefmt.RuleGroup {
return ruleGroupWithRules(groupName, 10, alertingRule(ruleName, expression))
}

func ruleGroupWithRules(groupName string, interval time.Duration, rules ...rulefmt.RuleNode) rulefmt.RuleGroup {
return rulefmt.RuleGroup{
Name: groupName,
Interval: 10,
Rules: []rulefmt.RuleNode{{
Record: recordNode,
Expr: exprNode,
}},
Interval: model.Duration(interval),
Rules: rules,
}
}

func ruleGroupWithAlertingRule(groupName string, ruleName string, expression string) rulefmt.RuleGroup {
func createTestRuleGroup() rulefmt.RuleGroup {
return ruleGroupWithRules("test_encoded_+\"+group_name/?", 100, recordingRule("test_rule", "up"))
}

func recordingRule(record, expr string) rulefmt.RuleNode {
var recordNode = yaml.Node{}
var exprNode = yaml.Node{}

recordNode.SetString(ruleName)
exprNode.SetString(expression)
recordNode.SetString(record)
exprNode.SetString(expr)

return rulefmt.RuleGroup{
Name: groupName,
Interval: 10,
Rules: []rulefmt.RuleNode{{
Alert: recordNode,
Expr: exprNode,
For: 30,
}},
return rulefmt.RuleNode{
Record: recordNode,
Expr: exprNode,
}
}

func createTestRuleGroup(t *testing.T) rulefmt.RuleGroup {
t.Helper()
func alertingRule(alert, expr string) rulefmt.RuleNode {
var alertNode = yaml.Node{}
var exprNode = yaml.Node{}

var (
recordNode = yaml.Node{}
exprNode = yaml.Node{}
)
alertNode.SetString(alert)
exprNode.SetString(expr)

recordNode.SetString("test_rule")
exprNode.SetString("up")
return rulefmt.RuleGroup{
Name: "test_encoded_+\"+group_name/?",
Interval: 100,
Rules: []rulefmt.RuleNode{
{
Record: recordNode,
Expr: exprNode,
},
},
return rulefmt.RuleNode{
Alert: alertNode,
Expr: exprNode,
For: 30,
}
}
12 changes: 8 additions & 4 deletions pkg/ruler/compat.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ type ManagerFactory func(ctx context.Context, userID string, notifier *notifier.
func DefaultTenantManagerFactory(
cfg Config,
p Pusher,
embeddedQueryable storage.Queryable,
queryable storage.Queryable,
queryFunc rules.QueryFunc,
overrides RulesLimits,
reg prometheus.Registerer,
Expand Down Expand Up @@ -304,14 +304,18 @@ func DefaultTenantManagerFactory(
queryTime = rulerQuerySeconds.WithLabelValues(userID)
zeroFetchedSeriesCount = zeroFetchedSeriesQueries.WithLabelValues(userID)
}
var wrappedQueryFunc rules.QueryFunc

wrappedQueryFunc = MetricsQueryFunc(queryFunc, totalQueries, failedQueries)
// Wrap the query function with our custom logic.
wrappedQueryFunc := WrapQueryFuncWithReadConsistency(queryFunc)
wrappedQueryFunc = MetricsQueryFunc(wrappedQueryFunc, totalQueries, failedQueries)
wrappedQueryFunc = RecordAndReportRuleQueryMetrics(wrappedQueryFunc, queryTime, zeroFetchedSeriesCount, logger)

// Wrap the queryable with our custom logic.
wrappedQueryable := WrapQueryableWithReadConsistency(queryable)

return rules.NewManager(&rules.ManagerOptions{
Appendable: NewPusherAppendable(p, userID, totalWrites, failedWrites),
Queryable: embeddedQueryable,
Queryable: wrappedQueryable,
QueryFunc: wrappedQueryFunc,
Context: user.InjectOrgID(ctx, userID),
GroupEvaluationContextFunc: FederatedGroupContextFunc,
Expand Down
Loading

0 comments on commit 094b110

Please sign in to comment.