Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ruler): add flags for the query frontend retries #10259

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
* [ENHANCEMENT] Distributor: OTLP receiver now converts also metric metadata. See also https://github.com/prometheus/prometheus/pull/15416. #10168
* [ENHANCEMENT] Distributor: discard float and histogram samples with duplicated timestamps from each timeseries in a request before the request is forwarded to ingesters. Discarded samples are tracked by the `cortex_discarded_samples_total` metrics with reason `sample_duplicate_timestamp`. #10145
* [ENHANCEMENT] Ruler: Add `cortex_prometheus_rule_group_last_rule_duration_sum_seconds` metric to track the total evaluation duration of a rule group regardless of concurrency #10189
* [ENHANCEMENT] Ruler: Add experimental flags to configure the retry behavior when forwarding queries to the query frontend: `-ruler.query-frontend.max-retries`, `-ruler.query-frontend.min-retry-backoff` and `-ruler.query-frontend.max-retry-backoff`. #10259
* [BUGFIX] Distributor: Use a boolean to track changes while merging the ReplicaDesc components, rather than comparing the objects directly. #10185
* [BUGFIX] Querier: fix timeout responding to query-frontend when response size is very close to `-querier.frontend-client.grpc-max-send-msg-size`. #10154
* [BUGFIX] Ruler: fix indeterminate rules being always run concurrently (instead of never) when `-ruler.max-independent-rule-evaluation-concurrency` is set. https://github.com/prometheus/prometheus/pull/15560 #10258
Expand Down
33 changes: 33 additions & 0 deletions cmd/mimir/config-descriptor.json
Original file line number Diff line number Diff line change
Expand Up @@ -12956,6 +12956,39 @@
"fieldFlag": "ruler.query-frontend.address",
"fieldType": "string"
},
{
"kind": "field",
"name": "max_retries",
"required": false,
"desc": "Maximum number of retries for a single request.",
"fieldValue": null,
"fieldDefaultValue": 3,
"fieldFlag": "ruler.query-frontend.max-retries",
"fieldType": "int",
"fieldCategory": "experimental"
},
{
"kind": "field",
"name": "min_retry_backoff",
"required": false,
"desc": "Minimum backoff duration for retries.",
"fieldValue": null,
"fieldDefaultValue": 100000000,
"fieldFlag": "ruler.query-frontend.min-retry-backoff",
"fieldType": "duration",
"fieldCategory": "experimental"
},
{
"kind": "field",
"name": "max_retry_backoff",
"required": false,
"desc": "Maximum backoff duration for retries.",
"fieldValue": null,
"fieldDefaultValue": 2000000000,
"fieldFlag": "ruler.query-frontend.max-retry-backoff",
"fieldType": "duration",
"fieldCategory": "experimental"
},
{
"kind": "block",
"name": "grpc_client_config",
Expand Down
6 changes: 6 additions & 0 deletions cmd/mimir/help-all.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -2987,6 +2987,12 @@ Usage of ./cmd/mimir/mimir:
Override the default minimum TLS version. Allowed values: VersionTLS10, VersionTLS11, VersionTLS12, VersionTLS13
-ruler.query-frontend.grpc-client-config.tls-server-name string
Override the expected name on the server certificate.
-ruler.query-frontend.max-retries int
[experimental] Maximum number of retries for a single request. (default 3)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is actually max attempts, right? So it includes the first try. That's my understanding from reading the dskit code. I think we should clarify the CLI flag then.

-ruler.query-frontend.max-retry-backoff duration
[experimental] Maximum backoff duration for retries. (default 2s)
-ruler.query-frontend.min-retry-backoff duration
[experimental] Minimum backoff duration for retries. (default 100ms)
-ruler.query-frontend.query-result-response-format string
Format to use when retrieving query results from query-frontends. Supported values: json, protobuf (default "protobuf")
-ruler.query-stats-enabled
Expand Down
12 changes: 12 additions & 0 deletions docs/sources/mimir/configure/configuration-parameters/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -2096,6 +2096,18 @@ query_frontend:
# CLI flag: -ruler.query-frontend.address
[address: <string> | default = ""]

# (experimental) Maximum number of retries for a single request.
# CLI flag: -ruler.query-frontend.max-retries
[max_retries: <int> | default = 3]

# (experimental) Minimum backoff duration for retries.
# CLI flag: -ruler.query-frontend.min-retry-backoff
[min_retry_backoff: <duration> | default = 100ms]

# (experimental) Maximum backoff duration for retries.
# CLI flag: -ruler.query-frontend.max-retry-backoff
[max_retry_backoff: <duration> | default = 2s]

# Configures the gRPC client used to communicate between the rulers and
# query-frontends.
# The CLI flags prefix for this block configuration is:
Expand Down
8 changes: 7 additions & 1 deletion pkg/mimir/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/backoff"
"github.com/grafana/dskit/dns"
httpgrpc_server "github.com/grafana/dskit/httpgrpc/server"
"github.com/grafana/dskit/kv"
Expand Down Expand Up @@ -852,7 +853,12 @@ func (t *Mimir) initRuler() (serv services.Service, err error) {
if err != nil {
return nil, err
}
remoteQuerier := ruler.NewRemoteQuerier(queryFrontendClient, t.Cfg.Querier.EngineConfig.Timeout, t.Cfg.Ruler.QueryFrontend.QueryResultResponseFormat, t.Cfg.API.PrometheusHTTPPrefix, util_log.Logger, ruler.WithOrgIDMiddleware)
retryConfig := backoff.Config{
MinBackoff: t.Cfg.Ruler.QueryFrontend.MinRetryBackoff,
MaxBackoff: t.Cfg.Ruler.QueryFrontend.MaxRetryBackoff,
MaxRetries: t.Cfg.Ruler.QueryFrontend.MaxRetries,
}
remoteQuerier := ruler.NewRemoteQuerier(queryFrontendClient, retryConfig, t.Cfg.Querier.EngineConfig.Timeout, t.Cfg.Ruler.QueryFrontend.QueryResultResponseFormat, t.Cfg.API.PrometheusHTTPPrefix, util_log.Logger, ruler.WithOrgIDMiddleware)

embeddedQueryable = prom_remote.NewSampleAndChunkQueryableClient(
remoteQuerier,
Expand Down
21 changes: 13 additions & 8 deletions pkg/ruler/remotequerier.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,6 @@ const (

statusError = "error"

maxRequestRetries = 3

formatJSON = "json"
formatProtobuf = "protobuf"
)
Expand All @@ -63,6 +61,11 @@ type QueryFrontendConfig struct {
// Address is the address of the query-frontend to connect to.
Address string `yaml:"address"`

// Retry configuration.
MaxRetries int `yaml:"max_retries" category:"experimental"`
MinRetryBackoff time.Duration `yaml:"min_retry_backoff" category:"experimental"`
MaxRetryBackoff time.Duration `yaml:"max_retry_backoff" category:"experimental"`

// GRPCClientConfig contains gRPC specific config options.
GRPCClientConfig grpcclient.Config `yaml:"grpc_client_config" doc:"description=Configures the gRPC client used to communicate between the rulers and query-frontends."`

Expand All @@ -75,6 +78,9 @@ func (c *QueryFrontendConfig) RegisterFlags(f *flag.FlagSet) {
"",
"GRPC listen address of the query-frontend(s). Must be a DNS address (prefixed with dns:///) "+
"to enable client side load balancing.")
f.IntVar(&c.MaxRetries, "ruler.query-frontend.max-retries", 3, "Maximum number of retries for a single request.")
f.DurationVar(&c.MinRetryBackoff, "ruler.query-frontend.min-retry-backoff", 100*time.Millisecond, "Minimum backoff duration for retries.")
f.DurationVar(&c.MaxRetryBackoff, "ruler.query-frontend.max-retry-backoff", 2*time.Second, "Maximum backoff duration for retries.")

c.GRPCClientConfig.CustomCompressors = []string{s2.Name}
c.GRPCClientConfig.RegisterFlagsWithPrefix("ruler.query-frontend.grpc-client-config", f)
Expand Down Expand Up @@ -115,6 +121,7 @@ type Middleware func(ctx context.Context, req *httpgrpc.HTTPRequest) error
// RemoteQuerier executes read operations against a httpgrpc.HTTPClient.
type RemoteQuerier struct {
client httpgrpc.HTTPClient
retryConfig backoff.Config
timeout time.Duration
middlewares []Middleware
promHTTPPrefix string
Expand All @@ -129,6 +136,7 @@ var protobufDecoderInstance = protobufDecoder{}
// NewRemoteQuerier creates and initializes a new RemoteQuerier instance.
func NewRemoteQuerier(
client httpgrpc.HTTPClient,
retryConfig backoff.Config,
timeout time.Duration,
preferredQueryResultResponseFormat string,
prometheusHTTPPrefix string,
Expand All @@ -137,6 +145,7 @@ func NewRemoteQuerier(
) *RemoteQuerier {
return &RemoteQuerier{
client: client,
retryConfig: retryConfig,
timeout: timeout,
middlewares: middlewares,
promHTTPPrefix: prometheusHTTPPrefix,
Expand Down Expand Up @@ -309,15 +318,11 @@ func (q *RemoteQuerier) createRequest(ctx context.Context, query string, ts time
func (q *RemoteQuerier) sendRequest(ctx context.Context, req *httpgrpc.HTTPRequest, logger log.Logger) (*httpgrpc.HTTPResponse, error) {
// Ongoing request may be cancelled during evaluation due to some transient error or server shutdown,
// so we'll keep retrying until we get a successful response or backoff is terminated.
retryConfig := backoff.Config{
MinBackoff: 100 * time.Millisecond,
MaxBackoff: 2 * time.Second,
MaxRetries: maxRequestRetries,
}
retry := backoff.New(ctx, retryConfig)
retry := backoff.New(ctx, q.retryConfig)

for {
resp, err := q.client.Handle(ctx, req)

if err == nil {
// Responses with status codes 4xx should always be considered erroneous.
// These errors shouldn't be retried because it is expected that
Expand Down
33 changes: 20 additions & 13 deletions pkg/ruler/remotequerier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/gogo/protobuf/proto"
"github.com/gogo/status"
"github.com/golang/snappy"
"github.com/grafana/dskit/backoff"
"github.com/grafana/dskit/grpcutil"
"github.com/grafana/dskit/httpgrpc"
"github.com/prometheus/prometheus/model/histogram"
Expand All @@ -30,6 +31,12 @@ import (
"github.com/grafana/mimir/pkg/querier/api"
)

var retryConfig = backoff.Config{
MinBackoff: 100 * time.Millisecond,
MaxBackoff: 1 * time.Second,
MaxRetries: 3,
}

type mockHTTPGRPCClient func(ctx context.Context, req *httpgrpc.HTTPRequest, _ ...grpc.CallOption) (*httpgrpc.HTTPResponse, error)

func (c mockHTTPGRPCClient) Handle(ctx context.Context, req *httpgrpc.HTTPRequest, opts ...grpc.CallOption) (*httpgrpc.HTTPResponse, error) {
Expand Down Expand Up @@ -64,7 +71,7 @@ func TestRemoteQuerier_Read(t *testing.T) {
t.Run("should issue a remote read request", func(t *testing.T) {
client, inReq := setup()

q := NewRemoteQuerier(client, time.Minute, formatJSON, "/prometheus", log.NewNopLogger())
q := NewRemoteQuerier(client, retryConfig, time.Minute, formatJSON, "/prometheus", log.NewNopLogger())
_, err := q.Read(context.Background(), &prompb.Query{}, false)
require.NoError(t, err)

Expand All @@ -76,7 +83,7 @@ func TestRemoteQuerier_Read(t *testing.T) {
t.Run("should not inject the read consistency header if none is defined in the context", func(t *testing.T) {
client, inReq := setup()

q := NewRemoteQuerier(client, time.Minute, formatJSON, "/prometheus", log.NewNopLogger())
q := NewRemoteQuerier(client, retryConfig, time.Minute, formatJSON, "/prometheus", log.NewNopLogger())
_, err := q.Read(context.Background(), &prompb.Query{}, false)
require.NoError(t, err)

Expand All @@ -86,7 +93,7 @@ func TestRemoteQuerier_Read(t *testing.T) {
t.Run("should inject the read consistency header if it is defined in the context", func(t *testing.T) {
client, inReq := setup()

q := NewRemoteQuerier(client, time.Minute, formatJSON, "/prometheus", log.NewNopLogger())
q := NewRemoteQuerier(client, retryConfig, time.Minute, formatJSON, "/prometheus", log.NewNopLogger())

ctx := api.ContextWithReadConsistencyLevel(context.Background(), api.ReadConsistencyStrong)
_, err := q.Read(ctx, &prompb.Query{}, false)
Expand All @@ -101,7 +108,7 @@ func TestRemoteQuerier_ReadReqTimeout(t *testing.T) {
<-ctx.Done()
return nil, ctx.Err()
}
q := NewRemoteQuerier(mockHTTPGRPCClient(mockClientFn), time.Second, formatJSON, "/prometheus", log.NewNopLogger())
q := NewRemoteQuerier(mockHTTPGRPCClient(mockClientFn), retryConfig, time.Second, formatJSON, "/prometheus", log.NewNopLogger())

_, err := q.Read(context.Background(), &prompb.Query{}, false)
require.Error(t, err)
Expand Down Expand Up @@ -139,7 +146,7 @@ func TestRemoteQuerier_Query(t *testing.T) {
t.Run(fmt.Sprintf("format = %s", format), func(t *testing.T) {
client, inReq := setup()

q := NewRemoteQuerier(client, time.Minute, format, "/prometheus", log.NewNopLogger())
q := NewRemoteQuerier(client, retryConfig, time.Minute, format, "/prometheus", log.NewNopLogger())
_, err := q.Query(context.Background(), "qs", tm)
require.NoError(t, err)

Expand All @@ -165,7 +172,7 @@ func TestRemoteQuerier_Query(t *testing.T) {
t.Run("should not inject the read consistency header if none is defined in the context", func(t *testing.T) {
client, inReq := setup()

q := NewRemoteQuerier(client, time.Minute, formatJSON, "/prometheus", log.NewNopLogger())
q := NewRemoteQuerier(client, retryConfig, time.Minute, formatJSON, "/prometheus", log.NewNopLogger())
_, err := q.Query(context.Background(), "qs", tm)
require.NoError(t, err)

Expand All @@ -175,7 +182,7 @@ func TestRemoteQuerier_Query(t *testing.T) {
t.Run("should inject the read consistency header if it is defined in the context", func(t *testing.T) {
client, inReq := setup()

q := NewRemoteQuerier(client, time.Minute, formatJSON, "/prometheus", log.NewNopLogger())
q := NewRemoteQuerier(client, retryConfig, time.Minute, formatJSON, "/prometheus", log.NewNopLogger())

ctx := api.ContextWithReadConsistencyLevel(context.Background(), api.ReadConsistencyStrong)
_, err := q.Query(ctx, "qs", tm)
Expand Down Expand Up @@ -276,7 +283,7 @@ func TestRemoteQuerier_QueryRetryOnFailure(t *testing.T) {
}
return testCase.response, nil
}
q := NewRemoteQuerier(mockHTTPGRPCClient(mockClientFn), time.Minute, formatJSON, "/prometheus", log.NewNopLogger())
q := NewRemoteQuerier(mockHTTPGRPCClient(mockClientFn), retryConfig, time.Minute, formatJSON, "/prometheus", log.NewNopLogger())
require.Equal(t, int64(0), count.Load())
_, err := q.Query(ctx, "qs", time.Now())
if testCase.err == nil {
Expand Down Expand Up @@ -405,7 +412,7 @@ func TestRemoteQuerier_QueryJSONDecoding(t *testing.T) {
Body: []byte(scenario.body),
}, nil
}
q := NewRemoteQuerier(mockHTTPGRPCClient(mockClientFn), time.Minute, formatJSON, "/prometheus", log.NewNopLogger())
q := NewRemoteQuerier(mockHTTPGRPCClient(mockClientFn), retryConfig, time.Minute, formatJSON, "/prometheus", log.NewNopLogger())

tm := time.Unix(1649092025, 515834)
actual, err := q.Query(context.Background(), "qs", tm)
Expand Down Expand Up @@ -678,7 +685,7 @@ func TestRemoteQuerier_QueryProtobufDecoding(t *testing.T) {
Body: b,
}, nil
}
q := NewRemoteQuerier(mockHTTPGRPCClient(mockClientFn), time.Minute, formatProtobuf, "/prometheus", log.NewNopLogger())
q := NewRemoteQuerier(mockHTTPGRPCClient(mockClientFn), retryConfig, time.Minute, formatProtobuf, "/prometheus", log.NewNopLogger())

tm := time.Unix(1649092025, 515834)
actual, err := q.Query(context.Background(), "qs", tm)
Expand All @@ -701,7 +708,7 @@ func TestRemoteQuerier_QueryUnknownResponseContentType(t *testing.T) {
Body: []byte("some body content"),
}, nil
}
q := NewRemoteQuerier(mockHTTPGRPCClient(mockClientFn), time.Minute, formatJSON, "/prometheus", log.NewNopLogger())
q := NewRemoteQuerier(mockHTTPGRPCClient(mockClientFn), retryConfig, time.Minute, formatJSON, "/prometheus", log.NewNopLogger())

tm := time.Unix(1649092025, 515834)
_, err := q.Query(context.Background(), "qs", tm)
Expand All @@ -713,7 +720,7 @@ func TestRemoteQuerier_QueryReqTimeout(t *testing.T) {
<-ctx.Done()
return nil, ctx.Err()
}
q := NewRemoteQuerier(mockHTTPGRPCClient(mockClientFn), time.Second, formatJSON, "/prometheus", log.NewNopLogger())
q := NewRemoteQuerier(mockHTTPGRPCClient(mockClientFn), retryConfig, time.Second, formatJSON, "/prometheus", log.NewNopLogger())

tm := time.Unix(1649092025, 515834)
_, err := q.Query(context.Background(), "qs", tm)
Expand Down Expand Up @@ -771,7 +778,7 @@ func TestRemoteQuerier_StatusErrorResponses(t *testing.T) {
return testCase.resp, testCase.err
}
logger := newLoggerWithCounter()
q := NewRemoteQuerier(mockHTTPGRPCClient(mockClientFn), time.Minute, formatJSON, "/prometheus", logger)
q := NewRemoteQuerier(mockHTTPGRPCClient(mockClientFn), retryConfig, time.Minute, formatJSON, "/prometheus", logger)

tm := time.Unix(1649092025, 515834)

Expand Down
Loading