Skip to content
This repository was archived by the owner on Jul 19, 2023. It is now read-only.

Query profiles parquet by rowGroup #799

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
215 changes: 209 additions & 6 deletions pkg/phlaredb/block_querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -941,6 +941,204 @@ func retrieveStacktracePartition(buf [][]parquet.Value, pos int) uint64 {
return uint64(0)
}

type sortingIterator struct {
}

type rowGroupIndex struct {
min, max, idx int64
}

type profileMatchIterator struct {
err error
it query.Iterator

file *parquet.File
currentRowGroup int
rowGroups []parquet.RowGroup
index []rowGroupIndex
minTimeOrder []int
lbls map[int64]labelsInfo

offset int
ranges []rowRange
seriesIndexes []uint32
}

func newProfileMatchIterator(file *parquet.File, lbls map[int64]labelsInfo) query.Iterator {
it := &profileMatchIterator{file: file, lbls: lbls}
//timeNanosIndex, _ := query.GetColumnIndexByPath(file, "TimeNanos")
it.rowGroups = file.RowGroups()

/*
it.index = make([]rowGroupIndex, len(it.rowGroups))
it.minTimeOrder = make([]int, len(it.rowGroups))

for rgIdx, rg := range it.rowGroups {
it.minTimeOrder[rgIdx] = rgIdx
idx := rg.ColumnChunks()[timeNanosIndex].ColumnIndex()
for pgIdx := 0; pgIdx < idx.NumPages(); pgIdx++ {
min := idx.MinValue(pgIdx).Int64()
max := idx.MaxValue(pgIdx).Int64()
if it.index[rgIdx].max == 0 || it.index[rgIdx].max < max {
it.index[rgIdx].max = max
}
if it.index[rgIdx].min == 0 || it.index[rgIdx].min > min {
it.index[rgIdx].min = min
}
}

sort.Slice(it.minTimeOrder, func(i, j int) bool {
return it.index[i].min < it.index[j].min
})
}
*/

return it
}

type parquetUint32Reader interface {
ReadUint32s(values []uint32) (int, error)
}

func (it *profileMatchIterator) Seek(to query.RowNumberWithDefinitionLevel) bool {
return it.it.Seek(to)
}

func (it *profileMatchIterator) readNextRowGroup() error {
if it.currentRowGroup >= len(it.rowGroups) {
return io.EOF
}

rg := it.rowGroups[it.currentRowGroup]
it.currentRowGroup++

seriesIndexIndex, _ := query.GetColumnIndexByPath(it.file, "SeriesIndex")

// find series ids relevant
pages := rg.ColumnChunks()[seriesIndexIndex].Pages()

batch := make([]uint32, 10_000)

rangeIdx := -1
it.ranges = it.ranges[:0]
it.seriesIndexes = it.seriesIndexes[:0]

for {
page, err := pages.ReadPage()
if err == io.EOF {
break
} else if err != nil {
return err
}

reader, ok := page.Values().(parquetUint32Reader)
if !ok {
return fmt.Errorf("unexpected reader: %T", page.Values())
}

for {
valuesEOF := false

n, err := reader.ReadUint32s(batch)
if err == io.EOF {
valuesEOF = true
} else if err != nil {
return err
}

for idx := range batch {
seriesIdx := batch[idx]

_, selected := it.lbls[int64(seriesIdx)]
if !selected {
continue
}

// check rowRanges
if rangeIdx > 0 && seriesIdx == it.seriesIndexes[rangeIdx] {
it.ranges[rangeIdx].length += 1
continue
}

// if rangeIdx > 0 {
// fmt.Printf("rowRange finished: %+#v seriesIdx=%d offset=%d\n", it.ranges[rangeIdx], it.seriesIndexes[rangeIdx], it.offset)
// }

// needs new row range
rangeIdx += 1
it.ranges = append(it.ranges, rowRange{
rowNum: int64(idx + it.offset),
length: 1,
})
it.seriesIndexes = append(it.seriesIndexes, seriesIdx)
}

it.offset += n
if valuesEOF {
break
}
}
}

var rowNumberIter iter.Iterator[rowNumWithSomething[uint32]] = &rowRangesIter[uint32]{
r: it.ranges,
fps: it.seriesIndexes,
pos: 0,
}

it.it = query.NewRowNumberIterator(rowNumberIter)

return nil

}

func (it *profileMatchIterator) Next() bool {
err := it.next()
if err == io.EOF {
return false
}
if err != nil {
it.err = err
return false
}

return true

}

func (it *profileMatchIterator) next() error {
for it.it == nil || !it.it.Next() {
// close old iterator if required
if it.it != nil {
if err := it.it.Close(); err != nil {
return err
}
}

// get new iterator
err := it.readNextRowGroup()
if err != nil {
return err
}
}

return nil
}

func (it *profileMatchIterator) Err() error {
return it.err
}

func (it *profileMatchIterator) Close() error {
// TODO
return nil
}

func (it *profileMatchIterator) At() *query.IteratorResult {
v := it.it.At()
return v
}

func (b *singleBlockQuerier) SelectMatchingProfiles(ctx context.Context, params *ingestv1.SelectProfilesRequest) (iter.Iterator[Profile], error) {
sp, ctx := opentracing.StartSpanFromContext(ctx, "SelectMatchingProfiles - Block")
defer sp.Finish()
Expand Down Expand Up @@ -992,16 +1190,18 @@ func (b *singleBlockQuerier) SelectMatchingProfiles(ctx context.Context, params
joinIters []query.Iterator
)

seriesIndexIter := newProfileMatchIterator(b.profiles.file, lblsPerRef)

if b.meta.Version >= 2 {
joinIters = []query.Iterator{
b.profiles.columnIter(ctx, "SeriesIndex", newMapPredicate(lblsPerRef), "SeriesIndex"),
seriesIndexIter,
b.profiles.columnIter(ctx, "TimeNanos", query.NewIntBetweenPredicate(model.Time(params.Start).UnixNano(), model.Time(params.End).UnixNano()), "TimeNanos"),
b.profiles.columnIter(ctx, "StacktracePartition", nil, "StacktracePartition"),
}
buf = make([][]parquet.Value, 3)
} else {
joinIters = []query.Iterator{
b.profiles.columnIter(ctx, "SeriesIndex", newMapPredicate(lblsPerRef), "SeriesIndex"),
seriesIndexIter,
b.profiles.columnIter(ctx, "TimeNanos", query.NewIntBetweenPredicate(model.Time(params.Start).UnixNano(), model.Time(params.End).UnixNano()), "TimeNanos"),
}
buf = make([][]parquet.Value, 2)
Expand All @@ -1015,8 +1215,11 @@ func (b *singleBlockQuerier) SelectMatchingProfiles(ctx context.Context, params
var currentSeriesSlice []Profile
for pIt.Next() {
res := pIt.At()
buf = res.Columns(buf, "SeriesIndex", "TimeNanos", "StacktracePartition")
seriesIndex := buf[0][0].Int64()

seriesIndexE := res.Entries[0].RowValue.(rowNumWithSomething[uint32])
seriesIndex := int64(seriesIndexE.elem)

buf = res.Columns(buf, "TimeNanos", "StacktracePartition")
if seriesIndex != currSeriesIndex {
currSeriesIndex = seriesIndex
if len(currentSeriesSlice) > 0 {
Expand All @@ -1028,8 +1231,8 @@ func (b *singleBlockQuerier) SelectMatchingProfiles(ctx context.Context, params
currentSeriesSlice = append(currentSeriesSlice, BlockProfile{
labels: lblsPerRef[seriesIndex].lbs,
fp: lblsPerRef[seriesIndex].fp,
ts: model.TimeFromUnixNano(buf[1][0].Int64()),
stacktracePartition: retrieveStacktracePartition(buf, 2),
ts: model.TimeFromUnixNano(buf[0][0].Int64()),
stacktracePartition: retrieveStacktracePartition(buf, 1),
RowNum: res.RowNumber[0],
})
}
Expand Down
57 changes: 57 additions & 0 deletions pkg/phlaredb/block_querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package phlaredb
import (
"context"
"fmt"
"os"
rpprof "runtime/pprof"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -190,5 +192,60 @@ func TestBlockCompatability(t *testing.T) {
})

}
}

func BenchmarkSelect(t *testing.B) {
path := "/Users/christian/parquet-notebook/"
bucket, err := filesystem.NewBucket(path)
require.NoError(t, err)

ctx := context.Background()
metas, err := NewBlockQuerier(ctx, bucket).BlockMetas(ctx)
require.NoError(t, err)

for _, meta := range metas {

q := NewSingleBlockQuerierFromMeta(ctx, bucket, meta)
require.NoError(t, q.Open(ctx))

profilesTypes, err := q.index.LabelValues("__profile_type__")
require.NoError(t, err)

for _, profileType := range profilesTypes {
name := fmt.Sprintf("block-%s-%s", meta.ULID.String(), profileType)
t.Run(name, func(t *testing.B) {
profileTypeParts := strings.Split(profileType, ":")

it, err := q.SelectMatchingProfiles(ctx, &ingestv1.SelectProfilesRequest{
LabelSelector: `{namespace="profiles-ops-001"}`,
Start: 0,
End: time.Now().UnixMilli(),
Type: &typesv1.ProfileType{
Name: profileTypeParts[0],
SampleType: profileTypeParts[1],
SampleUnit: profileTypeParts[2],
PeriodType: profileTypeParts[3],
PeriodUnit: profileTypeParts[4],
},
})
require.NoError(t, err)

f, err := os.Create("heap-after-" + name + ".pprof")
require.NoError(t, err)

require.NoError(t, rpprof.WriteHeapProfile(f))

require.NoError(t, f.Close())

// TODO: It would be nice actually comparing the whole profile, but at present the result is not deterministic.
p, err := q.MergePprof(ctx, it)

var sampleSum int64
for _, s := range p.Sample {
sampleSum += s.Value[0]
}
t.Logf("profileType=%s sum=%d", profileType, sampleSum)
})
}
}
}
6 changes: 3 additions & 3 deletions pkg/phlaredb/head_queriers.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,12 @@ func (q *headOnDiskQuerier) SelectMatchingProfiles(ctx context.Context, params *
for pIt.Next() {
res := pIt.At()

v, ok := res.Entries[0].RowValue.(fingerprintWithRowNum)
v, ok := res.Entries[0].RowValue.(rowNumWithSomething[model.Fingerprint])
if !ok {
panic("no fingerprint information found")
}

lbls, ok := labelsPerFP[v.fp]
lbls, ok := labelsPerFP[v.elem]
if !ok {
panic("no profile series labels with matching fingerprint found")
}
Expand All @@ -83,7 +83,7 @@ func (q *headOnDiskQuerier) SelectMatchingProfiles(ctx context.Context, params *
}
profiles = append(profiles, BlockProfile{
labels: lbls,
fp: v.fp,
fp: v.elem,
ts: model.TimeFromUnixNano(buf[0][0].Int64()),
stacktracePartition: retrieveStacktracePartition(buf, 1),
RowNum: res.RowNumber[0],
Expand Down
4 changes: 2 additions & 2 deletions pkg/phlaredb/profile_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func Test_rowRangeIter(t *testing.T) {
result := []int64{}
for it.Next() {
result = append(result, it.At().RowNumber())
assert.Equal(t, model.Fingerprint(0xff), it.At().fp)
assert.Equal(t, model.Fingerprint(0xff), it.At().elem)
}
assert.Equal(t, tc.expected, result)
})
Expand Down Expand Up @@ -238,7 +238,7 @@ func Test_rowRangesIter(t *testing.T) {

for it.Next() {
rows = append(rows, it.At().RowNumber())
fingerprints = append(fingerprints, it.At().fp)
fingerprints = append(fingerprints, it.At().elem)
}
assert.Equal(t, tc.expRows, rows)
assert.Equal(t, tc.expFingerprints, fingerprints)
Expand Down
Loading