Skip to content

Commit

Permalink
Distributor pushes to ingesters through a goroutines pool (#6660)
Browse files Browse the repository at this point in the history
* Update dskit

Signed-off-by: Oleg Zaytsev <[email protected]>

* Add support for workers when pushing to ingesters

Signed-off-by: Oleg Zaytsev <[email protected]>

* Update CHANGELOG.md and about-versioning.md

Signed-off-by: Oleg Zaytsev <[email protected]>

* Update pkg/distributor/distributor.go

Co-authored-by: Nick Pillitteri <[email protected]>

* Rename to distributor.reusable-ingester-push-workers

Signed-off-by: Oleg Zaytsev <[email protected]>

* s/ingesterDoBatchPushGo/ingesterDoBatchPushWorkers/

Signed-off-by: Oleg Zaytsev <[email protected]>

* Test that workers are called, call them, fix svc

Signed-off-by: Oleg Zaytsev <[email protected]>

---------

Signed-off-by: Oleg Zaytsev <[email protected]>
Co-authored-by: Nick Pillitteri <[email protected]>
  • Loading branch information
colega and 56quarters authored Nov 17, 2023
1 parent df61e7c commit 1280630
Show file tree
Hide file tree
Showing 15 changed files with 327 additions and 80 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
* [FEATURE] Add experimental endpoint `/api/v1/cardinality/active_series` to return the set of active series for a given selector. #6536 #6619 #6651 #6667
* [FEATURE] Added `-<prefix>.s3.part-size` flag to configure the S3 minimum file size in bytes used for multipart uploads. #6592
* [FEATURE] Add the experimental `-<prefix>.s3.send-content-md5` flag (defaults to `false`) to configure S3 Put Object requests to send a `Content-MD5` header. Setting this flag is not recommended unless your object storage does not support checksums. #6622
* [FEATURE] Distributor: add an experimental flag `-distributor.reusable-ingester-push-worker` that can be used to pre-allocate a pool of workers to be used to send push requests to the ingesters. #6660
* [ENHANCEMENT] Ingester: exported summary `cortex_ingester_inflight_push_requests_summary` tracking total number of inflight requests in percentile buckets. #5845
* [ENHANCEMENT] Query-scheduler: add `cortex_query_scheduler_enqueue_duration_seconds` metric that records the time taken to enqueue or reject a query request. #5879
* [ENHANCEMENT] Query-frontend: add `cortex_query_frontend_enqueue_duration_seconds` metric that records the time taken to enqueue or reject a query request. When query-scheduler is in use, the metric has the `scheduler_address` label to differentiate the enqueue duration by query-scheduler backend. #5879 #6087 #6120
Expand Down
11 changes: 11 additions & 0 deletions cmd/mimir/config-descriptor.json
Original file line number Diff line number Diff line change
Expand Up @@ -1647,6 +1647,17 @@
"fieldFlag": "distributor.limit-inflight-requests-using-grpc-method-limiter",
"fieldType": "boolean",
"fieldCategory": "experimental"
},
{
"kind": "field",
"name": "reusable_ingester_push_workers",
"required": false,
"desc": "Number of pre-allocated workers used to forward push requests to the ingesters. If 0, no workers will be used and a new goroutine will be spawned for each ingester push request. If not enough workers available, new goroutine will be spawned. (Note: this is a performance optimization, not a limiting feature.)",
"fieldValue": null,
"fieldDefaultValue": 0,
"fieldFlag": "distributor.reusable-ingester-push-workers",
"fieldType": "int",
"fieldCategory": "experimental"
}
],
"fieldValue": null,
Expand Down
2 changes: 2 additions & 0 deletions cmd/mimir/help-all.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -1163,6 +1163,8 @@ Usage of ./cmd/mimir/mimir:
[experimental] Enabled controls inclusion of the Retry-After header in the response: true includes it for client retry guidance, false omits it.
-distributor.retry-after-header.max-backoff-exponent int
[experimental] Sets the upper limit on the number of Retry-Attempt considered for calculation. It caps the Retry-Attempt header without rejecting additional attempts, controlling exponential backoff calculations. For example, when the base-seconds is set to 3 and max-backoff-exponent to 5, the maximum retry duration would be 3 * 2^5 = 96 seconds. (default 5)
-distributor.reusable-ingester-push-workers int
[experimental] Number of pre-allocated workers used to forward push requests to the ingesters. If 0, no workers will be used and a new goroutine will be spawned for each ingester push request. If not enough workers available, new goroutine will be spawned. (Note: this is a performance optimization, not a limiting feature.)
-distributor.ring.consul.acl-token string
ACL Token used to interact with Consul.
-distributor.ring.consul.cas-retry-delay duration
Expand Down
2 changes: 2 additions & 0 deletions docs/sources/mimir/configure/about-versioning.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ The following features are currently experimental:
- `-compactor.ring.heartbeat-period=0`
- `-store-gateway.sharding-ring.heartbeat-period=0`
- `-overrides-exporter.ring.heartbeat-period=0`
- Reusable ingester push worker
- `-distributor.reusable-ingester-push-workers`
- Ingester
- Add variance to chunks end time to spread writing across time (`-blocks-storage.tsdb.head-chunks-end-time-variance`)
- Snapshotting of in-memory TSDB data on disk when shutting down (`-blocks-storage.tsdb.memory-snapshot-on-shutdown`)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -920,6 +920,14 @@ instance_limits:
# (experimental) Use experimental method of limiting push requests.
# CLI flag: -distributor.limit-inflight-requests-using-grpc-method-limiter
[limit_inflight_requests_using_grpc_method_limiter: <boolean> | default = false]
# (experimental) Number of pre-allocated workers used to forward push requests
# to the ingesters. If 0, no workers will be used and a new goroutine will be
# spawned for each ingester push request. If not enough workers available, new
# goroutine will be spawned. (Note: this is a performance optimization, not a
# limiting feature.)
# CLI flag: -distributor.reusable-ingester-push-workers
[reusable_ingester_push_workers: <int> | default = 0]
```

### ingester
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ require (
github.com/golang/snappy v0.0.4
github.com/google/gopacket v1.1.19
github.com/gorilla/mux v1.8.1
github.com/grafana/dskit v0.0.0-20231113093401-0ae8e2e7ba38
github.com/grafana/dskit v0.0.0-20231114140740-f58e02bc4b61
github.com/grafana/e2e v0.1.1
github.com/hashicorp/golang-lru v1.0.2 // indirect
github.com/json-iterator/go v1.1.12
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -538,8 +538,8 @@ github.com/gosimple/slug v1.1.1 h1:fRu/digW+NMwBIP+RmviTK97Ho/bEj/C9swrCspN3D4=
github.com/gosimple/slug v1.1.1/go.mod h1:ER78kgg1Mv0NQGlXiDe57DpCyfbNywXXZ9mIorhxAf0=
github.com/grafana-tools/sdk v0.0.0-20220919052116-6562121319fc h1:PXZQA2WCxe85Tnn+WEvr8fDpfwibmEPgfgFEaC87G24=
github.com/grafana-tools/sdk v0.0.0-20220919052116-6562121319fc/go.mod h1:AHHlOEv1+GGQ3ktHMlhuTUwo3zljV3QJbC0+8o2kn+4=
github.com/grafana/dskit v0.0.0-20231113093401-0ae8e2e7ba38 h1:nHd5vwL0g4zvYFcjGDLAij5EelqhAWM+nxypldn5Wyk=
github.com/grafana/dskit v0.0.0-20231113093401-0ae8e2e7ba38/go.mod h1:8dsy5tQOkeNQyjXpm5mQsbCu3H5uzeBD35MzRQFznKU=
github.com/grafana/dskit v0.0.0-20231114140740-f58e02bc4b61 h1:A/2KuveR2QnGvI54x97OOJaWsBGdzL8V9ZzVi/klMnI=
github.com/grafana/dskit v0.0.0-20231114140740-f58e02bc4b61/go.mod h1:8dsy5tQOkeNQyjXpm5mQsbCu3H5uzeBD35MzRQFznKU=
github.com/grafana/e2e v0.1.1 h1:/b6xcv5BtoBnx8cZnCiey9DbjEc8z7gXHO5edoeRYxc=
github.com/grafana/e2e v0.1.1/go.mod h1:RpNLgae5VT+BUHvPE+/zSypmOXKwEu4t+tnEMS1ATaE=
github.com/grafana/goautoneg v0.0.0-20231010094147-47ce5e72a9ae h1:Yxbw9jKGJVC6qAK5Ubzzb/qZwM6rRMMqaDc/d4Vp3pM=
Expand Down
4 changes: 2 additions & 2 deletions pkg/alertmanager/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func (d *Distributor) doQuorum(userID string, w http.ResponseWriter, r *http.Req
var responses []*httpgrpc.HTTPResponse
var responsesMtx sync.Mutex
grpcHeaders := httpToHttpgrpcHeaders(r.Header)
err = ring.DoBatch(r.Context(), RingOp, d.alertmanagerRing, []uint32{shardByUser(userID)}, func(am ring.InstanceDesc, _ []int) error {
err = ring.DoBatchWithOptions(r.Context(), RingOp, d.alertmanagerRing, []uint32{shardByUser(userID)}, func(am ring.InstanceDesc, _ []int) error {
// Use a background context to make sure all alertmanagers get the request even if we return early.
localCtx := user.InjectOrgID(context.Background(), userID)
sp, localCtx := opentracing.StartSpanFromContext(localCtx, "Distributor.doQuorum")
Expand All @@ -196,7 +196,7 @@ func (d *Distributor) doQuorum(userID string, w http.ResponseWriter, r *http.Req
responsesMtx.Unlock()

return nil
}, func() {})
}, ring.DoBatchOptions{})

if err != nil {
respondFromError(err, w, logger)
Expand Down
4 changes: 2 additions & 2 deletions pkg/alertmanager/multitenant.go
Original file line number Diff line number Diff line change
Expand Up @@ -920,7 +920,7 @@ func (am *MultitenantAlertmanager) ReplicateStateForUser(ctx context.Context, us
level.Debug(am.logger).Log("msg", "message received for replication", "user", userID, "key", part.Key)

selfAddress := am.ringLifecycler.GetInstanceAddr()
err := ring.DoBatch(ctx, RingOp, am.ring, []uint32{shardByUser(userID)}, func(desc ring.InstanceDesc, _ []int) error {
err := ring.DoBatchWithOptions(ctx, RingOp, am.ring, []uint32{shardByUser(userID)}, func(desc ring.InstanceDesc, _ []int) error {
if desc.GetAddr() == selfAddress {
return nil
}
Expand All @@ -942,7 +942,7 @@ func (am *MultitenantAlertmanager) ReplicateStateForUser(ctx context.Context, us
level.Debug(am.logger).Log("msg", "user not found while trying to replicate state", "user", userID, "key", part.Key)
}
return nil
}, func() {})
}, ring.DoBatchOptions{})

return err
}
Expand Down
30 changes: 26 additions & 4 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/concurrency"
"github.com/grafana/dskit/grpcutil"
"github.com/grafana/dskit/httpgrpc"
"github.com/grafana/dskit/instrument"
Expand Down Expand Up @@ -150,6 +151,11 @@ type Distributor struct {

// Pool of []byte used when marshalling write requests.
writeRequestBytePool sync.Pool

// ingesterDoBatchPushWorkers is the Go function passed to ring.DoBatchWithOptions.
// It can be nil, in which case a simple `go f()` will be used.
// See Config.ReusableIngesterPushWorkers on how to configure this.
ingesterDoBatchPushWorkers func(func())
}

// Config contains the configuration required to
Expand Down Expand Up @@ -191,6 +197,7 @@ type Config struct {

WriteRequestsBufferPoolingEnabled bool `yaml:"write_requests_buffer_pooling_enabled" category:"experimental"`
LimitInflightRequestsUsingGrpcMethodLimiter bool `yaml:"limit_inflight_requests_using_grpc_method_limiter" category:"experimental"`
ReusableIngesterPushWorkers int `yaml:"reusable_ingester_push_workers" category:"experimental"`
}

// PushWrapper wraps around a push. It is similar to middleware.Interface.
Expand All @@ -207,6 +214,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet, logger log.Logger) {
f.DurationVar(&cfg.RemoteTimeout, "distributor.remote-timeout", 2*time.Second, "Timeout for downstream ingesters.")
f.BoolVar(&cfg.WriteRequestsBufferPoolingEnabled, "distributor.write-requests-buffer-pooling-enabled", false, "Enable pooling of buffers used for marshaling write requests.")
f.BoolVar(&cfg.LimitInflightRequestsUsingGrpcMethodLimiter, "distributor.limit-inflight-requests-using-grpc-method-limiter", false, "Use experimental method of limiting push requests.")
f.IntVar(&cfg.ReusableIngesterPushWorkers, "distributor.reusable-ingester-push-workers", 0, "Number of pre-allocated workers used to forward push requests to the ingesters. If 0, no workers will be used and a new goroutine will be spawned for each ingester push request. If not enough workers available, new goroutine will be spawned. (Note: this is a performance optimization, not a limiting feature.)")

cfg.DefaultLimits.RegisterFlags(f)
}
Expand Down Expand Up @@ -433,11 +441,22 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove
d.PushWithMiddlewares = d.wrapPushWithMiddlewares(d.push)

subservices = append(subservices, d.ingesterPool, d.activeUsers)

if cfg.ReusableIngesterPushWorkers > 0 {
wp := concurrency.NewReusableGoroutinesPool(cfg.ReusableIngesterPushWorkers)
d.ingesterDoBatchPushWorkers = wp.Go
// Closing the pool doesn't stop the workload it's running, we're doing this just to avoid leaking goroutines in tests.
subservices = append(subservices, services.NewBasicService(
nil,
func(ctx context.Context) error { <-ctx.Done(); return nil },
func(_ error) error { wp.Close(); return nil },
))
}

d.subservices, err = services.NewManager(subservices...)
if err != nil {
return nil, err
}

d.subservicesWatcher = services.NewFailureWatcher()
d.subservicesWatcher.WatchManager(d.subservices)

Expand Down Expand Up @@ -1310,7 +1329,7 @@ func (d *Distributor) push(ctx context.Context, pushReq *Request) error {
localCtx = ingester_client.WithSlabPool(localCtx, slabPool)
}

err = ring.DoBatchWithClientError(ctx, ring.WriteNoExtend, subRing, keys,
err = ring.DoBatchWithOptions(ctx, ring.WriteNoExtend, subRing, keys,
func(ingester ring.InstanceDesc, indexes []int) error {
var timeseriesCount, metadataCount int
for _, i := range indexes {
Expand Down Expand Up @@ -1338,8 +1357,11 @@ func (d *Distributor) push(ctx context.Context, pushReq *Request) error {
}
return err
},
func() { pushReq.CleanUp(); cancel() },
isClientError,
ring.DoBatchOptions{
Cleanup: func() { pushReq.CleanUp(); cancel() },
IsClientError: isClientError,
Go: d.ingesterDoBatchPushWorkers,
},
)

return err
Expand Down
55 changes: 55 additions & 0 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"github.com/prometheus/prometheus/scrape"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
"golang.org/x/exp/slices"
"golang.org/x/time/rate"
"google.golang.org/grpc"
Expand Down Expand Up @@ -134,6 +135,7 @@ func TestDistributor_Push(t *testing.T) {
expectedErrorDetails *mimirpb.WriteErrorDetails
expectedMetrics string
timeOut bool
configure func(*Config)
}{
"A push of no samples shouldn't block or return error, even if ingesters are sad": {
numIngesters: 3,
Expand Down Expand Up @@ -292,6 +294,22 @@ func TestDistributor_Push(t *testing.T) {
cortex_distributor_latest_seen_sample_timestamp_seconds{user="user"} 123456789.009
`,
},
"A push to 3 happy ingesters using batch worker gouroutines should succeed": {
numIngesters: 3,
happyIngesters: 3,
samples: samplesIn{num: 5, startTimestampMs: 123456789000},
metadata: 5,
metricNames: []string{lastSeenTimestamp},
expectedMetrics: `
# HELP cortex_distributor_latest_seen_sample_timestamp_seconds Unix timestamp of latest received sample per user.
# TYPE cortex_distributor_latest_seen_sample_timestamp_seconds gauge
cortex_distributor_latest_seen_sample_timestamp_seconds{user="user"} 123456789.004
`,
configure: func(cfg *Config) {
// 2 workers, so 1 push would need to spawn a new goroutine.
cfg.ReusableIngesterPushWorkers = 2
},
},
} {
t.Run(name, func(t *testing.T) {
limits := &validation.Limits{}
Expand All @@ -305,6 +323,7 @@ func TestDistributor_Push(t *testing.T) {
numDistributors: 1,
limits: limits,
timeOut: tc.timeOut,
configure: tc.configure,
})

request := makeWriteRequest(tc.samples.startTimestampMs, tc.samples.num, tc.metadata, false, true)
Expand Down Expand Up @@ -336,6 +355,42 @@ func TestDistributor_Push(t *testing.T) {
}
}

func TestDistributor_PushWithDoBatchWorkers(t *testing.T) {
limits := &validation.Limits{}
flagext.DefaultValues(limits)
limits.IngestionRate = 20
limits.IngestionBurstSize = 20

ds, _, _ := prepare(t, prepConfig{
numIngesters: 3,
happyIngesters: 3,
numDistributors: 1,
limits: limits,
configure: func(cfg *Config) {
// 2 workers, so 1 push would need to spawn a new goroutine.
cfg.ReusableIngesterPushWorkers = 2
},
})
require.Len(t, ds, 1)
distributor := ds[0]

require.NotNil(t, distributor.ingesterDoBatchPushWorkers)
counter := atomic.NewInt64(0)
originalIngesterDoBatchPushWorkers := distributor.ingesterDoBatchPushWorkers
distributor.ingesterDoBatchPushWorkers = func(f func()) {
counter.Inc()
originalIngesterDoBatchPushWorkers(f)
}

request := makeWriteRequest(123456789000, 3, 5, false, false)
ctx := user.InjectOrgID(context.Background(), "user")
response, err := distributor.Push(ctx, request)

require.NoError(t, err)
require.Equal(t, emptyResponse, response)
require.GreaterOrEqual(t, counter.Load(), int64(3))
}

func TestDistributor_ContextCanceledRequest(t *testing.T) {
now := time.Now()
mtime.NowForce(now)
Expand Down
38 changes: 38 additions & 0 deletions vendor/github.com/grafana/dskit/concurrency/worker.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 1280630

Please sign in to comment.