Skip to content

Commit

Permalink
Added ordered results
Browse files Browse the repository at this point in the history
Signed-off-by: Joe Elliott <[email protected]>
  • Loading branch information
joe-elliott committed Oct 24, 2024
1 parent 760ce80 commit 3d9fc4d
Show file tree
Hide file tree
Showing 20 changed files with 1,173 additions and 514 deletions.
13 changes: 13 additions & 0 deletions modules/frontend/combiner/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ type TResponse interface {
type PipelineResponse interface {
HTTPResponse() *http.Response
RequestData() any

IsMetadata() bool // todo: search and query range pass back metadata responses through a normal http response. update to use this instead.
}

type genericCombiner[T TResponse] struct {
Expand All @@ -30,6 +32,7 @@ type genericCombiner[T TResponse] struct {

new func() T
combine func(partial T, final T, resp PipelineResponse) error
metadata func(resp PipelineResponse, final T) error
finalize func(T) (T, error)
diff func(T) (T, error) // currently only implemented by the search combiner. required for streaming
quit func(T) bool
Expand All @@ -49,6 +52,16 @@ func initHTTPCombiner[T TResponse](c *genericCombiner[T], marshalingFormat strin

// AddResponse is used to add a http response to the combiner.
func (c *genericCombiner[T]) AddResponse(r PipelineResponse) error {
if r.IsMetadata() && c.metadata != nil {
c.mu.Lock()
defer c.mu.Unlock()

if err := c.metadata(r, c.current); err != nil {
return fmt.Errorf("error processing metadata: %w", err)
}
return nil
}

res := r.HTTPResponse()
if res == nil {
return nil
Expand Down
9 changes: 7 additions & 2 deletions modules/frontend/combiner/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,8 @@ func TestGenericCombinerDoesntRace(t *testing.T) {
}

type testPipelineResponse struct {
r *http.Response
r *http.Response
responseData any
}

func newTestResponse(t *testing.T) *testPipelineResponse {
Expand Down Expand Up @@ -230,7 +231,11 @@ func (p *testPipelineResponse) HTTPResponse() *http.Response {
}

func (p *testPipelineResponse) RequestData() any {
return nil
return p.responseData
}

func (p *testPipelineResponse) IsMetadata() bool {
return false
}

func newTestCombiner() *genericCombiner[*tempopb.ServiceStats] {
Expand Down
187 changes: 155 additions & 32 deletions modules/frontend/combiner/search.go
Original file line number Diff line number Diff line change
@@ -1,54 +1,82 @@
package combiner

import (
"net/http"
"sort"
"time"

"github.com/grafana/tempo/pkg/api"
"github.com/grafana/tempo/pkg/search"
"github.com/grafana/tempo/pkg/tempopb"
"github.com/grafana/tempo/pkg/traceql"
)

var _ PipelineResponse = (*SearchJobResponse)(nil)

type SearchShards struct {
TotalJobs uint32
CompletedThroughSeconds uint32
}

type SearchJobResponse struct {
TotalBlocks int
TotalJobs int
TotalBytes uint64
Shards []SearchShards
}

func (s *SearchJobResponse) HTTPResponse() *http.Response {
return nil
}

func (s *SearchJobResponse) RequestData() any {
return nil
}

func (s *SearchJobResponse) IsMetadata() bool {
return true
}

var _ GRPCCombiner[*tempopb.SearchResponse] = (*genericCombiner[*tempopb.SearchResponse])(nil)

// NewSearch returns a search combiner
func NewSearch(limit int) Combiner {
metadataCombiner := traceql.NewMetadataCombiner()
func NewSearch(keepMostRecent int) Combiner {
metadataCombiner := traceql.NewMetadataCombiner(keepMostRecent)
diffTraces := map[string]struct{}{}
completedThroughTracker := &ShardCompletionTracker{}

c := &genericCombiner[*tempopb.SearchResponse]{
httpStatusCode: 200,
new: func() *tempopb.SearchResponse { return &tempopb.SearchResponse{} },
current: &tempopb.SearchResponse{Metrics: &tempopb.SearchMetrics{}},
combine: func(partial *tempopb.SearchResponse, final *tempopb.SearchResponse, _ PipelineResponse) error {
combine: func(partial *tempopb.SearchResponse, final *tempopb.SearchResponse, resp PipelineResponse) error {
requestIdx, ok := resp.RequestData().(int)
if ok {
completedThroughTracker.addShardIdx(requestIdx)
}

for _, t := range partial.Traces {
// if we've reached the limit and this is NOT a new trace then skip it
if limit > 0 &&
metadataCombiner.Count() >= limit &&
!metadataCombiner.Exists(t.TraceID) {
continue
if metadataCombiner.AddMetadata(t) {
// record modified traces
diffTraces[t.TraceID] = struct{}{}
}

metadataCombiner.AddMetadata(t)
// record modified traces
diffTraces[t.TraceID] = struct{}{}
}

if partial.Metrics != nil {
// there is a coordination with the search sharder here. normal responses
// will never have total jobs set, but they will have valid Inspected* values
// a special response is sent back from the sharder with no traces but valid Total* values
// if TotalJobs is nonzero then assume its the special response
if partial.Metrics.TotalJobs == 0 {
final.Metrics.CompletedJobs++

final.Metrics.InspectedBytes += partial.Metrics.InspectedBytes
final.Metrics.InspectedTraces += partial.Metrics.InspectedTraces
} else {
final.Metrics.TotalBlocks += partial.Metrics.TotalBlocks
final.Metrics.TotalJobs += partial.Metrics.TotalJobs
final.Metrics.TotalBlockBytes += partial.Metrics.TotalBlockBytes
}
final.Metrics.CompletedJobs++
final.Metrics.InspectedBytes += partial.Metrics.InspectedBytes
final.Metrics.InspectedTraces += partial.Metrics.InspectedTraces
}

return nil
},
metadata: func(resp PipelineResponse, final *tempopb.SearchResponse) error {
if sj, ok := resp.(*SearchJobResponse); ok && sj != nil {
final.Metrics.TotalBlocks += uint32(sj.TotalBlocks)
final.Metrics.TotalJobs += uint32(sj.TotalJobs)
final.Metrics.TotalBlockBytes += sj.TotalBytes

completedThroughTracker.addShards(sj.Shards)
}

return nil
Expand All @@ -67,12 +95,24 @@ func NewSearch(limit int) Combiner {
Metrics: current.Metrics,
}

for _, tr := range metadataCombiner.Metadata() {
completedThroughSeconds := completedThroughTracker.completedThroughSeconds
// if all jobs are completed then let's just return everything the combiner has
if current.Metrics.CompletedJobs == current.Metrics.TotalJobs && current.Metrics.TotalJobs > 0 {
completedThroughSeconds = 1
}

// if we've not completed any shards, then return nothing
if completedThroughSeconds == 0 {
return diff, nil
}

for _, tr := range metadataCombiner.MetadataAfter(completedThroughSeconds) {
// if not in the map, skip. we haven't seen an update
if _, ok := diffTraces[tr.TraceID]; !ok {
continue
}

delete(diffTraces, tr.TraceID)
diff.Traces = append(diff.Traces, tr)
}

Expand All @@ -82,19 +122,29 @@ func NewSearch(limit int) Combiner {

addRootSpanNotReceivedText(diff.Traces)

// wipe out diff traces for the next time
clear(diffTraces)

return diff, nil
},
// search combiner doesn't use current in the way i would have expected. it only tracks metrics through current and uses the results map for the actual traces.
// should we change this?
quit: func(_ *tempopb.SearchResponse) bool {
if limit <= 0 {
// are we tracking a limit at all?
if keepMostRecent <= 0 {
return false
}

completedThroughSeconds := completedThroughTracker.completedThroughSeconds
// have we completed any shards?
if completedThroughSeconds == 0 {
return false
}

// do we have enought?
if metadataCombiner.Count() < keepMostRecent {
return false
}

return metadataCombiner.Count() >= limit
// is our oldest trace newer than the completedThrough?
return metadataCombiner.OldestTimestampNanos() > uint64(completedThroughSeconds)*uint64(time.Second)
},
}
initHTTPCombiner(c, api.HeaderAcceptJSON)
Expand All @@ -112,3 +162,76 @@ func addRootSpanNotReceivedText(results []*tempopb.TraceSearchMetadata) {
func NewTypedSearch(limit int) GRPCCombiner[*tempopb.SearchResponse] {
return NewSearch(limit).(GRPCCombiner[*tempopb.SearchResponse])
}

// ShardCompletionTracker
type ShardCompletionTracker struct {
shards []SearchShards
foundResponses []int

completedThroughSeconds uint32
curShard int
}

func (s *ShardCompletionTracker) addShards(shards []SearchShards) uint32 {
if len(shards) == 0 {
return s.completedThroughSeconds
}

s.shards = shards

// grow foundResponses to match while keeping the existing values
if len(s.shards) > len(s.foundResponses) {
temp := make([]int, len(s.shards))
copy(temp, s.foundResponses)
s.foundResponses = temp
}

for s.incrementCurShardIfComplete() {
}

return s.completedThroughSeconds
}

// Add adds a response to the tracker and returns the allowed completedThroughSeconds
func (s *ShardCompletionTracker) addShardIdx(shardIdx int) uint32 {
// we haven't received shards yet
if len(s.shards) == 0 {
// if shardIdx doesn't fit in foundResponses then alloc a new slice and copy foundResponses forward
if shardIdx >= len(s.foundResponses) {
temp := make([]int, shardIdx+1)
copy(temp, s.foundResponses)
s.foundResponses = temp
}

// and record this idx for when we get shards
s.foundResponses[shardIdx]++

return 0
}

//
if shardIdx >= len(s.foundResponses) {
return s.completedThroughSeconds
}

s.foundResponses[shardIdx]++
for s.incrementCurShardIfComplete() {
}

return s.completedThroughSeconds
}

func (s *ShardCompletionTracker) incrementCurShardIfComplete() bool {
if s.curShard >= len(s.shards) {
return false
}

if s.foundResponses[s.curShard] == int(s.shards[s.curShard].TotalJobs) {
s.completedThroughSeconds = s.shards[s.curShard].CompletedThroughSeconds
s.curShard++

return true
}

return false
}
Loading

0 comments on commit 3d9fc4d

Please sign in to comment.