From 79c32abaf3b79b52440ce3d1915ad1626e5930a2 Mon Sep 17 00:00:00 2001 From: Justin Lei Date: Fri, 20 Dec 2024 12:09:29 -0800 Subject: [PATCH] query-frontend: Allow for multiple topics in readConsistencyRoundTripper (#10220) * query-frontend: Allow for multiple topics in readConsistencyRoundTripper * Add topic label to StrongReadConsistencyInstrumentation metrics * Update CHANGELOG.md * Remove unused PartitionReader.Topic() method * Avoid overwriting QueryFrontendTopicOffsetsReaders map if already populated --- CHANGELOG.md | 2 + .../querymiddleware/read_consistency.go | 65 +++++--- .../querymiddleware/read_consistency_test.go | 12 +- pkg/frontend/querymiddleware/roundtrip.go | 30 ++-- .../querymiddleware/roundtrip_test.go | 2 +- pkg/mimir/mimir.go | 68 ++++---- pkg/mimir/modules.go | 157 +++++++++--------- pkg/storage/ingest/fetcher_test.go | 4 +- pkg/storage/ingest/partition_offset_reader.go | 4 + pkg/storage/ingest/reader.go | 40 +++-- pkg/storage/ingest/reader_test.go | 40 ++--- 11 files changed, 235 insertions(+), 189 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 042c546291a..c31ba2b06fd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,8 @@ ## main / unreleased +* [CHANGE] Query-frontend: Add `topic` label to `cortex_ingest_storage_strong_consistency_requests_total`, `cortex_ingest_storage_strong_consistency_failures_total`, and `cortex_ingest_storage_strong_consistency_wait_duration_seconds` metrics. #10220 + ### Grafana Mimir * [CHANGE] Distributor: OTLP and push handler replace all non-UTF8 characters with the unicode replacement character `\uFFFD` in error messages before propagating them. #10236 diff --git a/pkg/frontend/querymiddleware/read_consistency.go b/pkg/frontend/querymiddleware/read_consistency.go index 7455a924af0..9c15ae0e025 100644 --- a/pkg/frontend/querymiddleware/read_consistency.go +++ b/pkg/frontend/querymiddleware/read_consistency.go @@ -4,11 +4,13 @@ package querymiddleware import ( "net/http" + "sync" "github.com/go-kit/log" "github.com/grafana/dskit/tenant" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" + "golang.org/x/sync/errgroup" apierror "github.com/grafana/mimir/pkg/api/error" querierapi "github.com/grafana/mimir/pkg/querier/api" @@ -19,19 +21,21 @@ import ( type readConsistencyRoundTripper struct { next http.RoundTripper - offsetsReader *ingest.TopicOffsetsReader - limits Limits - logger log.Logger - metrics *ingest.StrongReadConsistencyInstrumentation[map[int32]int64] + // offsetsReaders is a map of offsets readers keyed by the request header the offsets get attached to. + offsetsReaders map[string]*ingest.TopicOffsetsReader + + limits Limits + logger log.Logger + metrics *ingest.StrongReadConsistencyInstrumentation[map[int32]int64] } -func newReadConsistencyRoundTripper(next http.RoundTripper, offsetsReader *ingest.TopicOffsetsReader, limits Limits, logger log.Logger, metrics *ingest.StrongReadConsistencyInstrumentation[map[int32]int64]) http.RoundTripper { +func newReadConsistencyRoundTripper(next http.RoundTripper, offsetsReaders map[string]*ingest.TopicOffsetsReader, limits Limits, logger log.Logger, metrics *ingest.StrongReadConsistencyInstrumentation[map[int32]int64]) http.RoundTripper { return &readConsistencyRoundTripper{ - next: next, - limits: limits, - logger: logger, - offsetsReader: offsetsReader, - metrics: metrics, + next: next, + offsetsReaders: offsetsReaders, + limits: limits, + logger: logger, + metrics: metrics, } } @@ -57,15 +61,32 @@ func (r *readConsistencyRoundTripper) RoundTrip(req *http.Request) (_ *http.Resp return r.next.RoundTrip(req) } - // Fetch last produced offsets. - offsets, err := r.metrics.Observe(false, func() (map[int32]int64, error) { - return r.offsetsReader.WaitNextFetchLastProducedOffset(ctx) - }) - if err != nil { - return nil, errors.Wrap(err, "wait for last produced offsets") + errGroup, ctx := errgroup.WithContext(ctx) + reqHeaderLock := &sync.Mutex{} + + for headerKey, offsetsReader := range r.offsetsReaders { + headerKey := headerKey + offsetsReader := offsetsReader + + errGroup.Go(func() error { + offsets, err := r.metrics.Observe(offsetsReader.Topic(), false, func() (map[int32]int64, error) { + return offsetsReader.WaitNextFetchLastProducedOffset(ctx) + }) + if err != nil { + return errors.Wrapf(err, "wait for last produced offsets of topic '%s'", offsetsReader.Topic()) + } + + reqHeaderLock.Lock() + req.Header.Add(headerKey, string(querierapi.EncodeOffsets(offsets))) + reqHeaderLock.Unlock() + + return nil + }) } - req.Header.Add(querierapi.ReadConsistencyOffsetsHeader, string(querierapi.EncodeOffsets(offsets))) + if err = errGroup.Wait(); err != nil { + return nil, err + } return r.next.RoundTrip(req) } @@ -82,7 +103,13 @@ func getDefaultReadConsistency(tenantIDs []string, limits Limits) string { return querierapi.ReadConsistencyEventual } -func newReadConsistencyMetrics(reg prometheus.Registerer) *ingest.StrongReadConsistencyInstrumentation[map[int32]int64] { +func newReadConsistencyMetrics(reg prometheus.Registerer, offsetsReaders map[string]*ingest.TopicOffsetsReader) *ingest.StrongReadConsistencyInstrumentation[map[int32]int64] { const component = "query-frontend" - return ingest.NewStrongReadConsistencyInstrumentation[map[int32]int64](component, reg) + + topics := make([]string, 0, len(offsetsReaders)) + for _, r := range offsetsReaders { + topics = append(topics, r.Topic()) + } + + return ingest.NewStrongReadConsistencyInstrumentation[map[int32]int64](component, reg, topics) } diff --git a/pkg/frontend/querymiddleware/read_consistency_test.go b/pkg/frontend/querymiddleware/read_consistency_test.go index 805207b8a20..0c68099ed17 100644 --- a/pkg/frontend/querymiddleware/read_consistency_test.go +++ b/pkg/frontend/querymiddleware/read_consistency_test.go @@ -102,8 +102,10 @@ func TestReadConsistencyRoundTripper(t *testing.T) { req = req.WithContext(querierapi.ContextWithReadConsistencyLevel(req.Context(), testData.reqConsistency)) } + offsetsReaders := map[string]*ingest.TopicOffsetsReader{querierapi.ReadConsistencyOffsetsHeader: reader} + reg := prometheus.NewPedanticRegistry() - rt := newReadConsistencyRoundTripper(downstream, reader, testData.limits, log.NewNopLogger(), newReadConsistencyMetrics(reg)) + rt := newReadConsistencyRoundTripper(downstream, offsetsReaders, testData.limits, log.NewNopLogger(), newReadConsistencyMetrics(reg, offsetsReaders)) _, err = rt.RoundTrip(req) require.NoError(t, err) @@ -130,13 +132,13 @@ func TestReadConsistencyRoundTripper(t *testing.T) { assert.NoError(t, promtest.GatherAndCompare(reg, strings.NewReader(fmt.Sprintf(` # HELP cortex_ingest_storage_strong_consistency_requests_total Total number of requests for which strong consistency has been requested. The metric distinguishes between requests with an offset specified and requests requesting to enforce strong consistency up until the last produced offset. # TYPE cortex_ingest_storage_strong_consistency_requests_total counter - cortex_ingest_storage_strong_consistency_requests_total{component="query-frontend", with_offset="false"} %d - cortex_ingest_storage_strong_consistency_requests_total{component="query-frontend", with_offset="true"} 0 + cortex_ingest_storage_strong_consistency_requests_total{component="query-frontend", topic="%s", with_offset="false"} %d + cortex_ingest_storage_strong_consistency_requests_total{component="query-frontend", topic="%s", with_offset="true"} 0 # HELP cortex_ingest_storage_strong_consistency_failures_total Total number of failures while waiting for strong consistency to be enforced. # TYPE cortex_ingest_storage_strong_consistency_failures_total counter - cortex_ingest_storage_strong_consistency_failures_total{component="query-frontend"} 0 - `, expectedRequests)), + cortex_ingest_storage_strong_consistency_failures_total{component="query-frontend", topic="%s"} 0 + `, topic, expectedRequests, topic, topic)), "cortex_ingest_storage_strong_consistency_requests_total", "cortex_ingest_storage_strong_consistency_failures_total")) }) diff --git a/pkg/frontend/querymiddleware/roundtrip.go b/pkg/frontend/querymiddleware/roundtrip.go index b0c190c2100..0aa5d49fa5f 100644 --- a/pkg/frontend/querymiddleware/roundtrip.go +++ b/pkg/frontend/querymiddleware/roundtrip.go @@ -207,10 +207,10 @@ func NewTripperware( cacheExtractor Extractor, engineOpts promql.EngineOpts, engineExperimentalFunctionsEnabled bool, - ingestStorageTopicOffsetsReader *ingest.TopicOffsetsReader, + ingestStorageTopicOffsetsReaders map[string]*ingest.TopicOffsetsReader, registerer prometheus.Registerer, ) (Tripperware, error) { - queryRangeTripperware, err := newQueryTripperware(cfg, log, limits, codec, cacheExtractor, engineOpts, engineExperimentalFunctionsEnabled, ingestStorageTopicOffsetsReader, registerer) + queryRangeTripperware, err := newQueryTripperware(cfg, log, limits, codec, cacheExtractor, engineOpts, engineExperimentalFunctionsEnabled, ingestStorageTopicOffsetsReaders, registerer) if err != nil { return nil, err } @@ -228,7 +228,7 @@ func newQueryTripperware( cacheExtractor Extractor, engineOpts promql.EngineOpts, engineExperimentalFunctionsEnabled bool, - ingestStorageTopicOffsetsReader *ingest.TopicOffsetsReader, + ingestStorageTopicOffsetsReaders map[string]*ingest.TopicOffsetsReader, registerer prometheus.Registerer, ) (Tripperware, error) { // Disable concurrency limits for sharded queries. @@ -281,18 +281,18 @@ func newQueryTripperware( } // Enforce read consistency after caching. - if ingestStorageTopicOffsetsReader != nil { - metrics := newReadConsistencyMetrics(registerer) - - queryrange = newReadConsistencyRoundTripper(queryrange, ingestStorageTopicOffsetsReader, limits, log, metrics) - instant = newReadConsistencyRoundTripper(instant, ingestStorageTopicOffsetsReader, limits, log, metrics) - cardinality = newReadConsistencyRoundTripper(cardinality, ingestStorageTopicOffsetsReader, limits, log, metrics) - activeSeries = newReadConsistencyRoundTripper(activeSeries, ingestStorageTopicOffsetsReader, limits, log, metrics) - activeNativeHistogramMetrics = newReadConsistencyRoundTripper(activeNativeHistogramMetrics, ingestStorageTopicOffsetsReader, limits, log, metrics) - labels = newReadConsistencyRoundTripper(labels, ingestStorageTopicOffsetsReader, limits, log, metrics) - series = newReadConsistencyRoundTripper(series, ingestStorageTopicOffsetsReader, limits, log, metrics) - remoteRead = newReadConsistencyRoundTripper(remoteRead, ingestStorageTopicOffsetsReader, limits, log, metrics) - next = newReadConsistencyRoundTripper(next, ingestStorageTopicOffsetsReader, limits, log, metrics) + if len(ingestStorageTopicOffsetsReaders) > 0 { + metrics := newReadConsistencyMetrics(registerer, ingestStorageTopicOffsetsReaders) + + queryrange = newReadConsistencyRoundTripper(queryrange, ingestStorageTopicOffsetsReaders, limits, log, metrics) + instant = newReadConsistencyRoundTripper(instant, ingestStorageTopicOffsetsReaders, limits, log, metrics) + cardinality = newReadConsistencyRoundTripper(cardinality, ingestStorageTopicOffsetsReaders, limits, log, metrics) + activeSeries = newReadConsistencyRoundTripper(activeSeries, ingestStorageTopicOffsetsReaders, limits, log, metrics) + activeNativeHistogramMetrics = newReadConsistencyRoundTripper(activeNativeHistogramMetrics, ingestStorageTopicOffsetsReaders, limits, log, metrics) + labels = newReadConsistencyRoundTripper(labels, ingestStorageTopicOffsetsReaders, limits, log, metrics) + series = newReadConsistencyRoundTripper(series, ingestStorageTopicOffsetsReaders, limits, log, metrics) + remoteRead = newReadConsistencyRoundTripper(remoteRead, ingestStorageTopicOffsetsReaders, limits, log, metrics) + next = newReadConsistencyRoundTripper(next, ingestStorageTopicOffsetsReaders, limits, log, metrics) } // Look up cache as first thing after validation. diff --git a/pkg/frontend/querymiddleware/roundtrip_test.go b/pkg/frontend/querymiddleware/roundtrip_test.go index e9672054291..0e829478ae7 100644 --- a/pkg/frontend/querymiddleware/roundtrip_test.go +++ b/pkg/frontend/querymiddleware/roundtrip_test.go @@ -874,7 +874,7 @@ func TestTripperware_ShouldSupportReadConsistencyOffsetsInjection(t *testing.T) Timeout: time.Minute, }, true, - offsetsReader, + map[string]*ingest.TopicOffsetsReader{querierapi.ReadConsistencyOffsetsHeader: offsetsReader}, nil, ) require.NoError(t, err) diff --git a/pkg/mimir/mimir.go b/pkg/mimir/mimir.go index 7bcd3eac250..ca8ebfe53d5 100644 --- a/pkg/mimir/mimir.go +++ b/pkg/mimir/mimir.go @@ -705,40 +705,40 @@ type Mimir struct { ServiceMap map[string]services.Service ModuleManager *modules.Manager - API *api.API - Server *server.Server - IngesterRing *ring.Ring - IngesterPartitionRingWatcher *ring.PartitionRingWatcher - IngesterPartitionInstanceRing *ring.PartitionInstanceRing - TenantLimits validation.TenantLimits - Overrides *validation.Overrides - ActiveGroupsCleanup *util.ActiveGroupsCleanupService - Distributor *distributor.Distributor - Ingester *ingester.Ingester - Flusher *flusher.Flusher - FrontendV1 *frontendv1.Frontend - RuntimeConfig *runtimeconfig.Manager - QuerierQueryable prom_storage.SampleAndChunkQueryable - ExemplarQueryable prom_storage.ExemplarQueryable - AdditionalStorageQueryables []querier.TimeRangeQueryable - MetadataSupplier querier.MetadataSupplier - QuerierEngine promql.QueryEngine - QueryFrontendTripperware querymiddleware.Tripperware - QueryFrontendTopicOffsetsReader *ingest.TopicOffsetsReader - QueryFrontendCodec querymiddleware.Codec - Ruler *ruler.Ruler - RulerStorage rulestore.RuleStore - Alertmanager *alertmanager.MultitenantAlertmanager - Compactor *compactor.MultitenantCompactor - StoreGateway *storegateway.StoreGateway - MemberlistKV *memberlist.KVInitService - ActivityTracker *activitytracker.ActivityTracker - Vault *vault.Vault - UsageStatsReporter *usagestats.Reporter - BlockBuilder *blockbuilder.BlockBuilder - BlockBuilderScheduler *blockbuilderscheduler.BlockBuilderScheduler - ContinuousTestManager *continuoustest.Manager - BuildInfoHandler http.Handler + API *api.API + Server *server.Server + IngesterRing *ring.Ring + IngesterPartitionRingWatcher *ring.PartitionRingWatcher + IngesterPartitionInstanceRing *ring.PartitionInstanceRing + TenantLimits validation.TenantLimits + Overrides *validation.Overrides + ActiveGroupsCleanup *util.ActiveGroupsCleanupService + Distributor *distributor.Distributor + Ingester *ingester.Ingester + Flusher *flusher.Flusher + FrontendV1 *frontendv1.Frontend + RuntimeConfig *runtimeconfig.Manager + QuerierQueryable prom_storage.SampleAndChunkQueryable + ExemplarQueryable prom_storage.ExemplarQueryable + AdditionalStorageQueryables []querier.TimeRangeQueryable + MetadataSupplier querier.MetadataSupplier + QuerierEngine promql.QueryEngine + QueryFrontendTripperware querymiddleware.Tripperware + QueryFrontendTopicOffsetsReaders map[string]*ingest.TopicOffsetsReader + QueryFrontendCodec querymiddleware.Codec + Ruler *ruler.Ruler + RulerStorage rulestore.RuleStore + Alertmanager *alertmanager.MultitenantAlertmanager + Compactor *compactor.MultitenantCompactor + StoreGateway *storegateway.StoreGateway + MemberlistKV *memberlist.KVInitService + ActivityTracker *activitytracker.ActivityTracker + Vault *vault.Vault + UsageStatsReporter *usagestats.Reporter + BlockBuilder *blockbuilder.BlockBuilder + BlockBuilderScheduler *blockbuilderscheduler.BlockBuilderScheduler + ContinuousTestManager *continuoustest.Manager + BuildInfoHandler http.Handler } // New makes a new Mimir. diff --git a/pkg/mimir/modules.go b/pkg/mimir/modules.go index 39868511b5c..a2839280bb9 100644 --- a/pkg/mimir/modules.go +++ b/pkg/mimir/modules.go @@ -50,6 +50,7 @@ import ( "github.com/grafana/mimir/pkg/frontend/transport" "github.com/grafana/mimir/pkg/ingester" "github.com/grafana/mimir/pkg/querier" + querierapi "github.com/grafana/mimir/pkg/querier/api" "github.com/grafana/mimir/pkg/querier/engine" "github.com/grafana/mimir/pkg/querier/tenantfederation" querier_worker "github.com/grafana/mimir/pkg/querier/worker" @@ -70,42 +71,42 @@ import ( // The various modules that make up Mimir. const ( - ActivityTracker string = "activity-tracker" - API string = "api" - SanityCheck string = "sanity-check" - IngesterRing string = "ingester-ring" - IngesterPartitionRing string = "ingester-partitions-ring" - RuntimeConfig string = "runtime-config" - Overrides string = "overrides" - OverridesExporter string = "overrides-exporter" - Server string = "server" - ActiveGroupsCleanupService string = "active-groups-cleanup-service" - Distributor string = "distributor" - DistributorService string = "distributor-service" - Ingester string = "ingester" - IngesterService string = "ingester-service" - Flusher string = "flusher" - Querier string = "querier" - Queryable string = "queryable" - StoreQueryable string = "store-queryable" - QueryFrontend string = "query-frontend" - QueryFrontendCodec string = "query-frontend-codec" - QueryFrontendTripperware string = "query-frontend-tripperware" - QueryFrontendTopicOffsetsReader string = "query-frontend-topic-offsets-reader" - RulerStorage string = "ruler-storage" - Ruler string = "ruler" - AlertManager string = "alertmanager" - Compactor string = "compactor" - StoreGateway string = "store-gateway" - MemberlistKV string = "memberlist-kv" - QueryScheduler string = "query-scheduler" - Vault string = "vault" - TenantFederation string = "tenant-federation" - UsageStats string = "usage-stats" - BlockBuilder string = "block-builder" - BlockBuilderScheduler string = "block-builder-scheduler" - ContinuousTest string = "continuous-test" - All string = "all" + ActivityTracker string = "activity-tracker" + API string = "api" + SanityCheck string = "sanity-check" + IngesterRing string = "ingester-ring" + IngesterPartitionRing string = "ingester-partitions-ring" + RuntimeConfig string = "runtime-config" + Overrides string = "overrides" + OverridesExporter string = "overrides-exporter" + Server string = "server" + ActiveGroupsCleanupService string = "active-groups-cleanup-service" + Distributor string = "distributor" + DistributorService string = "distributor-service" + Ingester string = "ingester" + IngesterService string = "ingester-service" + Flusher string = "flusher" + Querier string = "querier" + Queryable string = "queryable" + StoreQueryable string = "store-queryable" + QueryFrontend string = "query-frontend" + QueryFrontendCodec string = "query-frontend-codec" + QueryFrontendTripperware string = "query-frontend-tripperware" + QueryFrontendTopicOffsetsReaders string = "query-frontend-topic-offsets-reader" + RulerStorage string = "ruler-storage" + Ruler string = "ruler" + AlertManager string = "alertmanager" + Compactor string = "compactor" + StoreGateway string = "store-gateway" + MemberlistKV string = "memberlist-kv" + QueryScheduler string = "query-scheduler" + Vault string = "vault" + TenantFederation string = "tenant-federation" + UsageStats string = "usage-stats" + BlockBuilder string = "block-builder" + BlockBuilderScheduler string = "block-builder-scheduler" + ContinuousTest string = "continuous-test" + All string = "all" // Write Read and Backend are the targets used when using the read-write deployment mode. Write string = "write" @@ -702,9 +703,9 @@ func (t *Mimir) initQueryFrontendCodec() (services.Service, error) { return nil, nil } -// initQueryFrontendTopicOffsetsReader instantiates the topic offsets reader used by the query-frontend +// initQueryFrontendTopicOffsetsReaders instantiates the topic offsets reader used by the query-frontend // when the ingest storage is enabled. -func (t *Mimir) initQueryFrontendTopicOffsetsReader() (services.Service, error) { +func (t *Mimir) initQueryFrontendTopicOffsetsReaders() (services.Service, error) { if !t.Cfg.IngestStorage.Enabled { return nil, nil } @@ -726,8 +727,14 @@ func (t *Mimir) initQueryFrontendTopicOffsetsReader() (services.Service, error) return t.IngesterPartitionRingWatcher.PartitionRing().PartitionIDs(), nil } - t.QueryFrontendTopicOffsetsReader = ingest.NewTopicOffsetsReader(kafkaClient, t.Cfg.IngestStorage.KafkaConfig.Topic, getPartitionIDs, t.Cfg.IngestStorage.KafkaConfig.LastProducedOffsetPollInterval, t.Registerer, util_log.Logger) - return t.QueryFrontendTopicOffsetsReader, nil + ingestTopicOffsetsReader := ingest.NewTopicOffsetsReader(kafkaClient, t.Cfg.IngestStorage.KafkaConfig.Topic, getPartitionIDs, t.Cfg.IngestStorage.KafkaConfig.LastProducedOffsetPollInterval, t.Registerer, util_log.Logger) + + if t.QueryFrontendTopicOffsetsReaders == nil { + t.QueryFrontendTopicOffsetsReaders = make(map[string]*ingest.TopicOffsetsReader) + } + t.QueryFrontendTopicOffsetsReaders[querierapi.ReadConsistencyOffsetsHeader] = ingestTopicOffsetsReader + + return ingestTopicOffsetsReader, nil } // initQueryFrontendTripperware instantiates the tripperware used by the query frontend @@ -745,7 +752,7 @@ func (t *Mimir) initQueryFrontendTripperware() (serv services.Service, err error querymiddleware.PrometheusResponseExtractor{}, engineOpts, engineExperimentalFunctionsEnabled, - t.QueryFrontendTopicOffsetsReader, + t.QueryFrontendTopicOffsetsReaders, t.Registerer, ) if err != nil { @@ -1149,7 +1156,7 @@ func (t *Mimir) setupModuleManager() error { mm.RegisterModule(StoreQueryable, t.initStoreQueryable, modules.UserInvisibleModule) mm.RegisterModule(QueryFrontendCodec, t.initQueryFrontendCodec, modules.UserInvisibleModule) mm.RegisterModule(QueryFrontendTripperware, t.initQueryFrontendTripperware, modules.UserInvisibleModule) - mm.RegisterModule(QueryFrontendTopicOffsetsReader, t.initQueryFrontendTopicOffsetsReader, modules.UserInvisibleModule) + mm.RegisterModule(QueryFrontendTopicOffsetsReaders, t.initQueryFrontendTopicOffsetsReaders, modules.UserInvisibleModule) mm.RegisterModule(QueryFrontend, t.initQueryFrontend) mm.RegisterModule(RulerStorage, t.initRulerStorage, modules.UserInvisibleModule) mm.RegisterModule(Ruler, t.initRuler) @@ -1170,39 +1177,39 @@ func (t *Mimir) setupModuleManager() error { // Add dependencies deps := map[string][]string{ - Server: {ActivityTracker, SanityCheck, UsageStats}, - API: {Server}, - MemberlistKV: {API, Vault}, - RuntimeConfig: {API}, - IngesterRing: {API, RuntimeConfig, MemberlistKV, Vault}, - IngesterPartitionRing: {MemberlistKV, IngesterRing, API}, - Overrides: {RuntimeConfig}, - OverridesExporter: {Overrides, MemberlistKV, Vault}, - Distributor: {DistributorService, API, ActiveGroupsCleanupService, Vault}, - DistributorService: {IngesterRing, IngesterPartitionRing, Overrides, Vault}, - Ingester: {IngesterService, API, ActiveGroupsCleanupService, Vault}, - IngesterService: {IngesterRing, IngesterPartitionRing, Overrides, RuntimeConfig, MemberlistKV}, - Flusher: {Overrides, API}, - Queryable: {Overrides, DistributorService, IngesterRing, IngesterPartitionRing, API, StoreQueryable, MemberlistKV}, - Querier: {TenantFederation, Vault}, - StoreQueryable: {Overrides, MemberlistKV}, - QueryFrontendTripperware: {API, Overrides, QueryFrontendCodec, QueryFrontendTopicOffsetsReader}, - QueryFrontend: {QueryFrontendTripperware, MemberlistKV, Vault}, - QueryFrontendTopicOffsetsReader: {IngesterPartitionRing}, - QueryScheduler: {API, Overrides, MemberlistKV, Vault}, - Ruler: {DistributorService, StoreQueryable, RulerStorage, Vault}, - RulerStorage: {Overrides}, - AlertManager: {API, MemberlistKV, Overrides, Vault}, - Compactor: {API, MemberlistKV, Overrides, Vault}, - StoreGateway: {API, Overrides, MemberlistKV, Vault}, - TenantFederation: {Queryable}, - BlockBuilder: {API, Overrides}, - BlockBuilderScheduler: {API}, - ContinuousTest: {API}, - Write: {Distributor, Ingester}, - Read: {QueryFrontend, Querier}, - Backend: {QueryScheduler, Ruler, StoreGateway, Compactor, AlertManager, OverridesExporter}, - All: {QueryFrontend, Querier, Ingester, Distributor, StoreGateway, Ruler, Compactor}, + Server: {ActivityTracker, SanityCheck, UsageStats}, + API: {Server}, + MemberlistKV: {API, Vault}, + RuntimeConfig: {API}, + IngesterRing: {API, RuntimeConfig, MemberlistKV, Vault}, + IngesterPartitionRing: {MemberlistKV, IngesterRing, API}, + Overrides: {RuntimeConfig}, + OverridesExporter: {Overrides, MemberlistKV, Vault}, + Distributor: {DistributorService, API, ActiveGroupsCleanupService, Vault}, + DistributorService: {IngesterRing, IngesterPartitionRing, Overrides, Vault}, + Ingester: {IngesterService, API, ActiveGroupsCleanupService, Vault}, + IngesterService: {IngesterRing, IngesterPartitionRing, Overrides, RuntimeConfig, MemberlistKV}, + Flusher: {Overrides, API}, + Queryable: {Overrides, DistributorService, IngesterRing, IngesterPartitionRing, API, StoreQueryable, MemberlistKV}, + Querier: {TenantFederation, Vault}, + StoreQueryable: {Overrides, MemberlistKV}, + QueryFrontendTripperware: {API, Overrides, QueryFrontendCodec, QueryFrontendTopicOffsetsReaders}, + QueryFrontend: {QueryFrontendTripperware, MemberlistKV, Vault}, + QueryFrontendTopicOffsetsReaders: {IngesterPartitionRing}, + QueryScheduler: {API, Overrides, MemberlistKV, Vault}, + Ruler: {DistributorService, StoreQueryable, RulerStorage, Vault}, + RulerStorage: {Overrides}, + AlertManager: {API, MemberlistKV, Overrides, Vault}, + Compactor: {API, MemberlistKV, Overrides, Vault}, + StoreGateway: {API, Overrides, MemberlistKV, Vault}, + TenantFederation: {Queryable}, + BlockBuilder: {API, Overrides}, + BlockBuilderScheduler: {API}, + ContinuousTest: {API}, + Write: {Distributor, Ingester}, + Read: {QueryFrontend, Querier}, + Backend: {QueryScheduler, Ruler, StoreGateway, Compactor, AlertManager, OverridesExporter}, + All: {QueryFrontend, Querier, Ingester, Distributor, StoreGateway, Ruler, Compactor}, } for mod, targets := range deps { if err := mm.AddDependency(mod, targets...); err != nil { diff --git a/pkg/storage/ingest/fetcher_test.go b/pkg/storage/ingest/fetcher_test.go index 34fe74dd89c..233f809b0ef 100644 --- a/pkg/storage/ingest/fetcher_test.go +++ b/pkg/storage/ingest/fetcher_test.go @@ -725,7 +725,7 @@ func TestConcurrentFetchers(t *testing.T) { logger := log.NewNopLogger() reg := prometheus.NewPedanticRegistry() - metrics := newReaderMetrics(partitionID, reg, noopReaderMetricsSource{}) + metrics := newReaderMetrics(partitionID, reg, noopReaderMetricsSource{}, topicName) client := newKafkaProduceClient(t, clusterAddr) @@ -1151,7 +1151,7 @@ func createConcurrentFetchers(ctx context.Context, t *testing.T, client *kgo.Cli logger := testingLogger.WithT(t) reg := prometheus.NewPedanticRegistry() - metrics := newReaderMetrics(partition, reg, noopReaderMetricsSource{}) + metrics := newReaderMetrics(partition, reg, noopReaderMetricsSource{}, topic) // This instantiates the fields of kprom. // This is usually done by franz-go, but since now we use the metrics ourselves, we need to instantiate the metrics ourselves. diff --git a/pkg/storage/ingest/partition_offset_reader.go b/pkg/storage/ingest/partition_offset_reader.go index 9a3f43112a8..2acd6892ab4 100644 --- a/pkg/storage/ingest/partition_offset_reader.go +++ b/pkg/storage/ingest/partition_offset_reader.go @@ -199,3 +199,7 @@ func (p *TopicOffsetsReader) FetchLastProducedOffset(ctx context.Context) (map[i return p.client.FetchPartitionsLastProducedOffsets(ctx, partitionIDs) } + +func (p *TopicOffsetsReader) Topic() string { + return p.topic +} diff --git a/pkg/storage/ingest/reader.go b/pkg/storage/ingest/reader.go index 4abe1932138..982d8962311 100644 --- a/pkg/storage/ingest/reader.go +++ b/pkg/storage/ingest/reader.go @@ -124,7 +124,7 @@ func newPartitionReader(kafkaCfg KafkaConfig, partitionID int32, instanceID stri reg: reg, } - r.metrics = newReaderMetrics(partitionID, reg, r) + r.metrics = newReaderMetrics(partitionID, reg, r, kafkaCfg.Topic) r.Service = services.NewBasicService(r.start, r.run, r.stop) return r, nil @@ -761,7 +761,7 @@ func (r *PartitionReader) WaitReadConsistencyUntilOffset(ctx context.Context, of } func (r *PartitionReader) waitReadConsistency(ctx context.Context, withOffset bool, getOffset func(context.Context) (int64, error)) error { - _, err := r.metrics.strongConsistencyInstrumentation.Observe(withOffset, func() (struct{}, error) { + _, err := r.metrics.strongConsistencyInstrumentation.Observe(r.kafkaCfg.Topic, withOffset, func() (struct{}, error) { spanLog := spanlogger.FromContext(ctx, r.logger) spanLog.DebugLog("msg", "waiting for read consistency") @@ -968,7 +968,7 @@ type readerMetricsSource interface { EstimatedBytesPerRecord() int64 } -func newReaderMetrics(partitionID int32, reg prometheus.Registerer, metricsSource readerMetricsSource) readerMetrics { +func newReaderMetrics(partitionID int32, reg prometheus.Registerer, metricsSource readerMetricsSource, topic string) readerMetrics { const component = "partition-reader" receiveDelay := promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{ @@ -1033,7 +1033,7 @@ func newReaderMetrics(partitionID int32, reg prometheus.Registerer, metricsSourc Help: "How long a consumer spent processing a batch of records from Kafka. This includes retries on server errors.", NativeHistogramBucketFactor: 1.1, }), - strongConsistencyInstrumentation: NewStrongReadConsistencyInstrumentation[struct{}](component, reg), + strongConsistencyInstrumentation: NewStrongReadConsistencyInstrumentation[struct{}](component, reg, []string{topic}), lastConsumedOffset: lastConsumedOffset, kprom: NewKafkaReaderClientMetrics(ReaderMetricsPrefix, component, reg), missedRecords: promauto.With(reg).NewCounter(prometheus.CounterOpts{ @@ -1051,23 +1051,23 @@ func newReaderMetrics(partitionID int32, reg prometheus.Registerer, metricsSourc type StrongReadConsistencyInstrumentation[T any] struct { requests *prometheus.CounterVec - failures prometheus.Counter - latency prometheus.Histogram + failures *prometheus.CounterVec + latency *prometheus.HistogramVec } -func NewStrongReadConsistencyInstrumentation[T any](component string, reg prometheus.Registerer) *StrongReadConsistencyInstrumentation[T] { +func NewStrongReadConsistencyInstrumentation[T any](component string, reg prometheus.Registerer, topics []string) *StrongReadConsistencyInstrumentation[T] { i := &StrongReadConsistencyInstrumentation[T]{ requests: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ Name: "cortex_ingest_storage_strong_consistency_requests_total", Help: "Total number of requests for which strong consistency has been requested. The metric distinguishes between requests with an offset specified and requests requesting to enforce strong consistency up until the last produced offset.", ConstLabels: map[string]string{"component": component}, - }, []string{"with_offset"}), - failures: promauto.With(reg).NewCounter(prometheus.CounterOpts{ + }, []string{"with_offset", "topic"}), + failures: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ Name: "cortex_ingest_storage_strong_consistency_failures_total", Help: "Total number of failures while waiting for strong consistency to be enforced.", ConstLabels: map[string]string{"component": component}, - }), - latency: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{ + }, []string{"topic"}), + latency: promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{ Name: "cortex_ingest_storage_strong_consistency_wait_duration_seconds", Help: "How long a request spent waiting for strong consistency to be guaranteed.", NativeHistogramBucketFactor: 1.1, @@ -1075,20 +1075,24 @@ func NewStrongReadConsistencyInstrumentation[T any](component string, reg promet NativeHistogramMinResetDuration: 1 * time.Hour, Buckets: prometheus.DefBuckets, ConstLabels: map[string]string{"component": component}, - }), + }, []string{"topic"}), } // Init metrics. - for _, value := range []bool{true, false} { - i.requests.WithLabelValues(strconv.FormatBool(value)) + for _, topic := range topics { + for _, value := range []bool{true, false} { + i.requests.WithLabelValues(strconv.FormatBool(value), topic) + } + i.failures.WithLabelValues(topic) + i.latency.WithLabelValues(topic) } return i } -func (i *StrongReadConsistencyInstrumentation[T]) Observe(withOffset bool, f func() (T, error)) (_ T, returnErr error) { +func (i *StrongReadConsistencyInstrumentation[T]) Observe(topic string, withOffset bool, f func() (T, error)) (_ T, returnErr error) { startTime := time.Now() - i.requests.WithLabelValues(strconv.FormatBool(withOffset)).Inc() + i.requests.WithLabelValues(strconv.FormatBool(withOffset), topic).Inc() defer func() { // Do not track failure or latency if the request was canceled (because the tracking would be incorrect). @@ -1098,10 +1102,10 @@ func (i *StrongReadConsistencyInstrumentation[T]) Observe(withOffset bool, f fun // Track latency for failures too, so that we have a better measurement of latency if // backend latency is high and requests fail because of timeouts. - i.latency.Observe(time.Since(startTime).Seconds()) + i.latency.WithLabelValues(topic).Observe(time.Since(startTime).Seconds()) if returnErr != nil { - i.failures.Inc() + i.failures.WithLabelValues(topic).Inc() } }() diff --git a/pkg/storage/ingest/reader_test.go b/pkg/storage/ingest/reader_test.go index 22cef17a544..c6fc2b896cf 100644 --- a/pkg/storage/ingest/reader_test.go +++ b/pkg/storage/ingest/reader_test.go @@ -312,13 +312,13 @@ func TestPartitionReader_WaitReadConsistencyUntilLastProducedOffset_And_WaitRead assert.NoError(t, promtest.GatherAndCompare(reg, strings.NewReader(fmt.Sprintf(` # HELP cortex_ingest_storage_strong_consistency_requests_total Total number of requests for which strong consistency has been requested. The metric distinguishes between requests with an offset specified and requests requesting to enforce strong consistency up until the last produced offset. # TYPE cortex_ingest_storage_strong_consistency_requests_total counter - cortex_ingest_storage_strong_consistency_requests_total{component="partition-reader", with_offset="%t"} 1 - cortex_ingest_storage_strong_consistency_requests_total{component="partition-reader", with_offset="%t"} 0 + cortex_ingest_storage_strong_consistency_requests_total{component="partition-reader", topic="%s", with_offset="%t"} 1 + cortex_ingest_storage_strong_consistency_requests_total{component="partition-reader", topic="%s", with_offset="%t"} 0 # HELP cortex_ingest_storage_strong_consistency_failures_total Total number of failures while waiting for strong consistency to be enforced. # TYPE cortex_ingest_storage_strong_consistency_failures_total counter - cortex_ingest_storage_strong_consistency_failures_total{component="partition-reader"} 0 - `, withOffset, !withOffset)), + cortex_ingest_storage_strong_consistency_failures_total{component="partition-reader", topic="%s"} 0 + `, topicName, withOffset, topicName, !withOffset, topicName)), "cortex_ingest_storage_strong_consistency_requests_total", "cortex_ingest_storage_strong_consistency_failures_total")) }) @@ -362,13 +362,13 @@ func TestPartitionReader_WaitReadConsistencyUntilLastProducedOffset_And_WaitRead assert.NoError(t, promtest.GatherAndCompare(reg, strings.NewReader(fmt.Sprintf(` # HELP cortex_ingest_storage_strong_consistency_requests_total Total number of requests for which strong consistency has been requested. The metric distinguishes between requests with an offset specified and requests requesting to enforce strong consistency up until the last produced offset. # TYPE cortex_ingest_storage_strong_consistency_requests_total counter - cortex_ingest_storage_strong_consistency_requests_total{component="partition-reader", with_offset="%t"} 1 - cortex_ingest_storage_strong_consistency_requests_total{component="partition-reader", with_offset="%t"} 0 + cortex_ingest_storage_strong_consistency_requests_total{component="partition-reader", topic="%s", with_offset="%t"} 1 + cortex_ingest_storage_strong_consistency_requests_total{component="partition-reader", topic="%s", with_offset="%t"} 0 # HELP cortex_ingest_storage_strong_consistency_failures_total Total number of failures while waiting for strong consistency to be enforced. # TYPE cortex_ingest_storage_strong_consistency_failures_total counter - cortex_ingest_storage_strong_consistency_failures_total{component="partition-reader"} 1 - `, withOffset, !withOffset)), + cortex_ingest_storage_strong_consistency_failures_total{component="partition-reader", topic="%s"} 1 + `, topicName, withOffset, topicName, !withOffset, topicName)), "cortex_ingest_storage_strong_consistency_requests_total", "cortex_ingest_storage_strong_consistency_failures_total")) @@ -415,13 +415,13 @@ func TestPartitionReader_WaitReadConsistencyUntilLastProducedOffset_And_WaitRead assert.NoError(t, promtest.GatherAndCompare(reg, strings.NewReader(fmt.Sprintf(` # HELP cortex_ingest_storage_strong_consistency_requests_total Total number of requests for which strong consistency has been requested. The metric distinguishes between requests with an offset specified and requests requesting to enforce strong consistency up until the last produced offset. # TYPE cortex_ingest_storage_strong_consistency_requests_total counter - cortex_ingest_storage_strong_consistency_requests_total{component="partition-reader", with_offset="%t"} 1 - cortex_ingest_storage_strong_consistency_requests_total{component="partition-reader", with_offset="%t"} 0 + cortex_ingest_storage_strong_consistency_requests_total{component="partition-reader", topic="%s", with_offset="%t"} 1 + cortex_ingest_storage_strong_consistency_requests_total{component="partition-reader", topic="%s", with_offset="%t"} 0 # HELP cortex_ingest_storage_strong_consistency_failures_total Total number of failures while waiting for strong consistency to be enforced. # TYPE cortex_ingest_storage_strong_consistency_failures_total counter - cortex_ingest_storage_strong_consistency_failures_total{component="partition-reader"} 1 - `, withOffset, !withOffset)), + cortex_ingest_storage_strong_consistency_failures_total{component="partition-reader", topic="%s"} 1 + `, topicName, withOffset, topicName, !withOffset, topicName)), "cortex_ingest_storage_strong_consistency_requests_total", "cortex_ingest_storage_strong_consistency_failures_total")) @@ -452,13 +452,13 @@ func TestPartitionReader_WaitReadConsistencyUntilLastProducedOffset_And_WaitRead assert.NoError(t, promtest.GatherAndCompare(reg, strings.NewReader(fmt.Sprintf(` # HELP cortex_ingest_storage_strong_consistency_requests_total Total number of requests for which strong consistency has been requested. The metric distinguishes between requests with an offset specified and requests requesting to enforce strong consistency up until the last produced offset. # TYPE cortex_ingest_storage_strong_consistency_requests_total counter - cortex_ingest_storage_strong_consistency_requests_total{component="partition-reader", with_offset="%t"} 1 - cortex_ingest_storage_strong_consistency_requests_total{component="partition-reader", with_offset="%t"} 0 + cortex_ingest_storage_strong_consistency_requests_total{component="partition-reader", topic="%s", with_offset="%t"} 1 + cortex_ingest_storage_strong_consistency_requests_total{component="partition-reader", topic="%s", with_offset="%t"} 0 # HELP cortex_ingest_storage_strong_consistency_failures_total Total number of failures while waiting for strong consistency to be enforced. # TYPE cortex_ingest_storage_strong_consistency_failures_total counter - cortex_ingest_storage_strong_consistency_failures_total{component="partition-reader"} 0 - `, withOffset, !withOffset)), + cortex_ingest_storage_strong_consistency_failures_total{component="partition-reader", topic="%s"} 0 + `, topicName, withOffset, topicName, !withOffset, topicName)), "cortex_ingest_storage_strong_consistency_requests_total", "cortex_ingest_storage_strong_consistency_failures_total")) }) @@ -489,13 +489,13 @@ func TestPartitionReader_WaitReadConsistencyUntilLastProducedOffset_And_WaitRead assert.NoError(t, promtest.GatherAndCompare(reg, strings.NewReader(fmt.Sprintf(` # HELP cortex_ingest_storage_strong_consistency_requests_total Total number of requests for which strong consistency has been requested. The metric distinguishes between requests with an offset specified and requests requesting to enforce strong consistency up until the last produced offset. # TYPE cortex_ingest_storage_strong_consistency_requests_total counter - cortex_ingest_storage_strong_consistency_requests_total{component="partition-reader", with_offset="%t"} 1 - cortex_ingest_storage_strong_consistency_requests_total{component="partition-reader", with_offset="%t"} 0 + cortex_ingest_storage_strong_consistency_requests_total{component="partition-reader", topic="%s", with_offset="%t"} 1 + cortex_ingest_storage_strong_consistency_requests_total{component="partition-reader", topic="%s", with_offset="%t"} 0 # HELP cortex_ingest_storage_strong_consistency_failures_total Total number of failures while waiting for strong consistency to be enforced. # TYPE cortex_ingest_storage_strong_consistency_failures_total counter - cortex_ingest_storage_strong_consistency_failures_total{component="partition-reader"} 1 - `, withOffset, !withOffset)), + cortex_ingest_storage_strong_consistency_failures_total{component="partition-reader", topic="%s"} 1 + `, topicName, withOffset, topicName, !withOffset, topicName)), "cortex_ingest_storage_strong_consistency_requests_total", "cortex_ingest_storage_strong_consistency_failures_total")) })