From 4e9dfa04316954ba977fd2a0a017b8ee6285f382 Mon Sep 17 00:00:00 2001 From: "Grot (@grafanabot)" <43478413+grafanabot@users.noreply.github.com> Date: Tue, 19 Mar 2024 12:27:00 +0000 Subject: [PATCH] Get rid of -querier.prefer-streaming-chunks-from-ingesters (#7639) (#7661) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Get rid of querier.prefer-streaming-chunks-from-ingesters Signed-off-by: Yuri Nikolic * Fixing a failing test Signed-off-by: Yuri Nikolic * Fixing review findings Signed-off-by: Yuri Nikolic * Fixing failing tests Signed-off-by: Yuri Nikolic * Make lint happy Signed-off-by: Yuri Nikolic * Fixing review findings Signed-off-by: Yuri Nikolic --------- Signed-off-by: Yuri Nikolic (cherry picked from commit 8ed42e19e11cee578e49578abeb8b59048fc10c4) Co-authored-by: Đurica Yuri Nikolić --- cmd/mimir/config-descriptor.json | 11 - cmd/mimir/help-all.txt.tmpl | 2 - .../mimir/configure/about-versioning.md | 1 - .../configuration-parameters/index.md | 6 - docs/sources/mimir/release-notes/v2.12.md | 1 + integration/ingester_test.go | 277 +++++++++-------- integration/querier_test.go | 6 +- pkg/distributor/distributor.go | 1 - pkg/distributor/distributor_test.go | 36 ++- pkg/distributor/query.go | 5 +- pkg/distributor/query_ingest_storage_test.go | 7 +- pkg/distributor/query_test.go | 282 +++++++++--------- pkg/ingester/client/chunkcompat.go | 27 ++ pkg/mimir/modules.go | 1 - pkg/querier/querier.go | 2 - 15 files changed, 326 insertions(+), 339 deletions(-) diff --git a/cmd/mimir/config-descriptor.json b/cmd/mimir/config-descriptor.json index 9e772385770..6da7e39905e 100644 --- a/cmd/mimir/config-descriptor.json +++ b/cmd/mimir/config-descriptor.json @@ -1810,17 +1810,6 @@ "fieldType": "boolean", "fieldCategory": "advanced" }, - { - "kind": "field", - "name": "prefer_streaming_chunks_from_ingesters", - "required": false, - "desc": "Request ingesters stream chunks. Ingesters will only respond with a stream of chunks if the target ingester supports this, and this preference will be ignored by ingesters that do not support this.", - "fieldValue": null, - "fieldDefaultValue": true, - "fieldFlag": "querier.prefer-streaming-chunks-from-ingesters", - "fieldType": "boolean", - "fieldCategory": "experimental" - }, { "kind": "field", "name": "prefer_streaming_chunks_from_store_gateways", diff --git a/cmd/mimir/help-all.txt.tmpl b/cmd/mimir/help-all.txt.tmpl index 3bab726ebd8..f61510ad840 100644 --- a/cmd/mimir/help-all.txt.tmpl +++ b/cmd/mimir/help-all.txt.tmpl @@ -1717,8 +1717,6 @@ Usage of ./cmd/mimir/mimir: If true, when querying ingesters, only the minimum required ingesters required to reach quorum will be queried initially, with other ingesters queried only if needed due to failures from the initial set of ingesters. Enabling this option reduces resource consumption for the happy path at the cost of increased latency for the unhappy path. (default true) -querier.minimize-ingester-requests-hedging-delay duration Delay before initiating requests to further ingesters when request minimization is enabled and the initially selected set of ingesters have not all responded. Ignored if -querier.minimize-ingester-requests is not enabled. (default 3s) - -querier.prefer-streaming-chunks-from-ingesters - [experimental] Request ingesters stream chunks. Ingesters will only respond with a stream of chunks if the target ingester supports this, and this preference will be ignored by ingesters that do not support this. (default true) -querier.prefer-streaming-chunks-from-store-gateways [experimental] Request store-gateways stream chunks. Store-gateways will only respond with a stream of chunks if the target store-gateway supports this, and this preference will be ignored by store-gateways that do not support this. -querier.promql-experimental-functions-enabled diff --git a/docs/sources/mimir/configure/about-versioning.md b/docs/sources/mimir/configure/about-versioning.md index 04171800fae..dbdfa1eb9c2 100644 --- a/docs/sources/mimir/configure/about-versioning.md +++ b/docs/sources/mimir/configure/about-versioning.md @@ -120,7 +120,6 @@ The following features are currently experimental: - `-ingester.client.circuit-breaker.cooldown-period` - Querier - Use of Redis cache backend (`-blocks-storage.bucket-store.metadata-cache.backend=redis`) - - Streaming chunks from ingester to querier (`-querier.prefer-streaming-chunks-from-ingesters`) - Streaming chunks from store-gateway to querier (`-querier.prefer-streaming-chunks-from-store-gateways`, `-querier.streaming-chunks-per-store-gateway-buffer-size`) - Limiting queries based on the estimated number of chunks that will be used (`-querier.max-estimated-fetched-chunks-per-query-multiplier`) - Max concurrency for tenant federated queries (`-tenant-federation.max-concurrent`) diff --git a/docs/sources/mimir/configure/configuration-parameters/index.md b/docs/sources/mimir/configure/configuration-parameters/index.md index cffd910f52f..e06e2d564f8 100644 --- a/docs/sources/mimir/configure/configuration-parameters/index.md +++ b/docs/sources/mimir/configure/configuration-parameters/index.md @@ -1291,12 +1291,6 @@ store_gateway_client: # CLI flag: -querier.shuffle-sharding-ingesters-enabled [shuffle_sharding_ingesters_enabled: | default = true] -# (experimental) Request ingesters stream chunks. Ingesters will only respond -# with a stream of chunks if the target ingester supports this, and this -# preference will be ignored by ingesters that do not support this. -# CLI flag: -querier.prefer-streaming-chunks-from-ingesters -[prefer_streaming_chunks_from_ingesters: | default = true] - # (experimental) Request store-gateways stream chunks. Store-gateways will only # respond with a stream of chunks if the target store-gateway supports this, and # this preference will be ignored by store-gateways that do not support this. diff --git a/docs/sources/mimir/release-notes/v2.12.md b/docs/sources/mimir/release-notes/v2.12.md index c91154b7f90..2cbd2aa7dfc 100644 --- a/docs/sources/mimir/release-notes/v2.12.md +++ b/docs/sources/mimir/release-notes/v2.12.md @@ -102,6 +102,7 @@ The default value of the following CLI flags have been changed: The following deprecated configuration options are removed in Grafana Mimir 2.12: - The YAML setting `frontend.cache_unaligned_requests`. +- Experimental CLI flag `-querier.prefer-streaming-chunks-from-ingesters`. The following configuration options are deprecated and will be removed in Grafana Mimir 2.14: diff --git a/integration/ingester_test.go b/integration/ingester_test.go index 06ebeef4c19..abc4e7d6bf7 100644 --- a/integration/ingester_test.go +++ b/integration/ingester_test.go @@ -4,7 +4,6 @@ package integration import ( - "fmt" "net/http" "strconv" "testing" @@ -494,114 +493,13 @@ func TestIngesterQuerying(t *testing.T) { for testName, tc := range testCases { t.Run(testName, func(t *testing.T) { - for _, streamingEnabled := range []bool{true, false} { - t.Run(fmt.Sprintf("streaming enabled: %v", streamingEnabled), func(t *testing.T) { - s, err := e2e.NewScenario(networkName) - require.NoError(t, err) - defer s.Close() - - baseFlags := map[string]string{ - "-distributor.ingestion-tenant-shard-size": "0", - "-ingester.ring.heartbeat-period": "1s", - "-querier.prefer-streaming-chunks-from-ingesters": strconv.FormatBool(streamingEnabled), - } - - flags := mergeFlags( - BlocksStorageFlags(), - BlocksStorageS3Flags(), - baseFlags, - ) - - // Start dependencies. - consul := e2edb.NewConsul() - minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"]) - require.NoError(t, s.StartAndWaitReady(consul, minio)) - - // Start Mimir components. - distributor := e2emimir.NewDistributor("distributor", consul.NetworkHTTPEndpoint(), flags) - ingester := e2emimir.NewIngester("ingester", consul.NetworkHTTPEndpoint(), flags) - querier := e2emimir.NewQuerier("querier", consul.NetworkHTTPEndpoint(), flags) - require.NoError(t, s.StartAndWaitReady(distributor, ingester, querier)) - - // Wait until distributor has updated the ring. - require.NoError(t, distributor.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_ring_members"}, e2e.WithLabelMatchers( - labels.MustNewMatcher(labels.MatchEqual, "name", "ingester"), - labels.MustNewMatcher(labels.MatchEqual, "state", "ACTIVE")))) - - // Wait until querier has updated the ring. - require.NoError(t, querier.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_ring_members"}, e2e.WithLabelMatchers( - labels.MustNewMatcher(labels.MatchEqual, "name", "ingester"), - labels.MustNewMatcher(labels.MatchEqual, "state", "ACTIVE")))) - - client, err := e2emimir.NewClient(distributor.HTTPEndpoint(), querier.HTTPEndpoint(), "", "", userID) - require.NoError(t, err) - - res, err := client.Push(tc.inSeries) - require.NoError(t, err) - require.Equal(t, http.StatusOK, res.StatusCode) - - result, err := client.QueryRange(query, queryStart, queryEnd, queryStep) - require.NoError(t, err) - require.Equal(t, tc.expectedQueryResult, result) - - // The PromQL engine does some special handling for the timestamp() function which previously - // caused queries to fail when streaming chunks was enabled, so check that this regression - // has not been reintroduced. - result, err = client.QueryRange(timestampQuery, queryStart, queryEnd, queryStep) - require.NoError(t, err) - require.Equal(t, tc.expectedTimestampQueryResult, result) - - queryRequestCount := func(status string) (float64, error) { - counts, err := querier.SumMetrics([]string{"cortex_ingester_client_request_duration_seconds"}, - e2e.WithLabelMatchers( - labels.MustNewMatcher(labels.MatchEqual, "operation", "/cortex.Ingester/QueryStream"), - labels.MustNewMatcher(labels.MatchRegexp, "status_code", status), - ), - e2e.WithMetricCount, - e2e.SkipMissingMetrics, - ) - - if err != nil { - return 0, err - } - - require.Len(t, counts, 1) - return counts[0], nil - } - - successfulQueryRequests, err := queryRequestCount("OK") - require.NoError(t, err) - - cancelledQueryRequests, err := queryRequestCount("cancel") - require.NoError(t, err) - - totalQueryRequests, err := queryRequestCount(".*") - require.NoError(t, err) - - // We expect two query requests: the first query request and the timestamp query request - require.Equalf(t, 2.0, totalQueryRequests, "got %v query requests (%v successful, %v cancelled)", totalQueryRequests, successfulQueryRequests, cancelledQueryRequests) - require.Equalf(t, 2.0, successfulQueryRequests, "got %v query requests (%v successful, %v cancelled)", totalQueryRequests, successfulQueryRequests, cancelledQueryRequests) - require.Equalf(t, 0.0, cancelledQueryRequests, "got %v query requests (%v successful, %v cancelled)", totalQueryRequests, successfulQueryRequests, cancelledQueryRequests) - }) - } - }) - } -} - -func TestIngesterQueryingWithRequestMinimization(t *testing.T) { - for _, streamingEnabled := range []bool{true, false} { - t.Run(fmt.Sprintf("streaming enabled: %v", streamingEnabled), func(t *testing.T) { s, err := e2e.NewScenario(networkName) require.NoError(t, err) defer s.Close() baseFlags := map[string]string{ - "-distributor.ingestion-tenant-shard-size": "0", - "-ingester.ring.heartbeat-period": "1s", - "-ingester.ring.zone-awareness-enabled": "true", - "-ingester.ring.replication-factor": "3", - "-querier.minimize-ingester-requests": "true", - "-querier.prefer-streaming-chunks-from-ingesters": strconv.FormatBool(streamingEnabled), + "-distributor.ingestion-tenant-shard-size": "0", + "-ingester.ring.heartbeat-period": "1s", } flags := mergeFlags( @@ -615,68 +513,159 @@ func TestIngesterQueryingWithRequestMinimization(t *testing.T) { minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"]) require.NoError(t, s.StartAndWaitReady(consul, minio)) - ingesterFlags := func(zone string) map[string]string { - return mergeFlags(flags, map[string]string{ - "-ingester.ring.instance-availability-zone": zone, - }) - } - // Start Mimir components. distributor := e2emimir.NewDistributor("distributor", consul.NetworkHTTPEndpoint(), flags) - ingester1 := e2emimir.NewIngester("ingester-1", consul.NetworkHTTPEndpoint(), ingesterFlags("zone-a")) - ingester2 := e2emimir.NewIngester("ingester-2", consul.NetworkHTTPEndpoint(), ingesterFlags("zone-b")) - ingester3 := e2emimir.NewIngester("ingester-3", consul.NetworkHTTPEndpoint(), ingesterFlags("zone-c")) + ingester := e2emimir.NewIngester("ingester", consul.NetworkHTTPEndpoint(), flags) querier := e2emimir.NewQuerier("querier", consul.NetworkHTTPEndpoint(), flags) - require.NoError(t, s.StartAndWaitReady(distributor, ingester1, ingester2, ingester3, querier)) + require.NoError(t, s.StartAndWaitReady(distributor, ingester, querier)) - // Wait until distributor and querier have updated the ring. - for _, component := range []*e2emimir.MimirService{distributor, querier} { - require.NoError(t, component.WaitSumMetricsWithOptions(e2e.Equals(3), []string{"cortex_ring_members"}, e2e.WithLabelMatchers( - labels.MustNewMatcher(labels.MatchEqual, "name", "ingester"), - labels.MustNewMatcher(labels.MatchEqual, "state", "ACTIVE")))) - } + // Wait until distributor has updated the ring. + require.NoError(t, distributor.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_ring_members"}, e2e.WithLabelMatchers( + labels.MustNewMatcher(labels.MatchEqual, "name", "ingester"), + labels.MustNewMatcher(labels.MatchEqual, "state", "ACTIVE")))) + + // Wait until querier has updated the ring. + require.NoError(t, querier.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_ring_members"}, e2e.WithLabelMatchers( + labels.MustNewMatcher(labels.MatchEqual, "name", "ingester"), + labels.MustNewMatcher(labels.MatchEqual, "state", "ACTIVE")))) client, err := e2emimir.NewClient(distributor.HTTPEndpoint(), querier.HTTPEndpoint(), "", "", userID) require.NoError(t, err) - // Push some data to the cluster. - seriesName := "test_series" - now := time.Now() - series, expectedVector, _ := generateFloatSeries(seriesName, now, prompb.Label{Name: "foo", Value: "bar"}) - - res, err := client.Push(series) + res, err := client.Push(tc.inSeries) require.NoError(t, err) - require.Equal(t, 200, res.StatusCode) + require.Equal(t, http.StatusOK, res.StatusCode) - // Verify we can query the data we just pushed. - queryResult, err := client.Query(seriesName, now) + result, err := client.QueryRange(query, queryStart, queryEnd, queryStep) require.NoError(t, err) - require.Equal(t, model.ValVector, queryResult.Type()) - require.Equal(t, expectedVector, queryResult.(model.Vector)) + require.Equal(t, tc.expectedQueryResult, result) - // Check that we only queried two of the three ingesters. - totalQueryRequests := 0.0 + // The PromQL engine does some special handling for the timestamp() function which previously + // caused queries to fail when streaming chunks was enabled, so check that this regression + // has not been reintroduced. + result, err = client.QueryRange(timestampQuery, queryStart, queryEnd, queryStep) + require.NoError(t, err) + require.Equal(t, tc.expectedTimestampQueryResult, result) - for _, ingester := range []*e2emimir.MimirService{ingester1, ingester2, ingester3} { - sums, err := ingester.SumMetrics( - []string{"cortex_request_duration_seconds"}, + queryRequestCount := func(status string) (float64, error) { + counts, err := querier.SumMetrics([]string{"cortex_ingester_client_request_duration_seconds"}, e2e.WithLabelMatchers( - labels.MustNewMatcher(labels.MatchEqual, "route", "/cortex.Ingester/QueryStream"), - labels.MustNewMatcher(labels.MatchEqual, "status_code", "OK"), + labels.MustNewMatcher(labels.MatchEqual, "operation", "/cortex.Ingester/QueryStream"), + labels.MustNewMatcher(labels.MatchRegexp, "status_code", status), ), - e2e.SkipMissingMetrics, e2e.WithMetricCount, + e2e.SkipMissingMetrics, ) - require.NoError(t, err) - queryRequests := sums[0] - require.LessOrEqual(t, queryRequests, 1.0) - totalQueryRequests += queryRequests + if err != nil { + return 0, err + } + + require.Len(t, counts, 1) + return counts[0], nil } - require.Equal(t, 2.0, totalQueryRequests) + successfulQueryRequests, err := queryRequestCount("OK") + require.NoError(t, err) + + cancelledQueryRequests, err := queryRequestCount("cancel") + require.NoError(t, err) + + totalQueryRequests, err := queryRequestCount(".*") + require.NoError(t, err) + + // We expect two query requests: the first query request and the timestamp query request + require.Equalf(t, 2.0, totalQueryRequests, "got %v query requests (%v successful, %v cancelled)", totalQueryRequests, successfulQueryRequests, cancelledQueryRequests) + require.Equalf(t, 2.0, successfulQueryRequests, "got %v query requests (%v successful, %v cancelled)", totalQueryRequests, successfulQueryRequests, cancelledQueryRequests) + require.Equalf(t, 0.0, cancelledQueryRequests, "got %v query requests (%v successful, %v cancelled)", totalQueryRequests, successfulQueryRequests, cancelledQueryRequests) + }) + } +} + +func TestIngesterQueryingWithRequestMinimization(t *testing.T) { + s, err := e2e.NewScenario(networkName) + require.NoError(t, err) + defer s.Close() + + baseFlags := map[string]string{ + "-distributor.ingestion-tenant-shard-size": "0", + "-ingester.ring.heartbeat-period": "1s", + "-ingester.ring.zone-awareness-enabled": "true", + "-ingester.ring.replication-factor": "3", + "-querier.minimize-ingester-requests": "true", + } + + flags := mergeFlags( + BlocksStorageFlags(), + BlocksStorageS3Flags(), + baseFlags, + ) + + // Start dependencies. + consul := e2edb.NewConsul() + minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"]) + require.NoError(t, s.StartAndWaitReady(consul, minio)) + + ingesterFlags := func(zone string) map[string]string { + return mergeFlags(flags, map[string]string{ + "-ingester.ring.instance-availability-zone": zone, }) } + + // Start Mimir components. + distributor := e2emimir.NewDistributor("distributor", consul.NetworkHTTPEndpoint(), flags) + ingester1 := e2emimir.NewIngester("ingester-1", consul.NetworkHTTPEndpoint(), ingesterFlags("zone-a")) + ingester2 := e2emimir.NewIngester("ingester-2", consul.NetworkHTTPEndpoint(), ingesterFlags("zone-b")) + ingester3 := e2emimir.NewIngester("ingester-3", consul.NetworkHTTPEndpoint(), ingesterFlags("zone-c")) + querier := e2emimir.NewQuerier("querier", consul.NetworkHTTPEndpoint(), flags) + require.NoError(t, s.StartAndWaitReady(distributor, ingester1, ingester2, ingester3, querier)) + + // Wait until distributor and querier have updated the ring. + for _, component := range []*e2emimir.MimirService{distributor, querier} { + require.NoError(t, component.WaitSumMetricsWithOptions(e2e.Equals(3), []string{"cortex_ring_members"}, e2e.WithLabelMatchers( + labels.MustNewMatcher(labels.MatchEqual, "name", "ingester"), + labels.MustNewMatcher(labels.MatchEqual, "state", "ACTIVE")))) + } + + client, err := e2emimir.NewClient(distributor.HTTPEndpoint(), querier.HTTPEndpoint(), "", "", userID) + require.NoError(t, err) + + // Push some data to the cluster. + seriesName := "test_series" + now := time.Now() + series, expectedVector, _ := generateFloatSeries(seriesName, now, prompb.Label{Name: "foo", Value: "bar"}) + + res, err := client.Push(series) + require.NoError(t, err) + require.Equal(t, 200, res.StatusCode) + + // Verify we can query the data we just pushed. + queryResult, err := client.Query(seriesName, now) + require.NoError(t, err) + require.Equal(t, model.ValVector, queryResult.Type()) + require.Equal(t, expectedVector, queryResult.(model.Vector)) + + // Check that we only queried two of the three ingesters. + totalQueryRequests := 0.0 + + for _, ingester := range []*e2emimir.MimirService{ingester1, ingester2, ingester3} { + sums, err := ingester.SumMetrics( + []string{"cortex_request_duration_seconds"}, + e2e.WithLabelMatchers( + labels.MustNewMatcher(labels.MatchEqual, "route", "/cortex.Ingester/QueryStream"), + labels.MustNewMatcher(labels.MatchEqual, "status_code", "OK"), + ), + e2e.SkipMissingMetrics, + e2e.WithMetricCount, + ) + + require.NoError(t, err) + queryRequests := sums[0] + require.LessOrEqual(t, queryRequests, 1.0) + totalQueryRequests += queryRequests + } + + require.Equal(t, 2.0, totalQueryRequests) } func TestIngesterReportGRPCStatusCodes(t *testing.T) { diff --git a/integration/querier_test.go b/integration/querier_test.go index f30d3481b88..46c17cb39c6 100644 --- a/integration/querier_test.go +++ b/integration/querier_test.go @@ -61,7 +61,7 @@ func testQuerierWithBlocksStorageRunningInMicroservicesMode(t *testing.T, stream tenantShardSize: 1, indexCacheBackend: tsdb.IndexCacheBackendMemcached, }, - "shard size 1, ingester gRPC streaming enabled, memcached index cache, query sharding enabled": { + "shard size 1, memcached index cache, query sharding enabled": { tenantShardSize: 1, indexCacheBackend: tsdb.IndexCacheBackendMemcached, queryShardingEnabled: true, @@ -158,7 +158,6 @@ func testQuerierWithBlocksStorageRunningInMicroservicesMode(t *testing.T, stream "-store-gateway.tenant-shard-size": fmt.Sprintf("%d", testCfg.tenantShardSize), "-query-frontend.query-stats-enabled": "true", "-query-frontend.parallelize-shardable-queries": strconv.FormatBool(testCfg.queryShardingEnabled), - "-querier.prefer-streaming-chunks-from-ingesters": strconv.FormatBool(streamingEnabled), "-querier.prefer-streaming-chunks-from-store-gateways": strconv.FormatBool(streamingEnabled), }) @@ -852,7 +851,7 @@ func TestQueryLimitsWithBlocksStorageRunningInMicroServices(t *testing.T) { const blockRangePeriod = 5 * time.Second for _, streamingEnabled := range []bool{true, false} { - t.Run(fmt.Sprintf("streaming enabled: %v", streamingEnabled), func(t *testing.T) { + t.Run(fmt.Sprintf("store-gateway streaming enabled: %v", streamingEnabled), func(t *testing.T) { s, err := e2e.NewScenario(networkName) require.NoError(t, err) defer s.Close() @@ -865,7 +864,6 @@ func TestQueryLimitsWithBlocksStorageRunningInMicroServices(t *testing.T) { "-blocks-storage.bucket-store.sync-interval": "1s", "-blocks-storage.tsdb.retention-period": ((blockRangePeriod * 2) - 1).String(), "-querier.max-fetched-series-per-query": "3", - "-querier.prefer-streaming-chunks-from-ingesters": strconv.FormatBool(streamingEnabled), "-querier.prefer-streaming-chunks-from-store-gateways": strconv.FormatBool(streamingEnabled), }) diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index ead23d71e35..175ab6298c9 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -193,7 +193,6 @@ type Config struct { // This config is dynamically injected because it is defined in the querier config. ShuffleShardingLookbackPeriod time.Duration `yaml:"-"` - PreferStreamingChunksFromIngesters bool `yaml:"-"` StreamingChunksPerIngesterSeriesBufferSize uint64 `yaml:"-"` MinimizeIngesterRequests bool `yaml:"-"` MinimiseIngesterRequestsHedgingDelay time.Duration `yaml:"-"` diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index 52d96b5db47..1fafa9d9ddd 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -1196,7 +1196,7 @@ func TestDistributor_PushQuery(t *testing.T) { var m model.Matrix if len(resp.Chunkseries) == 0 { - m, err = client.TimeSeriesChunksToMatrix(0, 10, nil) + m, err = client.StreamingSeriesToMatrix(0, 10, resp.StreamingSeries) } else { m, err = client.TimeSeriesChunksToMatrix(0, 10, resp.Chunkseries) } @@ -4835,6 +4835,12 @@ type prepConfig struct { ingestStorageEnabled bool ingestStoragePartitions int32 // Number of partitions. Auto-detected from configured ingesters if not explicitly set. ingestStorageKafka *kfake.Cluster + + // We need this setting to simulate a response from ingesters that didn't support responding + // with a stream of chunks, and were responding with chunk series instead. This is needed to + // ensure backwards compatibility, i.e., that queriers can still correctly handle both types + // or responses. + disableStreamingResponse bool } // totalIngesters takes into account ingesterStateByZone and numIngesters. @@ -4973,6 +4979,7 @@ func prepareIngesterZone(t testing.TB, zone string, state ingesterZoneState, cfg labelNamesStreamResponseDelay: labelNamesStreamResponseDelay, timeOut: cfg.timeOut, circuitBreakerOpen: cfg.circuitBreakerOpen, + disableStreamingResponse: cfg.disableStreamingResponse, } // Init the partition reader if the ingest storage is enabled. @@ -5563,6 +5570,7 @@ type mockIngester struct { tokens []uint32 id int circuitBreakerOpen bool + disableStreamingResponse bool // partitionReader is responsible to consume a partition from Kafka when the // ingest storage is enabled. This field is nil if the ingest storage is disabled. @@ -5801,7 +5809,16 @@ func (i *mockIngester) QueryStream(ctx context.Context, req *client.QueryRequest } } - if req.StreamingChunksBatchSize > 0 { + if i.disableStreamingResponse || req.StreamingChunksBatchSize == 0 { + nonStreamingResponses = append(nonStreamingResponses, &client.QueryStreamResponse{ + Chunkseries: []client.TimeSeriesChunk{ + { + Labels: ts.Labels, + Chunks: wireChunks, + }, + }, + }) + } else { streamingLabelResponses = append(streamingLabelResponses, &client.QueryStreamResponse{ StreamingSeries: []client.QueryStreamSeries{ { @@ -5819,28 +5836,19 @@ func (i *mockIngester) QueryStream(ctx context.Context, req *client.QueryRequest }, }, }) - } else { - nonStreamingResponses = append(nonStreamingResponses, &client.QueryStreamResponse{ - Chunkseries: []client.TimeSeriesChunk{ - { - Labels: ts.Labels, - Chunks: wireChunks, - }, - }, - }) } } var results []*client.QueryStreamResponse - if req.StreamingChunksBatchSize > 0 { + if i.disableStreamingResponse { + results = nonStreamingResponses + } else { endOfLabelsMessage := &client.QueryStreamResponse{ IsEndOfSeriesStream: true, } results = append(streamingLabelResponses, endOfLabelsMessage) results = append(results, streamingChunkResponses...) - } else { - results = nonStreamingResponses } return &stream{ diff --git a/pkg/distributor/query.go b/pkg/distributor/query.go index 5421661e49b..4527ea475e5 100644 --- a/pkg/distributor/query.go +++ b/pkg/distributor/query.go @@ -84,10 +84,7 @@ func (d *Distributor) QueryStream(ctx context.Context, queryMetrics *stats.Query if err != nil { return err } - - if d.cfg.PreferStreamingChunksFromIngesters { - req.StreamingChunksBatchSize = d.cfg.StreamingChunksPerIngesterSeriesBufferSize - } + req.StreamingChunksBatchSize = d.cfg.StreamingChunksPerIngesterSeriesBufferSize replicationSets, err := d.getIngesterReplicationSetsForQuery(ctx) if err != nil { diff --git a/pkg/distributor/query_ingest_storage_test.go b/pkg/distributor/query_ingest_storage_test.go index 57ed717a853..8f0a726c00a 100644 --- a/pkg/distributor/query_ingest_storage_test.go +++ b/pkg/distributor/query_ingest_storage_test.go @@ -12,7 +12,6 @@ import ( "github.com/grafana/dskit/ring" "github.com/grafana/dskit/test" "github.com/grafana/dskit/user" - "github.com/prometheus/client_golang/prometheus/testutil" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" "github.com/stretchr/testify/assert" @@ -534,7 +533,7 @@ func TestDistributor_QueryStream_ShouldSupportIngestStorage(t *testing.T) { var responseMatrix model.Matrix if len(resp.Chunkseries) == 0 { - responseMatrix, err = ingester_client.TimeSeriesChunksToMatrix(0, 5, nil) + responseMatrix, err = ingester_client.StreamingSeriesToMatrix(0, 5, resp.StreamingSeries) } else { responseMatrix, err = ingester_client.TimeSeriesChunksToMatrix(0, 5, resp.Chunkseries) } @@ -544,10 +543,6 @@ func TestDistributor_QueryStream_ShouldSupportIngestStorage(t *testing.T) { // Check how many ingesters have been queried. // Because we return immediately on failures, it might take some time for all ingester calls to register. test.Poll(t, 4*cfg.queryDelay, testData.expectedQueriedIngesters, func() any { return countMockIngestersCalls(ingesters, "QueryStream") }) - - // We expected the number of non-deduplicated chunks to be equal to the number of queried series - // given we expect 1 chunk per series. - assert.Equal(t, float64(testData.expectedResponse.Len()), testutil.ToFloat64(queryMetrics.IngesterChunksTotal)-testutil.ToFloat64(queryMetrics.IngesterChunksDeduplicated)) }) } } diff --git a/pkg/distributor/query_test.go b/pkg/distributor/query_test.go index c0e82958d2f..274c9ea2407 100644 --- a/pkg/distributor/query_test.go +++ b/pkg/distributor/query_test.go @@ -184,157 +184,155 @@ func TestDistributor_QueryStream_ShouldReturnErrorIfMaxChunksPerQueryLimitIsReac for name, testCase := range testCases { t.Run(name, func(t *testing.T) { - for _, streamingEnabled := range []bool{true, false} { - t.Run(fmt.Sprintf("streaming enabled: %v", streamingEnabled), func(t *testing.T) { - for _, minimizeIngesterRequests := range []bool{true, false} { - t.Run(fmt.Sprintf("request minimization enabled: %v", minimizeIngesterRequests), func(t *testing.T) { - userCtx := user.InjectOrgID(context.Background(), "user") - limits := prepareDefaultLimits() - limits.MaxChunksPerQuery = limit - - // Prepare distributors. - ds, ingesters, reg, _ := prepare(t, prepConfig{ - numIngesters: 3, - happyIngesters: 3, - numDistributors: 1, - limits: limits, - configure: func(config *Config) { - config.PreferStreamingChunksFromIngesters = streamingEnabled - config.MinimizeIngesterRequests = minimizeIngesterRequests - }, - }) - - // Push a number of series below the max chunks limit. Each series has 1 sample, - // so expect 1 chunk per series when querying back. - initialSeries := limit / 3 - writeReq := makeWriteRequest(0, initialSeries, 0, false, false, "foo") - writeRes, err := ds[0].Push(userCtx, writeReq) - require.Equal(t, &mimirpb.WriteResponse{}, writeRes) - require.Nil(t, err) - - allSeriesMatchers := []*labels.Matcher{ - labels.MustNewMatcher(labels.MatchRegexp, model.MetricNameLabel, ".+"), - } - - queryCtx := limiter.AddQueryLimiterToContext(userCtx, limiter.NewQueryLimiter(0, 0, testCase.maxChunksLimit, testCase.maxEstimatedChunksLimit, stats.NewQueryMetrics(prometheus.NewPedanticRegistry()))) - queryMetrics := stats.NewQueryMetrics(reg[0]) - - // Since the number of series (and thus chunks) is equal to the limit (but doesn't - // exceed it), we expect a query running on all series to succeed. - queryRes, err := ds[0].QueryStream(queryCtx, queryMetrics, math.MinInt32, math.MaxInt32, allSeriesMatchers...) - require.NoError(t, err) - - if streamingEnabled { - require.Len(t, queryRes.StreamingSeries, initialSeries) - } else { - require.Len(t, queryRes.Chunkseries, initialSeries) - } - - firstRequestIngesterQueryCount := countCalls(ingesters, "QueryStream") - - if minimizeIngesterRequests { - require.LessOrEqual(t, firstRequestIngesterQueryCount, 2, "should not call third ingester if request minimisation is enabled and first two ingesters return a successful response") - } - - // Push more series to exceed the limit once we'll query back all series. - writeReq = &mimirpb.WriteRequest{} - for i := 0; i < limit; i++ { - writeReq.Timeseries = append(writeReq.Timeseries, - makeTimeseries([]string{model.MetricNameLabel, fmt.Sprintf("another_series_%d", i)}, makeSamples(0, 0), nil), - ) - } - - writeRes, err = ds[0].Push(userCtx, writeReq) - require.Equal(t, &mimirpb.WriteResponse{}, writeRes) - require.Nil(t, err) - - // Reset the query limiter in the context. - queryCtx = limiter.AddQueryLimiterToContext(userCtx, limiter.NewQueryLimiter(0, 0, testCase.maxChunksLimit, testCase.maxEstimatedChunksLimit, stats.NewQueryMetrics(prometheus.NewPedanticRegistry()))) - - // Since the number of series (and thus chunks) is exceeding to the limit, we expect - // a query running on all series to fail. - _, err = ds[0].QueryStream(queryCtx, queryMetrics, math.MinInt32, math.MaxInt32, allSeriesMatchers...) - require.Error(t, err) - require.ErrorContains(t, err, testCase.expectedError) - - if minimizeIngesterRequests { - secondRequestIngesterQueryCallCount := countCalls(ingesters, "QueryStream") - firstRequestIngesterQueryCount - require.LessOrEqual(t, secondRequestIngesterQueryCallCount, 2, "should not call third ingester if request minimisation is enabled and either of first two ingesters fail with limits error") - } - }) + for _, minimizeIngesterRequests := range []bool{true, false} { + t.Run(fmt.Sprintf("request minimization enabled: %v", minimizeIngesterRequests), func(t *testing.T) { + userCtx := user.InjectOrgID(context.Background(), "user") + limits := prepareDefaultLimits() + limits.MaxChunksPerQuery = limit + + // Prepare distributors. + ds, ingesters, reg, _ := prepare(t, prepConfig{ + numIngesters: 3, + happyIngesters: 3, + numDistributors: 1, + limits: limits, + configure: func(config *Config) { + config.MinimizeIngesterRequests = minimizeIngesterRequests + }, + }) + + // Push a number of series below the max chunks limit. Each series has 1 sample, + // so expect 1 chunk per series when querying back. + initialSeries := limit / 3 + writeReq := makeWriteRequest(0, initialSeries, 0, false, false, "foo") + writeRes, err := ds[0].Push(userCtx, writeReq) + require.Equal(t, &mimirpb.WriteResponse{}, writeRes) + require.Nil(t, err) + + allSeriesMatchers := []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchRegexp, model.MetricNameLabel, ".+"), } - }) - } - }) - } -} -func TestDistributor_QueryStream_ShouldReturnErrorIfMaxSeriesPerQueryLimitIsReached(t *testing.T) { - const maxSeriesLimit = 10 + queryCtx := limiter.AddQueryLimiterToContext(userCtx, limiter.NewQueryLimiter(0, 0, testCase.maxChunksLimit, testCase.maxEstimatedChunksLimit, stats.NewQueryMetrics(prometheus.NewPedanticRegistry()))) + queryMetrics := stats.NewQueryMetrics(reg[0]) - for _, minimizeIngesterRequests := range []bool{true, false} { - t.Run(fmt.Sprintf("request minimization enabled: %v", minimizeIngesterRequests), func(t *testing.T) { - userCtx := user.InjectOrgID(context.Background(), "user") - limits := prepareDefaultLimits() + // Since the number of series (and thus chunks) is equal to the limit (but doesn't + // exceed it), we expect a query running on all series to succeed. + queryRes, err := ds[0].QueryStream(queryCtx, queryMetrics, math.MinInt32, math.MaxInt32, allSeriesMatchers...) + require.NoError(t, err) - // Prepare distributors. - ds, ingesters, reg, _ := prepare(t, prepConfig{ - numIngesters: 3, - happyIngesters: 3, - numDistributors: 1, - limits: limits, - configure: func(config *Config) { - config.MinimizeIngesterRequests = minimizeIngesterRequests - }, - }) + require.Len(t, queryRes.StreamingSeries, initialSeries) - // Push a number of series below the max series limit. - initialSeries := maxSeriesLimit - writeReq := makeWriteRequest(0, initialSeries, 0, false, true, "foo") - writeRes, err := ds[0].Push(userCtx, writeReq) - assert.Equal(t, &mimirpb.WriteResponse{}, writeRes) - assert.Nil(t, err) + firstRequestIngesterQueryCount := countCalls(ingesters, "QueryStream") - allSeriesMatchers := []*labels.Matcher{ - labels.MustNewMatcher(labels.MatchRegexp, model.MetricNameLabel, ".+"), - } + if minimizeIngesterRequests { + require.LessOrEqual(t, firstRequestIngesterQueryCount, 2, "should not call third ingester if request minimisation is enabled and first two ingesters return a successful response") + } - queryMetrics := stats.NewQueryMetrics(reg[0]) + // Push more series to exceed the limit once we'll query back all series. + writeReq = &mimirpb.WriteRequest{} + for i := 0; i < limit; i++ { + writeReq.Timeseries = append(writeReq.Timeseries, + makeTimeseries([]string{model.MetricNameLabel, fmt.Sprintf("another_series_%d", i)}, makeSamples(0, 0), nil), + ) + } + + writeRes, err = ds[0].Push(userCtx, writeReq) + require.Equal(t, &mimirpb.WriteResponse{}, writeRes) + require.Nil(t, err) - // Since the number of series is equal to the limit (but doesn't - // exceed it), we expect a query running on all series to succeed. - queryCtx := limiter.AddQueryLimiterToContext(userCtx, limiter.NewQueryLimiter(maxSeriesLimit, 0, 0, 0, stats.NewQueryMetrics(prometheus.NewPedanticRegistry()))) - queryRes, err := ds[0].QueryStream(queryCtx, queryMetrics, math.MinInt32, math.MaxInt32, allSeriesMatchers...) - require.NoError(t, err) - assert.Len(t, queryRes.Chunkseries, initialSeries) + // Reset the query limiter in the context. + queryCtx = limiter.AddQueryLimiterToContext(userCtx, limiter.NewQueryLimiter(0, 0, testCase.maxChunksLimit, testCase.maxEstimatedChunksLimit, stats.NewQueryMetrics(prometheus.NewPedanticRegistry()))) - firstRequestIngesterQueryCount := countCalls(ingesters, "QueryStream") + // Since the number of series (and thus chunks) is exceeding to the limit, we expect + // a query running on all series to fail. + _, err = ds[0].QueryStream(queryCtx, queryMetrics, math.MinInt32, math.MaxInt32, allSeriesMatchers...) + require.Error(t, err) + require.ErrorContains(t, err, testCase.expectedError) - if minimizeIngesterRequests { - require.LessOrEqual(t, firstRequestIngesterQueryCount, 2, "should not call third ingester if request minimisation is enabled and first two ingesters return a successful response") + if minimizeIngesterRequests { + secondRequestIngesterQueryCallCount := countCalls(ingesters, "QueryStream") - firstRequestIngesterQueryCount + require.LessOrEqual(t, secondRequestIngesterQueryCallCount, 2, "should not call third ingester if request minimisation is enabled and either of first two ingesters fail with limits error") + } + }) } + }) + } +} + +func TestDistributor_QueryStream_ShouldReturnErrorIfMaxSeriesPerQueryLimitIsReached(t *testing.T) { + const maxSeriesLimit = 10 - // Push more series to exceed the limit once we'll query back all series. - writeReq = makeWriteRequestWith(makeTimeseries([]string{model.MetricNameLabel, "another_series"}, makeSamples(0, 0), nil)) + for _, disableStreamingResponse := range []bool{true, false} { + for _, minimizeIngesterRequests := range []bool{true, false} { + t.Run(fmt.Sprintf("streaming response disabled: %v, request minimization enabled: %v", disableStreamingResponse, minimizeIngesterRequests), func(t *testing.T) { + userCtx := user.InjectOrgID(context.Background(), "user") + limits := prepareDefaultLimits() + + // Prepare distributors. + ds, ingesters, reg, _ := prepare(t, prepConfig{ + numIngesters: 3, + happyIngesters: 3, + numDistributors: 1, + limits: limits, + disableStreamingResponse: disableStreamingResponse, + configure: func(config *Config) { + config.MinimizeIngesterRequests = minimizeIngesterRequests + }, + }) - writeRes, err = ds[0].Push(userCtx, writeReq) - assert.Equal(t, &mimirpb.WriteResponse{}, writeRes) - assert.Nil(t, err) + // Push a number of series below the max series limit. + initialSeries := maxSeriesLimit + writeReq := makeWriteRequest(0, initialSeries, 0, false, true, "foo") + writeRes, err := ds[0].Push(userCtx, writeReq) + assert.Equal(t, &mimirpb.WriteResponse{}, writeRes) + assert.Nil(t, err) - // Reset the query limiter in the context. - queryCtx = limiter.AddQueryLimiterToContext(userCtx, limiter.NewQueryLimiter(maxSeriesLimit, 0, 0, 0, stats.NewQueryMetrics(prometheus.NewPedanticRegistry()))) + allSeriesMatchers := []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchRegexp, model.MetricNameLabel, ".+"), + } - // Since the number of series is exceeding the limit, we expect - // a query running on all series to fail. - _, err = ds[0].QueryStream(queryCtx, queryMetrics, math.MinInt32, math.MaxInt32, allSeriesMatchers...) - require.Error(t, err) - assert.ErrorContains(t, err, "the query exceeded the maximum number of series") + queryMetrics := stats.NewQueryMetrics(reg[0]) - if minimizeIngesterRequests { - secondRequestIngesterQueryCallCount := countCalls(ingesters, "QueryStream") - firstRequestIngesterQueryCount - require.LessOrEqual(t, secondRequestIngesterQueryCallCount, 2, "should not call third ingester if request minimisation is enabled and either of first two ingesters fail with limits error") - } - }) + // Since the number of series is equal to the limit (but doesn't + // exceed it), we expect a query running on all series to succeed. + queryCtx := limiter.AddQueryLimiterToContext(userCtx, limiter.NewQueryLimiter(maxSeriesLimit, 0, 0, 0, stats.NewQueryMetrics(prometheus.NewPedanticRegistry()))) + queryRes, err := ds[0].QueryStream(queryCtx, queryMetrics, math.MinInt32, math.MaxInt32, allSeriesMatchers...) + require.NoError(t, err) + if disableStreamingResponse { + assert.Len(t, queryRes.Chunkseries, initialSeries) + } else { + assert.Len(t, queryRes.StreamingSeries, initialSeries) + } + + firstRequestIngesterQueryCount := countCalls(ingesters, "QueryStream") + + if minimizeIngesterRequests { + require.LessOrEqual(t, firstRequestIngesterQueryCount, 2, "should not call third ingester if request minimisation is enabled and first two ingesters return a successful response") + } + + // Push more series to exceed the limit once we'll query back all series. + writeReq = makeWriteRequestWith(makeTimeseries([]string{model.MetricNameLabel, "another_series"}, makeSamples(0, 0), nil)) + + writeRes, err = ds[0].Push(userCtx, writeReq) + assert.Equal(t, &mimirpb.WriteResponse{}, writeRes) + assert.Nil(t, err) + + // Reset the query limiter in the context. + queryCtx = limiter.AddQueryLimiterToContext(userCtx, limiter.NewQueryLimiter(maxSeriesLimit, 0, 0, 0, stats.NewQueryMetrics(prometheus.NewPedanticRegistry()))) + + // Since the number of series is exceeding the limit, we expect + // a query running on all series to fail. + _, err = ds[0].QueryStream(queryCtx, queryMetrics, math.MinInt32, math.MaxInt32, allSeriesMatchers...) + require.Error(t, err) + assert.ErrorContains(t, err, "the query exceeded the maximum number of series") + + if minimizeIngesterRequests { + secondRequestIngesterQueryCallCount := countCalls(ingesters, "QueryStream") - firstRequestIngesterQueryCount + require.LessOrEqual(t, secondRequestIngesterQueryCallCount, 2, "should not call third ingester if request minimisation is enabled and either of first two ingesters fail with limits error") + } + }) + } } } @@ -348,11 +346,12 @@ func TestDistributor_QueryStream_ShouldReturnErrorIfMaxChunkBytesPerQueryLimitIs // Use replication factor of 1 so that we always wait the response from all ingesters. // This guarantees us to always read the same chunks and have a stable test. ds, _, reg, _ := prepare(t, prepConfig{ - numIngesters: 3, - happyIngesters: 3, - numDistributors: 1, - limits: limits, - replicationFactor: 1, + numIngesters: 3, + happyIngesters: 3, + numDistributors: 1, + limits: limits, + replicationFactor: 1, + disableStreamingResponse: true, }) allSeriesMatchers := []*labels.Matcher{ @@ -421,9 +420,6 @@ func TestDistributor_QueryStream_ShouldSuccessfullyRunOnSlowIngesterWithStreamin replicationFactor: 1, // Use replication factor of 1 so that we always wait the response from all ingesters. ingestStorageEnabled: ingestStorageEnabled, ingestStoragePartitions: 3, - configure: func(cfg *Config) { - cfg.PreferStreamingChunksFromIngesters = true - }, }) // Mock 1 ingester to be slow. diff --git a/pkg/ingester/client/chunkcompat.go b/pkg/ingester/client/chunkcompat.go index 7ae8c15b767..f34521a4a4a 100644 --- a/pkg/ingester/client/chunkcompat.go +++ b/pkg/ingester/client/chunkcompat.go @@ -8,6 +8,7 @@ package client import ( "bytes" "errors" + "fmt" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" @@ -65,6 +66,32 @@ func StreamsToMatrix(from, through model.Time, responses []*QueryStreamResponse) return result, nil } +// StreamingSeriesToMatrix converts slice of []client.TimeSeriesChunk to a model.Matrix. +func StreamingSeriesToMatrix(from, through model.Time, sSeries []StreamingSeries) (model.Matrix, error) { + if sSeries == nil { + return nil, nil + } + + result := model.Matrix{} + var chunks []Chunk + for _, series := range sSeries { + chunks = chunks[:0] + for sourceIdx, source := range series.Sources { + sourceChunks, err := source.StreamReader.GetChunks(source.SeriesIndex) + if err != nil { + return nil, fmt.Errorf("GetChunks() from stream reader for series %d from source %d: %w", source.SeriesIndex, sourceIdx, err) + } + chunks = append(chunks, sourceChunks...) + } + stream, err := seriesChunksToMatrix(from, through, series.Labels, chunks) + if err != nil { + return nil, err + } + result = append(result, stream) + } + return result, nil +} + // TimeSeriesChunksToMatrix converts slice of []client.TimeSeriesChunk to a model.Matrix. func TimeSeriesChunksToMatrix(from, through model.Time, serieses []TimeSeriesChunk) (model.Matrix, error) { if serieses == nil { diff --git a/pkg/mimir/modules.go b/pkg/mimir/modules.go index 2fb7d04bb25..2d38d808b11 100644 --- a/pkg/mimir/modules.go +++ b/pkg/mimir/modules.go @@ -467,7 +467,6 @@ func (t *Mimir) initDistributorService() (serv services.Service, err error) { // ruler's dependency) canJoinDistributorsRing := t.Cfg.isAnyModuleEnabled(Distributor, Write, All) - t.Cfg.Distributor.PreferStreamingChunksFromIngesters = t.Cfg.Querier.PreferStreamingChunksFromIngesters t.Cfg.Distributor.StreamingChunksPerIngesterSeriesBufferSize = t.Cfg.Querier.StreamingChunksPerIngesterSeriesBufferSize t.Cfg.Distributor.MinimizeIngesterRequests = t.Cfg.Querier.MinimizeIngesterRequests t.Cfg.Distributor.MinimiseIngesterRequestsHedgingDelay = t.Cfg.Querier.MinimiseIngesterRequestsHedgingDelay diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index ba0ca37da3f..4ac5a89fc92 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -48,7 +48,6 @@ type Config struct { ShuffleShardingIngestersEnabled bool `yaml:"shuffle_sharding_ingesters_enabled" category:"advanced"` - PreferStreamingChunksFromIngesters bool `yaml:"prefer_streaming_chunks_from_ingesters" category:"experimental"` // Enabled by default as of Mimir 2.11, remove altogether in 2.12. PreferStreamingChunksFromStoreGateways bool `yaml:"prefer_streaming_chunks_from_store_gateways" category:"experimental"` PreferAvailabilityZone string `yaml:"prefer_availability_zone" category:"experimental" doc:"hidden"` StreamingChunksPerIngesterSeriesBufferSize uint64 `yaml:"streaming_chunks_per_ingester_series_buffer_size" category:"advanced"` @@ -71,7 +70,6 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.DurationVar(&cfg.MaxQueryIntoFuture, "querier.max-query-into-future", 10*time.Minute, "Maximum duration into the future you can query. 0 to disable.") f.DurationVar(&cfg.QueryStoreAfter, queryStoreAfterFlag, 12*time.Hour, "The time after which a metric should be queried from storage and not just ingesters. 0 means all queries are sent to store. If this option is enabled, the time range of the query sent to the store-gateway will be manipulated to ensure the query end is not more recent than 'now - query-store-after'.") f.BoolVar(&cfg.ShuffleShardingIngestersEnabled, "querier.shuffle-sharding-ingesters-enabled", true, fmt.Sprintf("Fetch in-memory series from the minimum set of required ingesters, selecting only ingesters which may have received series since -%s. If this setting is false or -%s is '0', queriers always query all ingesters (ingesters shuffle sharding on read path is disabled).", validation.QueryIngestersWithinFlag, validation.QueryIngestersWithinFlag)) - f.BoolVar(&cfg.PreferStreamingChunksFromIngesters, "querier.prefer-streaming-chunks-from-ingesters", true, "Request ingesters stream chunks. Ingesters will only respond with a stream of chunks if the target ingester supports this, and this preference will be ignored by ingesters that do not support this.") f.BoolVar(&cfg.PreferStreamingChunksFromStoreGateways, "querier.prefer-streaming-chunks-from-store-gateways", false, "Request store-gateways stream chunks. Store-gateways will only respond with a stream of chunks if the target store-gateway supports this, and this preference will be ignored by store-gateways that do not support this.") f.StringVar(&cfg.PreferAvailabilityZone, "querier.prefer-availability-zone", "", "Preferred availability zone to query ingesters from when using the ingest storage.")