Skip to content
This repository was archived by the owner on Jul 19, 2023. It is now read-only.

WIP: Migrate to OTEL tracing #830

Draft
wants to merge 17 commits into
base: main
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions migrate-to-otel.gopatch
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
@@
var a expression
var b expression
var s identifier
var t identifier
@@
-s, t := opentracing.StartSpanFromContext(a,b)
-...
- defer s.Finish()
+import "go.opentelemetry.io/otel"
+t, s := otel.Tracer("github.com/grafana/pyroscope").Start(a,b)
+defer s.End()

@@
var foo,x identifier
@@

-import foo "github.com/opentracing/opentracing-go/log"
+import foo "go.opentelemetry.io/otel/attribute"
foo.x

@@
@@
- otlog
+ attribute

@@
var span identifier
var x expression
@@
- span.LogFields(...)
+import "go.opentelemetry.io/otel/trace"
+ span.AddEvent("TODO", trace.WithAttributes(...))


@@
@@
-opentracing.Span
+import "go.opentelemetry.io/otel/trace"
+trace.Span
143 changes: 45 additions & 98 deletions pkg/phlaredb/block_querier.go
Original file line number Diff line number Diff line change
@@ -21,14 +21,15 @@ import (
"github.com/grafana/dskit/runutil"
"github.com/oklog/ulid"
"github.com/opentracing/opentracing-go"
otlog "github.com/opentracing/opentracing-go/log"
"github.com/pkg/errors"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/promql/parser"
"github.com/samber/lo"
"github.com/segmentio/parquet-go"
"github.com/thanos-io/objstore"
"golang.org/x/exp/constraints"
"go.opentelemetry.io/otel"
attribute "go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc/codes"

@@ -462,58 +463,6 @@ func (b *singleBlockQuerier) Bounds() (model.Time, model.Time) {
return b.meta.MinTime, b.meta.MaxTime
}

type mapPredicate[K constraints.Integer, V any] struct {
min K
max K
m map[K]V
}

func newMapPredicate[K constraints.Integer, V any](m map[K]V) query.Predicate {
p := &mapPredicate[K, V]{
m: m,
}

first := true
for k := range m {
if first || p.max < k {
p.max = k
}
if first || p.min > k {
p.min = k
}
first = false
}

return p
}

func (m *mapPredicate[K, V]) KeepColumnChunk(c parquet.ColumnChunk) bool {
if ci := c.ColumnIndex(); ci != nil {
for i := 0; i < ci.NumPages(); i++ {
min := K(ci.MinValue(i).Int64())
max := K(ci.MaxValue(i).Int64())
if m.max >= min && m.min <= max {
return true
}
}
return false
}

return true
}

func (m *mapPredicate[K, V]) KeepPage(page parquet.Page) bool {
if min, max, ok := page.Bounds(); ok {
return m.max >= K(min.Int64()) && m.min <= K(max.Int64())
}
return true
}

func (m *mapPredicate[K, V]) KeepValue(v parquet.Value) bool {
_, exists := m.m[K(v.Int64())]
return exists
}

type labelsInfo struct {
fp model.Fingerprint
lbs phlaremodel.Labels
@@ -610,8 +559,8 @@ func SelectMatchingProfiles(ctx context.Context, request *ingestv1.SelectProfile
}

func MergeProfilesStacktraces(ctx context.Context, stream *connect.BidiStream[ingestv1.MergeProfilesStacktracesRequest, ingestv1.MergeProfilesStacktracesResponse], blockGetter BlockGetter) error {
sp, ctx := opentracing.StartSpanFromContext(ctx, "MergeProfilesStacktraces")
defer sp.Finish()
ctx, sp := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "MergeProfilesStacktraces")
defer sp.End()

r, err := stream.Receive()
if err != nil {
@@ -625,12 +574,11 @@ func MergeProfilesStacktraces(ctx context.Context, stream *connect.BidiStream[in
return connect.NewError(connect.CodeInvalidArgument, errors.New("missing initial select request"))
}
request := r.Request
sp.LogFields(
otlog.String("start", model.Time(request.Start).Time().String()),
otlog.String("end", model.Time(request.End).Time().String()),
otlog.String("selector", request.LabelSelector),
otlog.String("profile_id", request.Type.ID),
)
sp.AddEvent("TODO", trace.WithAttributes(
attribute.String("start", model.Time(request.Start).Time().String()),
attribute.String("end", model.Time(request.End).Time().String()),
attribute.String("selector", request.LabelSelector),
attribute.String("profile_id", request.Type.ID)))

queriers, err := blockGetter(ctx, model.Time(request.Start), model.Time(request.End))
if err != nil {
@@ -674,7 +622,7 @@ func MergeProfilesStacktraces(ctx context.Context, stream *connect.BidiStream[in

// Signals the end of the profile streaming by sending an empty response.
// This allows the client to not block other streaming ingesters.
sp.LogFields(otlog.String("msg", "signaling the end of the profile streaming"))
sp.AddEvent("TODO", trace.WithAttributes(attribute.String("msg", "signaling the end of the profile streaming")))
if err = stream.Send(&ingestv1.MergeProfilesStacktracesResponse{}); err != nil {
return err
}
@@ -684,7 +632,7 @@ func MergeProfilesStacktraces(ctx context.Context, stream *connect.BidiStream[in
}

// sends the final result to the client.
sp.LogFields(otlog.String("msg", "sending the final result to the client"))
sp.AddEvent("TODO", trace.WithAttributes(attribute.String("msg", "sending the final result to the client")))
err = stream.Send(&ingestv1.MergeProfilesStacktracesResponse{
Result: &ingestv1.MergeProfilesStacktracesResult{
Format: ingestv1.StacktracesMergeFormat_MERGE_FORMAT_TREE,
@@ -702,8 +650,8 @@ func MergeProfilesStacktraces(ctx context.Context, stream *connect.BidiStream[in
}

func MergeProfilesLabels(ctx context.Context, stream *connect.BidiStream[ingestv1.MergeProfilesLabelsRequest, ingestv1.MergeProfilesLabelsResponse], blockGetter BlockGetter) error {
sp, ctx := opentracing.StartSpanFromContext(ctx, "MergeProfilesLabels")
defer sp.Finish()
ctx, sp := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "MergeProfilesLabels")
defer sp.End()

r, err := stream.Receive()
if err != nil {
@@ -719,13 +667,12 @@ func MergeProfilesLabels(ctx context.Context, stream *connect.BidiStream[ingestv
request := r.Request
by := r.By
sort.Strings(by)
sp.LogFields(
otlog.String("start", model.Time(request.Start).Time().String()),
otlog.String("end", model.Time(request.End).Time().String()),
otlog.String("selector", request.LabelSelector),
otlog.String("profile_id", request.Type.ID),
otlog.String("by", strings.Join(by, ",")),
)
sp.AddEvent("TODO", trace.WithAttributes(
attribute.String("start", model.Time(request.Start).Time().String()),
attribute.String("end", model.Time(request.End).Time().String()),
attribute.String("selector", request.LabelSelector),
attribute.String("profile_id", request.Type.ID),
attribute.String("by", strings.Join(by, ","))))

queriers, err := blockGetter(ctx, model.Time(request.Start), model.Time(request.End))
if err != nil {
@@ -796,8 +743,8 @@ func MergeProfilesLabels(ctx context.Context, stream *connect.BidiStream[ingestv
}

func MergeProfilesPprof(ctx context.Context, stream *connect.BidiStream[ingestv1.MergeProfilesPprofRequest, ingestv1.MergeProfilesPprofResponse], blockGetter BlockGetter) error {
sp, ctx := opentracing.StartSpanFromContext(ctx, "MergeProfilesPprof")
defer sp.Finish()
ctx, sp := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "MergeProfilesPprof")
defer sp.End()

r, err := stream.Receive()
if err != nil {
@@ -811,12 +758,11 @@ func MergeProfilesPprof(ctx context.Context, stream *connect.BidiStream[ingestv1
return connect.NewError(connect.CodeInvalidArgument, errors.New("missing initial select request"))
}
request := r.Request
sp.LogFields(
otlog.String("start", model.Time(request.Start).Time().String()),
otlog.String("end", model.Time(request.End).Time().String()),
otlog.String("selector", request.LabelSelector),
otlog.String("profile_id", request.Type.ID),
)
sp.AddEvent("TODO", trace.WithAttributes(
attribute.String("start", model.Time(request.Start).Time().String()),
attribute.String("end", model.Time(request.End).Time().String()),
attribute.String("selector", request.LabelSelector),
attribute.String("profile_id", request.Type.ID)))

queriers, err := blockGetter(ctx, model.Time(request.Start), model.Time(request.End))
if err != nil {
@@ -942,8 +888,9 @@ func retrieveStacktracePartition(buf [][]parquet.Value, pos int) uint64 {
}

func (b *singleBlockQuerier) SelectMatchingProfiles(ctx context.Context, params *ingestv1.SelectProfilesRequest) (iter.Iterator[Profile], error) {
sp, ctx := opentracing.StartSpanFromContext(ctx, "SelectMatchingProfiles - Block")
defer sp.Finish()
ctx, sp := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "SelectMatchingProfiles - Block")
defer sp.End()

if err := b.Open(ctx); err != nil {
return nil, err
}
@@ -988,26 +935,26 @@ func (b *singleBlockQuerier) SelectMatchingProfiles(ctx context.Context, params
}

var (
buf [][]parquet.Value
joinIters []query.Iterator
buf [][]parquet.Value
)

pIt := query.NewBinaryJoinIterator(
0,
b.profiles.columnIter(ctx, "SeriesIndex", query.NewMapPredicate(lblsPerRef), "SeriesIndex"),
b.profiles.columnIter(ctx, "TimeNanos", query.NewIntBetweenPredicate(model.Time(params.Start).UnixNano(), model.Time(params.End).UnixNano()), "TimeNanos"),
)

if b.meta.Version >= 2 {
joinIters = []query.Iterator{
b.profiles.columnIter(ctx, "SeriesIndex", newMapPredicate(lblsPerRef), "SeriesIndex"),
b.profiles.columnIter(ctx, "TimeNanos", query.NewIntBetweenPredicate(model.Time(params.Start).UnixNano(), model.Time(params.End).UnixNano()), "TimeNanos"),
pIt = query.NewBinaryJoinIterator(
0,
pIt,
b.profiles.columnIter(ctx, "StacktracePartition", nil, "StacktracePartition"),
}
)
buf = make([][]parquet.Value, 3)
} else {
joinIters = []query.Iterator{
b.profiles.columnIter(ctx, "SeriesIndex", newMapPredicate(lblsPerRef), "SeriesIndex"),
b.profiles.columnIter(ctx, "TimeNanos", query.NewIntBetweenPredicate(model.Time(params.Start).UnixNano(), model.Time(params.End).UnixNano()), "TimeNanos"),
}
buf = make([][]parquet.Value, 2)
}

pIt := query.NewJoinIterator(0, joinIters, nil)
iters := make([]iter.Iterator[Profile], 0, len(lblsPerRef))
defer pIt.Close()

@@ -1098,9 +1045,9 @@ func (q *singleBlockQuerier) openFiles(ctx context.Context) error {
sp, ctx := opentracing.StartSpanFromContext(ctx, "BlockQuerier - open")
defer func() {
q.metrics.blockOpeningLatency.Observe(time.Since(start).Seconds())
sp.LogFields(
otlog.String("block_ulid", q.meta.ULID.String()),
)
sp.AddEvent("TODO", trace.WithAttributes(
attribute.String("block_ulid", q.meta.ULID.String())))

sp.Finish()
}()
g, ctx := errgroup.WithContext(ctx)
@@ -1206,7 +1153,7 @@ func (r *parquetReader[M, P]) columnIter(ctx context.Context, columnName string,
return query.NewErrIterator(fmt.Errorf("column '%s' not found in parquet file '%s'", columnName, r.relPath()))
}
ctx = query.AddMetricsToContext(ctx, r.metrics.query)
return query.NewColumnIterator(ctx, r.file.RowGroups(), index, columnName, 1000, predicate, alias)
return query.NewSyncIterator(ctx, r.file.RowGroups(), index, columnName, 1000, predicate, alias)
}

func repeatedColumnIter[T any](ctx context.Context, source Source, columnName string, rows iter.Iterator[T]) iter.Iterator[*query.RepeatedRow[T]] {
1 change: 0 additions & 1 deletion pkg/phlaredb/block_querier_test.go
Original file line number Diff line number Diff line change
@@ -190,5 +190,4 @@ func TestBlockCompatability(t *testing.T) {
})

}

}
14 changes: 8 additions & 6 deletions pkg/phlaredb/head.go
Original file line number Diff line number Diff line change
@@ -17,13 +17,15 @@ import (
"github.com/google/pprof/profile"
"github.com/google/uuid"
"github.com/opentracing/opentracing-go"
otlog "github.com/opentracing/opentracing-go/log"
"github.com/pkg/errors"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/promql/parser"
"github.com/prometheus/prometheus/tsdb/fileutil"
"github.com/samber/lo"
"go.opentelemetry.io/otel"
attribute "go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"go.uber.org/atomic"
"google.golang.org/grpc/codes"

@@ -533,8 +535,8 @@ func (h *Head) Queriers() Queriers {

// add the location IDs to the stacktraces
func (h *Head) resolveStacktraces(ctx context.Context, stacktracesByMapping stacktracesByMapping) *ingestv1.MergeProfilesStacktracesResult {
sp, _ := opentracing.StartSpanFromContext(ctx, "resolveStacktraces - Head")
defer sp.Finish()
_, sp := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "resolveStacktraces - Head")
defer sp.End()

names := []string{}
functions := map[int64]int{}
@@ -548,7 +550,7 @@ func (h *Head) resolveStacktraces(ctx context.Context, stacktracesByMapping stac
h.strings.lock.RUnlock()
}()

sp.LogFields(otlog.String("msg", "building MergeProfilesStacktracesResult"))
sp.AddEvent("TODO", trace.WithAttributes(attribute.String("msg", "building MergeProfilesStacktracesResult")))
_ = stacktracesByMapping.ForEach(
func(mapping uint64, stacktraceSamples stacktraceSampleMap) error {
mp, ok := h.symbolDB.MappingReader(mapping)
@@ -595,8 +597,8 @@ func (h *Head) resolveStacktraces(ctx context.Context, stacktracesByMapping stac
}

func (h *Head) resolvePprof(ctx context.Context, stacktracesByMapping profileSampleByMapping) *profile.Profile {
sp, _ := opentracing.StartSpanFromContext(ctx, "resolvePprof - Head")
defer sp.Finish()
_, sp := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "resolvePprof - Head")
defer sp.End()

locations := map[int32]*profile.Location{}
functions := map[uint64]*profile.Function{}
44 changes: 22 additions & 22 deletions pkg/phlaredb/head_queriers.go
Original file line number Diff line number Diff line change
@@ -10,6 +10,7 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/common/model"
"github.com/segmentio/parquet-go"
"go.opentelemetry.io/otel"

ingestv1 "github.com/grafana/phlare/api/gen/proto/go/ingester/v1"
typesv1 "github.com/grafana/phlare/api/gen/proto/go/types/v1"
@@ -34,8 +35,8 @@ func (q *headOnDiskQuerier) Open(_ context.Context) error {
}

func (q *headOnDiskQuerier) SelectMatchingProfiles(ctx context.Context, params *ingestv1.SelectProfilesRequest) (iter.Iterator[Profile], error) {
sp, ctx := opentracing.StartSpanFromContext(ctx, "SelectMatchingProfiles - HeadOnDisk")
defer sp.Finish()
ctx, sp := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "SelectMatchingProfiles - HeadOnDisk")
defer sp.End()

// query the index for rows
rowIter, labelsPerFP, err := q.head.profiles.index.selectMatchingRowRanges(ctx, params, q.rowGroupIdx)
@@ -48,14 +49,13 @@ func (q *headOnDiskQuerier) SelectMatchingProfiles(ctx context.Context, params *
start = model.Time(params.Start)
end = model.Time(params.End)
)
pIt := query.NewJoinIterator(
0,
[]query.Iterator{
pIt := query.NewBinaryJoinIterator(0,
query.NewBinaryJoinIterator(
0,
rowIter,
q.rowGroup().columnIter(ctx, "TimeNanos", query.NewIntBetweenPredicate(start.UnixNano(), end.UnixNano()), "TimeNanos"),
q.rowGroup().columnIter(ctx, "StacktracePartition", nil, "StacktracePartition"),
},
nil,
),
q.rowGroup().columnIter(ctx, "StacktracePartition", nil, "StacktracePartition"),
)
defer pIt.Close()

@@ -107,8 +107,8 @@ func (q *headOnDiskQuerier) Bounds() (model.Time, model.Time) {
}

func (q *headOnDiskQuerier) MergeByStacktraces(ctx context.Context, rows iter.Iterator[Profile]) (*ingestv1.MergeProfilesStacktracesResult, error) {
sp, ctx := opentracing.StartSpanFromContext(ctx, "MergeByStacktraces - HeadOnDisk")
defer sp.Finish()
ctx, sp := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "MergeByStacktraces - HeadOnDisk")
defer sp.End()

stacktraceSamples := stacktracesByMapping{}

@@ -121,8 +121,8 @@ func (q *headOnDiskQuerier) MergeByStacktraces(ctx context.Context, rows iter.It
}

func (q *headOnDiskQuerier) MergePprof(ctx context.Context, rows iter.Iterator[Profile]) (*profile.Profile, error) {
sp, ctx := opentracing.StartSpanFromContext(ctx, "MergeByPprof - HeadOnDisk")
defer sp.Finish()
ctx, sp := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "MergeByPprof - HeadOnDisk")
defer sp.End()

stacktraceSamples := profileSampleByMapping{}

@@ -134,8 +134,8 @@ func (q *headOnDiskQuerier) MergePprof(ctx context.Context, rows iter.Iterator[P
}

func (q *headOnDiskQuerier) MergeByLabels(ctx context.Context, rows iter.Iterator[Profile], by ...string) ([]*typesv1.Series, error) {
sp, ctx := opentracing.StartSpanFromContext(ctx, "MergeByLabels - HeadOnDisk")
defer sp.Finish()
ctx, sp := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "MergeByLabels - HeadOnDisk")
defer sp.End()

seriesByLabels := make(seriesByLabels)

@@ -169,8 +169,8 @@ func (q *headInMemoryQuerier) Open(_ context.Context) error {
}

func (q *headInMemoryQuerier) SelectMatchingProfiles(ctx context.Context, params *ingestv1.SelectProfilesRequest) (iter.Iterator[Profile], error) {
sp, ctx := opentracing.StartSpanFromContext(ctx, "SelectMatchingProfiles - HeadInMemory")
defer sp.Finish()
ctx, sp := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "SelectMatchingProfiles - HeadInMemory")
defer sp.End()

index := q.head.profiles.index

@@ -216,8 +216,8 @@ func (q *headInMemoryQuerier) Bounds() (model.Time, model.Time) {
}

func (q *headInMemoryQuerier) MergeByStacktraces(ctx context.Context, rows iter.Iterator[Profile]) (*ingestv1.MergeProfilesStacktracesResult, error) {
sp, _ := opentracing.StartSpanFromContext(ctx, "MergeByStacktraces - HeadInMemory")
defer sp.Finish()
_, sp := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "MergeByStacktraces - HeadInMemory")
defer sp.End()

stacktraceSamples := stacktracesByMapping{}

@@ -244,8 +244,8 @@ func (q *headInMemoryQuerier) MergeByStacktraces(ctx context.Context, rows iter.
}

func (q *headInMemoryQuerier) MergePprof(ctx context.Context, rows iter.Iterator[Profile]) (*profile.Profile, error) {
sp, _ := opentracing.StartSpanFromContext(ctx, "MergePprof - HeadInMemory")
defer sp.Finish()
_, sp := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "MergePprof - HeadInMemory")
defer sp.End()

stacktraceSamples := profileSampleByMapping{}

@@ -268,8 +268,8 @@ func (q *headInMemoryQuerier) MergePprof(ctx context.Context, rows iter.Iterator
}

func (q *headInMemoryQuerier) MergeByLabels(ctx context.Context, rows iter.Iterator[Profile], by ...string) ([]*typesv1.Series, error) {
sp, _ := opentracing.StartSpanFromContext(ctx, "MergeByLabels - HeadInMemory")
defer sp.Finish()
_, sp := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "MergeByLabels - HeadInMemory")
defer sp.End()

labelsByFingerprint := map[model.Fingerprint]string{}
seriesByLabels := make(seriesByLabels)
20 changes: 9 additions & 11 deletions pkg/phlaredb/phlaredb.go
Original file line number Diff line number Diff line change
@@ -18,9 +18,11 @@ import (
"github.com/grafana/dskit/services"
"github.com/oklog/ulid"
"github.com/opentracing/opentracing-go"
otlog "github.com/opentracing/opentracing-go/log"
"github.com/pkg/errors"
"github.com/prometheus/common/model"
"go.opentelemetry.io/otel"
attribute "go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"

profilev1 "github.com/grafana/phlare/api/gen/proto/go/google/v1"
ingestv1 "github.com/grafana/phlare/api/gen/proto/go/ingester/v1"
@@ -380,12 +382,8 @@ func filterProfiles[B BidiServerMerge[Res, Req],
Profile: maxBlockProfile,
Index: 0,
}, true, its...), batchProfileSize, func(ctx context.Context, batch []ProfileWithIndex) error {
sp, _ := opentracing.StartSpanFromContext(ctx, "filterProfiles - Filtering batch")
sp.LogFields(
otlog.Int("batch_len", len(batch)),
otlog.Int("batch_requested_size", batchProfileSize),
)
defer sp.Finish()
_, sp := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "filterProfiles - Filtering batch")
defer sp.End()

seriesByFP := map[model.Fingerprint]labelWithIndex{}
selectProfileResult.LabelsSets = selectProfileResult.LabelsSets[:0]
@@ -409,7 +407,7 @@ func filterProfiles[B BidiServerMerge[Res, Req],
})

}
sp.LogFields(otlog.String("msg", "sending batch to client"))
sp.AddEvent("TODO", trace.WithAttributes(attribute.String("msg", "sending batch to client")))
var err error
switch s := BidiServerMerge[Res, Req](stream).(type) {
case BidiServerMerge[*ingestv1.MergeProfilesStacktracesResponse, *ingestv1.MergeProfilesStacktracesRequest]:
@@ -433,9 +431,9 @@ func filterProfiles[B BidiServerMerge[Res, Req],
}
return err
}
sp.LogFields(otlog.String("msg", "batch sent to client"))
sp.AddEvent("TODO", trace.WithAttributes(attribute.String("msg", "batch sent to client")))

sp.LogFields(otlog.String("msg", "reading selection from client"))
sp.AddEvent("TODO", trace.WithAttributes(attribute.String("msg", "reading selection from client")))

// handle response for the batch.
var selected []bool
@@ -462,7 +460,7 @@ func filterProfiles[B BidiServerMerge[Res, Req],
}
return err
}
sp.LogFields(otlog.String("msg", "selection received"))
sp.AddEvent("TODO", trace.WithAttributes(attribute.String("msg", "selection received")))
for i, k := range selected {
if k {
selection[batch[i].Index] = append(selection[batch[i].Index], batch[i].Profile)
2 changes: 1 addition & 1 deletion pkg/phlaredb/profile_store.go
Original file line number Diff line number Diff line change
@@ -498,7 +498,7 @@ func (r *rowGroupOnDisk) columnIter(ctx context.Context, columnName string, pred
if !found {
return query.NewErrIterator(fmt.Errorf("column '%s' not found in head row group segment '%s'", columnName, r.file.Name()))
}
return query.NewColumnIterator(ctx, []parquet.RowGroup{r.RowGroup}, column.ColumnIndex, columnName, 1000, predicate, alias)
return query.NewSyncIterator(ctx, []parquet.RowGroup{r.RowGroup}, column.ColumnIndex, columnName, 1000, predicate, alias)
}

type seriesIDRowsRewriter struct {
10 changes: 6 additions & 4 deletions pkg/phlaredb/profiles.go
Original file line number Diff line number Diff line change
@@ -15,6 +15,7 @@ import (
"github.com/prometheus/prometheus/promql/parser"
"github.com/prometheus/prometheus/storage"
"github.com/samber/lo"
"go.opentelemetry.io/otel"
"go.uber.org/atomic"
"google.golang.org/grpc/codes"

@@ -195,8 +196,9 @@ func (pi *profilesIndex) Add(ps *schemav1.InMemoryProfile, lbs phlaremodel.Label
}

func (pi *profilesIndex) selectMatchingFPs(ctx context.Context, params *ingestv1.SelectProfilesRequest) ([]model.Fingerprint, error) {
sp, _ := opentracing.StartSpanFromContext(ctx, "selectMatchingFPs - Index")
defer sp.Finish()
_, sp := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "selectMatchingFPs - Index")
defer sp.End()

selectors, err := parser.ParseMetricSelector(params.LabelSelector)
if err != nil {
return nil, status.Error(codes.InvalidArgument, "failed to parse label selectors: "+err.Error())
@@ -246,8 +248,8 @@ func (pi *profilesIndex) selectMatchingRowRanges(ctx context.Context, params *in
map[model.Fingerprint]phlaremodel.Labels,
error,
) {
sp, ctx := opentracing.StartSpanFromContext(ctx, "selectMatchingRowRanges - Index")
defer sp.Finish()
ctx, sp := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "selectMatchingRowRanges - Index")
defer sp.End()

ids, err := pi.selectMatchingFPs(ctx, params)
if err != nil {
1,201 changes: 745 additions & 456 deletions pkg/phlaredb/query/iters.go

Large diffs are not rendered by default.

639 changes: 525 additions & 114 deletions pkg/phlaredb/query/iters_test.go

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion pkg/phlaredb/query/predicate_test.go
Original file line number Diff line number Diff line change
@@ -75,7 +75,7 @@ func testPredicate[T any](t *testing.T, tc predicateTestCase[T]) {

p := InstrumentedPredicate{pred: tc.predicate}

i := NewColumnIterator(context.TODO(), r.RowGroups(), 0, "test", 100, &p, "")
i := NewSyncIterator(context.TODO(), r.RowGroups(), 0, "test", 100, &p, "")
for i.Next() {
}

40 changes: 40 additions & 0 deletions pkg/phlaredb/query/predicates.go
Original file line number Diff line number Diff line change
@@ -6,6 +6,7 @@ import (

pq "github.com/segmentio/parquet-go"
"go.uber.org/atomic"
"golang.org/x/exp/constraints"
)

// Predicate is a pushdown predicate that can be applied at
@@ -254,3 +255,42 @@ func (p *InstrumentedPredicate) KeepValue(v pq.Value) bool {

return false
}

type mapPredicate[K constraints.Integer, V any] struct {
inbetweenPred Predicate
m map[K]V
}

func NewMapPredicate[K constraints.Integer, V any](m map[K]V) Predicate {

var min, max int64

first := true
for k := range m {
if first || max < int64(k) {
max = int64(k)
}
if first || min > int64(k) {
min = int64(k)
}
first = false
}

return &mapPredicate[K, V]{
inbetweenPred: NewIntBetweenPredicate(min, max),
m: m,
}
}

func (m *mapPredicate[K, V]) KeepColumnChunk(c pq.ColumnChunk) bool {
return m.inbetweenPred.KeepColumnChunk(c)
}

func (m *mapPredicate[K, V]) KeepPage(page pq.Page) bool {
return m.inbetweenPred.KeepPage(page)
}

func (m *mapPredicate[K, V]) KeepValue(v pq.Value) bool {
_, exists := m.m[K(v.Int64())]
return exists
}
13 changes: 7 additions & 6 deletions pkg/phlaredb/query/repeated.go
Original file line number Diff line number Diff line change
@@ -7,9 +7,10 @@ import (

"github.com/grafana/dskit/multierror"
"github.com/opentracing/opentracing-go"
otlog "github.com/opentracing/opentracing-go/log"
"github.com/samber/lo"
"github.com/segmentio/parquet-go"
attribute "go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"

"github.com/grafana/phlare/pkg/iter"
)
@@ -24,7 +25,7 @@ type repeatedPageIterator[T any] struct {
column int
readSize int
ctx context.Context
span opentracing.Span
span trace.Span

rgs []parquet.RowGroup
startRowGroupRowNum int64
@@ -134,10 +135,10 @@ Outer:
return false
}
it.span.LogFields(
otlog.String("msg", "Page read"),
otlog.Int64("startRowGroupRowNum", it.startRowGroupRowNum),
otlog.Int64("startPageRowNum", it.startPageRowNum),
otlog.Int64("pageRowNum", it.currentPage.NumRows()),
attribute.String("msg", "Page read"),
attribute.Int64("startRowGroupRowNum", it.startRowGroupRowNum),
attribute.Int64("startPageRowNum", it.startPageRowNum),
attribute.Int64("pageRowNum", it.currentPage.NumRows()),
)
it.valueReader = it.currentPage.Values()
}
58 changes: 31 additions & 27 deletions pkg/phlaredb/sample_merge.go
Original file line number Diff line number Diff line change
@@ -6,10 +6,12 @@ import (

"github.com/google/pprof/profile"
"github.com/opentracing/opentracing-go"
otlog "github.com/opentracing/opentracing-go/log"
"github.com/prometheus/common/model"
"github.com/samber/lo"
"github.com/segmentio/parquet-go"
"go.opentelemetry.io/otel"
attribute "go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"

googlev1 "github.com/grafana/phlare/api/gen/proto/go/google/v1"
ingestv1 "github.com/grafana/phlare/api/gen/proto/go/ingester/v1"
@@ -20,8 +22,8 @@ import (
)

func (b *singleBlockQuerier) MergeByStacktraces(ctx context.Context, rows iter.Iterator[Profile]) (*ingestv1.MergeProfilesStacktracesResult, error) {
sp, ctx := opentracing.StartSpanFromContext(ctx, "MergeByStacktraces - Block")
defer sp.Finish()
ctx, sp := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "MergeByStacktraces - Block")
defer sp.End()

stacktraceAggrValues := make(stacktracesByMapping)
if err := mergeByStacktraces(ctx, b.profiles.file, rows, stacktraceAggrValues); err != nil {
@@ -33,8 +35,8 @@ func (b *singleBlockQuerier) MergeByStacktraces(ctx context.Context, rows iter.I
}

func (b *singleBlockQuerier) MergePprof(ctx context.Context, rows iter.Iterator[Profile]) (*profile.Profile, error) {
sp, ctx := opentracing.StartSpanFromContext(ctx, "MergeByStacktraces - Block")
defer sp.Finish()
ctx, sp := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "MergeByStacktraces - Block")
defer sp.End()

stacktraceAggrValues := make(profileSampleByMapping)
if err := mergeByStacktraces(ctx, b.profiles.file, rows, stacktraceAggrValues); err != nil {
@@ -85,18 +87,18 @@ func (b *singleBlockQuerier) resolveLocations(ctx context.Context, mapping uint6
}

func (b *singleBlockQuerier) resolvePprofSymbols(ctx context.Context, profileSampleByMapping profileSampleByMapping) (*profile.Profile, error) {
sp, ctx := opentracing.StartSpanFromContext(ctx, "ResolvePprofSymbols - Block")
defer sp.Finish()
ctx, sp := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "ResolvePprofSymbols - Block")
defer sp.End()

locationsIdsByStacktraceID := newLocationsIdsByStacktraceID(len(profileSampleByMapping) * 1024)

// gather stacktraces
if err := profileSampleByMapping.ForEach(func(mapping uint64, samples profileSampleMap) error {
stacktraceIDs := samples.Ids()
sp.LogFields(
otlog.Int("stacktraces", len(stacktraceIDs)),
otlog.Uint64("mapping", mapping),
)
sp.AddEvent("TODO", trace.WithAttributes(
attribute.Int("stacktraces", len(stacktraceIDs)),
attribute.Uint64("mapping", mapping)))

return b.resolveLocations(ctx, mapping, locationsIdsByStacktraceID, stacktraceIDs)
}); err != nil {
return nil, err
@@ -245,26 +247,27 @@ func (b *singleBlockQuerier) resolvePprofSymbols(ctx context.Context, profileSam
}

func (b *singleBlockQuerier) resolveSymbols(ctx context.Context, stacktracesByMapping stacktracesByMapping) (*ingestv1.MergeProfilesStacktracesResult, error) {
sp, ctx := opentracing.StartSpanFromContext(ctx, "ResolveSymbols - Block")
defer sp.Finish()
ctx, sp := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "ResolveSymbols - Block")
defer sp.End()

locationsIdsByStacktraceID := newLocationsIdsByStacktraceID(len(stacktracesByMapping) * 1024)

// gather stacktraces
if err := stacktracesByMapping.ForEach(func(mapping uint64, samples stacktraceSampleMap) error {
stacktraceIDs := samples.Ids()
sp.LogFields(
otlog.Int("stacktraces", len(stacktraceIDs)),
otlog.Uint64("mapping", mapping),
)
sp.AddEvent("TODO", trace.WithAttributes(
attribute.Int("stacktraces", len(stacktraceIDs)),
attribute.Uint64("mapping", mapping)))

return b.resolveLocations(ctx, mapping, locationsIdsByStacktraceID, stacktraceIDs)
}); err != nil {
return nil, err
}

sp.LogFields(otlog.Int("locationIDs", len(locationsIdsByStacktraceID.locationIds())))
sp.AddEvent("TODO", trace.WithAttributes(attribute.Int("locationIDs", len(locationsIdsByStacktraceID.locationIds()))))

// gather locations
sp.LogFields(otlog.String("msg", "gather locations"))
sp.AddEvent("TODO", trace.WithAttributes(attribute.String("msg", "gather locations")))
var (
locationIDsByFunctionID = newUniqueIDs[[]int64]()
locations = b.locations.retrieveRows(ctx, locationsIdsByStacktraceID.locationIds().iterator())
@@ -279,10 +282,10 @@ func (b *singleBlockQuerier) resolveSymbols(ctx context.Context, stacktracesByMa
if err := locations.Err(); err != nil {
return nil, err
}
sp.LogFields(otlog.Int("functions", len(locationIDsByFunctionID)))
sp.AddEvent("TODO", trace.WithAttributes(attribute.Int("functions", len(locationIDsByFunctionID))))

// gather functions
sp.LogFields(otlog.String("msg", "gather functions"))
sp.AddEvent("TODO", trace.WithAttributes(attribute.String("msg", "gather functions")))
var (
functionIDsByStringID = newUniqueIDs[[]int64]()
functions = b.functions.retrieveRows(ctx, locationIDsByFunctionID.iterator())
@@ -297,7 +300,7 @@ func (b *singleBlockQuerier) resolveSymbols(ctx context.Context, stacktracesByMa
}

// gather strings
sp.LogFields(otlog.String("msg", "gather strings"))
sp.AddEvent("TODO", trace.WithAttributes(attribute.String("msg", "gather strings")))
var (
names = make([]string, len(functionIDsByStringID))
idSlice = make([][]int64, len(functionIDsByStringID))
@@ -314,7 +317,7 @@ func (b *singleBlockQuerier) resolveSymbols(ctx context.Context, stacktracesByMa
return nil, err
}

sp.LogFields(otlog.String("msg", "build MergeProfilesStacktracesResult"))
sp.AddEvent("TODO", trace.WithAttributes(attribute.String("msg", "build MergeProfilesStacktracesResult")))
// idSlice contains stringIDs and gets rewritten into functionIDs
for nameID := range idSlice {
var functionIDs []int64
@@ -361,8 +364,8 @@ func (b *singleBlockQuerier) resolveSymbols(ctx context.Context, stacktracesByMa
}

func (b *singleBlockQuerier) MergeByLabels(ctx context.Context, rows iter.Iterator[Profile], by ...string) ([]*typesv1.Series, error) {
sp, ctx := opentracing.StartSpanFromContext(ctx, "MergeByLabels - Block")
defer sp.Finish()
ctx, sp := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "MergeByLabels - Block")
defer sp.End()

m := make(seriesByLabels)
columnName := "TotalValue"
@@ -469,8 +472,9 @@ type mapAdder interface {
}

func mergeByStacktraces(ctx context.Context, profileSource Source, rows iter.Iterator[Profile], m mapAdder) error {
sp, ctx := opentracing.StartSpanFromContext(ctx, "mergeByStacktraces")
defer sp.Finish()
ctx, sp := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "mergeByStacktraces")
defer sp.End()

// clone the rows to be able to iterate over them twice
multiRows, err := iter.CloneN(rows, 2)
if err != nil {
11 changes: 7 additions & 4 deletions pkg/querier/ingester_querier.go
Original file line number Diff line number Diff line change
@@ -8,6 +8,7 @@ import (
ring_client "github.com/grafana/dskit/ring/client"
"github.com/opentracing/opentracing-go"
"github.com/prometheus/prometheus/promql/parser"
"go.opentelemetry.io/otel"
"golang.org/x/sync/errgroup"

ingesterv1 "github.com/grafana/phlare/api/gen/proto/go/ingester/v1"
@@ -58,8 +59,9 @@ func forAllIngesters[T any](ctx context.Context, ingesterQuerier *IngesterQuerie
}

func (q *Querier) selectTreeFromIngesters(ctx context.Context, req *querierv1.SelectMergeStacktracesRequest) (*phlaremodel.Tree, error) {
sp, ctx := opentracing.StartSpanFromContext(ctx, "SelectTree Ingesters")
defer sp.Finish()
ctx, sp := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "SelectTree Ingesters")
defer sp.End()

profileType, err := phlaremodel.ParseProfileTypeSelector(req.ProfileTypeID)
if err != nil {
return nil, connect.NewError(connect.CodeInvalidArgument, err)
@@ -103,8 +105,9 @@ func (q *Querier) selectTreeFromIngesters(ctx context.Context, req *querierv1.Se
}

func (q *Querier) selectSeriesFromIngesters(ctx context.Context, req *ingesterv1.MergeProfilesLabelsRequest) ([]ResponseFromReplica[clientpool.BidiClientMergeProfilesLabels], error) {
sp, ctx := opentracing.StartSpanFromContext(ctx, "SelectSeries Ingesters")
defer sp.Finish()
ctx, sp := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "SelectSeries Ingesters")
defer sp.End()

responses, err := forAllIngesters(ctx, q.ingesterQuerier, func(ctx context.Context, ic IngesterQueryClient) (clientpool.BidiClientMergeProfilesLabels, error) {
return ic.MergeProfilesLabels(ctx), nil
})
65 changes: 34 additions & 31 deletions pkg/querier/querier.go
Original file line number Diff line number Diff line change
@@ -15,13 +15,15 @@ import (
"github.com/grafana/dskit/services"
"github.com/grafana/mimir/pkg/util/spanlogger"
"github.com/opentracing/opentracing-go"
otlog "github.com/opentracing/opentracing-go/log"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/promql/parser"
"github.com/samber/lo"
"go.opentelemetry.io/otel"
attribute "go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"golang.org/x/sync/errgroup"

googlev1 "github.com/grafana/phlare/api/gen/proto/go/google/v1"
@@ -112,8 +114,8 @@ func (q *Querier) stopping(_ error) error {
}

func (q *Querier) ProfileTypes(ctx context.Context, req *connect.Request[querierv1.ProfileTypesRequest]) (*connect.Response[querierv1.ProfileTypesResponse], error) {
sp, ctx := opentracing.StartSpanFromContext(ctx, "ProfileTypes")
defer sp.Finish()
ctx, sp := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "ProfileTypes")
defer sp.End()

responses, err := forAllIngesters(ctx, q.ingesterQuerier, func(childCtx context.Context, ic IngesterQueryClient) ([]*typesv1.ProfileType, error) {
res, err := ic.ProfileTypes(childCtx, connect.NewRequest(&ingestv1.ProfileTypesRequest{}))
@@ -148,9 +150,9 @@ func (q *Querier) ProfileTypes(ctx context.Context, req *connect.Request[querier
func (q *Querier) LabelValues(ctx context.Context, req *connect.Request[typesv1.LabelValuesRequest]) (*connect.Response[typesv1.LabelValuesResponse], error) {
sp, ctx := opentracing.StartSpanFromContext(ctx, "LabelValues")
defer func() {
sp.LogFields(
otlog.String("name", req.Msg.Name),
)
sp.AddEvent("TODO", trace.WithAttributes(
attribute.String("name", req.Msg.Name)))

sp.Finish()
}()
responses, err := forAllIngesters(ctx, q.ingesterQuerier, func(childCtx context.Context, ic IngesterQueryClient) ([]string, error) {
@@ -173,8 +175,9 @@ func (q *Querier) LabelValues(ctx context.Context, req *connect.Request[typesv1.
}

func (q *Querier) LabelNames(ctx context.Context, req *connect.Request[typesv1.LabelNamesRequest]) (*connect.Response[typesv1.LabelNamesResponse], error) {
sp, ctx := opentracing.StartSpanFromContext(ctx, "LabelNames")
defer sp.Finish()
ctx, sp := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "LabelNames")
defer sp.End()

responses, err := forAllIngesters(ctx, q.ingesterQuerier, func(childCtx context.Context, ic IngesterQueryClient) ([]string, error) {
res, err := ic.LabelNames(childCtx, connect.NewRequest(&typesv1.LabelNamesRequest{
Matchers: req.Msg.Matchers,
@@ -196,9 +199,9 @@ func (q *Querier) LabelNames(ctx context.Context, req *connect.Request[typesv1.L
func (q *Querier) Series(ctx context.Context, req *connect.Request[querierv1.SeriesRequest]) (*connect.Response[querierv1.SeriesResponse], error) {
sp, ctx := opentracing.StartSpanFromContext(ctx, "Series")
defer func() {
sp.LogFields(
otlog.String("matchers", strings.Join(req.Msg.Matchers, ",")),
)
sp.AddEvent("TODO", trace.WithAttributes(
attribute.String("matchers", strings.Join(req.Msg.Matchers, ","))))

sp.Finish()
}()
responses, err := forAllIngesters(ctx, q.ingesterQuerier, func(childCtx context.Context, ic IngesterQueryClient) ([]*typesv1.Labels, error) {
@@ -227,13 +230,13 @@ func (q *Querier) Series(ctx context.Context, req *connect.Request[querierv1.Ser
func (q *Querier) Diff(ctx context.Context, req *connect.Request[querierv1.DiffRequest]) (*connect.Response[querierv1.DiffResponse], error) {
sp, ctx := opentracing.StartSpanFromContext(ctx, "Diff")
defer func() {
sp.LogFields(
otlog.String("leftStart", model.Time(req.Msg.Left.Start).Time().String()),
otlog.String("leftEnd", model.Time(req.Msg.Left.End).Time().String()),
sp.AddEvent("TODO", trace.WithAttributes(
attribute.String("leftStart", model.Time(req.Msg.Left.Start).Time().String()),
attribute.String("leftEnd", model.Time(req.Msg.Left.End).Time().String()),
// Assume are the same
otlog.String("selector", req.Msg.Left.LabelSelector),
otlog.String("profile_id", req.Msg.Left.ProfileTypeID),
)
attribute.String("selector", req.Msg.Left.LabelSelector),
attribute.String("profile_id", req.Msg.Left.ProfileTypeID)))

sp.Finish()
}()

@@ -409,12 +412,12 @@ func splitQueryToStores(start, end model.Time, now model.Time, queryStoreAfter t
func (q *Querier) SelectMergeProfile(ctx context.Context, req *connect.Request[querierv1.SelectMergeProfileRequest]) (*connect.Response[googlev1.Profile], error) {
sp, ctx := opentracing.StartSpanFromContext(ctx, "SelectMergeProfile")
defer func() {
sp.LogFields(
otlog.String("start", model.Time(req.Msg.Start).Time().String()),
otlog.String("end", model.Time(req.Msg.End).Time().String()),
otlog.String("selector", req.Msg.LabelSelector),
otlog.String("profile_id", req.Msg.ProfileTypeID),
)
sp.AddEvent("TODO", trace.WithAttributes(
attribute.String("start", model.Time(req.Msg.Start).Time().String()),
attribute.String("end", model.Time(req.Msg.End).Time().String()),
attribute.String("selector", req.Msg.LabelSelector),
attribute.String("profile_id", req.Msg.ProfileTypeID)))

sp.Finish()
}()

@@ -467,14 +470,14 @@ func (q *Querier) SelectMergeProfile(ctx context.Context, req *connect.Request[q
func (q *Querier) SelectSeries(ctx context.Context, req *connect.Request[querierv1.SelectSeriesRequest]) (*connect.Response[querierv1.SelectSeriesResponse], error) {
sp, ctx := opentracing.StartSpanFromContext(ctx, "SelectSeries")
defer func() {
sp.LogFields(
otlog.String("start", model.Time(req.Msg.Start).Time().String()),
otlog.String("end", model.Time(req.Msg.End).Time().String()),
otlog.String("selector", req.Msg.LabelSelector),
otlog.String("profile_id", req.Msg.ProfileTypeID),
otlog.String("group_by", strings.Join(req.Msg.GroupBy, ",")),
otlog.Float64("step", req.Msg.Step),
)
sp.AddEvent("TODO", trace.WithAttributes(
attribute.String("start", model.Time(req.Msg.Start).Time().String()),
attribute.String("end", model.Time(req.Msg.End).Time().String()),
attribute.String("selector", req.Msg.LabelSelector),
attribute.String("profile_id", req.Msg.ProfileTypeID),
attribute.String("group_by", strings.Join(req.Msg.GroupBy, ",")),
attribute.Float64("step", req.Msg.Step)))

sp.Finish()
}()

31 changes: 16 additions & 15 deletions pkg/querier/select_merge.go
Original file line number Diff line number Diff line change
@@ -8,13 +8,6 @@ import (

"github.com/google/pprof/profile"
"github.com/grafana/dskit/multierror"
"github.com/opentracing/opentracing-go"
"github.com/prometheus/common/model"
"github.com/samber/lo"
"golang.org/x/sync/errgroup"

otlog "github.com/opentracing/opentracing-go/log"

googlev1 "github.com/grafana/phlare/api/gen/proto/go/google/v1"
ingestv1 "github.com/grafana/phlare/api/gen/proto/go/ingester/v1"
typesv1 "github.com/grafana/phlare/api/gen/proto/go/types/v1"
@@ -24,6 +17,13 @@ import (
"github.com/grafana/phlare/pkg/pprof"
"github.com/grafana/phlare/pkg/util"
"github.com/grafana/phlare/pkg/util/loser"
"github.com/opentracing/opentracing-go"
"github.com/prometheus/common/model"
"github.com/samber/lo"
"go.opentelemetry.io/otel"
attribute "go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"golang.org/x/sync/errgroup"
)

type ProfileWithLabels struct {
@@ -220,8 +220,9 @@ func (s *mergeIterator[R, Req, Res]) Close() error {

// skipDuplicates iterates through the iterator and skip duplicates.
func skipDuplicates(ctx context.Context, its []MergeIterator) error {
span, _ := opentracing.StartSpanFromContext(ctx, "skipDuplicates")
defer span.Finish()
_, span := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "skipDuplicates")
defer span.End()

var errors multierror.MultiError
tree := loser.New(its,
&ProfileWithLabels{
@@ -259,17 +260,17 @@ func skipDuplicates(ctx context.Context, its []MergeIterator) error {
}
duplicates++
}
span.LogFields(otlog.Int("duplicates", duplicates))
span.LogFields(otlog.Int("total", total))
span.AddEvent("TODO", trace.WithAttributes(attribute.Int("duplicates", duplicates)))
span.AddEvent("TODO", trace.WithAttributes(attribute.Int("total", total)))

return errors.Err()
}

// selectMergeTree selects the profile from each ingester by deduping them and
// returns merge of stacktrace samples represented as a tree.
func selectMergeTree(ctx context.Context, responses []ResponseFromReplica[clientpool.BidiClientMergeProfilesStacktraces]) (*phlaremodel.Tree, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "selectMergeTree")
defer span.Finish()
ctx, span := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "selectMergeTree")
defer span.End()

mergeResults := make([]MergeResult[*ingestv1.MergeProfilesStacktracesResult], len(responses))
iters := make([]MergeIterator, len(responses))
@@ -294,7 +295,7 @@ func selectMergeTree(ctx context.Context, responses []ResponseFromReplica[client
}

// Collects the results in parallel.
span.LogFields(otlog.String("msg", "collecting merge results"))
span.AddEvent("TODO", trace.WithAttributes(attribute.String("msg", "collecting merge results")))
g, _ := errgroup.WithContext(ctx)
m := phlaremodel.NewTreeMerger()
sm := phlaremodel.NewStackTraceMerger()
@@ -327,7 +328,7 @@ func selectMergeTree(ctx context.Context, responses []ResponseFromReplica[client
}
}

span.LogFields(otlog.String("msg", "building tree"))
span.AddEvent("TODO", trace.WithAttributes(attribute.String("msg", "building tree")))
return m.Tree(), nil
}

11 changes: 7 additions & 4 deletions pkg/querier/store_gateway_querier.go
Original file line number Diff line number Diff line change
@@ -15,6 +15,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/prometheus/promql/parser"
"go.opentelemetry.io/otel"
"golang.org/x/sync/errgroup"

ingesterv1 "github.com/grafana/phlare/api/gen/proto/go/ingester/v1"
@@ -151,8 +152,9 @@ func GetShuffleShardingSubring(ring ring.ReadRing, userID string, limits StoreGa
}

func (q *Querier) selectTreeFromStoreGateway(ctx context.Context, req *querierv1.SelectMergeStacktracesRequest) (*phlaremodel.Tree, error) {
sp, ctx := opentracing.StartSpanFromContext(ctx, "SelectTree StoreGateway")
defer sp.Finish()
ctx, sp := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "SelectTree StoreGateway")
defer sp.End()

profileType, err := phlaremodel.ParseProfileTypeSelector(req.ProfileTypeID)
if err != nil {
return nil, connect.NewError(connect.CodeInvalidArgument, err)
@@ -200,8 +202,9 @@ func (q *Querier) selectTreeFromStoreGateway(ctx context.Context, req *querierv1
}

func (q *Querier) selectSeriesFromStoreGateway(ctx context.Context, req *ingesterv1.MergeProfilesLabelsRequest) ([]ResponseFromReplica[clientpool.BidiClientMergeProfilesLabels], error) {
sp, ctx := opentracing.StartSpanFromContext(ctx, "SelectSeries StoreGateway")
defer sp.Finish()
ctx, sp := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "SelectSeries StoreGateway")
defer sp.End()

tenantID, err := tenant.ExtractTenantIDFromContext(ctx)
if err != nil {
return nil, connect.NewError(connect.CodeInvalidArgument, err)
3 changes: 2 additions & 1 deletion pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
@@ -27,6 +27,7 @@ import (
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/weaveworks/common/middleware"
"github.com/weaveworks/common/user"
"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc"

"github.com/grafana/phlare/pkg/frontend/frontendpb"
@@ -200,7 +201,7 @@ type schedulerRequest struct {

ctx context.Context
ctxCancel context.CancelFunc
queueSpan opentracing.Span
queueSpan trace.Span

// This is only used for testing.
parentSpanContext opentracing.SpanContext