diff --git a/CHANGELOG.md b/CHANGELOG.md index 42a051a3e7c..a323b244ba5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -46,6 +46,7 @@ * [ENHANCEMENT] Speedup DistinctValue collector and exit early for ingesters [#4104](https://github.com/grafana/tempo/pull/4104) (@electron0zero) * [ENHANCEMENT] Add disk caching in ingester SearchTagValuesV2 for completed blocks [#4069](https://github.com/grafana/tempo/pull/4069) (@electron0zero) * [ENHANCEMENT] Add a max flush attempts and metric to the metrics generator [#4254](https://github.com/grafana/tempo/pull/4254) (@joe-elliott) +* [ENHANCEMENT] Collection of query-frontend changes to reduce allocs. [#4242]https://github.com/grafana/tempo/pull/4242 (@joe-elliott) * [BUGFIX] Replace hedged requests roundtrips total with a counter. [#4063](https://github.com/grafana/tempo/pull/4063) [#4078](https://github.com/grafana/tempo/pull/4078) (@galalen) * [BUGFIX] Metrics generators: Correctly drop from the ring before stopping ingestion to reduce drops during a rollout. [#4101](https://github.com/grafana/tempo/pull/4101) (@joe-elliott) * [BUGFIX] Correctly handle 400 Bad Request and 404 Not Found in gRPC streaming [#4144](https://github.com/grafana/tempo/pull/4144) (@mapno) diff --git a/integration/e2e/api_test.go b/integration/e2e/api_test.go index 7633f0e74bd..aa8497491a8 100644 --- a/integration/e2e/api_test.go +++ b/integration/e2e/api_test.go @@ -627,7 +627,7 @@ func callSearchTagValuesV2AndAssert(t *testing.T, svc *e2e.HTTPService, tagName, require.Equal(t, expected.TagValues, actualGrpcResp.TagValues) // assert metrics, and make sure it's non-zero when response is non-empty if len(grpcResp.TagValues) > 0 { - require.Greater(t, grpcResp.Metrics.InspectedBytes, uint64(100)) + require.Greater(t, grpcResp.Metrics.InspectedBytes, uint64(0)) } } diff --git a/modules/frontend/combiner/common.go b/modules/frontend/combiner/common.go index 8e687406675..7b73a93ac36 100644 --- a/modules/frontend/combiner/common.go +++ b/modules/frontend/combiner/common.go @@ -7,6 +7,8 @@ import ( "strings" "sync" + tempo_io "github.com/grafana/tempo/pkg/io" + "github.com/gogo/protobuf/jsonpb" "github.com/gogo/protobuf/proto" "github.com/gogo/status" @@ -90,7 +92,7 @@ func (c *genericCombiner[T]) AddResponse(r PipelineResponse) error { switch res.Header.Get(api.HeaderContentType) { case api.HeaderAcceptProtobuf: - b, err := io.ReadAll(res.Body) + b, err := tempo_io.ReadAllWithEstimate(res.Body, res.ContentLength) if err != nil { return fmt.Errorf("error reading response body") } diff --git a/modules/frontend/combiner/trace_by_id.go b/modules/frontend/combiner/trace_by_id.go index dfdd886f431..27ba0948c38 100644 --- a/modules/frontend/combiner/trace_by_id.go +++ b/modules/frontend/combiner/trace_by_id.go @@ -11,6 +11,7 @@ import ( "github.com/gogo/protobuf/proto" "github.com/grafana/tempo/pkg/api" + tempo_io "github.com/grafana/tempo/pkg/io" "github.com/grafana/tempo/pkg/model/trace" "github.com/grafana/tempo/pkg/tempopb" ) @@ -67,7 +68,7 @@ func (c *traceByIDCombiner) AddResponse(r PipelineResponse) error { } // Read the body - buff, err := io.ReadAll(res.Body) + buff, err := tempo_io.ReadAllWithEstimate(res.Body, res.ContentLength) if err != nil { c.statusMessage = internalErrorMsg return fmt.Errorf("error reading response body: %w", err) diff --git a/modules/frontend/frontend.go b/modules/frontend/frontend.go index d4f73276f38..f7d856030c8 100644 --- a/modules/frontend/frontend.go +++ b/modules/frontend/frontend.go @@ -269,7 +269,7 @@ func newMetricsSummaryHandler(next pipeline.AsyncRoundTripper[combiner.PipelineR resp, _, err := resps.Next(req.Context()) // metrics path will only ever have one response level.Info(logger).Log( - "msg", "search tag response", + "msg", "metrics summary response", "tenant", tenant, "path", req.URL.Path, "err", err) @@ -278,6 +278,34 @@ func newMetricsSummaryHandler(next pipeline.AsyncRoundTripper[combiner.PipelineR }) } +// cloneRequestforQueriers returns a cloned pipeline.Request from the passed pipeline.Request ready for queriers. The caller is given an opportunity +// to modify the internal http.Request before it is returned using the modHTTP param. If modHTTP is nil, the internal http.Request is returned. +func cloneRequestforQueriers(parent pipeline.Request, tenant string, modHTTP func(*http.Request) (*http.Request, error)) (pipeline.Request, error) { + // first clone the http request with headers nil'ed out. this prevents the headers from being copied saving allocs + // here and especially downstream in the httpgrpc bridge. prepareRequestForQueriers will add the only headers that + // the queriers actually need. + req := parent.HTTPRequest() + saveHeaders := req.Header + req.Header = nil + clonedHTTPReq := req.Clone(req.Context()) + + req.Header = saveHeaders + clonedHTTPReq.Header = make(http.Header, 2) // cheating here. alloc 2 b/c we know that's how many headers prepareRequestForQueriers will add + + // give the caller a chance to modify the internal http request + if modHTTP != nil { + var err error + clonedHTTPReq, err = modHTTP(clonedHTTPReq) + if err != nil { + return nil, err + } + } + + prepareRequestForQueriers(clonedHTTPReq, tenant) + + return parent.CloneFromHTTPRequest(clonedHTTPReq), nil +} + // prepareRequestForQueriers modifies the request so they will be farmed correctly to the queriers // - adds the tenant header // - sets the requesturi (see below for details) diff --git a/modules/frontend/metrics_query_range_handler.go b/modules/frontend/metrics_query_range_handler.go index 4c0ed89afe1..54ffc29ff67 100644 --- a/modules/frontend/metrics_query_range_handler.go +++ b/modules/frontend/metrics_query_range_handler.go @@ -113,7 +113,7 @@ func newMetricsQueryRangeHTTPHandler(cfg Config, next pipeline.AsyncRoundTripper func logQueryRangeResult(logger log.Logger, tenantID string, durationSeconds float64, req *tempopb.QueryRangeRequest, resp *tempopb.QueryRangeResponse, err error) { if resp == nil { level.Info(logger).Log( - "msg", "query range results - no resp", + "msg", "query range response - no resp", "tenant", tenantID, "duration_seconds", durationSeconds, "error", err) @@ -123,7 +123,7 @@ func logQueryRangeResult(logger log.Logger, tenantID string, durationSeconds flo if resp.Metrics == nil { level.Info(logger).Log( - "msg", "query range results - no metrics", + "msg", "query range response - no metrics", "tenant", tenantID, "query", req.Query, "range_nanos", req.End-req.Start, @@ -133,7 +133,7 @@ func logQueryRangeResult(logger log.Logger, tenantID string, durationSeconds flo } level.Info(logger).Log( - "msg", "query range results", + "msg", "query range response", "tenant", tenantID, "query", req.Query, "range_nanos", req.End-req.Start, diff --git a/modules/frontend/metrics_query_range_sharder.go b/modules/frontend/metrics_query_range_sharder.go index 76c66500de4..bfffbf21bf7 100644 --- a/modules/frontend/metrics_query_range_sharder.go +++ b/modules/frontend/metrics_query_range_sharder.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "math" + "net/http" "time" "github.com/go-kit/log" //nolint:all deprecated @@ -113,7 +114,7 @@ func (s queryRangeSharder) RoundTrip(pipelineRequest pipeline.Request) (pipeline cutoff = time.Now().Add(-s.cfg.QueryBackendAfter) ) - generatorReq := s.generatorRequest(ctx, tenantID, pipelineRequest, *req, cutoff) + generatorReq := s.generatorRequest(tenantID, pipelineRequest, *req, cutoff) reqCh := make(chan pipeline.Request, 2) // buffer of 2 allows us to insert generatorReq and metrics if generatorReq != nil { @@ -243,15 +244,13 @@ func (s *queryRangeSharder) buildBackendRequests(ctx context.Context, tenantID s exemplars = max(uint32(float64(exemplars)*float64(m.TotalRecords)/float64(pages)), 1) } - for startPage := 0; startPage < int(m.TotalRecords); startPage += pages { - subR := parent.HTTPRequest().Clone(ctx) - - dedColsJSON, err := colsToJSON.JSONForDedicatedColumns(m.DedicatedColumns) - if err != nil { - // errFn(fmt.Errorf("failed to convert dedicated columns. block: %s tempopb: %w", blockID, err)) - continue - } + dedColsJSON, err := colsToJSON.JSONForDedicatedColumns(m.DedicatedColumns) + if err != nil { + _ = level.Error(s.logger).Log("msg", "failed to convert dedicated columns in query range sharder. skipping", "block", m.BlockID, "err", err) + continue + } + for startPage := 0; startPage < int(m.TotalRecords); startPage += pages { // Trim and align the request for this block. I.e. if the request is "Last Hour" we don't want to // cache the response for that, we want only the few minutes time range for this block. This has // size savings but the main thing is that the response is reuseable for any overlapping query. @@ -261,31 +260,34 @@ func (s *queryRangeSharder) buildBackendRequests(ctx context.Context, tenantID s continue } - queryRangeReq := &tempopb.QueryRangeRequest{ - Query: searchReq.Query, - Start: start, - End: end, - Step: step, - QueryMode: searchReq.QueryMode, - // New RF1 fields - BlockID: m.BlockID.String(), - StartPage: uint32(startPage), - PagesToSearch: uint32(pages), - Version: m.Version, - Encoding: m.Encoding.String(), - Size_: m.Size_, - FooterSize: m.FooterSize, - // DedicatedColumns: dc, for perf reason we pass dedicated columns json in directly to not have to realloc object -> proto -> json - Exemplars: exemplars, + pipelineR, err := cloneRequestforQueriers(parent, tenantID, func(r *http.Request) (*http.Request, error) { + queryRangeReq := &tempopb.QueryRangeRequest{ + Query: searchReq.Query, + Start: start, + End: end, + Step: step, + QueryMode: searchReq.QueryMode, + // New RF1 fields + BlockID: m.BlockID.String(), + StartPage: uint32(startPage), + PagesToSearch: uint32(pages), + Version: m.Version, + Encoding: m.Encoding.String(), + Size_: m.Size_, + FooterSize: m.FooterSize, + // DedicatedColumns: dc, for perf reason we pass dedicated columns json in directly to not have to realloc object -> proto -> json + Exemplars: exemplars, + } + + return api.BuildQueryRangeRequest(r, queryRangeReq, dedColsJSON), nil + }) + if err != nil { + _ = level.Error(s.logger).Log("msg", "failed to cloneRequestForQuerirs in the query range sharder. skipping", "block", m.BlockID, "err", err) + continue } - subR = api.BuildQueryRangeRequest(subR, queryRangeReq, dedColsJSON) - - prepareRequestForQueriers(subR, tenantID) - pipelineR := parent.CloneFromHTTPRequest(subR) - // TODO: Handle sampling rate - key := queryRangeCacheKey(tenantID, queryHash, int64(queryRangeReq.Start), int64(queryRangeReq.End), m, int(queryRangeReq.StartPage), int(queryRangeReq.PagesToSearch)) + key := queryRangeCacheKey(tenantID, queryHash, int64(start), int64(end), m, int(step), pages) if len(key) > 0 { pipelineR.SetCacheKey(key) } @@ -306,7 +308,7 @@ func max(a, b uint32) uint32 { return b } -func (s *queryRangeSharder) generatorRequest(ctx context.Context, tenantID string, parent pipeline.Request, searchReq tempopb.QueryRangeRequest, cutoff time.Time) *pipeline.HTTPRequest { +func (s *queryRangeSharder) generatorRequest(tenantID string, parent pipeline.Request, searchReq tempopb.QueryRangeRequest, cutoff time.Time) pipeline.Request { traceql.TrimToAfter(&searchReq, cutoff) // if start == end then we don't need to query it if searchReq.Start == searchReq.End { @@ -315,12 +317,11 @@ func (s *queryRangeSharder) generatorRequest(ctx context.Context, tenantID strin searchReq.QueryMode = querier.QueryModeRecent - subR := parent.HTTPRequest().Clone(ctx) - subR = api.BuildQueryRangeRequest(subR, &searchReq, "") // dedicated cols are never passed to the generators - - prepareRequestForQueriers(subR, tenantID) + subR, _ := cloneRequestforQueriers(parent, tenantID, func(r *http.Request) (*http.Request, error) { + return api.BuildQueryRangeRequest(r, &searchReq, ""), nil + }) - return parent.CloneFromHTTPRequest(subR) + return subR } // maxDuration returns the max search duration allowed for this tenant. diff --git a/modules/frontend/pipeline/pipeline.go b/modules/frontend/pipeline/pipeline.go index 920cd4eb203..0dc9bae839e 100644 --- a/modules/frontend/pipeline/pipeline.go +++ b/modules/frontend/pipeline/pipeline.go @@ -13,7 +13,9 @@ var tracer = otel.Tracer("modules/frontend/pipeline") type Request interface { HTTPRequest() *http.Request Context() context.Context + WithContext(context.Context) + CloneFromHTTPRequest(request *http.Request) Request Weight() int SetWeight(int) @@ -23,7 +25,6 @@ type Request interface { SetResponseData(any) // add data that will be sent back with this requests response ResponseData() any - CloneFromHTTPRequest(request *http.Request) *HTTPRequest } type HTTPRequest struct { @@ -78,8 +79,13 @@ func (r *HTTPRequest) SetWeight(w int) { r.weight = w } -func (r *HTTPRequest) CloneFromHTTPRequest(request *http.Request) *HTTPRequest { - return &HTTPRequest{req: request, weight: r.weight} +func (r *HTTPRequest) CloneFromHTTPRequest(request *http.Request) Request { + return &HTTPRequest{ + req: request, + weight: r.weight, + cacheKey: r.cacheKey, + responseData: r.responseData, + } } // diff --git a/modules/frontend/search_handlers.go b/modules/frontend/search_handlers.go index 35875832016..1675271a27f 100644 --- a/modules/frontend/search_handlers.go +++ b/modules/frontend/search_handlers.go @@ -145,7 +145,7 @@ func logResult(logger log.Logger, tenantID string, durationSeconds float64, req if resp == nil { level.Info(logger).Log( - "msg", "search results - no resp", + "msg", "search response - no resp", "tenant", tenantID, "duration_seconds", durationSeconds, "status_code", statusCode, @@ -156,7 +156,7 @@ func logResult(logger log.Logger, tenantID string, durationSeconds float64, req if resp.Metrics == nil { level.Info(logger).Log( - "msg", "search results - no metrics", + "msg", "search response - no metrics", "tenant", tenantID, "query", req.Query, "range_seconds", req.End-req.Start, @@ -167,7 +167,7 @@ func logResult(logger log.Logger, tenantID string, durationSeconds float64, req } level.Info(logger).Log( - "msg", "search results", + "msg", "search response", "tenant", tenantID, "query", req.Query, "range_seconds", req.End-req.Start, diff --git a/modules/frontend/search_sharder.go b/modules/frontend/search_sharder.go index a8e1df35ec8..ddad508d30f 100644 --- a/modules/frontend/search_sharder.go +++ b/modules/frontend/search_sharder.go @@ -3,6 +3,7 @@ package frontend import ( "context" "fmt" + "net/http" "time" "github.com/go-kit/log" //nolint:all deprecated @@ -95,7 +96,7 @@ func (s asyncSearchSharder) RoundTrip(pipelineRequest pipeline.Request) (pipelin // build request to search ingesters based on query_ingesters_until config and time range // pass subCtx in requests so we can cancel and exit early - err = s.ingesterRequests(ctx, tenantID, pipelineRequest, *searchReq, reqCh) + err = s.ingesterRequests(tenantID, pipelineRequest, *searchReq, reqCh) if err != nil { return nil, err } @@ -199,10 +200,10 @@ func (s *asyncSearchSharder) backendRequests(ctx context.Context, tenantID strin // that covers the ingesters. If nil is returned for the http.Request then there is no ingesters query. // since this function modifies searchReq.Start and End we are taking a value instead of a pointer to prevent it from // unexpectedly changing the passed searchReq. -func (s *asyncSearchSharder) ingesterRequests(ctx context.Context, tenantID string, parent pipeline.Request, searchReq tempopb.SearchRequest, reqCh chan pipeline.Request) error { +func (s *asyncSearchSharder) ingesterRequests(tenantID string, parent pipeline.Request, searchReq tempopb.SearchRequest, reqCh chan pipeline.Request) error { // request without start or end, search only in ingester if searchReq.Start == 0 || searchReq.End == 0 { - return buildIngesterRequest(ctx, tenantID, parent, &searchReq, reqCh) + return buildIngesterRequest(tenantID, parent, &searchReq, reqCh) } ingesterUntil := uint32(time.Now().Add(-s.cfg.QueryIngestersUntil).Unix()) @@ -257,7 +258,7 @@ func (s *asyncSearchSharder) ingesterRequests(ctx context.Context, tenantID stri subReq.Start = shardStart subReq.End = shardEnd - err := buildIngesterRequest(ctx, tenantID, parent, &subReq, reqCh) + err := buildIngesterRequest(tenantID, parent, &subReq, reqCh) if err != nil { return err } @@ -310,36 +311,37 @@ func buildBackendRequests(ctx context.Context, tenantID string, parent pipeline. } blockID := m.BlockID.String() - for startPage := 0; startPage < int(m.TotalRecords); startPage += pages { - subR := parent.HTTPRequest().Clone(ctx) - dedColsJSON, err := colsToJSON.JSONForDedicatedColumns(m.DedicatedColumns) - if err != nil { - errFn(fmt.Errorf("failed to convert dedicated columns. block: %s tempopb: %w", blockID, err)) - continue - } + dedColsJSON, err := colsToJSON.JSONForDedicatedColumns(m.DedicatedColumns) + if err != nil { + errFn(fmt.Errorf("failed to convert dedicated columns. block: %s tempopb: %w", blockID, err)) + continue + } - subR, err = api.BuildSearchBlockRequest(subR, &tempopb.SearchBlockRequest{ - BlockID: blockID, - StartPage: uint32(startPage), - PagesToSearch: uint32(pages), - Encoding: m.Encoding.String(), - IndexPageSize: m.IndexPageSize, - TotalRecords: m.TotalRecords, - DataEncoding: m.DataEncoding, - Version: m.Version, - Size_: m.Size_, - FooterSize: m.FooterSize, - // DedicatedColumns: dc, for perf reason we pass dedicated columns json in directly to not have to realloc object -> proto -> json - }, dedColsJSON) + for startPage := 0; startPage < int(m.TotalRecords); startPage += pages { + pipelineR, err := cloneRequestforQueriers(parent, tenantID, func(r *http.Request) (*http.Request, error) { + r, err = api.BuildSearchBlockRequest(r, &tempopb.SearchBlockRequest{ + BlockID: blockID, + StartPage: uint32(startPage), + PagesToSearch: uint32(pages), + Encoding: m.Encoding.String(), + IndexPageSize: m.IndexPageSize, + TotalRecords: m.TotalRecords, + DataEncoding: m.DataEncoding, + Version: m.Version, + Size_: m.Size_, + FooterSize: m.FooterSize, + // DedicatedColumns: dc, for perf reason we pass dedicated columns json in directly to not have to realloc object -> proto -> json + }, dedColsJSON) + + return r, err + }) if err != nil { errFn(fmt.Errorf("failed to build search block request. block: %s tempopb: %w", blockID, err)) continue } - prepareRequestForQueriers(subR, tenantID) key := searchJobCacheKey(tenantID, queryHash, int64(searchReq.Start), int64(searchReq.End), m, startPage, pages) - pipelineR := parent.CloneFromHTTPRequest(subR) pipelineR.SetCacheKey(key) select { @@ -396,14 +398,14 @@ func pagesPerRequest(m *backend.BlockMeta, bytesPerRequest int) int { return pagesPerQuery } -func buildIngesterRequest(ctx context.Context, tenantID string, parent pipeline.Request, searchReq *tempopb.SearchRequest, reqCh chan pipeline.Request) error { - subR := parent.HTTPRequest().Clone(ctx) - subR, err := api.BuildSearchRequest(subR, searchReq) +func buildIngesterRequest(tenantID string, parent pipeline.Request, searchReq *tempopb.SearchRequest, reqCh chan pipeline.Request) error { + subR, err := cloneRequestforQueriers(parent, tenantID, func(r *http.Request) (*http.Request, error) { + return api.BuildSearchRequest(r, searchReq) + }) if err != nil { return err } - prepareRequestForQueriers(subR, tenantID) - reqCh <- parent.CloneFromHTTPRequest(subR) + reqCh <- subR return nil } diff --git a/modules/frontend/search_sharder_test.go b/modules/frontend/search_sharder_test.go index ed8eb3abe6f..3a8730d083c 100644 --- a/modules/frontend/search_sharder_test.go +++ b/modules/frontend/search_sharder_test.go @@ -494,7 +494,7 @@ func TestIngesterRequests(t *testing.T) { pr := pipeline.NewHTTPRequest(req) pr.SetWeight(2) - err = s.ingesterRequests(context.Background(), "test", pr, *searchReq, reqChan) + err = s.ingesterRequests("test", pr, *searchReq, reqChan) if tc.expectedError != nil { assert.Equal(t, tc.expectedError, err) continue diff --git a/modules/frontend/tag_handlers.go b/modules/frontend/tag_handlers.go index b61bc4beb8f..404009a48ca 100644 --- a/modules/frontend/tag_handlers.go +++ b/modules/frontend/tag_handlers.go @@ -45,7 +45,6 @@ func newTagsStreamingGRPCHandler(cfg Config, next pipeline.AsyncRoundTripper[com if err != nil { return err } - prepareRequestForQueriers(httpReq, tenant) var finalResponse *tempopb.SearchTagsResponse comb := combiner.NewTypedSearchTags(o.MaxBytesPerTagValuesQuery(tenant)) @@ -79,7 +78,6 @@ func newTagsV2StreamingGRPCHandler(cfg Config, next pipeline.AsyncRoundTripper[c if err != nil { return err } - prepareRequestForQueriers(httpReq, tenant) var finalResponse *tempopb.SearchTagsV2Response comb := combiner.NewTypedSearchTagsV2(o.MaxBytesPerTagValuesQuery(tenant)) @@ -117,7 +115,6 @@ func newTagValuesStreamingGRPCHandler(cfg Config, next pipeline.AsyncRoundTrippe if err != nil { return err } - prepareRequestForQueriers(httpReq, tenant) var finalResponse *tempopb.SearchTagValuesResponse comb := combiner.NewTypedSearchTagValues(o.MaxBytesPerTagValuesQuery(tenant)) @@ -155,7 +152,6 @@ func newTagValuesV2StreamingGRPCHandler(cfg Config, next pipeline.AsyncRoundTrip if err != nil { return err } - prepareRequestForQueriers(httpReq, tenant) var finalResponse *tempopb.SearchTagValuesV2Response comb := combiner.NewTypedSearchTagValuesV2(o.MaxBytesPerTagValuesQuery(tenant)) @@ -394,7 +390,7 @@ func logTagsRequest(logger log.Logger, tenantID, handler, scope string, rangeSec func logTagsResult(logger log.Logger, tenantID, handler, scope string, rangeSeconds uint32, durationSeconds float64, inspectedBytes uint64, err error) { level.Info(logger).Log( - "msg", "search tag results", + "msg", "search tag response", "tenant", tenantID, "handler", handler, "scope", scope, @@ -417,7 +413,7 @@ func logTagValuesRequest(logger log.Logger, tenantID, handler, tagName, query st func logTagValuesResult(logger log.Logger, tenantID, handler, tagName, query string, rangeSeconds uint32, durationSeconds float64, inspectedBytes uint64, err error) { level.Info(logger).Log( - "msg", "search tag values results", + "msg", "search tag values response", "tenant", tenantID, "handler", handler, "tag", tagName, diff --git a/modules/frontend/tag_sharder.go b/modules/frontend/tag_sharder.go index b3862ba889d..55f1f1e7942 100644 --- a/modules/frontend/tag_sharder.go +++ b/modules/frontend/tag_sharder.go @@ -190,9 +190,9 @@ func newAsyncTagSharder(reader tempodb.Reader, o overrides.Interface, cfg Search // until limit results are found func (s searchTagSharder) RoundTrip(pipelineRequest pipeline.Request) (pipeline.Responses[combiner.PipelineResponse], error) { r := pipelineRequest.HTTPRequest() - requestCtx := r.Context() + ctx := pipelineRequest.Context() - tenantID, err := user.ExtractOrgID(requestCtx) + tenantID, err := user.ExtractOrgID(ctx) if err != nil { return pipeline.NewBadRequest(err), nil } @@ -201,8 +201,9 @@ func (s searchTagSharder) RoundTrip(pipelineRequest pipeline.Request) (pipeline. if err != nil { return pipeline.NewBadRequest(err), nil } - ctx, span := tracer.Start(requestCtx, "frontend.ShardSearchTags") + ctx, span := tracer.Start(ctx, "frontend.ShardSearchTags") defer span.End() + pipelineRequest.WithContext(ctx) // calculate and enforce max search duration maxDuration := s.maxDuration(tenantID) @@ -213,7 +214,7 @@ func (s searchTagSharder) RoundTrip(pipelineRequest pipeline.Request) (pipeline. // build request to search ingester based on query_ingesters_until config and time range // pass subCtx in requests, so we can cancel and exit early - ingesterReq, err := s.ingesterRequest(ctx, tenantID, pipelineRequest, searchReq) + ingesterReq, err := s.ingesterRequest(tenantID, pipelineRequest, searchReq) if err != nil { return nil, err } @@ -223,7 +224,7 @@ func (s searchTagSharder) RoundTrip(pipelineRequest pipeline.Request) (pipeline. reqCh <- ingesterReq } - s.backendRequests(ctx, tenantID, r, searchReq, reqCh, func(err error) { + s.backendRequests(ctx, tenantID, pipelineRequest, searchReq, reqCh, func(err error) { // todo: actually find a way to return this error to the user s.logger.Log("msg", "failed to build backend requests", "err", err) }) @@ -252,7 +253,7 @@ func (s searchTagSharder) blockMetas(start, end int64, tenantID string) []*backe // backendRequest builds backend requests to search backend blocks. backendRequest takes ownership of reqCh and closes it. // it returns 3 int values: totalBlocks, totalBlockBytes, and estimated jobs -func (s searchTagSharder) backendRequests(ctx context.Context, tenantID string, parent *http.Request, searchReq tagSearchReq, reqCh chan<- pipeline.Request, errFn func(error)) { +func (s searchTagSharder) backendRequests(ctx context.Context, tenantID string, parent pipeline.Request, searchReq tagSearchReq, reqCh chan<- pipeline.Request, errFn func(error)) { var blocks []*backend.BlockMeta // request without start or end, search only in ingester @@ -282,7 +283,7 @@ func (s searchTagSharder) backendRequests(ctx context.Context, tenantID string, // buildBackendRequests returns a slice of requests that cover all blocks in the store // that are covered by start/end. -func (s searchTagSharder) buildBackendRequests(ctx context.Context, tenantID string, parent *http.Request, metas []*backend.BlockMeta, bytesPerRequest int, reqCh chan<- pipeline.Request, errFn func(error), searchReq tagSearchReq) { +func (s searchTagSharder) buildBackendRequests(ctx context.Context, tenantID string, parent pipeline.Request, metas []*backend.BlockMeta, bytesPerRequest int, reqCh chan<- pipeline.Request, errFn func(error), searchReq tagSearchReq) { defer close(reqCh) hash := searchReq.hash() @@ -296,14 +297,13 @@ func (s searchTagSharder) buildBackendRequests(ctx context.Context, tenantID str blockID := m.BlockID.String() for startPage := 0; startPage < int(m.TotalRecords); startPage += pages { - subR := parent.Clone(ctx) - subR, err := searchReq.buildTagSearchBlockRequest(subR, blockID, startPage, pages, m) + pipelineR, err := cloneRequestforQueriers(parent, tenantID, func(r *http.Request) (*http.Request, error) { + return searchReq.buildTagSearchBlockRequest(r, blockID, startPage, pages, m) + }) if err != nil { errFn(err) - return + continue } - prepareRequestForQueriers(subR, tenantID) - pipelineR := pipeline.NewHTTPRequest(subR) key := cacheKey(keyPrefix, tenantID, hash, int64(searchReq.start()), int64(searchReq.end()), m, startPage, pages) pipelineR.SetCacheKey(key) @@ -321,10 +321,10 @@ func (s searchTagSharder) buildBackendRequests(ctx context.Context, tenantID str // that covers the ingesters. If nil is returned for the http.Request then there is no ingesters query. // we should do a copy of the searchReq before use this function, as it is an interface, we cannot guaranteed be passed // by value. -func (s searchTagSharder) ingesterRequest(ctx context.Context, tenantID string, parent pipeline.Request, searchReq tagSearchReq) (*pipeline.HTTPRequest, error) { +func (s searchTagSharder) ingesterRequest(tenantID string, parent pipeline.Request, searchReq tagSearchReq) (pipeline.Request, error) { // request without start or end, search only in ingester if searchReq.start() == 0 || searchReq.end() == 0 { - return s.buildIngesterRequest(ctx, tenantID, parent, searchReq) + return s.buildIngesterRequest(tenantID, parent, searchReq) } now := time.Now() @@ -349,17 +349,17 @@ func (s searchTagSharder) ingesterRequest(ctx context.Context, tenantID string, } newSearchReq := searchReq.newWithRange(ingesterStart, ingesterEnd) - return s.buildIngesterRequest(ctx, tenantID, parent, newSearchReq) + return s.buildIngesterRequest(tenantID, parent, newSearchReq) } -func (s searchTagSharder) buildIngesterRequest(ctx context.Context, tenantID string, parent pipeline.Request, searchReq tagSearchReq) (*pipeline.HTTPRequest, error) { - subR := parent.HTTPRequest().Clone(ctx) - subR, err := searchReq.buildSearchTagRequest(subR) +func (s searchTagSharder) buildIngesterRequest(tenantID string, parent pipeline.Request, searchReq tagSearchReq) (pipeline.Request, error) { + subR, err := cloneRequestforQueriers(parent, tenantID, func(r *http.Request) (*http.Request, error) { + return searchReq.buildSearchTagRequest(r) + }) if err != nil { return nil, err } - prepareRequestForQueriers(subR, tenantID) - return parent.CloneFromHTTPRequest(subR), nil + return subR, nil } // maxDuration returns the max search duration allowed for this tenant. diff --git a/modules/frontend/tag_sharder_test.go b/modules/frontend/tag_sharder_test.go index 150cd5e0dc2..e47fa87f35c 100644 --- a/modules/frontend/tag_sharder_test.go +++ b/modules/frontend/tag_sharder_test.go @@ -161,7 +161,7 @@ func TestTagsBackendRequests(t *testing.T) { req.endValue = uint32(tc.params.end) } - s.backendRequests(context.TODO(), "test", r, &req, reqCh, func(err error) { + s.backendRequests(context.TODO(), "test", pipeline.NewHTTPRequest(r), &req, reqCh, func(err error) { require.Equal(t, tc.expectedError, err) }) @@ -265,7 +265,7 @@ func TestTagsIngesterRequest(t *testing.T) { } copyReq := searchReq - actualReq, err := s.ingesterRequest(context.Background(), "test", pipelineReq, &searchReq) + actualReq, err := s.ingesterRequest("test", pipelineReq, &searchReq) if tc.expectedError != nil { assert.Equal(t, tc.expectedError, err) continue diff --git a/modules/frontend/traceid_sharder.go b/modules/frontend/traceid_sharder.go index 21bdfa47d24..cb17c09c12a 100644 --- a/modules/frontend/traceid_sharder.go +++ b/modules/frontend/traceid_sharder.go @@ -1,13 +1,12 @@ package frontend import ( - "context" "encoding/hex" "net/http" "github.com/go-kit/log" //nolint:all //deprecated - "github.com/grafana/dskit/user" + "github.com/grafana/dskit/user" "github.com/grafana/tempo/modules/frontend/combiner" "github.com/grafana/tempo/modules/frontend/pipeline" "github.com/grafana/tempo/modules/querier" @@ -40,13 +39,11 @@ func newAsyncTraceIDSharder(cfg *TraceByIDConfig, logger log.Logger) pipeline.As // RoundTrip implements http.RoundTripper func (s asyncTraceSharder) RoundTrip(pipelineRequest pipeline.Request) (pipeline.Responses[combiner.PipelineResponse], error) { - r := pipelineRequest.HTTPRequest() - - ctx, span := tracer.Start(r.Context(), "frontend.ShardQuery") + ctx, span := tracer.Start(pipelineRequest.Context(), "frontend.ShardQuery") defer span.End() - r = r.WithContext(ctx) + pipelineRequest.WithContext(ctx) - reqs, err := s.buildShardedRequests(ctx, r) + reqs, err := s.buildShardedRequests(pipelineRequest) if err != nil { return nil, err } @@ -66,35 +63,43 @@ func (s asyncTraceSharder) RoundTrip(pipelineRequest pipeline.Request) (pipeline } return pipeline.NewAsyncSharderFunc(ctx, int(concurrentShards), len(reqs), func(i int) pipeline.Request { - pipelineReq := pipelineRequest.CloneFromHTTPRequest(reqs[i]) + pipelineReq := reqs[i] return pipelineReq }, s.next), nil } // buildShardedRequests returns a slice of requests sharded on the precalculated // block boundaries -func (s *asyncTraceSharder) buildShardedRequests(ctx context.Context, parent *http.Request) ([]*http.Request, error) { +func (s *asyncTraceSharder) buildShardedRequests(parent pipeline.Request) ([]pipeline.Request, error) { userID, err := user.ExtractOrgID(parent.Context()) if err != nil { return nil, err } - reqs := make([]*http.Request, s.cfg.QueryShards) + reqs := make([]pipeline.Request, s.cfg.QueryShards) params := map[string]string{} + + reqs[0], err = cloneRequestforQueriers(parent, userID, func(r *http.Request) (*http.Request, error) { + params[querier.QueryModeKey] = querier.QueryModeIngesters + return api.BuildQueryRequest(r, params), nil + }) + if err != nil { + return nil, err + } + // build sharded block queries - for i := 0; i < len(s.blockBoundaries); i++ { - reqs[i] = parent.Clone(ctx) - if i == 0 { - // ingester query - params[querier.QueryModeKey] = querier.QueryModeIngesters - } else { + for i := 1; i < len(s.blockBoundaries); i++ { + i := i // save the loop variable locally to make sure the closure grabs the correct var. + pipelineR, _ := cloneRequestforQueriers(parent, userID, func(r *http.Request) (*http.Request, error) { // block queries params[querier.BlockStartKey] = hex.EncodeToString(s.blockBoundaries[i-1]) params[querier.BlockEndKey] = hex.EncodeToString(s.blockBoundaries[i]) params[querier.QueryModeKey] = querier.QueryModeBlocks - } - reqs[i] = api.BuildQueryRequest(reqs[i], params) - prepareRequestForQueriers(reqs[i], userID) + + return api.BuildQueryRequest(r, params), nil + }) + + reqs[i] = pipelineR } return reqs, nil diff --git a/modules/frontend/traceid_sharder_test.go b/modules/frontend/traceid_sharder_test.go index 74e3339c6c2..4bc9dbca631 100644 --- a/modules/frontend/traceid_sharder_test.go +++ b/modules/frontend/traceid_sharder_test.go @@ -8,6 +8,7 @@ import ( "github.com/grafana/dskit/user" "github.com/stretchr/testify/require" + "github.com/grafana/tempo/modules/frontend/pipeline" "github.com/grafana/tempo/pkg/blockboundary" ) @@ -24,10 +25,10 @@ func TestBuildShardedRequests(t *testing.T) { ctx := user.InjectOrgID(context.Background(), "blerg") req := httptest.NewRequest("GET", "/", nil).WithContext(ctx) - shardedReqs, err := sharder.buildShardedRequests(ctx, req) + shardedReqs, err := sharder.buildShardedRequests(pipeline.NewHTTPRequest(req)) require.NoError(t, err) require.Len(t, shardedReqs, queryShards) - require.Equal(t, "/querier?mode=ingesters", shardedReqs[0].RequestURI) - urisEqual(t, []string{"/querier?blockEnd=ffffffffffffffffffffffffffffffff&blockStart=00000000000000000000000000000000&mode=blocks"}, []string{shardedReqs[1].RequestURI}) + require.Equal(t, "/querier?mode=ingesters", shardedReqs[0].HTTPRequest().RequestURI) + urisEqual(t, []string{"/querier?blockEnd=ffffffffffffffffffffffffffffffff&blockStart=00000000000000000000000000000000&mode=blocks"}, []string{shardedReqs[1].HTTPRequest().RequestURI}) } diff --git a/pkg/api/query_builder.go b/pkg/api/query_builder.go index 7a43191ffec..fdabf1ceb06 100644 --- a/pkg/api/query_builder.go +++ b/pkg/api/query_builder.go @@ -15,6 +15,7 @@ func newQueryBuilder(init string) *queryBuilder { builder: strings.Builder{}, } + qb.builder.Grow(100) // pre-allocate some space. ideally the caller could indicate roughly the expected size, but starting with 100 bytes significantly outperforms 0 qb.builder.WriteString(init) return qb }