Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Query-Frontend: Perf improvements #4242

Merged
merged 12 commits into from
Nov 1, 2024
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
* [ENHANCEMENT] Speedup DistinctValue collector and exit early for ingesters [#4104](https://github.com/grafana/tempo/pull/4104) (@electron0zero)
* [ENHANCEMENT] Add disk caching in ingester SearchTagValuesV2 for completed blocks [#4069](https://github.com/grafana/tempo/pull/4069) (@electron0zero)
* [ENHANCEMENT] Add a max flush attempts and metric to the metrics generator [#4254](https://github.com/grafana/tempo/pull/4254) (@joe-elliott)
* [ENHANCEMENT] Collection of query-frontend changes to reduce allocs. [#4242]https://github.com/grafana/tempo/pull/4242 (@joe-elliott)
* [BUGFIX] Replace hedged requests roundtrips total with a counter. [#4063](https://github.com/grafana/tempo/pull/4063) [#4078](https://github.com/grafana/tempo/pull/4078) (@galalen)
* [BUGFIX] Metrics generators: Correctly drop from the ring before stopping ingestion to reduce drops during a rollout. [#4101](https://github.com/grafana/tempo/pull/4101) (@joe-elliott)
* [BUGFIX] Correctly handle 400 Bad Request and 404 Not Found in gRPC streaming [#4144](https://github.com/grafana/tempo/pull/4144) (@mapno)
Expand Down
2 changes: 1 addition & 1 deletion integration/e2e/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -627,7 +627,7 @@ func callSearchTagValuesV2AndAssert(t *testing.T, svc *e2e.HTTPService, tagName,
require.Equal(t, expected.TagValues, actualGrpcResp.TagValues)
// assert metrics, and make sure it's non-zero when response is non-empty
if len(grpcResp.TagValues) > 0 {
require.Greater(t, grpcResp.Metrics.InspectedBytes, uint64(100))
require.Greater(t, grpcResp.Metrics.InspectedBytes, uint64(0))
}
}

Expand Down
4 changes: 3 additions & 1 deletion modules/frontend/combiner/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"strings"
"sync"

tempo_io "github.com/grafana/tempo/pkg/io"

"github.com/gogo/protobuf/jsonpb"
"github.com/gogo/protobuf/proto"
"github.com/gogo/status"
Expand Down Expand Up @@ -90,7 +92,7 @@ func (c *genericCombiner[T]) AddResponse(r PipelineResponse) error {

switch res.Header.Get(api.HeaderContentType) {
case api.HeaderAcceptProtobuf:
b, err := io.ReadAll(res.Body)
b, err := tempo_io.ReadAllWithEstimate(res.Body, res.ContentLength)
if err != nil {
return fmt.Errorf("error reading response body")
}
Expand Down
3 changes: 2 additions & 1 deletion modules/frontend/combiner/trace_by_id.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/gogo/protobuf/proto"
"github.com/grafana/tempo/pkg/api"
tempo_io "github.com/grafana/tempo/pkg/io"
"github.com/grafana/tempo/pkg/model/trace"
"github.com/grafana/tempo/pkg/tempopb"
)
Expand Down Expand Up @@ -67,7 +68,7 @@ func (c *traceByIDCombiner) AddResponse(r PipelineResponse) error {
}

// Read the body
buff, err := io.ReadAll(res.Body)
buff, err := tempo_io.ReadAllWithEstimate(res.Body, res.ContentLength)
if err != nil {
c.statusMessage = internalErrorMsg
return fmt.Errorf("error reading response body: %w", err)
Expand Down
30 changes: 29 additions & 1 deletion modules/frontend/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ func newMetricsSummaryHandler(next pipeline.AsyncRoundTripper[combiner.PipelineR
resp, _, err := resps.Next(req.Context()) // metrics path will only ever have one response

level.Info(logger).Log(
"msg", "search tag response",
"msg", "metrics summary response",
"tenant", tenant,
"path", req.URL.Path,
"err", err)
Expand All @@ -278,6 +278,34 @@ func newMetricsSummaryHandler(next pipeline.AsyncRoundTripper[combiner.PipelineR
})
}

// cloneRequestforQueriers returns a cloned pipeline.Request from the passed pipeline.Request ready for queriers. The caller is given an opportunity
// to modify the internal http.Request before it is returned using the modHTTP param. If modHTTP is nil, the internal http.Request is returned.
func cloneRequestforQueriers(parent pipeline.Request, tenant string, modHTTP func(*http.Request) (*http.Request, error)) (pipeline.Request, error) {
// first clone the http request with headers nil'ed out. this prevents the headers from being copied saving allocs
// here and especially downstream in the httpgrpc bridge. prepareRequestForQueriers will add the only headers that
// the queriers actually need.
req := parent.HTTPRequest()
saveHeaders := req.Header
req.Header = nil
clonedHTTPReq := req.Clone(req.Context())

req.Header = saveHeaders
clonedHTTPReq.Header = make(http.Header, 2) // cheating here. alloc 2 b/c we know that's how many headers prepareRequestForQueriers will add

// give the caller a chance to modify the internal http request
if modHTTP != nil {
var err error
clonedHTTPReq, err = modHTTP(clonedHTTPReq)
if err != nil {
return nil, err
}
}

prepareRequestForQueriers(clonedHTTPReq, tenant)

return parent.CloneFromHTTPRequest(clonedHTTPReq), nil
}

// prepareRequestForQueriers modifies the request so they will be farmed correctly to the queriers
// - adds the tenant header
// - sets the requesturi (see below for details)
Expand Down
6 changes: 3 additions & 3 deletions modules/frontend/metrics_query_range_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func newMetricsQueryRangeHTTPHandler(cfg Config, next pipeline.AsyncRoundTripper
func logQueryRangeResult(logger log.Logger, tenantID string, durationSeconds float64, req *tempopb.QueryRangeRequest, resp *tempopb.QueryRangeResponse, err error) {
if resp == nil {
level.Info(logger).Log(
"msg", "query range results - no resp",
"msg", "query range response - no resp",
"tenant", tenantID,
"duration_seconds", durationSeconds,
"error", err)
Expand All @@ -123,7 +123,7 @@ func logQueryRangeResult(logger log.Logger, tenantID string, durationSeconds flo

if resp.Metrics == nil {
level.Info(logger).Log(
"msg", "query range results - no metrics",
"msg", "query range response - no metrics",
"tenant", tenantID,
"query", req.Query,
"range_nanos", req.End-req.Start,
Expand All @@ -133,7 +133,7 @@ func logQueryRangeResult(logger log.Logger, tenantID string, durationSeconds flo
}

level.Info(logger).Log(
"msg", "query range results",
"msg", "query range response",
"tenant", tenantID,
"query", req.Query,
"range_nanos", req.End-req.Start,
Expand Down
75 changes: 38 additions & 37 deletions modules/frontend/metrics_query_range_sharder.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"math"
"net/http"
"time"

"github.com/go-kit/log" //nolint:all deprecated
Expand Down Expand Up @@ -113,7 +114,7 @@ func (s queryRangeSharder) RoundTrip(pipelineRequest pipeline.Request) (pipeline
cutoff = time.Now().Add(-s.cfg.QueryBackendAfter)
)

generatorReq := s.generatorRequest(ctx, tenantID, pipelineRequest, *req, cutoff)
generatorReq := s.generatorRequest(tenantID, pipelineRequest, *req, cutoff)
reqCh := make(chan pipeline.Request, 2) // buffer of 2 allows us to insert generatorReq and metrics

if generatorReq != nil {
Expand Down Expand Up @@ -243,15 +244,13 @@ func (s *queryRangeSharder) buildBackendRequests(ctx context.Context, tenantID s
exemplars = max(uint32(float64(exemplars)*float64(m.TotalRecords)/float64(pages)), 1)
}

for startPage := 0; startPage < int(m.TotalRecords); startPage += pages {
subR := parent.HTTPRequest().Clone(ctx)

dedColsJSON, err := colsToJSON.JSONForDedicatedColumns(m.DedicatedColumns)
if err != nil {
// errFn(fmt.Errorf("failed to convert dedicated columns. block: %s tempopb: %w", blockID, err))
continue
}
dedColsJSON, err := colsToJSON.JSONForDedicatedColumns(m.DedicatedColumns)
if err != nil {
_ = level.Error(s.logger).Log("msg", "failed to convert dedicated columns in query range sharder. skipping", "block", m.BlockID, "err", err)
continue
}

for startPage := 0; startPage < int(m.TotalRecords); startPage += pages {
// Trim and align the request for this block. I.e. if the request is "Last Hour" we don't want to
// cache the response for that, we want only the few minutes time range for this block. This has
// size savings but the main thing is that the response is reuseable for any overlapping query.
Expand All @@ -261,31 +260,34 @@ func (s *queryRangeSharder) buildBackendRequests(ctx context.Context, tenantID s
continue
}

queryRangeReq := &tempopb.QueryRangeRequest{
Query: searchReq.Query,
Start: start,
End: end,
Step: step,
QueryMode: searchReq.QueryMode,
// New RF1 fields
BlockID: m.BlockID.String(),
StartPage: uint32(startPage),
PagesToSearch: uint32(pages),
Version: m.Version,
Encoding: m.Encoding.String(),
Size_: m.Size_,
FooterSize: m.FooterSize,
// DedicatedColumns: dc, for perf reason we pass dedicated columns json in directly to not have to realloc object -> proto -> json
Exemplars: exemplars,
pipelineR, err := cloneRequestforQueriers(parent, tenantID, func(r *http.Request) (*http.Request, error) {
queryRangeReq := &tempopb.QueryRangeRequest{
Query: searchReq.Query,
Start: start,
End: end,
Step: step,
QueryMode: searchReq.QueryMode,
// New RF1 fields
BlockID: m.BlockID.String(),
StartPage: uint32(startPage),
PagesToSearch: uint32(pages),
Version: m.Version,
Encoding: m.Encoding.String(),
Size_: m.Size_,
FooterSize: m.FooterSize,
// DedicatedColumns: dc, for perf reason we pass dedicated columns json in directly to not have to realloc object -> proto -> json
Exemplars: exemplars,
}

return api.BuildQueryRangeRequest(r, queryRangeReq, dedColsJSON), nil
})
if err != nil {
_ = level.Error(s.logger).Log("msg", "failed to cloneRequestForQuerirs in the query range sharder. skipping", "block", m.BlockID, "err", err)
continue
}

subR = api.BuildQueryRangeRequest(subR, queryRangeReq, dedColsJSON)

prepareRequestForQueriers(subR, tenantID)
pipelineR := parent.CloneFromHTTPRequest(subR)

// TODO: Handle sampling rate
key := queryRangeCacheKey(tenantID, queryHash, int64(queryRangeReq.Start), int64(queryRangeReq.End), m, int(queryRangeReq.StartPage), int(queryRangeReq.PagesToSearch))
key := queryRangeCacheKey(tenantID, queryHash, int64(start), int64(end), m, int(step), pages)
if len(key) > 0 {
pipelineR.SetCacheKey(key)
}
Expand All @@ -306,7 +308,7 @@ func max(a, b uint32) uint32 {
return b
}

func (s *queryRangeSharder) generatorRequest(ctx context.Context, tenantID string, parent pipeline.Request, searchReq tempopb.QueryRangeRequest, cutoff time.Time) *pipeline.HTTPRequest {
func (s *queryRangeSharder) generatorRequest(tenantID string, parent pipeline.Request, searchReq tempopb.QueryRangeRequest, cutoff time.Time) pipeline.Request {
traceql.TrimToAfter(&searchReq, cutoff)
// if start == end then we don't need to query it
if searchReq.Start == searchReq.End {
Expand All @@ -315,12 +317,11 @@ func (s *queryRangeSharder) generatorRequest(ctx context.Context, tenantID strin

searchReq.QueryMode = querier.QueryModeRecent

subR := parent.HTTPRequest().Clone(ctx)
subR = api.BuildQueryRangeRequest(subR, &searchReq, "") // dedicated cols are never passed to the generators

prepareRequestForQueriers(subR, tenantID)
subR, _ := cloneRequestforQueriers(parent, tenantID, func(r *http.Request) (*http.Request, error) {
return api.BuildQueryRangeRequest(r, &searchReq, ""), nil
})

return parent.CloneFromHTTPRequest(subR)
return subR
}

// maxDuration returns the max search duration allowed for this tenant.
Expand Down
12 changes: 9 additions & 3 deletions modules/frontend/pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ var tracer = otel.Tracer("modules/frontend/pipeline")
type Request interface {
HTTPRequest() *http.Request
Context() context.Context

WithContext(context.Context)
CloneFromHTTPRequest(request *http.Request) Request

Weight() int
SetWeight(int)
Expand All @@ -23,7 +25,6 @@ type Request interface {

SetResponseData(any) // add data that will be sent back with this requests response
ResponseData() any
CloneFromHTTPRequest(request *http.Request) *HTTPRequest
}

type HTTPRequest struct {
Expand Down Expand Up @@ -78,8 +79,13 @@ func (r *HTTPRequest) SetWeight(w int) {
r.weight = w
}

func (r *HTTPRequest) CloneFromHTTPRequest(request *http.Request) *HTTPRequest {
return &HTTPRequest{req: request, weight: r.weight}
func (r *HTTPRequest) CloneFromHTTPRequest(request *http.Request) Request {
return &HTTPRequest{
req: request,
weight: r.weight,
cacheKey: r.cacheKey,
responseData: r.responseData,
}
}

//
Expand Down
6 changes: 3 additions & 3 deletions modules/frontend/search_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func logResult(logger log.Logger, tenantID string, durationSeconds float64, req

if resp == nil {
level.Info(logger).Log(
"msg", "search results - no resp",
"msg", "search response - no resp",
"tenant", tenantID,
"duration_seconds", durationSeconds,
"status_code", statusCode,
Expand All @@ -156,7 +156,7 @@ func logResult(logger log.Logger, tenantID string, durationSeconds float64, req

if resp.Metrics == nil {
level.Info(logger).Log(
"msg", "search results - no metrics",
"msg", "search response - no metrics",
"tenant", tenantID,
"query", req.Query,
"range_seconds", req.End-req.Start,
Expand All @@ -167,7 +167,7 @@ func logResult(logger log.Logger, tenantID string, durationSeconds float64, req
}

level.Info(logger).Log(
"msg", "search results",
"msg", "search response",
"tenant", tenantID,
"query", req.Query,
"range_seconds", req.End-req.Start,
Expand Down
Loading