Skip to content

Commit

Permalink
feat(ruler): add flags for the query frontend retries
Browse files Browse the repository at this point in the history
Currently, Mimir is hardcoded at 3 retries and 100ms as a minimum retry time. Could be useful to configure this
  • Loading branch information
julienduchesne committed Dec 16, 2024
1 parent 190618a commit fb55d8e
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 22 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
* [CHANGE] Querier: pass query matchers to queryable `IsApplicable` hook. #10256
* [ENHANCEMENT] Distributor: OTLP receiver now converts also metric metadata. See also https://github.com/prometheus/prometheus/pull/15416. #10168
* [ENHANCEMENT] Distributor: discard float and histogram samples with duplicated timestamps from each timeseries in a request before the request is forwarded to ingesters. Discarded samples are tracked by the `cortex_discarded_samples_total` metrics with reason `sample_duplicate_timestamp`. #10145
* [ENHANCEMENT] Ruler: Add flags to configure the retry behavior when forwarding queries to the query frontend: `-ruler.query-frontend.max-retries`, `-ruler.query-frontend.min-retry-backoff` and `-ruler.query-frontend.max-retry-backoff`. #10259
* [BUGFIX] Distributor: Use a boolean to track changes while merging the ReplicaDesc components, rather than comparing the objects directly. #10185
* [BUGFIX] Querier: fix timeout responding to query-frontend when response size is very close to `-querier.frontend-client.grpc-max-send-msg-size`. #10154

Expand Down
8 changes: 7 additions & 1 deletion pkg/mimir/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/backoff"
"github.com/grafana/dskit/dns"
httpgrpc_server "github.com/grafana/dskit/httpgrpc/server"
"github.com/grafana/dskit/kv"
Expand Down Expand Up @@ -852,7 +853,12 @@ func (t *Mimir) initRuler() (serv services.Service, err error) {
if err != nil {
return nil, err
}
remoteQuerier := ruler.NewRemoteQuerier(queryFrontendClient, t.Cfg.Querier.EngineConfig.Timeout, t.Cfg.Ruler.QueryFrontend.QueryResultResponseFormat, t.Cfg.API.PrometheusHTTPPrefix, util_log.Logger, ruler.WithOrgIDMiddleware)
retryConfig := backoff.Config{
MinBackoff: t.Cfg.Ruler.QueryFrontend.MinRetryBackoff,
MaxBackoff: t.Cfg.Ruler.QueryFrontend.MaxRetryBackoff,
MaxRetries: t.Cfg.Ruler.QueryFrontend.MaxRetries,
}
remoteQuerier := ruler.NewRemoteQuerier(queryFrontendClient, retryConfig, t.Cfg.Querier.EngineConfig.Timeout, t.Cfg.Ruler.QueryFrontend.QueryResultResponseFormat, t.Cfg.API.PrometheusHTTPPrefix, util_log.Logger, ruler.WithOrgIDMiddleware)

embeddedQueryable = prom_remote.NewSampleAndChunkQueryableClient(
remoteQuerier,
Expand Down
20 changes: 12 additions & 8 deletions pkg/ruler/remotequerier.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,6 @@ const (

statusError = "error"

maxRequestRetries = 3

formatJSON = "json"
formatProtobuf = "protobuf"
)
Expand All @@ -63,6 +61,11 @@ type QueryFrontendConfig struct {
// Address is the address of the query-frontend to connect to.
Address string `yaml:"address"`

// Retry configuration.
MaxRetries int `yaml:"max_retries"`
MinRetryBackoff time.Duration `yaml:"min_retry_backoff"`
MaxRetryBackoff time.Duration `yaml:"max_retry_backoff"`

// GRPCClientConfig contains gRPC specific config options.
GRPCClientConfig grpcclient.Config `yaml:"grpc_client_config" doc:"description=Configures the gRPC client used to communicate between the rulers and query-frontends."`

Expand All @@ -75,6 +78,9 @@ func (c *QueryFrontendConfig) RegisterFlags(f *flag.FlagSet) {
"",
"GRPC listen address of the query-frontend(s). Must be a DNS address (prefixed with dns:///) "+
"to enable client side load balancing.")
f.IntVar(&c.MaxRetries, "ruler.query-frontend.max-retries", 3, "Maximum number of retries for a single request.")
f.DurationVar(&c.MinRetryBackoff, "ruler.query-frontend.min-retry-backoff", 100*time.Millisecond, "Minimum backoff duration for retries.")
f.DurationVar(&c.MaxRetryBackoff, "ruler.query-frontend.max-retry-backoff", 2*time.Second, "Maximum backoff duration for retries.")

c.GRPCClientConfig.CustomCompressors = []string{s2.Name}
c.GRPCClientConfig.RegisterFlagsWithPrefix("ruler.query-frontend.grpc-client-config", f)
Expand Down Expand Up @@ -115,6 +121,7 @@ type Middleware func(ctx context.Context, req *httpgrpc.HTTPRequest) error
// RemoteQuerier executes read operations against a httpgrpc.HTTPClient.
type RemoteQuerier struct {
client httpgrpc.HTTPClient
retryConfig backoff.Config
timeout time.Duration
middlewares []Middleware
promHTTPPrefix string
Expand All @@ -129,6 +136,7 @@ var protobufDecoderInstance = protobufDecoder{}
// NewRemoteQuerier creates and initializes a new RemoteQuerier instance.
func NewRemoteQuerier(
client httpgrpc.HTTPClient,
retryConfig backoff.Config,
timeout time.Duration,
preferredQueryResultResponseFormat string,
prometheusHTTPPrefix string,
Expand All @@ -137,6 +145,7 @@ func NewRemoteQuerier(
) *RemoteQuerier {
return &RemoteQuerier{
client: client,
retryConfig: retryConfig,
timeout: timeout,
middlewares: middlewares,
promHTTPPrefix: prometheusHTTPPrefix,
Expand Down Expand Up @@ -309,12 +318,7 @@ func (q *RemoteQuerier) createRequest(ctx context.Context, query string, ts time
func (q *RemoteQuerier) sendRequest(ctx context.Context, req *httpgrpc.HTTPRequest, logger log.Logger) (*httpgrpc.HTTPResponse, error) {
// Ongoing request may be cancelled during evaluation due to some transient error or server shutdown,
// so we'll keep retrying until we get a successful response or backoff is terminated.
retryConfig := backoff.Config{
MinBackoff: 100 * time.Millisecond,
MaxBackoff: 2 * time.Second,
MaxRetries: maxRequestRetries,
}
retry := backoff.New(ctx, retryConfig)
retry := backoff.New(ctx, q.retryConfig)

for {
resp, err := q.client.Handle(ctx, req)
Expand Down
33 changes: 20 additions & 13 deletions pkg/ruler/remotequerier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/gogo/protobuf/proto"
"github.com/gogo/status"
"github.com/golang/snappy"
"github.com/grafana/dskit/backoff"
"github.com/grafana/dskit/grpcutil"
"github.com/grafana/dskit/httpgrpc"
"github.com/prometheus/prometheus/model/histogram"
Expand All @@ -30,6 +31,12 @@ import (
"github.com/grafana/mimir/pkg/querier/api"
)

var retryConfig = backoff.Config{
MinBackoff: 100 * time.Millisecond,
MaxBackoff: 1 * time.Second,
MaxRetries: 3,
}

type mockHTTPGRPCClient func(ctx context.Context, req *httpgrpc.HTTPRequest, _ ...grpc.CallOption) (*httpgrpc.HTTPResponse, error)

func (c mockHTTPGRPCClient) Handle(ctx context.Context, req *httpgrpc.HTTPRequest, opts ...grpc.CallOption) (*httpgrpc.HTTPResponse, error) {
Expand Down Expand Up @@ -64,7 +71,7 @@ func TestRemoteQuerier_Read(t *testing.T) {
t.Run("should issue a remote read request", func(t *testing.T) {
client, inReq := setup()

q := NewRemoteQuerier(client, time.Minute, formatJSON, "/prometheus", log.NewNopLogger())
q := NewRemoteQuerier(client, retryConfig, time.Minute, formatJSON, "/prometheus", log.NewNopLogger())
_, err := q.Read(context.Background(), &prompb.Query{}, false)
require.NoError(t, err)

Expand All @@ -76,7 +83,7 @@ func TestRemoteQuerier_Read(t *testing.T) {
t.Run("should not inject the read consistency header if none is defined in the context", func(t *testing.T) {
client, inReq := setup()

q := NewRemoteQuerier(client, time.Minute, formatJSON, "/prometheus", log.NewNopLogger())
q := NewRemoteQuerier(client, retryConfig, time.Minute, formatJSON, "/prometheus", log.NewNopLogger())
_, err := q.Read(context.Background(), &prompb.Query{}, false)
require.NoError(t, err)

Expand All @@ -86,7 +93,7 @@ func TestRemoteQuerier_Read(t *testing.T) {
t.Run("should inject the read consistency header if it is defined in the context", func(t *testing.T) {
client, inReq := setup()

q := NewRemoteQuerier(client, time.Minute, formatJSON, "/prometheus", log.NewNopLogger())
q := NewRemoteQuerier(client, retryConfig, time.Minute, formatJSON, "/prometheus", log.NewNopLogger())

ctx := api.ContextWithReadConsistencyLevel(context.Background(), api.ReadConsistencyStrong)
_, err := q.Read(ctx, &prompb.Query{}, false)
Expand All @@ -101,7 +108,7 @@ func TestRemoteQuerier_ReadReqTimeout(t *testing.T) {
<-ctx.Done()
return nil, ctx.Err()
}
q := NewRemoteQuerier(mockHTTPGRPCClient(mockClientFn), time.Second, formatJSON, "/prometheus", log.NewNopLogger())
q := NewRemoteQuerier(mockHTTPGRPCClient(mockClientFn), retryConfig, time.Second, formatJSON, "/prometheus", log.NewNopLogger())

_, err := q.Read(context.Background(), &prompb.Query{}, false)
require.Error(t, err)
Expand Down Expand Up @@ -139,7 +146,7 @@ func TestRemoteQuerier_Query(t *testing.T) {
t.Run(fmt.Sprintf("format = %s", format), func(t *testing.T) {
client, inReq := setup()

q := NewRemoteQuerier(client, time.Minute, format, "/prometheus", log.NewNopLogger())
q := NewRemoteQuerier(client, retryConfig, time.Minute, format, "/prometheus", log.NewNopLogger())
_, err := q.Query(context.Background(), "qs", tm)
require.NoError(t, err)

Expand All @@ -165,7 +172,7 @@ func TestRemoteQuerier_Query(t *testing.T) {
t.Run("should not inject the read consistency header if none is defined in the context", func(t *testing.T) {
client, inReq := setup()

q := NewRemoteQuerier(client, time.Minute, formatJSON, "/prometheus", log.NewNopLogger())
q := NewRemoteQuerier(client, retryConfig, time.Minute, formatJSON, "/prometheus", log.NewNopLogger())
_, err := q.Query(context.Background(), "qs", tm)
require.NoError(t, err)

Expand All @@ -175,7 +182,7 @@ func TestRemoteQuerier_Query(t *testing.T) {
t.Run("should inject the read consistency header if it is defined in the context", func(t *testing.T) {
client, inReq := setup()

q := NewRemoteQuerier(client, time.Minute, formatJSON, "/prometheus", log.NewNopLogger())
q := NewRemoteQuerier(client, retryConfig, time.Minute, formatJSON, "/prometheus", log.NewNopLogger())

ctx := api.ContextWithReadConsistencyLevel(context.Background(), api.ReadConsistencyStrong)
_, err := q.Query(ctx, "qs", tm)
Expand Down Expand Up @@ -276,7 +283,7 @@ func TestRemoteQuerier_QueryRetryOnFailure(t *testing.T) {
}
return testCase.response, nil
}
q := NewRemoteQuerier(mockHTTPGRPCClient(mockClientFn), time.Minute, formatJSON, "/prometheus", log.NewNopLogger())
q := NewRemoteQuerier(mockHTTPGRPCClient(mockClientFn), retryConfig, time.Minute, formatJSON, "/prometheus", log.NewNopLogger())
require.Equal(t, int64(0), count.Load())
_, err := q.Query(ctx, "qs", time.Now())
if testCase.err == nil {
Expand Down Expand Up @@ -405,7 +412,7 @@ func TestRemoteQuerier_QueryJSONDecoding(t *testing.T) {
Body: []byte(scenario.body),
}, nil
}
q := NewRemoteQuerier(mockHTTPGRPCClient(mockClientFn), time.Minute, formatJSON, "/prometheus", log.NewNopLogger())
q := NewRemoteQuerier(mockHTTPGRPCClient(mockClientFn), retryConfig, time.Minute, formatJSON, "/prometheus", log.NewNopLogger())

tm := time.Unix(1649092025, 515834)
actual, err := q.Query(context.Background(), "qs", tm)
Expand Down Expand Up @@ -678,7 +685,7 @@ func TestRemoteQuerier_QueryProtobufDecoding(t *testing.T) {
Body: b,
}, nil
}
q := NewRemoteQuerier(mockHTTPGRPCClient(mockClientFn), time.Minute, formatProtobuf, "/prometheus", log.NewNopLogger())
q := NewRemoteQuerier(mockHTTPGRPCClient(mockClientFn), retryConfig, time.Minute, formatProtobuf, "/prometheus", log.NewNopLogger())

tm := time.Unix(1649092025, 515834)
actual, err := q.Query(context.Background(), "qs", tm)
Expand All @@ -701,7 +708,7 @@ func TestRemoteQuerier_QueryUnknownResponseContentType(t *testing.T) {
Body: []byte("some body content"),
}, nil
}
q := NewRemoteQuerier(mockHTTPGRPCClient(mockClientFn), time.Minute, formatJSON, "/prometheus", log.NewNopLogger())
q := NewRemoteQuerier(mockHTTPGRPCClient(mockClientFn), retryConfig, time.Minute, formatJSON, "/prometheus", log.NewNopLogger())

tm := time.Unix(1649092025, 515834)
_, err := q.Query(context.Background(), "qs", tm)
Expand All @@ -713,7 +720,7 @@ func TestRemoteQuerier_QueryReqTimeout(t *testing.T) {
<-ctx.Done()
return nil, ctx.Err()
}
q := NewRemoteQuerier(mockHTTPGRPCClient(mockClientFn), time.Second, formatJSON, "/prometheus", log.NewNopLogger())
q := NewRemoteQuerier(mockHTTPGRPCClient(mockClientFn), retryConfig, time.Second, formatJSON, "/prometheus", log.NewNopLogger())

tm := time.Unix(1649092025, 515834)
_, err := q.Query(context.Background(), "qs", tm)
Expand Down Expand Up @@ -771,7 +778,7 @@ func TestRemoteQuerier_StatusErrorResponses(t *testing.T) {
return testCase.resp, testCase.err
}
logger := newLoggerWithCounter()
q := NewRemoteQuerier(mockHTTPGRPCClient(mockClientFn), time.Minute, formatJSON, "/prometheus", logger)
q := NewRemoteQuerier(mockHTTPGRPCClient(mockClientFn), retryConfig, time.Minute, formatJSON, "/prometheus", logger)

tm := time.Unix(1649092025, 515834)

Expand Down

0 comments on commit fb55d8e

Please sign in to comment.