Skip to content

Commit

Permalink
Open circuit breakers on timeouts and per-instance limit errors only (#…
Browse files Browse the repository at this point in the history
…7310)

* Open circuit breakers on timeouts and per-instance limit errors only

Signed-off-by: Yuri Nikolic <[email protected]>

* Update CHANGELOG

Signed-off-by: Yuri Nikolic <[email protected]>

---------

Signed-off-by: Yuri Nikolic <[email protected]>
  • Loading branch information
duricanikolic authored Feb 7, 2024
1 parent 4916393 commit 24d97c4
Show file tree
Hide file tree
Showing 7 changed files with 51 additions and 13 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
* [CHANGE] Query-frontend: the default value of the CLI flag `-query-frontend.max-cache-freshness` (and its respective YAML configuration parameter) has been changed from `1m` to `10m`. #7161
* [CHANGE] Distributor: default the optimization `-distributor.write-requests-buffer-pooling-enabled` to `true`. #7165
* [CHANGE] Tracing: Move query information to span attributes instead of span logs. #7046
* [CHANGE] Distributor: the default value of circuit breaker's CLI flag `-ingester.client.circuit-breaker.cooldown-period` has been changed from `1m` to `10s`. #7310
* [FEATURE] Introduce `-server.log-source-ips-full` option to log all IPs from `Forwarded`, `X-Real-IP`, `X-Forwarded-For` headers. #7250
* [FEATURE] Introduce `-tenant-federation.max-tenants` option to limit the max number of tenants allowed for requests when federation is enabled. #6959
* [FEATURE] Cardinality API: added a new `count_method` parameter which enables counting active label values. #7085
Expand Down
2 changes: 1 addition & 1 deletion cmd/mimir/config-descriptor.json
Original file line number Diff line number Diff line change
Expand Up @@ -2267,7 +2267,7 @@
"required": false,
"desc": "How long the circuit breaker will stay in the open state before allowing some requests",
"fieldValue": null,
"fieldDefaultValue": 60000000000,
"fieldDefaultValue": 10000000000,
"fieldFlag": "ingester.client.circuit-breaker.cooldown-period",
"fieldType": "duration",
"fieldCategory": "experimental"
Expand Down
2 changes: 1 addition & 1 deletion cmd/mimir/help-all.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -1274,7 +1274,7 @@ Usage of ./cmd/mimir/mimir:
-ingester.client.backoff-retries int
Number of times to backoff and retry before failing. (default 10)
-ingester.client.circuit-breaker.cooldown-period duration
[experimental] How long the circuit breaker will stay in the open state before allowing some requests (default 1m0s)
[experimental] How long the circuit breaker will stay in the open state before allowing some requests (default 10s)
-ingester.client.circuit-breaker.enabled
[experimental] Enable circuit breaking when making requests to ingesters
-ingester.client.circuit-breaker.failure-execution-threshold uint
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2356,7 +2356,7 @@ circuit_breaker:
# (experimental) How long the circuit breaker will stay in the open state
# before allowing some requests
# CLI flag: -ingester.client.circuit-breaker.cooldown-period
[cooldown_period: <duration> | default = 1m]
[cooldown_period: <duration> | default = 10s]
# (deprecated) If set to true, gRPC status codes will be reported in
# "status_code" label of "cortex_ingester_client_request_duration_seconds"
Expand Down
24 changes: 19 additions & 5 deletions pkg/ingester/client/circuitbreaker.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@ import (
"github.com/failsafe-go/failsafe-go/circuitbreaker"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/grpcutil"
"github.com/grafana/dskit/ring"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/grafana/mimir/pkg/mimirpb"
)

const (
Expand Down Expand Up @@ -97,9 +99,21 @@ func isFailure(err error) bool {
return false
}

// We only consider timeouts or the ingester being unavailable (returned when hitting
// per-instance limits) to be errors worthy of tripping the circuit breaker since these
// We only consider timeouts or ingester hitting a per-instance limit
// to be errors worthy of tripping the circuit breaker since these
// are specific to a particular ingester, not a user or request.
code := status.Code(err)
return code == codes.Unavailable || code == codes.DeadlineExceeded
if stat, ok := grpcutil.ErrorToStatus(err); ok {
if stat.Code() == codes.DeadlineExceeded {
return true
}

details := stat.Details()
if len(details) != 1 {
return false
}
if errDetails, ok := details[0].(*mimirpb.ErrorDetails); ok {
return errDetails.GetCause() == mimirpb.INSTANCE_LIMIT
}
}
return false
}
31 changes: 27 additions & 4 deletions pkg/ingester/client/circuitbreaker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,16 @@ import (
"time"

"github.com/failsafe-go/failsafe-go/circuitbreaker"
"github.com/gogo/status"
"github.com/grafana/dskit/grpcutil"
"github.com/grafana/dskit/ring"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/grafana/mimir/pkg/mimirpb"
"github.com/grafana/mimir/pkg/util/test"
)

Expand All @@ -44,11 +45,33 @@ func TestIsFailure(t *testing.T) {
require.True(t, isFailure(fmt.Errorf("%w", err)))
})

t.Run("gRPC unavailable", func(t *testing.T) {
err := status.Error(codes.Unavailable, "broken!")
t.Run("gRPC unavailable with INSTANCE_LIMIT details", func(t *testing.T) {
err := perInstanceLimitError(t)
require.True(t, isFailure(err))
require.True(t, isFailure(fmt.Errorf("%w", err)))
})

t.Run("gRPC unavailable with SERVICE_UNAVAILABLE details is not a failure", func(t *testing.T) {
stat := status.New(codes.Unavailable, "broken!")
stat, err := stat.WithDetails(&mimirpb.ErrorDetails{Cause: mimirpb.SERVICE_UNAVAILABLE})
require.NoError(t, err)
err = stat.Err()
require.False(t, isFailure(err))
require.False(t, isFailure(fmt.Errorf("%w", err)))
})

t.Run("gRPC unavailable without details is not a failure", func(t *testing.T) {
err := status.Error(codes.Unavailable, "broken!")
require.False(t, isFailure(err))
require.False(t, isFailure(fmt.Errorf("%w", err)))
})
}

func perInstanceLimitError(t *testing.T) error {
stat := status.New(codes.Unavailable, "broken!")
stat, err := stat.WithDetails(&mimirpb.ErrorDetails{Cause: mimirpb.INSTANCE_LIMIT})
require.NoError(t, err)
return stat.Err()
}

func TestNewCircuitBreaker(t *testing.T) {
Expand All @@ -59,7 +82,7 @@ func TestNewCircuitBreaker(t *testing.T) {

// gRPC invoker that returns an error that will be treated as an error by the circuit breaker
failure := func(currentCtx context.Context, currentMethod string, currentReq, currentRepl interface{}, currentConn *grpc.ClientConn, currentOpts ...grpc.CallOption) error {
return status.Error(codes.Unavailable, "failed")
return perInstanceLimitError(t)
}

conn := grpc.ClientConn{}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ingester/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func (cfg *CircuitBreakerConfig) RegisterFlagsWithPrefix(prefix string, f *flag.
f.UintVar(&cfg.FailureThreshold, prefix+".circuit-breaker.failure-threshold", 10, "Max percentage of requests that can fail over period before the circuit breaker opens")
f.UintVar(&cfg.FailureExecutionThreshold, prefix+".circuit-breaker.failure-execution-threshold", 100, "How many requests must have been executed in period for the circuit breaker to be eligible to open for the rate of failures")
f.DurationVar(&cfg.ThresholdingPeriod, prefix+".circuit-breaker.thresholding-period", time.Minute, "Moving window of time that the percentage of failed requests is computed over")
f.DurationVar(&cfg.CooldownPeriod, prefix+".circuit-breaker.cooldown-period", time.Minute, "How long the circuit breaker will stay in the open state before allowing some requests")
f.DurationVar(&cfg.CooldownPeriod, prefix+".circuit-breaker.cooldown-period", 10*time.Second, "How long the circuit breaker will stay in the open state before allowing some requests")
}

func (cfg *CircuitBreakerConfig) Validate() error {
Expand Down

0 comments on commit 24d97c4

Please sign in to comment.