Skip to content

Commit

Permalink
ingester: implement request size instance limit (#6492)
Browse files Browse the repository at this point in the history
* grpc push check: use size from metadata for ingester push requests.

Signed-off-by: Peter Štibraný <[email protected]>

* Implement inflight bytes check.

Signed-off-by: Peter Štibraný <[email protected]>

* Add test for inflight bytes check.

Signed-off-by: Peter Štibraný <[email protected]>

* changelog entry

Signed-off-by: Peter Štibraný <[email protected]>

* docs, help

Signed-off-by: Peter Štibraný <[email protected]>

* CHANGELOG.md

Signed-off-by: Peter Štibraný <[email protected]>

* Make ctx param first.

Signed-off-by: Peter Štibraný <[email protected]>

* Add documentation for err-mimir-ingester-max-inflight-push-requests-bytes error.

Signed-off-by: Peter Štibraný <[email protected]>

* Make linter happy.

Signed-off-by: Peter Štibraný <[email protected]>

* Make sure to check for inflight bytes even if new request size is not known.

Signed-off-by: Peter Štibraný <[email protected]>

* Check that reporting of cortex_ingester_inflight_push_requests_bytes works.

Signed-off-by: Peter Štibraný <[email protected]>

---------

Signed-off-by: Peter Štibraný <[email protected]>
  • Loading branch information
pstibrany authored Nov 1, 2023
1 parent 1cdfe2c commit 3ca84fe
Show file tree
Hide file tree
Showing 13 changed files with 381 additions and 108 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
* [ENHANCEMENT] Query-frontend: return warnings generated during query evaluation. #6391
* [ENHANCEMENT] Server: Add the option `-server.http-read-header-timeout` to enable specifying a timeout for reading HTTP request headers. It defaults to 0, in which case reading of headers can take up to `-server.http-read-timeout`, leaving no time for reading body, if there's any. #6517
* [ENHANCEMENT] Add connection-string option, `-<prefix>.azure.connection-string`, for Azure Blob Storage. #6487
* [ENHANCEMENT] Ingester: Add `-ingester.instance-limits.max-inflight-push-requests-bytes`. This limit protects the ingester against requests that together may cause an OOM. #6492
* [BUGFIX] Ring: Ensure network addresses used for component hash rings are formatted correctly when using IPv6. #6068
* [BUGFIX] Query-scheduler: don't retain connections from queriers that have shut down, leading to gradually increasing enqueue latency over time. #6100 #6145
* [BUGFIX] Ingester: prevent query logic from continuing to execute after queries are canceled. #6085
Expand Down
11 changes: 11 additions & 0 deletions cmd/mimir/config-descriptor.json
Original file line number Diff line number Diff line change
Expand Up @@ -2876,6 +2876,17 @@
"fieldFlag": "ingester.instance-limits.max-inflight-push-requests",
"fieldType": "int",
"fieldCategory": "advanced"
},
{
"kind": "field",
"name": "max_inflight_push_requests_bytes",
"required": false,
"desc": "The sum of the request sizes in bytes of inflight push requests that this ingester can handle. This limit is per-ingester, not per-tenant. Additional requests will be rejected. 0 = unlimited.",
"fieldValue": null,
"fieldDefaultValue": 0,
"fieldFlag": "ingester.instance-limits.max-inflight-push-requests-bytes",
"fieldType": "int",
"fieldCategory": "advanced"
}
],
"fieldValue": null,
Expand Down
2 changes: 2 additions & 0 deletions cmd/mimir/help-all.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -1299,6 +1299,8 @@ Usage of ./cmd/mimir/mimir:
Comma-separated list of metric names, for which the -ingester.max-global-series-per-metric limit will be ignored. Does not affect the -ingester.max-global-series-per-user limit.
-ingester.instance-limits.max-inflight-push-requests int
Max inflight push requests that this ingester can handle (across all tenants). Additional requests will be rejected. 0 = unlimited. (default 30000)
-ingester.instance-limits.max-inflight-push-requests-bytes int
The sum of the request sizes in bytes of inflight push requests that this ingester can handle. This limit is per-ingester, not per-tenant. Additional requests will be rejected. 0 = unlimited.
-ingester.instance-limits.max-ingestion-rate float
Max ingestion rate (samples/sec) that ingester will accept. This limit is per-ingester, not per-tenant. Additional push requests will be rejected. Current ingestion rate is computed as exponentially weighted moving average, updated every second. 0 = unlimited.
-ingester.instance-limits.max-series int
Expand Down
16 changes: 16 additions & 0 deletions docs/sources/mimir/manage/mimir-runbooks/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -1422,6 +1422,22 @@ How to **fix** it:
- Check the write requests latency through the `Mimir / Writes` dashboard and come back to investigate the root cause of high latency (the higher the latency, the higher the number of in-flight write requests).
- Consider scaling out the ingesters.
### err-mimir-ingester-max-inflight-push-requests-bytes
This error occurs when an ingester rejects a write request because of the maximum size of all in-flight push requests has been reached.
How it **works**:
- The ingester has a per-instance limit on the total size of the in-flight write (push) requests.
- The limit applies to all in-flight write requests, across all tenants, and it protects the ingester from using too much memory for incoming requests in case of high traffic.
- To configure the limit, set the `-ingester.instance-limits.max-inflight-push-requests-bytes` option (or `max_inflight_push_requests_bytes` in the runtime config).
How to **fix** it:
- Increase the limit by setting the `-ingester.instance-limits.max-inflight-push-requests-bytes` option (or `max_inflight_push_requests_bytes` in the runtime config), if possible.
- Check the write requests latency through the `Mimir / Writes` dashboard and come back to investigate the root cause of high latency (the higher the latency, the higher the number of in-flight write requests).
- Consider scaling out the ingesters.
### err-mimir-max-series-per-user
This error occurs when the number of in-memory series for a given tenant exceeds the configured limit.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1094,6 +1094,12 @@ instance_limits:
# CLI flag: -ingester.instance-limits.max-inflight-push-requests
[max_inflight_push_requests: <int> | default = 30000]
# (advanced) The sum of the request sizes in bytes of inflight push requests
# that this ingester can handle. This limit is per-ingester, not per-tenant.
# Additional requests will be rejected. 0 = unlimited.
# CLI flag: -ingester.instance-limits.max-inflight-push-requests-bytes
[max_inflight_push_requests_bytes: <int> | default = 0]
# (advanced) Comma-separated list of metric names, for which the
# -ingester.max-global-series-per-metric limit will be ignored. Does not affect
# the -ingester.max-global-series-per-user limit.
Expand Down
68 changes: 45 additions & 23 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,10 +120,11 @@ const (
)

var (
reasonIngesterMaxIngestionRate = globalerror.IngesterMaxIngestionRate.LabelValue()
reasonIngesterMaxTenants = globalerror.IngesterMaxTenants.LabelValue()
reasonIngesterMaxInMemorySeries = globalerror.IngesterMaxInMemorySeries.LabelValue()
reasonIngesterMaxInflightPushRequests = globalerror.IngesterMaxInflightPushRequests.LabelValue()
reasonIngesterMaxIngestionRate = globalerror.IngesterMaxIngestionRate.LabelValue()
reasonIngesterMaxTenants = globalerror.IngesterMaxTenants.LabelValue()
reasonIngesterMaxInMemorySeries = globalerror.IngesterMaxInMemorySeries.LabelValue()
reasonIngesterMaxInflightPushRequests = globalerror.IngesterMaxInflightPushRequests.LabelValue()
reasonIngesterMaxInflightPushRequestsBytes = globalerror.IngesterMaxInflightPushRequestsBytes.LabelValue()
// This is the closest fitting Prometheus API error code for requests rejected due to limiting.
tooBusyError = newErrorWithHTTPStatus(
errors.New(tooBusyErrorMsg),
Expand Down Expand Up @@ -284,8 +285,9 @@ type Ingester struct {
usersMetadata map[string]*userMetricsMetadata

// Rate of pushed samples. Used to limit global samples push rate.
ingestionRate *util_math.EwmaRate
inflightPushRequests atomic.Int64
ingestionRate *util_math.EwmaRate
inflightPushRequests atomic.Int64
inflightPushRequestsBytes atomic.Int64

// Anonymous usage statistics tracked by ingester.
memorySeriesStats *expvar.Int
Expand Down Expand Up @@ -348,7 +350,7 @@ func New(cfg Config, limits *validation.Overrides, activeGroupsCleanupService *u
return nil, err
}
i.ingestionRate = util_math.NewEWMARate(0.2, instanceIngestionRateTickInterval)
i.metrics = newIngesterMetrics(registerer, cfg.ActiveSeriesMetrics.Enabled, i.getInstanceLimits, i.ingestionRate, &i.inflightPushRequests)
i.metrics = newIngesterMetrics(registerer, cfg.ActiveSeriesMetrics.Enabled, i.getInstanceLimits, i.ingestionRate, &i.inflightPushRequests, &i.inflightPushRequestsBytes)
i.activeGroups = activeGroupsCleanupService

if registerer != nil {
Expand Down Expand Up @@ -408,7 +410,7 @@ func NewForFlusher(cfg Config, limits *validation.Overrides, registerer promethe
if err != nil {
return nil, err
}
i.metrics = newIngesterMetrics(registerer, false, i.getInstanceLimits, nil, &i.inflightPushRequests)
i.metrics = newIngesterMetrics(registerer, false, i.getInstanceLimits, nil, &i.inflightPushRequests, &i.inflightPushRequestsBytes)

i.shipperIngesterID = "flusher"

Expand Down Expand Up @@ -770,40 +772,59 @@ type pushStats struct {
// In the first case, returned errors can be inspected/logged by middleware. Ingester.PushWithCleanup will wrap the error in util_log.DoNotLogError wrapper.
//
// In the second case, returned errors will not be logged, because request will not reach any middleware.
func (i *Ingester) StartPushRequest() error {
func (i *Ingester) StartPushRequest(requestSize int64) error {
if err := i.checkAvailable(); err != nil {
return err
}

inflight := i.inflightPushRequests.Inc()
decreaseInflightInDefer := true
inflightBytes := int64(0)
rejectEqualInflightBytes := false
if requestSize > 0 {
inflightBytes = i.inflightPushRequestsBytes.Add(requestSize)
} else {
inflightBytes = i.inflightPushRequestsBytes.Load()
rejectEqualInflightBytes = true // if inflightBytes == limit, reject new request
}

finishRequestInDefer := true
defer func() {
if decreaseInflightInDefer {
i.inflightPushRequests.Dec()
if finishRequestInDefer {
i.FinishPushRequest(requestSize)
}
}()

il := i.getInstanceLimits()
if il != nil && il.MaxInflightPushRequests > 0 {
if inflight > il.MaxInflightPushRequests {
if il != nil {
if il.MaxInflightPushRequests > 0 && inflight > il.MaxInflightPushRequests {
i.metrics.rejected.WithLabelValues(reasonIngesterMaxInflightPushRequests).Inc()
return errMaxInflightRequestsReached
}
}

if il != nil && il.MaxIngestionRate > 0 {
if rate := i.ingestionRate.Rate(); rate >= il.MaxIngestionRate {
i.metrics.rejected.WithLabelValues(reasonIngesterMaxIngestionRate).Inc()
return errMaxIngestionRateReached
if il.MaxInflightPushRequestsBytes > 0 {
if (rejectEqualInflightBytes && inflightBytes >= il.MaxInflightPushRequestsBytes) || inflightBytes > il.MaxInflightPushRequestsBytes {
i.metrics.rejected.WithLabelValues(reasonIngesterMaxInflightPushRequestsBytes).Inc()
return errMaxInflightRequestsBytesReached
}
}

if il.MaxIngestionRate > 0 {
if rate := i.ingestionRate.Rate(); rate >= il.MaxIngestionRate {
i.metrics.rejected.WithLabelValues(reasonIngesterMaxIngestionRate).Inc()
return errMaxIngestionRateReached
}
}
}

decreaseInflightInDefer = false
finishRequestInDefer = false
return nil
}

func (i *Ingester) FinishPushRequest() {
func (i *Ingester) FinishPushRequest(requestSize int64) {
i.inflightPushRequests.Dec()
if requestSize > 0 {
i.inflightPushRequestsBytes.Sub(requestSize)
}
}

// PushWithCleanup is the Push() implementation for blocks storage and takes a WriteRequest and adds it to the TSDB head.
Expand All @@ -814,10 +835,11 @@ func (i *Ingester) PushWithCleanup(ctx context.Context, req *mimirpb.WriteReques

// If we're using grpc handlers, we don't need to start/finish request here.
if !i.cfg.LimitInflightRequestsUsingGrpcMethodLimiter {
if err := i.StartPushRequest(); err != nil {
reqSize := int64(req.Size())
if err := i.StartPushRequest(reqSize); err != nil {
return middleware.DoNotLogError{Err: err}
}
defer i.FinishPushRequest()
defer i.FinishPushRequest(reqSize)
}

userID, err := tenant.TenantID(ctx)
Expand Down
Loading

0 comments on commit 3ca84fe

Please sign in to comment.