Skip to content

Commit

Permalink
Set shuffle sharding ingester lookback automatically (#2110)
Browse files Browse the repository at this point in the history
This change deprecates the
`querier.shuffle-sharding-ingesters-lookback-period` option, instead
setting the lookback period from the value of the
`querier.query-ingesters-within` option. Outside of our own integration
tests, there's no situation where it's useful to set the lookback period
to a different value than the `query-ingesters-within` option.

This also adds a new option `querier.shuffle-sharding-ingesters-enabled`
that can be used to enable or disable shuffle sharding on ingesters on
the read path instead of using the lookback period for this.

Fixes #1810

Signed-off-by: Nick Pillitteri <[email protected]>
  • Loading branch information
56quarters authored Jun 20, 2022
1 parent e6b8ef6 commit b13d2df
Show file tree
Hide file tree
Showing 14 changed files with 74 additions and 72 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,14 @@
* [CHANGE] Default values have changed for the following settings. This improves query performance for recent data (within 12h) by only reading from ingesters: #1909 #1921
- `-blocks-storage.bucket-store.ignore-blocks-within` now defaults to `10h` (previously `0`)
- `-querier.query-store-after` now defaults to `12h` (previously `0`)
- `-querier.shuffle-sharding-ingesters-lookback-period` now defaults to `13h` (previously `0`)

* [CHANGE] The following settings are now classified as advanced because the defaults should work for most users and tuning them requires in-depth knowledge of how the read path works: #1929
- `-querier.query-ingesters-within`
- `-querier.query-store-after`
* [CHANGE] Config flag category overrides can be set dynamically at runtime. #1934
* [CHANGE] Ingester: deprecated `-ingester.ring.join-after`. Mimir now behaves as this setting is always set to 0s. This configuration option will be removed in Mimir 2.4.0. #1965
* [CHANGE] Blocks uploaded by ingester no longer contain `__org_id__` label. Compactor now ignores this label and will compact blocks with and without this label together. `mimirconvert` tool will remove the label from blocks as "unknown" label. #1972
* [CHANGE] Querier: deprecated `-querier.shuffle-sharding-ingesters-lookback-period`, instead adding `-querier.shuffle-sharding-ingesters-enabled` to enable or disable shuffle sharding on the read path. The value of `-querier.query-ingesters-within` is now used internally for shuffle sharding lookback. #2110
* [ENHANCEMENT] Distributor: Added limit to prevent tenants from sending excessive number of requests: #1843
* The following CLI flags (and their respective YAML config options) have been added:
* `-distributor.request-rate-limit`
Expand Down
10 changes: 5 additions & 5 deletions cmd/mimir/config-descriptor.json
Original file line number Diff line number Diff line change
Expand Up @@ -1491,13 +1491,13 @@
},
{
"kind": "field",
"name": "shuffle_sharding_ingesters_lookback_period",
"name": "shuffle_sharding_ingesters_enabled",
"required": false,
"desc": "When this setting is \u003e 0, queriers fetch in-memory series from the minimum set of required ingesters, selecting only ingesters which may have received series since 'now - lookback period'. The lookback period should be greater or equal than the configured -querier.query-store-after and -querier.query-ingesters-within. If this setting is 0, queriers always query all ingesters (ingesters shuffle sharding on read path is disabled).",
"desc": "Fetch in-memory series from the minimum set of required ingesters, selecting only ingesters which may have received series since -querier.query-ingesters-within. If this setting is false or -querier.query-ingesters-within is '0', queriers always query all ingesters (ingesters shuffle sharding on read path is disabled).",
"fieldValue": null,
"fieldDefaultValue": 46800000000000,
"fieldFlag": "querier.shuffle-sharding-ingesters-lookback-period",
"fieldType": "duration",
"fieldDefaultValue": true,
"fieldFlag": "querier.shuffle-sharding-ingesters-enabled",
"fieldType": "boolean",
"fieldCategory": "advanced"
},
{
Expand Down
4 changes: 2 additions & 2 deletions cmd/mimir/help-all.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -1095,8 +1095,8 @@ Usage of ./cmd/mimir/mimir:
The time after which a metric should be queried from storage and not just ingesters. 0 means all queries are sent to store. If this option is enabled, the time range of the query sent to the store-gateway will be manipulated to ensure the query end is not more recent than 'now - query-store-after'. (default 12h0m0s)
-querier.scheduler-address string
Address of the query-scheduler component, in host:port format. Only one of -querier.frontend-address or -querier.scheduler-address can be set. If neither is set, queries are only received via HTTP endpoint.
-querier.shuffle-sharding-ingesters-lookback-period duration
When this setting is > 0, queriers fetch in-memory series from the minimum set of required ingesters, selecting only ingesters which may have received series since 'now - lookback period'. The lookback period should be greater or equal than the configured -querier.query-store-after and -querier.query-ingesters-within. If this setting is 0, queriers always query all ingesters (ingesters shuffle sharding on read path is disabled). (default 13h0m0s)
-querier.shuffle-sharding-ingesters-enabled
Fetch in-memory series from the minimum set of required ingesters, selecting only ingesters which may have received series since -querier.query-ingesters-within. If this setting is false or -querier.query-ingesters-within is '0', queriers always query all ingesters (ingesters shuffle sharding on read path is disabled). (default true)
-querier.store-gateway-client.tls-ca-path string
Path to the CA certificates file to validate server certificate against. If not set, the host's root CA certificates are used.
-querier.store-gateway-client.tls-cert-path string
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,15 @@ To enable shuffle sharding for ingesters on the write path, configure the follow
Assuming that you have enabled shuffle sharding for the write path, to enable shuffle sharding for ingesters on the read path, configure the following flags (or their respective YAML configuration options) on the querier and ruler:

- `-distributor.ingestion-tenant-shard-size=<size>`
- `-querier.shuffle-sharding-ingesters-lookback-period=<period>`<br />
Queriers and rulers fetch in-memory series from the minimum set of required ingesters, selecting only ingesters which might have received series since 'now - lookback period'.
The configured lookback `<period>` should be:
- greater than or equal to `-querier.query-store-after` and `-querier.query-ingesters-within` and,

The following flags are set appropriately by default to enable shuffle sharding for ingesters on the read path. If you need to modify their defaults:

- `-querier.shuffle-sharding-ingesters-enabled=true`<br />
Shuffle sharding for ingesters on the read path can be explicitly enabled or disabled.
- `-querier.query-ingesters-within=<period>`<br />
Queriers and rulers fetch in-memory series from the minimum set of required ingesters, selecting only ingesters which might have received series since 'now - query ingesters within'. If this period is `0`, shuffle sharding for ingesters on the read path is disabled, which means all ingesters in the Mimir cluster are queried for any tenant.
The configured `<period>` should be:
- greater than `-querier.query-store-after` and,
- greater than the estimated minimum amount of time for the oldest samples stored in a block uploaded by ingester to be discovered and available for querying.
When running Grafana Mimir with the default configuration, the estimated minimum amount of time for the oldest sample in a uploaded block to be available for querying is `3h`.

Expand All @@ -115,23 +120,24 @@ Keeping ingesters shuffle sharding enabled only on the write path does not lead

If you’re running a Grafana Mimir cluster with shuffle sharding disabled, and you want to enable it for the ingesters, use the following rollout strategy to avoid missing querying for any series currently in the ingesters:

1. Explicitly disable ingesters shuffle-sharding on the read path via `-querier.shuffle-sharding-ingesters-enabled=false` since this is enabled by default.
1. Enable ingesters shuffle sharding on the write path.
1. Wait for at least the amount of time specified via `-querier.shuffle-sharding-ingesters-lookback-period`.
1. Enable ingesters shuffle-sharding on the read path.
1. Wait for at least the amount of time specified via `-querier.query-ingesters-within`.
1. Enable ingesters shuffle-sharding on the read path via `-querier.shuffle-sharding-ingesters-enabled=true`.

#### Limitation: Decreasing the tenant shard size

The current shuffle sharding implementation in Grafana Mimir has a limitation that prevents you from safely decreasing the tenant shard size when you enable ingesters’ shuffle sharding on the read path.

If a tenant’s shard decreases in size, there is currently no way for the queriers and rulers to know how large the tenant shard was previously, and as a result, they potentially miss an ingester with data for that tenant.
The lookback mechanism, which is used to select the ingesters that might have received series since 'now - lookback period', doesn't work correctly if the tenant shard size is decreased.
The query-ingesters-within period, which is used to select the ingesters that might have received series since 'now - query ingesters within', doesn't work correctly for finding tenant shards if the tenant shard size is decreased.

Although decreasing the tenant shard size is not supported, consider the following workaround:

1. Disable shuffle sharding on the read path.
1. Disable shuffle sharding on the read path via `-querier.shuffle-sharding-ingesters-enabled=false`.
1. Decrease the configured tenant shard size.
1. Wait for at least the amount of time specified via `-querier.shuffle-sharding-ingesters-lookback-period`.
1. Re-enable shuffle sharding on the read path.
1. Wait for at least the amount of time specified via `-querier.query-ingesters-within`.
1. Re-enable shuffle sharding on the read path via `-querier.shuffle-sharding-ingesters-enabled=true`.

### Query-frontend and query-scheduler shuffle sharding

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -836,14 +836,13 @@ store_gateway_client:
# CLI flag: -querier.store-gateway-client.tls-insecure-skip-verify
[tls_insecure_skip_verify: <boolean> | default = false]
# (advanced) When this setting is > 0, queriers fetch in-memory series from the
# minimum set of required ingesters, selecting only ingesters which may have
# received series since 'now - lookback period'. The lookback period should be
# greater or equal than the configured -querier.query-store-after and
# -querier.query-ingesters-within. If this setting is 0, queriers always query
# all ingesters (ingesters shuffle sharding on read path is disabled).
# CLI flag: -querier.shuffle-sharding-ingesters-lookback-period
[shuffle_sharding_ingesters_lookback_period: <duration> | default = 13h]
# (advanced) Fetch in-memory series from the minimum set of required ingesters,
# selecting only ingesters which may have received series since
# -querier.query-ingesters-within. If this setting is false or
# -querier.query-ingesters-within is '0', queriers always query all ingesters
# (ingesters shuffle sharding on read path is disabled).
# CLI flag: -querier.shuffle-sharding-ingesters-enabled
[shuffle_sharding_ingesters_enabled: <boolean> | default = true]
# The maximum number of concurrent queries. This config option should be set on
# query-frontend too when query sharding is enabled.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ Complete the following steps to scale down ingesters deployed in a single zone.

```
-querier.query-store-after=0s
-querier.shuffle-sharding-ingesters-lookback-period=87600h
-querier.shuffle-sharding-ingesters-enabled=false
```

b. Configure the compactors to frequently update the bucket index:
Expand Down Expand Up @@ -106,10 +106,10 @@ Complete the following steps to scale down ingesters deployed in a single zone.

c. Send a `SIGINT` or `SIGTERM` signal to the process of the ingester to terminate.

d. Wait 10 minutes before proceeding with the next ingester. The temporarily configuration applied guarantees newly uploaded blocks are available for querying within 10 minutes.
d. Wait 10 minutes before proceeding with the next ingester. The temporarily applied configuration guarantees newly uploaded blocks are available for querying within 10 minutes.

1. Wait until the originally configured `-querier.query-store-after` period of time has elapsed since when all ingesters have been shutdown.
1. Revert the temporarily configuration changes done at the beginning of the scale down procedure.
1. Revert the temporary configuration changes done at the beginning of the scale down procedure.

#### Scaling down ingesters deployed in multiple zones

Expand Down
17 changes: 13 additions & 4 deletions integration/ingester_sharding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

func TestIngesterSharding(t *testing.T) {
const numSeriesToPush = 1000
const queryIngestersWithinSecs = 5

tests := map[string]struct {
tenantShardSize int
Expand All @@ -48,11 +49,13 @@ func TestIngesterSharding(t *testing.T) {

flags := BlocksStorageFlags()
flags["-distributor.ingestion-tenant-shard-size"] = strconv.Itoa(testData.tenantShardSize)

// Enable shuffle sharding on read path but not lookback, otherwise all ingesters would be
// queried being just registered.
// We're verifying that shuffle sharding on the read path works so we need to set `query-ingesters-within`
// to a small enough value that they'll have been part of the ring for long enough by the time we attempt
// to query back the values we wrote to them. If they _haven't_ been part of the ring for long enough, the
// query would be sent to all ingesters and our test wouldn't really be testing anything.
flags["-querier.query-store-after"] = "0"
flags["-querier.shuffle-sharding-ingesters-lookback-period"] = "1ns"
flags["-querier.query-ingesters-within"] = fmt.Sprintf("%ds", queryIngestersWithinSecs)
flags["-ingester.ring.heartbeat-period"] = "1s"

// Start dependencies.
consul := e2edb.NewConsul()
Expand All @@ -77,6 +80,11 @@ func TestIngesterSharding(t *testing.T) {
labels.MustNewMatcher(labels.MatchEqual, "name", "ingester"),
labels.MustNewMatcher(labels.MatchEqual, "state", "ACTIVE"))))

// Yes, we're sleeping in this test. We need to make sure that the ingesters have been part
// of the ring long enough before writing metrics to them to ensure that only the shuffle
// sharded ingesters will be queried for them when we go to verify the series written.
time.Sleep((queryIngestersWithinSecs + 1) * time.Second)

// Push series.
now := time.Now()
expectedVectors := map[string]model.Vector{}
Expand Down Expand Up @@ -109,6 +117,7 @@ func TestIngesterSharding(t *testing.T) {
}
}

// Verify that the expected number of ingesters had series (write path).
require.Equal(t, testData.expectedIngestersWithSeries, numIngestersWithSeries)
require.Equal(t, numSeriesToPush, totalIngestedSeries)

Expand Down
6 changes: 2 additions & 4 deletions operations/mimir/shuffle-sharding.libsonnet
Original file line number Diff line number Diff line change
Expand Up @@ -125,10 +125,8 @@
}
) + (
if !($._config.shuffle_sharding.ingester_write_path_enabled && !$._config.shuffle_sharding.ingester_read_path_enabled) then {} else {
// The shuffle-sharding flags in the ruler applies both to read and write path, so we don’t have a way
// to keep it enabled on the write path and disable it only on the read path. However, we can obtain the
// same effect setting the lookback period to a very high value.
'querier.shuffle-sharding-ingesters-lookback-period': '87600h', // 3650 days.
// If shuffle sharding is enabled for the write path but isn't enabled for the read path, Mimir will query all ingesters
'querier.shuffle-sharding-ingesters-enabled': 'false',
}
) + (
if !$._config.shuffle_sharding.store_gateway_enabled then {} else {
Expand Down
2 changes: 1 addition & 1 deletion pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ type Config struct {
// this (and should never use it) but this feature is used by other projects built on top of it
SkipLabelNameValidation bool `yaml:"-"`

// This config is dynamically injected because defined in the querier config.
// This config is dynamically injected because it is defined in the querier config.
ShuffleShardingLookbackPeriod time.Duration `yaml:"-"`

// Limits for distributor
Expand Down
13 changes: 6 additions & 7 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2007,7 +2007,7 @@ func TestDistributor_MetricsMetadata(t *testing.T) {
for testName, testData := range tests {
t.Run(testName, func(t *testing.T) {
// Create distributor
ds, ingesters, _ := prepare(t, prepConfig{
ds, _, _ := prepare(t, prepConfig{
numIngesters: numIngesters,
happyIngesters: numIngesters,
numDistributors: 1,
Expand All @@ -2022,16 +2022,15 @@ func TestDistributor_MetricsMetadata(t *testing.T) {
_, err := ds[0].Push(ctx, req)
require.NoError(t, err)

// Check how many ingesters are queried as part of the shuffle sharding subring.
replicationSet, err := ds[0].GetIngestersForMetadata(ctx)
require.NoError(t, err)
assert.Equal(t, testData.expectedIngesters, len(replicationSet.Instances))

// Assert on metric metadata
metadata, err := ds[0].MetricsMetadata(ctx)
require.NoError(t, err)
assert.Equal(t, 10, len(metadata))

// Check how many ingesters have been queried.
// Due to the quorum the distributor could cancel the last request towards ingesters
// if all other ones are successful, so we're good either has been queried X or X-1
// ingesters.
assert.Contains(t, []int{testData.expectedIngesters, testData.expectedIngesters - 1}, countMockIngestersCalls(ingesters, "MetricsMetadata"))
})
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/mimir/mimir.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func (c *Config) RegisterFlags(f *flag.FlagSet, logger log.Logger) {
c.API.RegisterFlags(f)
c.registerServerFlagsWithChangedDefaultValues(f)
c.Distributor.RegisterFlags(f, logger)
c.Querier.RegisterFlags(f)
c.Querier.RegisterFlags(f, logger)
c.IngesterClient.RegisterFlags(f)
c.Ingester.RegisterFlags(f, logger)
c.Flusher.RegisterFlags(f)
Expand Down
9 changes: 8 additions & 1 deletion pkg/mimir/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,14 @@ func (t *Mimir) initOverridesExporter() (services.Service, error) {

func (t *Mimir) initDistributorService() (serv services.Service, err error) {
t.Cfg.Distributor.DistributorRing.ListenPort = t.Cfg.Server.GRPCListenPort
t.Cfg.Distributor.ShuffleShardingLookbackPeriod = t.Cfg.Querier.ShuffleShardingIngestersLookbackPeriod

// Only enable shuffle sharding on the read path when `query-ingesters-within`
// is non-zero since otherwise we can't determine if an ingester should be part
// of a tenant's shuffle sharding subring (we compare its registration time with
// the lookback period).
if t.Cfg.Querier.ShuffleShardingIngestersEnabled && t.Cfg.Querier.QueryIngestersWithin > 0 {
t.Cfg.Distributor.ShuffleShardingLookbackPeriod = t.Cfg.Querier.QueryIngestersWithin
}

// Check whether the distributor can join the distributors ring, which is
// whenever it's not running as an internal dependency (ie. querier or
Expand Down
Loading

0 comments on commit b13d2df

Please sign in to comment.