diff --git a/modules/frontend/combiner/search.go b/modules/frontend/combiner/search.go index e1245f78d0f..6f164714590 100644 --- a/modules/frontend/combiner/search.go +++ b/modules/frontend/combiner/search.go @@ -2,8 +2,6 @@ package combiner import ( "net/http" - "sort" - "time" "github.com/grafana/tempo/pkg/api" "github.com/grafana/tempo/pkg/search" @@ -40,8 +38,8 @@ func (s *SearchJobResponse) IsMetadata() bool { var _ GRPCCombiner[*tempopb.SearchResponse] = (*genericCombiner[*tempopb.SearchResponse])(nil) // NewSearch returns a search combiner -func NewSearch(keepMostRecent int) Combiner { - metadataCombiner := traceql.NewMetadataCombiner(keepMostRecent) +func NewSearch(limit int, keepMostRecent bool) Combiner { + metadataCombiner := traceql.NewMetadataCombiner(limit, keepMostRecent) diffTraces := map[string]struct{}{} completedThroughTracker := &ShardCompletionTracker{} @@ -95,18 +93,25 @@ func NewSearch(keepMostRecent int) Combiner { Metrics: current.Metrics, } - completedThroughSeconds := completedThroughTracker.completedThroughSeconds - // if all jobs are completed then let's just return everything the combiner has - if current.Metrics.CompletedJobs == current.Metrics.TotalJobs && current.Metrics.TotalJobs > 0 { - completedThroughSeconds = 1 - } - - // if we've not completed any shards, then return nothing - if completedThroughSeconds == 0 { - return diff, nil + metadataFn := metadataCombiner.Metadata + if keepMostRecent { + metadataFn = func() []*tempopb.TraceSearchMetadata { + completedThroughSeconds := completedThroughTracker.completedThroughSeconds + // if all jobs are completed then let's just return everything the combiner has + if current.Metrics.CompletedJobs == current.Metrics.TotalJobs && current.Metrics.TotalJobs > 0 { + completedThroughSeconds = 1 + } + + // if we've not completed any shards, then return nothing + if completedThroughSeconds == 0 { + return nil + } + + return metadataCombiner.MetadataAfter(completedThroughSeconds) + } } - for _, tr := range metadataCombiner.MetadataAfter(completedThroughSeconds) { + for _, tr := range metadataFn() { // if not in the map, skip. we haven't seen an update if _, ok := diffTraces[tr.TraceID]; !ok { continue @@ -116,10 +121,6 @@ func NewSearch(keepMostRecent int) Combiner { diff.Traces = append(diff.Traces, tr) } - sort.Slice(diff.Traces, func(i, j int) bool { - return diff.Traces[i].StartTimeUnixNano > diff.Traces[j].StartTimeUnixNano - }) - addRootSpanNotReceivedText(diff.Traces) return diff, nil @@ -127,24 +128,13 @@ func NewSearch(keepMostRecent int) Combiner { // search combiner doesn't use current in the way i would have expected. it only tracks metrics through current and uses the results map for the actual traces. // should we change this? quit: func(_ *tempopb.SearchResponse) bool { - // are we tracking a limit at all? - if keepMostRecent <= 0 { - return false - } - completedThroughSeconds := completedThroughTracker.completedThroughSeconds // have we completed any shards? if completedThroughSeconds == 0 { - return false - } - - // do we have enought? - if metadataCombiner.Count() < keepMostRecent { - return false + completedThroughSeconds = traceql.TimestampNever } - // is our oldest trace newer than the completedThrough? - return metadataCombiner.OldestTimestampNanos() > uint64(completedThroughSeconds)*uint64(time.Second) + return metadataCombiner.IsCompleteFor(completedThroughSeconds) }, } initHTTPCombiner(c, api.HeaderAcceptJSON) @@ -159,8 +149,8 @@ func addRootSpanNotReceivedText(results []*tempopb.TraceSearchMetadata) { } } -func NewTypedSearch(limit int) GRPCCombiner[*tempopb.SearchResponse] { - return NewSearch(limit).(GRPCCombiner[*tempopb.SearchResponse]) +func NewTypedSearch(limit int, keepMostRecent bool) GRPCCombiner[*tempopb.SearchResponse] { + return NewSearch(limit, keepMostRecent).(GRPCCombiner[*tempopb.SearchResponse]) } // ShardCompletionTracker diff --git a/modules/frontend/combiner/search_test.go b/modules/frontend/combiner/search_test.go index bf475b3475a..778666bc7c1 100644 --- a/modules/frontend/combiner/search_test.go +++ b/modules/frontend/combiner/search_test.go @@ -16,35 +16,92 @@ import ( "google.golang.org/grpc/codes" ) -func TestSearchProgressShouldQuit(t *testing.T) { +func TestSearchProgressShouldQuitAny(t *testing.T) { // new combiner should not quit - c := NewSearch(0) + c := NewSearch(0, false) should := c.ShouldQuit() require.False(t, should) // 500 response should quit - c = NewSearch(0) + c = NewSearch(0, false) err := c.AddResponse(toHTTPResponse(t, &tempopb.SearchResponse{}, 500)) require.NoError(t, err) should = c.ShouldQuit() require.True(t, should) // 429 response should quit - c = NewSearch(0) + c = NewSearch(0, false) err = c.AddResponse(toHTTPResponse(t, &tempopb.SearchResponse{}, 429)) require.NoError(t, err) should = c.ShouldQuit() require.True(t, should) // unparseable body should not quit, but should return an error - c = NewSearch(0) + c = NewSearch(0, false) err = c.AddResponse(&testPipelineResponse{r: &http.Response{Body: io.NopCloser(strings.NewReader("foo")), StatusCode: 200}}) require.Error(t, err) should = c.ShouldQuit() require.False(t, should) // under limit should not quit - c = NewSearch(2) + c = NewSearch(2, false) + err = c.AddResponse(toHTTPResponse(t, &tempopb.SearchResponse{ + Traces: []*tempopb.TraceSearchMetadata{ + { + TraceID: "1", + }, + }, + }, 200)) + require.NoError(t, err) + should = c.ShouldQuit() + require.False(t, should) + + // over limit should quit + c = NewSearch(1, false) + err = c.AddResponse(toHTTPResponse(t, &tempopb.SearchResponse{ + Traces: []*tempopb.TraceSearchMetadata{ + { + TraceID: "1", + }, + { + TraceID: "2", + }, + }, + }, 200)) + require.NoError(t, err) + should = c.ShouldQuit() + require.True(t, should) +} + +func TestSearchProgressShouldQuitMostRecent(t *testing.T) { + // new combiner should not quit + c := NewSearch(0, true) + should := c.ShouldQuit() + require.False(t, should) + + // 500 response should quit + c = NewSearch(0, true) + err := c.AddResponse(toHTTPResponse(t, &tempopb.SearchResponse{}, 500)) + require.NoError(t, err) + should = c.ShouldQuit() + require.True(t, should) + + // 429 response should quit + c = NewSearch(0, true) + err = c.AddResponse(toHTTPResponse(t, &tempopb.SearchResponse{}, 429)) + require.NoError(t, err) + should = c.ShouldQuit() + require.True(t, should) + + // unparseable body should not quit, but should return an error + c = NewSearch(0, true) + err = c.AddResponse(&testPipelineResponse{r: &http.Response{Body: io.NopCloser(strings.NewReader("foo")), StatusCode: 200}}) + require.Error(t, err) + should = c.ShouldQuit() + require.False(t, should) + + // under limit should not quit + c = NewSearch(2, true) err = c.AddResponse(toHTTPResponse(t, &tempopb.SearchResponse{ Traces: []*tempopb.TraceSearchMetadata{ { @@ -57,7 +114,7 @@ func TestSearchProgressShouldQuit(t *testing.T) { require.False(t, should) // over limit but no search job response, should not quit - c = NewSearch(1) + c = NewSearch(1, true) err = c.AddResponse(toHTTPResponseWithResponseData(t, &tempopb.SearchResponse{ Traces: []*tempopb.TraceSearchMetadata{ { @@ -111,238 +168,242 @@ func TestSearchProgressShouldQuit(t *testing.T) { } func TestSearchCombinesResults(t *testing.T) { - start := time.Date(1, 2, 3, 4, 5, 6, 7, time.UTC) - traceID := "traceID" + for _, keepMostRecent := range []bool{true, false} { + start := time.Date(1, 2, 3, 4, 5, 6, 7, time.UTC) + traceID := "traceID" - c := NewSearch(10) - sr := toHTTPResponse(t, &tempopb.SearchResponse{ - Traces: []*tempopb.TraceSearchMetadata{ - { - TraceID: traceID, - StartTimeUnixNano: uint64(start.Add(time.Second).UnixNano()), - DurationMs: uint32(time.Second.Milliseconds()), - }, // 1 second after start and shorter duration - { - TraceID: traceID, - StartTimeUnixNano: uint64(start.UnixNano()), - DurationMs: uint32(time.Hour.Milliseconds()), - }, // earliest start time and longer duration - { - TraceID: traceID, - StartTimeUnixNano: uint64(start.Add(time.Hour).UnixNano()), - DurationMs: uint32(time.Millisecond.Milliseconds()), - }, // 1 hour after start and shorter duration - }, - Metrics: &tempopb.SearchMetrics{}, - }, 200) - err := c.AddResponse(sr) - require.NoError(t, err) + c := NewSearch(10, keepMostRecent) + sr := toHTTPResponse(t, &tempopb.SearchResponse{ + Traces: []*tempopb.TraceSearchMetadata{ + { + TraceID: traceID, + StartTimeUnixNano: uint64(start.Add(time.Second).UnixNano()), + DurationMs: uint32(time.Second.Milliseconds()), + }, // 1 second after start and shorter duration + { + TraceID: traceID, + StartTimeUnixNano: uint64(start.UnixNano()), + DurationMs: uint32(time.Hour.Milliseconds()), + }, // earliest start time and longer duration + { + TraceID: traceID, + StartTimeUnixNano: uint64(start.Add(time.Hour).UnixNano()), + DurationMs: uint32(time.Millisecond.Milliseconds()), + }, // 1 hour after start and shorter duration + }, + Metrics: &tempopb.SearchMetrics{}, + }, 200) + err := c.AddResponse(sr) + require.NoError(t, err) - expected := &tempopb.SearchResponse{ - Traces: []*tempopb.TraceSearchMetadata{ - { - TraceID: traceID, - StartTimeUnixNano: uint64(start.UnixNano()), - DurationMs: uint32(time.Hour.Milliseconds()), - RootServiceName: search.RootSpanNotYetReceivedText, + expected := &tempopb.SearchResponse{ + Traces: []*tempopb.TraceSearchMetadata{ + { + TraceID: traceID, + StartTimeUnixNano: uint64(start.UnixNano()), + DurationMs: uint32(time.Hour.Milliseconds()), + RootServiceName: search.RootSpanNotYetReceivedText, + }, }, - }, - Metrics: &tempopb.SearchMetrics{ - CompletedJobs: 1, - }, - } + Metrics: &tempopb.SearchMetrics{ + CompletedJobs: 1, + }, + } - resp, err := c.HTTPFinal() - require.NoError(t, err) + resp, err := c.HTTPFinal() + require.NoError(t, err) - actual := &tempopb.SearchResponse{} - fromHTTPResponse(t, resp, actual) + actual := &tempopb.SearchResponse{} + fromHTTPResponse(t, resp, actual) - require.Equal(t, expected, actual) + require.Equal(t, expected, actual) + } } func TestSearchResponseCombiner(t *testing.T) { - tests := []struct { - name string - response1 PipelineResponse - response2 PipelineResponse - - expectedStatus int - expectedResponse *tempopb.SearchResponse - expectedHTTPError error - expectedGRPCError error - }{ - { - name: "empty returns", - response1: toHTTPResponse(t, &tempopb.SearchResponse{Metrics: &tempopb.SearchMetrics{}}, 200), - response2: toHTTPResponse(t, &tempopb.SearchResponse{Metrics: &tempopb.SearchMetrics{}}, 200), - expectedStatus: 200, - expectedResponse: &tempopb.SearchResponse{ - Traces: []*tempopb.TraceSearchMetadata{}, - Metrics: &tempopb.SearchMetrics{ - CompletedJobs: 2, + for _, keepMostRecent := range []bool{true, false} { + tests := []struct { + name string + response1 PipelineResponse + response2 PipelineResponse + + expectedStatus int + expectedResponse *tempopb.SearchResponse + expectedHTTPError error + expectedGRPCError error + }{ + { + name: "empty returns", + response1: toHTTPResponse(t, &tempopb.SearchResponse{Metrics: &tempopb.SearchMetrics{}}, 200), + response2: toHTTPResponse(t, &tempopb.SearchResponse{Metrics: &tempopb.SearchMetrics{}}, 200), + expectedStatus: 200, + expectedResponse: &tempopb.SearchResponse{ + Traces: []*tempopb.TraceSearchMetadata{}, + Metrics: &tempopb.SearchMetrics{ + CompletedJobs: 2, + }, }, }, - }, - { - name: "404+200", - response1: toHTTPResponse(t, nil, 404), - response2: toHTTPResponse(t, &tempopb.SearchResponse{Metrics: &tempopb.SearchMetrics{}}, 200), - expectedStatus: 404, - expectedGRPCError: status.Error(codes.NotFound, ""), - }, - { - name: "200+400", - response1: toHTTPResponse(t, &tempopb.SearchResponse{Metrics: &tempopb.SearchMetrics{}}, 200), - response2: toHTTPResponse(t, nil, 400), - expectedStatus: 400, - expectedGRPCError: status.Error(codes.InvalidArgument, ""), - }, - { - name: "200+429", - response1: toHTTPResponse(t, &tempopb.SearchResponse{Metrics: &tempopb.SearchMetrics{}}, 200), - response2: toHTTPResponse(t, nil, 429), - expectedStatus: 429, - expectedGRPCError: status.Error(codes.ResourceExhausted, ""), - }, - { - name: "500+404", - response1: toHTTPResponse(t, nil, 500), - response2: toHTTPResponse(t, nil, 404), - expectedStatus: 500, - expectedGRPCError: status.Error(codes.Internal, ""), - }, - { - name: "404+500 - first bad response wins", - response1: toHTTPResponse(t, nil, 404), - response2: toHTTPResponse(t, nil, 500), - expectedStatus: 404, - expectedGRPCError: status.Error(codes.NotFound, ""), - }, - { - name: "500+200", - response1: toHTTPResponse(t, nil, 500), - response2: toHTTPResponse(t, &tempopb.SearchResponse{Metrics: &tempopb.SearchMetrics{}}, 200), - expectedStatus: 500, - expectedGRPCError: status.Error(codes.Internal, ""), - }, - { - name: "200+500", - response1: toHTTPResponse(t, &tempopb.SearchResponse{Metrics: &tempopb.SearchMetrics{}}, 200), - response2: toHTTPResponse(t, nil, 500), - expectedStatus: 500, - expectedGRPCError: status.Error(codes.Internal, ""), - }, - { - name: "respects total blocks message", - response1: &SearchJobResponse{ - TotalBlocks: 5, - TotalJobs: 10, - TotalBytes: 15, + { + name: "404+200", + response1: toHTTPResponse(t, nil, 404), + response2: toHTTPResponse(t, &tempopb.SearchResponse{Metrics: &tempopb.SearchMetrics{}}, 200), + expectedStatus: 404, + expectedGRPCError: status.Error(codes.NotFound, ""), }, - response2: toHTTPResponse(t, &tempopb.SearchResponse{ - Traces: []*tempopb.TraceSearchMetadata{ - { - TraceID: "5678", - StartTimeUnixNano: 0, + { + name: "200+400", + response1: toHTTPResponse(t, &tempopb.SearchResponse{Metrics: &tempopb.SearchMetrics{}}, 200), + response2: toHTTPResponse(t, nil, 400), + expectedStatus: 400, + expectedGRPCError: status.Error(codes.InvalidArgument, ""), + }, + { + name: "200+429", + response1: toHTTPResponse(t, &tempopb.SearchResponse{Metrics: &tempopb.SearchMetrics{}}, 200), + response2: toHTTPResponse(t, nil, 429), + expectedStatus: 429, + expectedGRPCError: status.Error(codes.ResourceExhausted, ""), + }, + { + name: "500+404", + response1: toHTTPResponse(t, nil, 500), + response2: toHTTPResponse(t, nil, 404), + expectedStatus: 500, + expectedGRPCError: status.Error(codes.Internal, ""), + }, + { + name: "404+500 - first bad response wins", + response1: toHTTPResponse(t, nil, 404), + response2: toHTTPResponse(t, nil, 500), + expectedStatus: 404, + expectedGRPCError: status.Error(codes.NotFound, ""), + }, + { + name: "500+200", + response1: toHTTPResponse(t, nil, 500), + response2: toHTTPResponse(t, &tempopb.SearchResponse{Metrics: &tempopb.SearchMetrics{}}, 200), + expectedStatus: 500, + expectedGRPCError: status.Error(codes.Internal, ""), + }, + { + name: "200+500", + response1: toHTTPResponse(t, &tempopb.SearchResponse{Metrics: &tempopb.SearchMetrics{}}, 200), + response2: toHTTPResponse(t, nil, 500), + expectedStatus: 500, + expectedGRPCError: status.Error(codes.Internal, ""), + }, + { + name: "respects total blocks message", + response1: &SearchJobResponse{ + TotalBlocks: 5, + TotalJobs: 10, + TotalBytes: 15, + }, + response2: toHTTPResponse(t, &tempopb.SearchResponse{ + Traces: []*tempopb.TraceSearchMetadata{ + { + TraceID: "5678", + StartTimeUnixNano: 0, + }, }, - }, - Metrics: &tempopb.SearchMetrics{ - InspectedTraces: 5, - InspectedBytes: 7, - }, - }, 200), - expectedStatus: 200, - expectedResponse: &tempopb.SearchResponse{ - Traces: []*tempopb.TraceSearchMetadata{ - { - TraceID: "5678", - StartTimeUnixNano: 0, - RootServiceName: search.RootSpanNotYetReceivedText, + Metrics: &tempopb.SearchMetrics{ + InspectedTraces: 5, + InspectedBytes: 7, + }, + }, 200), + expectedStatus: 200, + expectedResponse: &tempopb.SearchResponse{ + Traces: []*tempopb.TraceSearchMetadata{ + { + TraceID: "5678", + StartTimeUnixNano: 0, + RootServiceName: search.RootSpanNotYetReceivedText, + }, + }, + Metrics: &tempopb.SearchMetrics{ + TotalBlocks: 5, + TotalJobs: 10, + TotalBlockBytes: 15, + InspectedTraces: 5, + InspectedBytes: 7, + CompletedJobs: 1, }, - }, - Metrics: &tempopb.SearchMetrics{ - TotalBlocks: 5, - TotalJobs: 10, - TotalBlockBytes: 15, - InspectedTraces: 5, - InspectedBytes: 7, - CompletedJobs: 1, }, }, - }, - { - name: "200+200", - response1: toHTTPResponse(t, &tempopb.SearchResponse{ - Traces: []*tempopb.TraceSearchMetadata{ - { - TraceID: "1234", - StartTimeUnixNano: 1, + { + name: "200+200", + response1: toHTTPResponse(t, &tempopb.SearchResponse{ + Traces: []*tempopb.TraceSearchMetadata{ + { + TraceID: "1234", + StartTimeUnixNano: 1, + }, }, - }, - Metrics: &tempopb.SearchMetrics{ - InspectedTraces: 1, - TotalBlocks: 2, - InspectedBytes: 3, - }, - }, 200), - response2: toHTTPResponse(t, &tempopb.SearchResponse{ - Traces: []*tempopb.TraceSearchMetadata{ - { - TraceID: "5678", - StartTimeUnixNano: 0, + Metrics: &tempopb.SearchMetrics{ + InspectedTraces: 1, + TotalBlocks: 2, + InspectedBytes: 3, }, - }, - Metrics: &tempopb.SearchMetrics{ - InspectedTraces: 5, - TotalBlocks: 6, - InspectedBytes: 7, - }, - }, 200), - expectedStatus: 200, - expectedResponse: &tempopb.SearchResponse{ - Traces: []*tempopb.TraceSearchMetadata{ - { - TraceID: "1234", - StartTimeUnixNano: 1, - RootServiceName: search.RootSpanNotYetReceivedText, + }, 200), + response2: toHTTPResponse(t, &tempopb.SearchResponse{ + Traces: []*tempopb.TraceSearchMetadata{ + { + TraceID: "5678", + StartTimeUnixNano: 0, + }, }, - { - TraceID: "5678", - StartTimeUnixNano: 0, - RootServiceName: search.RootSpanNotYetReceivedText, + Metrics: &tempopb.SearchMetrics{ + InspectedTraces: 5, + TotalBlocks: 6, + InspectedBytes: 7, + }, + }, 200), + expectedStatus: 200, + expectedResponse: &tempopb.SearchResponse{ + Traces: []*tempopb.TraceSearchMetadata{ + { + TraceID: "1234", + StartTimeUnixNano: 1, + RootServiceName: search.RootSpanNotYetReceivedText, + }, + { + TraceID: "5678", + StartTimeUnixNano: 0, + RootServiceName: search.RootSpanNotYetReceivedText, + }, + }, + Metrics: &tempopb.SearchMetrics{ + InspectedTraces: 6, + InspectedBytes: 10, + CompletedJobs: 2, }, - }, - Metrics: &tempopb.SearchMetrics{ - InspectedTraces: 6, - InspectedBytes: 10, - CompletedJobs: 2, }, }, - }, - } + } - for _, tc := range tests { - t.Run(tc.name, func(t *testing.T) { - combiner := NewTypedSearch(20) + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + combiner := NewTypedSearch(20, keepMostRecent) - err := combiner.AddResponse(tc.response1) - require.NoError(t, err) - err = combiner.AddResponse(tc.response2) - require.NoError(t, err) + err := combiner.AddResponse(tc.response1) + require.NoError(t, err) + err = combiner.AddResponse(tc.response2) + require.NoError(t, err) - httpResp, err := combiner.HTTPFinal() - require.Equal(t, tc.expectedStatus, httpResp.StatusCode) - require.Equal(t, tc.expectedHTTPError, err) + httpResp, err := combiner.HTTPFinal() + require.Equal(t, tc.expectedStatus, httpResp.StatusCode) + require.Equal(t, tc.expectedHTTPError, err) - grpcresp, err := combiner.GRPCFinal() - require.Equal(t, tc.expectedGRPCError, err) - require.Equal(t, tc.expectedResponse, grpcresp) - }) + grpcresp, err := combiner.GRPCFinal() + require.Equal(t, tc.expectedGRPCError, err) + require.Equal(t, tc.expectedResponse, grpcresp) + }) + } } } -func TestCombinerDiffs(t *testing.T) { +func TestCombinerShards(t *testing.T) { tests := []struct { name string pipelineResponse PipelineResponse @@ -583,7 +644,8 @@ func TestCombinerDiffs(t *testing.T) { } // apply tests one at a time to the combiner and check expected results - combiner := NewTypedSearch(5) + + combiner := NewTypedSearch(5, true) for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { if tc.pipelineResponse != nil { diff --git a/modules/frontend/pipeline/responses_test.go b/modules/frontend/pipeline/responses_test.go index 966f5c1f56f..04f1ffbcaeb 100644 --- a/modules/frontend/pipeline/responses_test.go +++ b/modules/frontend/pipeline/responses_test.go @@ -304,7 +304,7 @@ func TestAsyncResponsesDoesNotLeak(t *testing.T) { bridge := &pipelineBridge{ next: tc.finalRT(cancel), } - httpCollector := NewHTTPCollector(sharder{next: bridge}, 0, combiner.NewSearch(0)) + httpCollector := NewHTTPCollector(sharder{next: bridge}, 0, combiner.NewSearch(0, false)) _, _ = httpCollector.RoundTrip(req) @@ -326,7 +326,7 @@ func TestAsyncResponsesDoesNotLeak(t *testing.T) { bridge := &pipelineBridge{ next: tc.finalRT(cancel), } - grpcCollector := NewGRPCCollector[*tempopb.SearchResponse](sharder{next: bridge}, 0, combiner.NewTypedSearch(0), func(_ *tempopb.SearchResponse) error { return nil }) + grpcCollector := NewGRPCCollector[*tempopb.SearchResponse](sharder{next: bridge}, 0, combiner.NewTypedSearch(0, false), func(_ *tempopb.SearchResponse) error { return nil }) _ = grpcCollector.RoundTrip(req) @@ -350,7 +350,7 @@ func TestAsyncResponsesDoesNotLeak(t *testing.T) { } s := sharder{next: sharder{next: bridge}, funcSharder: true} - grpcCollector := NewGRPCCollector[*tempopb.SearchResponse](s, 0, combiner.NewTypedSearch(0), func(_ *tempopb.SearchResponse) error { return nil }) + grpcCollector := NewGRPCCollector[*tempopb.SearchResponse](s, 0, combiner.NewTypedSearch(0, false), func(_ *tempopb.SearchResponse) error { return nil }) _ = grpcCollector.RoundTrip(req) @@ -373,7 +373,7 @@ func TestAsyncResponsesDoesNotLeak(t *testing.T) { } s := sharder{next: sharder{next: bridge, funcSharder: true}} - grpcCollector := NewGRPCCollector[*tempopb.SearchResponse](s, 0, combiner.NewTypedSearch(0), func(_ *tempopb.SearchResponse) error { return nil }) + grpcCollector := NewGRPCCollector[*tempopb.SearchResponse](s, 0, combiner.NewTypedSearch(0, false), func(_ *tempopb.SearchResponse) error { return nil }) _ = grpcCollector.RoundTrip(req) diff --git a/modules/frontend/search_handlers.go b/modules/frontend/search_handlers.go index 35875832016..0f819d514f3 100644 --- a/modules/frontend/search_handlers.go +++ b/modules/frontend/search_handlers.go @@ -20,6 +20,7 @@ import ( "github.com/grafana/tempo/pkg/api" "github.com/grafana/tempo/pkg/tempopb" + "github.com/grafana/tempo/pkg/traceql" ) // newSearchStreamingGRPCHandler returns a handler that streams results from the HTTP handler @@ -43,14 +44,14 @@ func newSearchStreamingGRPCHandler(cfg Config, next pipeline.AsyncRoundTripper[c tenant, _ := user.ExtractOrgID(ctx) start := time.Now() - limit, err := adjustLimit(req.Limit, cfg.Search.Sharder.DefaultLimit, cfg.Search.Sharder.MaxLimit) + comb, err := newCombiner(req, cfg.Search.Sharder) if err != nil { - level.Error(logger).Log("msg", "search streaming: adjust limit failed", "err", err) - return status.Errorf(codes.InvalidArgument, "adjust limit: %s", err.Error()) + level.Error(logger).Log("msg", "search streaming: could not create combiner", "err", err) + return status.Error(codes.InvalidArgument, err.Error()) + } var finalResponse *tempopb.SearchResponse - comb := combiner.NewTypedSearch(int(limit)) collector := pipeline.NewGRPCCollector[*tempopb.SearchResponse](next, cfg.ResponseConsumers, comb, func(sr *tempopb.SearchResponse) error { finalResponse = sr // sadly we can't srv.Send directly into the collector. we need bytesProcessed for the SLO calculations return srv.Send(sr) @@ -89,10 +90,9 @@ func newSearchHTTPHandler(cfg Config, next pipeline.AsyncRoundTripper[combiner.P }, nil } - // build combiner with limit - limit, err := adjustLimit(searchReq.Limit, cfg.Search.Sharder.DefaultLimit, cfg.Search.Sharder.MaxLimit) + comb, err := newCombiner(searchReq, cfg.Search.Sharder) if err != nil { - level.Error(logger).Log("msg", "search: adjust limit failed", "err", err) + level.Error(logger).Log("msg", "search: could not create combiner", "err", err) return &http.Response{ StatusCode: http.StatusBadRequest, Status: http.StatusText(http.StatusBadRequest), @@ -103,7 +103,6 @@ func newSearchHTTPHandler(cfg Config, next pipeline.AsyncRoundTripper[combiner.P logRequest(logger, tenant, searchReq) // build and use roundtripper - comb := combiner.NewTypedSearch(int(limit)) rt := pipeline.NewHTTPCollector(next, cfg.ResponseConsumers, comb) resp, err := rt.RoundTrip(req) @@ -122,6 +121,25 @@ func newSearchHTTPHandler(cfg Config, next pipeline.AsyncRoundTripper[combiner.P }) } +func newCombiner(req *tempopb.SearchRequest, cfg SearchSharderConfig) (combiner.GRPCCombiner[*tempopb.SearchResponse], error) { + limit, err := adjustLimit(req.Limit, cfg.DefaultLimit, cfg.MaxLimit) + if err != nil { + return nil, err + } + + query, err := traceql.Parse(req.Query) + if err != nil { + return nil, fmt.Errorf("invalid TraceQL query: %s", err) + } + + var mostRecent, ok bool + if mostRecent, ok = query.Hints.GetBool(traceql.HintMostRecent, false); !ok { + mostRecent = false + } + + return combiner.NewTypedSearch(int(limit), mostRecent), nil +} + // adjusts the limit based on provided config func adjustLimit(limit, defaultLimit, maxLimit uint32) (uint32, error) { if limit == 0 { diff --git a/modules/frontend/search_handlers_test.go b/modules/frontend/search_handlers_test.go index 43df9cfe2af..81e0e0c82ce 100644 --- a/modules/frontend/search_handlers_test.go +++ b/modules/frontend/search_handlers_test.go @@ -32,7 +32,6 @@ import ( "github.com/grafana/tempo/pkg/cache" "github.com/grafana/tempo/pkg/search" "github.com/grafana/tempo/pkg/tempopb" - "github.com/grafana/tempo/pkg/traceql" "github.com/grafana/tempo/pkg/util" "github.com/grafana/tempo/pkg/util/test" "github.com/grafana/tempo/tempodb" @@ -140,14 +139,14 @@ func TestFrontendSearch(t *testing.T) { func runnerBadRequestOnOrgID(t *testing.T, f *QueryFrontend) { // http - httpReq := httptest.NewRequest("GET", "/api/search", nil) + httpReq := httptest.NewRequest("GET", "/api/search?q={}", nil) httpResp := httptest.NewRecorder() f.SearchHandler.ServeHTTP(httpResp, httpReq) require.Equal(t, "no org id", httpResp.Body.String()) require.Equal(t, http.StatusBadRequest, httpResp.Code) // grpc - grpcReq := &tempopb.SearchRequest{} + grpcReq := &tempopb.SearchRequest{Query: "{}"} err := f.streamingSearch(grpcReq, newMockStreamingServer[*tempopb.SearchResponse]("", nil)) require.Equal(t, status.Error(codes.InvalidArgument, "no org id"), err) } @@ -273,7 +272,7 @@ func runnerRequests(t *testing.T, f *QueryFrontend) { func runnerClientCancelContext(t *testing.T, f *QueryFrontend) { // http - httpReq := httptest.NewRequest("GET", "/api/search", nil) + httpReq := httptest.NewRequest("GET", "/api/search?q={}", nil) httpResp := httptest.NewRecorder() ctx, cancel := context.WithCancel(httpReq.Context()) @@ -296,7 +295,7 @@ func runnerClientCancelContext(t *testing.T, f *QueryFrontend) { time.Sleep(50 * time.Millisecond) cancel() }() - grpcReq := &tempopb.SearchRequest{} + grpcReq := &tempopb.SearchRequest{Query: "{}"} err := f.streamingSearch(grpcReq, srv) require.Equal(t, status.Error(codes.Internal, "context canceled"), err) } @@ -385,7 +384,7 @@ func TestSearchLimitHonored(t *testing.T) { tenant := "1|2|3|4|5|6" // due to the blocks we will have 4 trace ids normally - httpReq := httptest.NewRequest("GET", "/api/search", nil) + httpReq := httptest.NewRequest("GET", "/api/search?q={}", nil) httpReq, err := api.BuildSearchRequest(httpReq, tc.request) require.NoError(t, err) @@ -406,18 +405,18 @@ func TestSearchLimitHonored(t *testing.T) { } // grpc - combiner := traceql.NewMetadataCombiner(100) + distinctTraces := map[string]struct{}{} err = f.streamingSearch(tc.request, newMockStreamingServer(tenant, func(i int, sr *tempopb.SearchResponse) { // combine for _, t := range sr.Traces { - combiner.AddMetadata(t) + distinctTraces[t.TraceID] = struct{}{} } })) if tc.badRequest { - require.Equal(t, status.Error(codes.InvalidArgument, "adjust limit: limit 20 exceeds max limit 15"), err) + require.Equal(t, status.Error(codes.InvalidArgument, "limit 20 exceeds max limit 15"), err) } else { require.NoError(t, err) - require.Equal(t, tc.expectedTraces, combiner.Count()) + require.Equal(t, tc.expectedTraces, len(distinctTraces)) } }) } @@ -503,7 +502,7 @@ func TestSearchFailurePropagatesFromQueriers(t *testing.T) { }, }, nil) - httpReq := httptest.NewRequest("GET", "/api/search?start=1&end=10000", nil) + httpReq := httptest.NewRequest("GET", "/api/search?start=1&end=10000&q={}", nil) httpResp := httptest.NewRecorder() ctx := user.InjectOrgID(httpReq.Context(), "foo") @@ -550,7 +549,7 @@ func TestSearchFailurePropagatesFromQueriers(t *testing.T) { // grpc srv := newMockStreamingServer[*tempopb.SearchResponse]("bar", nil) - grpcReq := &tempopb.SearchRequest{} + grpcReq := &tempopb.SearchRequest{Query: "{}"} err := f.streamingSearch(grpcReq, srv) require.Equal(t, tc.expectedErr, err) } diff --git a/modules/frontend/tag_handlers_test.go b/modules/frontend/tag_handlers_test.go index 23f46147f83..84b67b3e54c 100644 --- a/modules/frontend/tag_handlers_test.go +++ b/modules/frontend/tag_handlers_test.go @@ -16,6 +16,7 @@ import ( "github.com/gogo/protobuf/jsonpb" "github.com/gogo/protobuf/proto" "github.com/gogo/status" + "github.com/gorilla/mux" "github.com/grafana/dskit/user" "github.com/grafana/tempo/pkg/cache" "github.com/grafana/tempo/pkg/tempopb" @@ -49,7 +50,7 @@ func runnerTagsBadRequestOnOrgID(t *testing.T, f *QueryFrontend) { // http httpReq := httptest.NewRequest("GET", "/api/search/tags", nil) httpResp := httptest.NewRecorder() - f.SearchHandler.ServeHTTP(httpResp, httpReq) + f.SearchTagsHandler.ServeHTTP(httpResp, httpReq) require.Equal(t, "no org id", httpResp.Body.String()) require.Equal(t, http.StatusBadRequest, httpResp.Code) @@ -63,7 +64,7 @@ func runnerTagsV2BadRequestOnOrgID(t *testing.T, f *QueryFrontend) { // http httpReq := httptest.NewRequest("GET", "/api/v2/search/tags", nil) httpResp := httptest.NewRecorder() - f.SearchHandler.ServeHTTP(httpResp, httpReq) + f.SearchTagsV2Handler.ServeHTTP(httpResp, httpReq) require.Equal(t, "no org id", httpResp.Body.String()) require.Equal(t, http.StatusBadRequest, httpResp.Code) @@ -76,8 +77,9 @@ func runnerTagsV2BadRequestOnOrgID(t *testing.T, f *QueryFrontend) { func runnerTagValuesBadRequestOnOrgID(t *testing.T, f *QueryFrontend) { // http httpReq := httptest.NewRequest("GET", "/api/search/tag/foo/values", nil) + httpReq = mux.SetURLVars(httpReq, map[string]string{"tagName": "foo"}) httpResp := httptest.NewRecorder() - f.SearchHandler.ServeHTTP(httpResp, httpReq) + f.SearchTagsValuesHandler.ServeHTTP(httpResp, httpReq) require.Equal(t, "no org id", httpResp.Body.String()) require.Equal(t, http.StatusBadRequest, httpResp.Code) @@ -90,8 +92,9 @@ func runnerTagValuesBadRequestOnOrgID(t *testing.T, f *QueryFrontend) { func runnerTagValuesV2BadRequestOnOrgID(t *testing.T, f *QueryFrontend) { // http httpReq := httptest.NewRequest("GET", "/api/v2/search/tag/foo/values", nil) + httpReq = mux.SetURLVars(httpReq, map[string]string{"tagName": "foo"}) httpResp := httptest.NewRecorder() - f.SearchHandler.ServeHTTP(httpResp, httpReq) + f.SearchTagsValuesV2Handler.ServeHTTP(httpResp, httpReq) require.Equal(t, "no org id", httpResp.Body.String()) require.Equal(t, http.StatusBadRequest, httpResp.Code) @@ -115,7 +118,7 @@ func runnerTagsV2ClientCancelContext(t *testing.T, f *QueryFrontend) { cancel() }() - f.SearchHandler.ServeHTTP(httpResp, httpReq) + f.SearchTagsV2Handler.ServeHTTP(httpResp, httpReq) require.Equal(t, "context canceled", httpResp.Body.String()) require.Equal(t, 499, httpResp.Code) // todo: is this 499 valid? @@ -134,6 +137,7 @@ func runnerTagsV2ClientCancelContext(t *testing.T, f *QueryFrontend) { func runnerTagValuesV2ClientCancelContext(t *testing.T, f *QueryFrontend) { // http httpReq := httptest.NewRequest("GET", "/api/v2/search/tag/foo/values", nil) + httpReq = mux.SetURLVars(httpReq, map[string]string{"tagName": "foo"}) httpResp := httptest.NewRecorder() ctx, cancel := context.WithCancel(httpReq.Context()) @@ -145,7 +149,7 @@ func runnerTagValuesV2ClientCancelContext(t *testing.T, f *QueryFrontend) { cancel() }() - f.SearchHandler.ServeHTTP(httpResp, httpReq) + f.SearchTagsValuesV2Handler.ServeHTTP(httpResp, httpReq) require.Equal(t, "context canceled", httpResp.Body.String()) require.Equal(t, 499, httpResp.Code) // todo: is this 499 valid? @@ -248,7 +252,7 @@ func TestSearchTagsV2FailurePropagatesFromQueriers(t *testing.T) { ctx := user.InjectOrgID(httpReq.Context(), "foo") httpReq = httpReq.WithContext(ctx) - f.SearchHandler.ServeHTTP(httpResp, httpReq) + f.SearchTagsV2Handler.ServeHTTP(httpResp, httpReq) require.Equal(t, tc.expectedMessage, httpResp.Body.String()) require.Equal(t, tc.expectedCode, httpResp.Code) @@ -373,13 +377,14 @@ func TestSearchTagValuesV2FailurePropagatesFromQueriers(t *testing.T) { }, }, nil) - httpReq := httptest.NewRequest("GET", "/api/v2/search/tags?start=1&end=10000", nil) + httpReq := httptest.NewRequest("GET", "/api/v2/search/tag/foo/values?start=1&end=10000", nil) + httpReq = mux.SetURLVars(httpReq, map[string]string{"tagName": "foo"}) httpResp := httptest.NewRecorder() ctx := user.InjectOrgID(httpReq.Context(), "foo") httpReq = httpReq.WithContext(ctx) - f.SearchHandler.ServeHTTP(httpResp, httpReq) + f.SearchTagsValuesV2Handler.ServeHTTP(httpResp, httpReq) require.Equal(t, tc.expectedMessage, httpResp.Body.String()) require.Equal(t, tc.expectedCode, httpResp.Code) diff --git a/modules/ingester/ingester_test.go b/modules/ingester/ingester_test.go index f9d40b5aa52..5ab3418a50e 100644 --- a/modules/ingester/ingester_test.go +++ b/modules/ingester/ingester_test.go @@ -235,12 +235,10 @@ func TestSearchWAL(t *testing.T) { // search WAL ctx := user.InjectOrgID(context.Background(), "test") - searchReq := &tempopb.SearchRequest{Tags: map[string]string{ - "foo": "bar", - }} + searchReq := &tempopb.SearchRequest{Query: "{ }"} results, err := inst.Search(ctx, searchReq) require.NoError(t, err) - require.Equal(t, uint32(1), results.Metrics.InspectedTraces) + require.Equal(t, 1, len(results.Traces)) // Shutdown require.NoError(t, i.stopping(nil)) @@ -256,7 +254,7 @@ func TestSearchWAL(t *testing.T) { results, err = inst.Search(ctx, searchReq) require.NoError(t, err) - require.Equal(t, uint32(1), results.Metrics.InspectedTraces) + require.Equal(t, 1, len(results.Traces)) } // TODO - This test is flaky and commented out until it's fixed diff --git a/modules/ingester/instance_search.go b/modules/ingester/instance_search.go index 040d19afcb3..6d88bdf128e 100644 --- a/modules/ingester/instance_search.go +++ b/modules/ingester/instance_search.go @@ -40,9 +40,19 @@ func (i *instance) Search(ctx context.Context, req *tempopb.SearchRequest) (*tem span.AddEvent("SearchRequest", trace.WithAttributes(attribute.String("request", req.String()))) + rootExpr, err := traceql.Parse(req.Query) + if err != nil { + return nil, fmt.Errorf("error parsing query: %w", err) + } + + var mostRecent, ok bool + if mostRecent, ok = rootExpr.Hints.GetBool(traceql.HintMostRecent, false); !ok { + mostRecent = false + } + var ( resultsMtx = sync.Mutex{} - combiner = traceql.NewMetadataCombiner(maxResults) + combiner = traceql.NewMetadataCombiner(maxResults, mostRecent) metrics = &tempopb.SearchMetrics{} opts = common.DefaultSearchOptions() anyErr atomic.Error @@ -58,12 +68,8 @@ func (i *instance) Search(ctx context.Context, req *tempopb.SearchRequest) (*tem var resp *tempopb.SearchResponse var err error - // if blocks end time < the oldest entry in the combiner we can ignore b/c its impossible for a trace in this block - // to be more recent than our current results - // if endtime = 0 in means the block is not yet completed and we should always search it - if combiner.Count() >= maxResults && - !blockMeta.EndTime.IsZero() && - blockMeta.EndTime.Unix() < int64(combiner.OldestTimestampNanos()) { + // if the combiner is complete for the block's end time, we can skip searching it + if combiner.IsCompleteFor(uint32(blockMeta.EndTime.Unix())) { return } @@ -105,6 +111,10 @@ func (i *instance) Search(ctx context.Context, req *tempopb.SearchRequest) (*tem for _, tr := range resp.Traces { combiner.AddMetadata(tr) + if combiner.IsCompleteFor(traceql.TimestampNever) { + cancel() + return + } } } @@ -124,12 +134,6 @@ func (i *instance) Search(ctx context.Context, req *tempopb.SearchRequest) (*tem if err := anyErr.Load(); err != nil { return nil, err } - if combiner.Count() >= maxResults { - return &tempopb.SearchResponse{ - Traces: combiner.Metadata(), - Metrics: metrics, - }, nil - } // Search all other blocks (concurrently) // Lock blocks mutex until all search tasks are finished and this function exits. This avoids diff --git a/modules/ingester/instance_search_test.go b/modules/ingester/instance_search_test.go index cc91b3b4b27..74d17e236e1 100644 --- a/modules/ingester/instance_search_test.go +++ b/modules/ingester/instance_search_test.go @@ -43,9 +43,8 @@ func TestInstanceSearch(t *testing.T) { ids, _, _, _ := writeTracesForSearch(t, i, "", tagKey, tagValue, false, false) req := &tempopb.SearchRequest{ - Tags: map[string]string{}, + Query: fmt.Sprintf(`{ span.%s = "%s" }`, tagKey, tagValue), } - req.Tags[tagKey] = tagValue req.Limit = uint32(len(ids)) + 1 // Test after appending to WAL. writeTracesforSearch() makes sure all traces are in the wal @@ -188,27 +187,23 @@ func TestInstanceSearchWithStartAndEnd(t *testing.T) { searchAndAssert := func(req *tempopb.SearchRequest, inspectedTraces uint32) { sr := search(req, 0, 0) assert.Len(t, sr.Traces, len(ids)) - assert.Equal(t, sr.Metrics.InspectedTraces, inspectedTraces) checkEqual(t, ids, sr) // writeTracesForSearch will build spans that end 1 second from now // query 2 min range to have extra slack and always be within range sr = search(req, uint32(time.Now().Add(-5*time.Minute).Unix()), uint32(time.Now().Add(5*time.Minute).Unix())) assert.Len(t, sr.Traces, len(ids)) - assert.Equal(t, sr.Metrics.InspectedTraces, inspectedTraces) checkEqual(t, ids, sr) // search with start=5m from now, end=10m from now sr = search(req, uint32(time.Now().Add(5*time.Minute).Unix()), uint32(time.Now().Add(10*time.Minute).Unix())) // no results and should inspect 100 traces in wal assert.Len(t, sr.Traces, 0) - assert.Equal(t, uint32(0), sr.Metrics.InspectedTraces) } req := &tempopb.SearchRequest{ - Tags: map[string]string{}, + Query: fmt.Sprintf(`{ span.%s = "%s" }`, tagKey, tagValue), } - req.Tags[tagKey] = tagValue req.Limit = uint32(len(ids)) + 1 // Test after appending to WAL. @@ -612,7 +607,7 @@ func TestInstanceSearchNoData(t *testing.T) { i, _ := defaultInstance(t) req := &tempopb.SearchRequest{ - Tags: map[string]string{}, + Query: "{}", } sr, err := i.Search(context.Background(), req) @@ -634,7 +629,7 @@ func TestInstanceSearchDoesNotRace(t *testing.T) { tagValue := "bar" req := &tempopb.SearchRequest{ - Tags: map[string]string{tagKey: tagValue}, + Query: fmt.Sprintf(`{ span.%s = "%s" }`, tagKey, tagValue), } end := make(chan struct{}) @@ -764,11 +759,7 @@ func TestWALBlockDeletedDuringSearch(t *testing.T) { go concurrent(func() { _, err := i.Search(context.Background(), &tempopb.SearchRequest{ - Tags: map[string]string{ - // Not present in the data, so it will be an exhaustive - // search - "wuv": "xyz", - }, + Query: `{ span.wuv = "xyz" }`, }) require.NoError(t, err) }) @@ -811,7 +802,7 @@ func TestInstanceSearchMetrics(t *testing.T) { search := func() *tempopb.SearchMetrics { sr, err := i.Search(context.Background(), &tempopb.SearchRequest{ - Tags: map[string]string{"foo": "bar"}, + Query: fmt.Sprintf(`{ span.%s = "%s" }`, "foo", "bar"), }) require.NoError(t, err) return sr.Metrics @@ -826,14 +817,12 @@ func TestInstanceSearchMetrics(t *testing.T) { err := i.CutCompleteTraces(0, true) require.NoError(t, err) m = search() - require.Equal(t, numTraces, m.InspectedTraces) require.Less(t, numBytes, m.InspectedBytes) // Test after cutting new headblock blockID, err := i.CutBlockIfReady(0, 0, true) require.NoError(t, err) m = search() - require.Equal(t, numTraces, m.InspectedTraces) require.Less(t, numBytes, m.InspectedBytes) // Test after completing a block @@ -842,7 +831,7 @@ func TestInstanceSearchMetrics(t *testing.T) { err = i.ClearCompletingBlock(blockID) require.NoError(t, err) m = search() - require.Equal(t, numTraces, m.InspectedTraces) + require.Less(t, numBytes, m.InspectedBytes) } func BenchmarkInstanceSearchUnderLoad(b *testing.B) { diff --git a/pkg/traceql/combine.go b/pkg/traceql/combine.go index 73611280c99..1c7ab688a06 100644 --- a/pkg/traceql/combine.go +++ b/pkg/traceql/combine.go @@ -1,7 +1,9 @@ package traceql import ( + "math" "slices" + "sort" "strings" "time" @@ -9,23 +11,120 @@ import ( "github.com/grafana/tempo/pkg/util" ) -type MetadataCombiner struct { +type MetadataCombiner interface { + AddMetadata(new *tempopb.TraceSearchMetadata) bool + IsCompleteFor(ts uint32) bool + + Metadata() []*tempopb.TraceSearchMetadata + MetadataAfter(ts uint32) []*tempopb.TraceSearchMetadata + + addSpanset(new *Spanset) +} + +const TimestampNever = uint32(math.MaxUint32) +const TimestampAllTime = uint32(1) + +func NewMetadataCombiner(limit int, keepMostRecent bool) MetadataCombiner { + if keepMostRecent { + return newMostRecentCombiner(limit) + } + + return newAnyCombiner(limit) +} + +type anyCombiner struct { + trs map[string]*tempopb.TraceSearchMetadata + limit int +} + +func newAnyCombiner(limit int) *anyCombiner { + return &anyCombiner{ + trs: make(map[string]*tempopb.TraceSearchMetadata, limit), + limit: limit, + } +} + +// addSpanset adds a new spanset to the combiner. It only performs the asTraceSearchMetadata +// conversion if the spanset will be added +func (c *anyCombiner) addSpanset(new *Spanset) { + // else let's see if it's worth converting this to a metadata and adding it + // if it's already in the list, then we should add it + if _, ok := c.trs[util.TraceIDToHexString(new.TraceID)]; ok { + c.AddMetadata(asTraceSearchMetadata(new)) + return + } + + // if we don't have too many + if c.IsCompleteFor(0) { + return + } + + c.AddMetadata(asTraceSearchMetadata(new)) +} + +// AddMetadata adds the new metadata to the map. if it already exists +// use CombineSearchResults to combine the two +func (c *anyCombiner) AddMetadata(new *tempopb.TraceSearchMetadata) bool { + if existing, ok := c.trs[new.TraceID]; ok { + combineSearchResults(existing, new) + return true + } + + // if we don't have too many + if c.IsCompleteFor(0) { + return false + } + + c.trs[new.TraceID] = new + return true +} + +func (c *anyCombiner) Count() int { + return len(c.trs) +} + +func (c *anyCombiner) Exists(id string) bool { + _, ok := c.trs[id] + return ok +} + +func (c *anyCombiner) IsCompleteFor(_ uint32) bool { + return c.Count() >= c.limit && c.limit > 0 +} + +func (c *anyCombiner) Metadata() []*tempopb.TraceSearchMetadata { + m := make([]*tempopb.TraceSearchMetadata, 0, len(c.trs)) + for _, tr := range c.trs { + m = append(m, tr) + } + sort.Slice(m, func(i, j int) bool { + return m[i].StartTimeUnixNano > m[j].StartTimeUnixNano + }) + return m +} + +// MetadataAfter returns all traces that started after the given time. anyCombiner has no concept of time so it just returns all traces +func (c *anyCombiner) MetadataAfter(_ uint32) []*tempopb.TraceSearchMetadata { + return c.Metadata() +} + +type mostRecentCombiner struct { trs map[string]*tempopb.TraceSearchMetadata trsSorted []*tempopb.TraceSearchMetadata keepMostRecent int } -func NewMetadataCombiner(keepMostRecent int) *MetadataCombiner { - return &MetadataCombiner{ - trs: make(map[string]*tempopb.TraceSearchMetadata, keepMostRecent), - trsSorted: make([]*tempopb.TraceSearchMetadata, 0, keepMostRecent), - keepMostRecent: keepMostRecent, +func newMostRecentCombiner(limit int) *mostRecentCombiner { + return &mostRecentCombiner{ + trs: make(map[string]*tempopb.TraceSearchMetadata, limit), + trsSorted: make([]*tempopb.TraceSearchMetadata, 0, limit), + keepMostRecent: limit, } } -// AddSpanset adds a new spanset to the combiner. It only performs the asTraceSearchMetadata +// addSpanset adds a new spanset to the combiner. It only performs the asTraceSearchMetadata // conversion if the spanset will be added -func (c *MetadataCombiner) AddSpanset(new *Spanset) { +func (c *mostRecentCombiner) addSpanset(new *Spanset) { // if we're not configured to keep most recent then just add it if c.keepMostRecent == 0 || c.Count() < c.keepMostRecent { c.AddMetadata(asTraceSearchMetadata(new)) @@ -50,7 +149,7 @@ func (c *MetadataCombiner) AddSpanset(new *Spanset) { // AddMetadata adds the new metadata to the map. if it already exists // use CombineSearchResults to combine the two -func (c *MetadataCombiner) AddMetadata(new *tempopb.TraceSearchMetadata) bool { +func (c *mostRecentCombiner) AddMetadata(new *tempopb.TraceSearchMetadata) bool { if existing, ok := c.trs[new.TraceID]; ok { combineSearchResults(existing, new) return true @@ -80,21 +179,34 @@ func (c *MetadataCombiner) AddMetadata(new *tempopb.TraceSearchMetadata) bool { return true } -func (c *MetadataCombiner) Count() int { +func (c *mostRecentCombiner) Count() int { return len(c.trs) } -func (c *MetadataCombiner) Exists(id string) bool { +func (c *mostRecentCombiner) Exists(id string) bool { _, ok := c.trs[id] return ok } -func (c *MetadataCombiner) Metadata() []*tempopb.TraceSearchMetadata { +// IsCompleteFor returns true if the combiner has reached the limit and all traces are after the given time +func (c *mostRecentCombiner) IsCompleteFor(ts uint32) bool { + if ts == TimestampNever { + return false + } + + if c.Count() < c.keepMostRecent { + return false + } + + return c.OldestTimestampNanos() > uint64(ts)*uint64(time.Second) +} + +func (c *mostRecentCombiner) Metadata() []*tempopb.TraceSearchMetadata { return c.trsSorted } // MetadataAfter returns all traces that started after the given time -func (c *MetadataCombiner) MetadataAfter(afterSeconds uint32) []*tempopb.TraceSearchMetadata { +func (c *mostRecentCombiner) MetadataAfter(afterSeconds uint32) []*tempopb.TraceSearchMetadata { afterNanos := uint64(afterSeconds) * uint64(time.Second) afterTraces := make([]*tempopb.TraceSearchMetadata, 0, len(c.trsSorted)) @@ -107,7 +219,7 @@ func (c *MetadataCombiner) MetadataAfter(afterSeconds uint32) []*tempopb.TraceSe return afterTraces } -func (c *MetadataCombiner) OldestTimestampNanos() uint64 { +func (c *mostRecentCombiner) OldestTimestampNanos() uint64 { if len(c.trsSorted) == 0 { return 0 } diff --git a/pkg/traceql/combine_test.go b/pkg/traceql/combine_test.go index 43d950ba437..809795b738d 100644 --- a/pkg/traceql/combine_test.go +++ b/pkg/traceql/combine_test.go @@ -273,7 +273,7 @@ func TestCombineResults(t *testing.T) { func TestCombinerKeepsMostRecent(t *testing.T) { totalTraces := 10 keepMostRecent := 5 - combiner := NewMetadataCombiner(keepMostRecent) + combiner := NewMetadataCombiner(keepMostRecent, true).(*mostRecentCombiner) // make traces traces := make([]*Spanset, totalTraces) @@ -300,7 +300,7 @@ func TestCombinerKeepsMostRecent(t *testing.T) { // add to combiner for i := 0; i < totalTraces; i++ { - combiner.AddSpanset(traces[i]) + combiner.addSpanset(traces[i]) } // test that the most recent are kept diff --git a/pkg/traceql/engine.go b/pkg/traceql/engine.go index f2657bc25de..a6570d92400 100644 --- a/pkg/traceql/engine.go +++ b/pkg/traceql/engine.go @@ -55,6 +55,11 @@ func (e *Engine) ExecuteSearch(ctx context.Context, searchReq *tempopb.SearchReq return nil, err } + var mostRecent, ok bool + if mostRecent, ok = rootExpr.Hints.GetBool(HintMostRecent, false); !ok { + mostRecent = false + } + fetchSpansRequest.StartTimeUnixNanos = unixSecToNano(searchReq.Start) fetchSpansRequest.EndTimeUnixNanos = unixSecToNano(searchReq.End) @@ -111,7 +116,7 @@ func (e *Engine) ExecuteSearch(ctx context.Context, searchReq *tempopb.SearchReq Traces: nil, Metrics: &tempopb.SearchMetrics{}, } - combiner := NewMetadataCombiner(int(searchReq.Limit)) + combiner := NewMetadataCombiner(int(searchReq.Limit), mostRecent) for { spanset, err := iterator.Next(ctx) if err != nil && !errors.Is(err, io.EOF) { @@ -122,7 +127,10 @@ func (e *Engine) ExecuteSearch(ctx context.Context, searchReq *tempopb.SearchReq break } - combiner.AddSpanset(spanset) + combiner.addSpanset(spanset) + if combiner.IsCompleteFor(TimestampNever) { + break + } } res.Traces = combiner.Metadata() diff --git a/pkg/traceql/enum_hints.go b/pkg/traceql/enum_hints.go index e95de1fda93..5e09ffd3b4b 100644 --- a/pkg/traceql/enum_hints.go +++ b/pkg/traceql/enum_hints.go @@ -12,11 +12,12 @@ const ( HintTimeOverlapCutoff = "time_overlap_cutoff" HintConcurrentBlocks = "concurrent_blocks" HintExemplars = "exemplars" + HintMostRecent = "most_recent" // traceql search hint to return most recent results ordered by time ) func isUnsafe(h string) bool { switch h { - case HintSample, HintExemplars: + case HintSample, HintExemplars, HintMostRecent: return false default: return true