Skip to content

Commit

Permalink
Enable reporting gRPC codes as labels in cortex and ingester_client r…
Browse files Browse the repository at this point in the history
…equest duration metrics (#6562)

* Enable reporting gRPC codes as labels in cortex and ingester_client request duration metrics

Signed-off-by: Yuri Nikolic <[email protected]>

* Fix review findings

Signed-off-by: Yuri Nikolic <[email protected]>

---------

Signed-off-by: Yuri Nikolic <[email protected]>
  • Loading branch information
duricanikolic authored Nov 6, 2023
1 parent dc6d3f0 commit c3b7e81
Show file tree
Hide file tree
Showing 16 changed files with 399 additions and 113 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@
* [ENHANCEMENT] Ingester: Add `-ingester.instance-limits.max-inflight-push-requests-bytes`. This limit protects the ingester against requests that together may cause an OOM. #6492
* [ENHANCEMENT] Ingester: add new per-tenant `cortex_ingester_local_limits` metric to expose the calculated local per-tenant limits seen at each ingester. Exports the local per-tenant series limit with label `{limit="max_global_series_per_user"}` #6403
* [ENHANCEMENT] Query-frontend: added "queue_time_seconds" field to "query stats" log. This is total time that query and subqueries spent in the queue, before queriers picked it up. #6537
* [ENHANCEMENT] Server: Add `-server.report-grpc-codes-in-instrumentation-label-enabled` CLI flag to specify whether gRPC status codes should be used in `status_code` label of `cortex_request_duration_seconds` metric. It defaults to false, meaning that successful and erroneous gRPC status codes are represented with `success` and `error` respectively. #6562
* [ENHANCEMENT] Server: Add `-ingester.client.report-grpc-codes-in-instrumentation-label-enabled` CLI flag to specify whether gRPC status codes should be used in `status_code` label of `cortex_ingester_client_request_duration_seconds` metric. It defaults to false, meaning that successful and erroneous gRPC status codes are represented with `2xx` and `error` respectively. #6562
* [BUGFIX] Ring: Ensure network addresses used for component hash rings are formatted correctly when using IPv6. #6068
* [BUGFIX] Query-scheduler: don't retain connections from queriers that have shut down, leading to gradually increasing enqueue latency over time. #6100 #6145
* [BUGFIX] Ingester: prevent query logic from continuing to execute after queries are canceled. #6085
Expand Down
21 changes: 21 additions & 0 deletions cmd/mimir/config-descriptor.json
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,16 @@
"fieldType": "boolean",
"fieldCategory": "advanced"
},
{
"kind": "field",
"name": "report_grpc_codes_in_instrumentation_label_enabled",
"required": false,
"desc": "If set to true, gRPC statuses will be reported in instrumentation labels with their string representations. Otherwise, they will be reported as \"error\".",
"fieldValue": null,
"fieldDefaultValue": false,
"fieldFlag": "server.report-grpc-codes-in-instrumentation-label-enabled",
"fieldType": "boolean"
},
{
"kind": "field",
"name": "graceful_shutdown_timeout",
Expand Down Expand Up @@ -2202,6 +2212,17 @@
],
"fieldValue": null,
"fieldDefaultValue": null
},
{
"kind": "field",
"name": "report_grpc_codes_in_instrumentation_label_enabled",
"required": false,
"desc": "If set to true, gRPC status codes will be reported in \"status_code\" label of \"cortex_ingester_client_request_duration_seconds\" metric. Otherwise, they will be reported as \"error\"",
"fieldValue": null,
"fieldDefaultValue": false,
"fieldFlag": "ingester.client.report-grpc-codes-in-instrumentation-label-enabled",
"fieldType": "boolean",
"fieldCategory": "advanced"
}
],
"fieldValue": null,
Expand Down
4 changes: 4 additions & 0 deletions cmd/mimir/help-all.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -1275,6 +1275,8 @@ Usage of ./cmd/mimir/mimir:
[experimental] Initial connection window size. Values less than the default are not supported and are ignored. Setting this to a value other than the default disables the BDP estimator. (default 63KiB1023B)
-ingester.client.initial-stream-window-size value
[experimental] Initial stream window size. Values less than the default are not supported and are ignored. Setting this to a value other than the default disables the BDP estimator. (default 63KiB1023B)
-ingester.client.report-grpc-codes-in-instrumentation-label-enabled
If set to true, gRPC status codes will be reported in "status_code" label of "cortex_ingester_client_request_duration_seconds" metric. Otherwise, they will be reported as "error"
-ingester.client.tls-ca-path string
Path to the CA certificates to validate server certificate against. If not set, the host's root CA certificates are used.
-ingester.client.tls-cert-path string
Expand Down Expand Up @@ -2597,6 +2599,8 @@ Usage of ./cmd/mimir/mimir:
Base path to serve all API routes from (e.g. /v1/)
-server.register-instrumentation
Register the intrumentation handlers (/metrics etc). (default true)
-server.report-grpc-codes-in-instrumentation-label-enabled
If set to true, gRPC statuses will be reported in instrumentation labels with their string representations. Otherwise, they will be reported as "error".
-server.tls-cipher-suites string
Comma-separated list of cipher suites to use. If blank, the default Go cipher suites is used.
-server.tls-min-version string
Expand Down
2 changes: 2 additions & 0 deletions cmd/mimir/help.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -671,6 +671,8 @@ Usage of ./cmd/mimir/mimir:
Optionally log request headers.
-server.log-request-headers-exclude-list string
Comma separated list of headers to exclude from loggin. Only used if server.log-request-headers is true.
-server.report-grpc-codes-in-instrumentation-label-enabled
If set to true, gRPC statuses will be reported in instrumentation labels with their string representations. Otherwise, they will be reported as "error".
-server.tls-cipher-suites string
Comma-separated list of cipher suites to use. If blank, the default Go cipher suites is used.
-server.tls-min-version string
Expand Down
11 changes: 11 additions & 0 deletions docs/sources/mimir/references/configuration-parameters/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -586,6 +586,11 @@ grpc_tls_config:
# CLI flag: -server.register-instrumentation
[register_instrumentation: <boolean> | default = true]
# If set to true, gRPC statuses will be reported in instrumentation labels with
# their string representations. Otherwise, they will be reported as "error".
# CLI flag: -server.report-grpc-codes-in-instrumentation-label-enabled
[report_grpc_codes_in_instrumentation_label_enabled: <boolean> | default = false]
# (advanced) Timeout for graceful shutdowns
# CLI flag: -server.graceful-shutdown-timeout
[graceful_shutdown_timeout: <duration> | default = 30s]
Expand Down Expand Up @@ -2265,6 +2270,12 @@ circuit_breaker:
# before allowing some requests
# CLI flag: -ingester.client.circuit-breaker.cooldown-period
[cooldown_period: <duration> | default = 1m]
# (advanced) If set to true, gRPC status codes will be reported in "status_code"
# label of "cortex_ingester_client_request_duration_seconds" metric. Otherwise,
# they will be reported as "error"
# CLI flag: -ingester.client.report-grpc-codes-in-instrumentation-label-enabled
[report_grpc_codes_in_instrumentation_label_enabled: <boolean> | default = false]
```

### grpc_client
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ require (
github.com/golang/snappy v0.0.4
github.com/google/gopacket v1.1.19
github.com/gorilla/mux v1.8.1
github.com/grafana/dskit v0.0.0-20231031132813-52f4e8d82d59
github.com/grafana/dskit v0.0.0-20231106102023-e59db7bb4403
github.com/grafana/e2e v0.1.1
github.com/hashicorp/golang-lru v1.0.2 // indirect
github.com/json-iterator/go v1.1.12
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -538,8 +538,8 @@ github.com/gosimple/slug v1.1.1 h1:fRu/digW+NMwBIP+RmviTK97Ho/bEj/C9swrCspN3D4=
github.com/gosimple/slug v1.1.1/go.mod h1:ER78kgg1Mv0NQGlXiDe57DpCyfbNywXXZ9mIorhxAf0=
github.com/grafana-tools/sdk v0.0.0-20220919052116-6562121319fc h1:PXZQA2WCxe85Tnn+WEvr8fDpfwibmEPgfgFEaC87G24=
github.com/grafana-tools/sdk v0.0.0-20220919052116-6562121319fc/go.mod h1:AHHlOEv1+GGQ3ktHMlhuTUwo3zljV3QJbC0+8o2kn+4=
github.com/grafana/dskit v0.0.0-20231031132813-52f4e8d82d59 h1:tWbF2UD8HgvpqyxV60zmUDDzJI2gybMJxw4/RY/UNTo=
github.com/grafana/dskit v0.0.0-20231031132813-52f4e8d82d59/go.mod h1:8dsy5tQOkeNQyjXpm5mQsbCu3H5uzeBD35MzRQFznKU=
github.com/grafana/dskit v0.0.0-20231106102023-e59db7bb4403 h1:CGq4R5z3RJuNVXJqsZuJXd1cKC1quLGKQic505VWtmY=
github.com/grafana/dskit v0.0.0-20231106102023-e59db7bb4403/go.mod h1:8dsy5tQOkeNQyjXpm5mQsbCu3H5uzeBD35MzRQFznKU=
github.com/grafana/e2e v0.1.1 h1:/b6xcv5BtoBnx8cZnCiey9DbjEc8z7gXHO5edoeRYxc=
github.com/grafana/e2e v0.1.1/go.mod h1:RpNLgae5VT+BUHvPE+/zSypmOXKwEu4t+tnEMS1ATaE=
github.com/grafana/goautoneg v0.0.0-20231010094147-47ce5e72a9ae h1:Yxbw9jKGJVC6qAK5Ubzzb/qZwM6rRMMqaDc/d4Vp3pM=
Expand Down
156 changes: 156 additions & 0 deletions integration/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -678,3 +678,159 @@ func TestIngesterQueryingWithRequestMinimization(t *testing.T) {
})
}
}

func TestIngesterReportGRPCStatusCodes(t *testing.T) {
query := "foobar"
queryEnd := time.Now().Round(time.Second)
queryStart := queryEnd.Add(-1 * time.Hour)
queryStep := 10 * time.Minute

testCases := map[string]struct {
serverReportGRPCStatusCodes bool
ingesterClientReportGRPCStatusCodes bool
expectedPushStatusCode string
expectedQueryStatusCode string
}{
"when server and ingester client do not report grpc codes, successful push and query give success and 2xx": {
serverReportGRPCStatusCodes: false,
ingesterClientReportGRPCStatusCodes: false,
expectedPushStatusCode: "success",
expectedQueryStatusCode: "2xx",
},
"when server does not report and ingester client reports grpc codes, successful push and query give success and OK": {
serverReportGRPCStatusCodes: false,
ingesterClientReportGRPCStatusCodes: true,
expectedPushStatusCode: "success",
expectedQueryStatusCode: "OK",
},
"when server reports and ingester client does not report grpc codes, successful push and query give OK and 2xx": {
serverReportGRPCStatusCodes: true,
ingesterClientReportGRPCStatusCodes: false,
expectedPushStatusCode: "OK",
expectedQueryStatusCode: "2xx",
},
"when server and ingester client report grpc codes, successful push and query give OK and OK": {
serverReportGRPCStatusCodes: true,
ingesterClientReportGRPCStatusCodes: true,
expectedPushStatusCode: "OK",
expectedQueryStatusCode: "OK",
},
}

series := []prompb.TimeSeries{
{
Labels: []prompb.Label{
{
Name: "__name__",
Value: "not_foobar",
},
},
Samples: []prompb.Sample{
{
Timestamp: queryStart.Add(-2 * time.Second).UnixMilli(),
Value: 100,
},
},
},
}
expectedQueryResult := model.Matrix{}

for testName, testData := range testCases {
t.Run(testName, func(t *testing.T) {
s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
defer s.Close()

baseFlags := map[string]string{
"-distributor.ingestion-tenant-shard-size": "0",
"-ingester.ring.heartbeat-period": "1s",
"-ingester.client.report-grpc-codes-in-instrumentation-label-enabled": strconv.FormatBool(testData.ingesterClientReportGRPCStatusCodes),
"-server.report-grpc-codes-in-instrumentation-label-enabled": strconv.FormatBool(testData.serverReportGRPCStatusCodes),
}

flags := mergeFlags(
BlocksStorageFlags(),
BlocksStorageS3Flags(),
baseFlags,
)

// Start dependencies.
consul := e2edb.NewConsul()
minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"])
require.NoError(t, s.StartAndWaitReady(consul, minio))

// Start Mimir components.
distributor := e2emimir.NewDistributor("distributor", consul.NetworkHTTPEndpoint(), flags)
ingester := e2emimir.NewIngester("ingester", consul.NetworkHTTPEndpoint(), flags)
querier := e2emimir.NewQuerier("querier", consul.NetworkHTTPEndpoint(), flags)
require.NoError(t, s.StartAndWaitReady(distributor, ingester, querier))

// Wait until distributor has updated the ring.
require.NoError(t, distributor.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_ring_members"}, e2e.WithLabelMatchers(
labels.MustNewMatcher(labels.MatchEqual, "name", "ingester"),
labels.MustNewMatcher(labels.MatchEqual, "state", "ACTIVE"))))

// Wait until querier has updated the ring.
require.NoError(t, querier.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_ring_members"}, e2e.WithLabelMatchers(
labels.MustNewMatcher(labels.MatchEqual, "name", "ingester"),
labels.MustNewMatcher(labels.MatchEqual, "state", "ACTIVE"))))

client, err := e2emimir.NewClient(distributor.HTTPEndpoint(), querier.HTTPEndpoint(), "", "", userID)
require.NoError(t, err)

res, err := client.Push(series)
require.NoError(t, err)
require.Equal(t, http.StatusOK, res.StatusCode)

sums, err := ingester.SumMetrics(
[]string{"cortex_request_duration_seconds"},
e2e.WithLabelMatchers(
labels.MustNewMatcher(labels.MatchEqual, "route", "/cortex.Ingester/Push"),
labels.MustNewMatcher(labels.MatchEqual, "status_code", testData.expectedPushStatusCode),
),
e2e.SkipMissingMetrics,
e2e.WithMetricCount,
)

require.NoError(t, err)
pushRequests := sums[0]
require.Equal(t, pushRequests, 1.0)

result, err := client.QueryRange(query, queryStart, queryEnd, queryStep)
require.NoError(t, err)
require.Equal(t, expectedQueryResult, result)

queryRequestCount := func(status string) (float64, error) {
counts, err := querier.SumMetrics([]string{"cortex_ingester_client_request_duration_seconds"},
e2e.WithLabelMatchers(
labels.MustNewMatcher(labels.MatchEqual, "operation", "/cortex.Ingester/QueryStream"),
labels.MustNewMatcher(labels.MatchRegexp, "status_code", status),
),
e2e.WithMetricCount,
e2e.SkipMissingMetrics,
)

if err != nil {
return 0, err
}

require.Len(t, counts, 1)
return counts[0], nil
}

successfulQueryRequests, err := queryRequestCount(testData.expectedQueryStatusCode)
require.NoError(t, err)

cancelledQueryRequests, err := queryRequestCount("cancel")
require.NoError(t, err)

totalQueryRequests, err := queryRequestCount(".*")
require.NoError(t, err)

// We expect two query requests: the first query request and the timestamp query request
require.Equalf(t, 1.0, totalQueryRequests, "got %v query requests (%v successful, %v cancelled)", totalQueryRequests, successfulQueryRequests, cancelledQueryRequests)
require.Equalf(t, 1.0, successfulQueryRequests, "got %v query requests (%v successful, %v cancelled)", totalQueryRequests, successfulQueryRequests, cancelledQueryRequests)
require.Equalf(t, 0.0, cancelledQueryRequests, "got %v query requests (%v successful, %v cancelled)", totalQueryRequests, successfulQueryRequests, cancelledQueryRequests)
})
}
}
13 changes: 10 additions & 3 deletions pkg/ingester/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/go-kit/log"
"github.com/grafana/dskit/grpcclient"
"github.com/grafana/dskit/middleware"
"github.com/grafana/dskit/ring"
"google.golang.org/grpc"
"google.golang.org/grpc/health/grpc_health_v1"
Expand All @@ -34,7 +35,11 @@ type closableHealthAndIngesterClient struct {
// MakeIngesterClient makes a new IngesterClient
func MakeIngesterClient(inst ring.InstanceDesc, cfg Config, metrics *Metrics, logger log.Logger) (HealthAndIngesterClient, error) {
logger = log.With(logger, "component", "ingester-client")
unary, stream := grpcclient.Instrument(metrics.requestDuration)
var reportGRPCStatusesOptions []middleware.InstrumentationOption
if cfg.ReportGRPCStatusCodes {
reportGRPCStatusesOptions = []middleware.InstrumentationOption{middleware.ReportGRPCStatusOption}
}
unary, stream := grpcclient.Instrument(metrics.requestDuration, reportGRPCStatusesOptions...)
if cfg.CircuitBreaker.Enabled {
unary = append([]grpc.UnaryClientInterceptor{NewCircuitBreaker(inst, cfg.CircuitBreaker, metrics, logger)}, unary...)
}
Expand Down Expand Up @@ -64,14 +69,16 @@ func (c *closableHealthAndIngesterClient) Close() error {

// Config is the configuration struct for the ingester client
type Config struct {
GRPCClientConfig grpcclient.Config `yaml:"grpc_client_config" doc:"description=Configures the gRPC client used to communicate with ingesters from distributors, queriers and rulers."`
CircuitBreaker CircuitBreakerConfig `yaml:"circuit_breaker"`
GRPCClientConfig grpcclient.Config `yaml:"grpc_client_config" doc:"description=Configures the gRPC client used to communicate with ingesters from distributors, queriers and rulers."`
CircuitBreaker CircuitBreakerConfig `yaml:"circuit_breaker"`
ReportGRPCStatusCodes bool `yaml:"report_grpc_codes_in_instrumentation_label_enabled" category:"advanced"`
}

// RegisterFlags registers configuration settings used by the ingester client config.
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
cfg.GRPCClientConfig.RegisterFlagsWithPrefix("ingester.client", f)
cfg.CircuitBreaker.RegisterFlagsWithPrefix("ingester.client", f)
f.BoolVar(&cfg.ReportGRPCStatusCodes, "ingester.client.report-grpc-codes-in-instrumentation-label-enabled", false, "If set to true, gRPC status codes will be reported in \"status_code\" label of \"cortex_ingester_client_request_duration_seconds\" metric. Otherwise, they will be reported as \"error\"")
}

func (cfg *Config) Validate() error {
Expand Down
6 changes: 3 additions & 3 deletions vendor/github.com/grafana/dskit/grpcclient/instrumentation.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

25 changes: 0 additions & 25 deletions vendor/github.com/grafana/dskit/grpcutil/cancel.go

This file was deleted.

Loading

0 comments on commit c3b7e81

Please sign in to comment.