From 3a8630329249f52cae4666aa6a00f83671ad03d6 Mon Sep 17 00:00:00 2001 From: Christian Simon Date: Wed, 28 Jun 2023 15:54:07 +0100 Subject: [PATCH] Something iterates --- pkg/phlaredb/block_querier.go | 215 ++++++++++++++++++++++++++++- pkg/phlaredb/block_querier_test.go | 57 ++++++++ pkg/phlaredb/head_queriers.go | 6 +- pkg/phlaredb/profile_test.go | 4 +- pkg/phlaredb/profiles.go | 26 ++-- 5 files changed, 284 insertions(+), 24 deletions(-) diff --git a/pkg/phlaredb/block_querier.go b/pkg/phlaredb/block_querier.go index 989e95a1a..403866298 100644 --- a/pkg/phlaredb/block_querier.go +++ b/pkg/phlaredb/block_querier.go @@ -941,6 +941,204 @@ func retrieveStacktracePartition(buf [][]parquet.Value, pos int) uint64 { return uint64(0) } +type sortingIterator struct { +} + +type rowGroupIndex struct { + min, max, idx int64 +} + +type profileMatchIterator struct { + err error + it query.Iterator + + file *parquet.File + currentRowGroup int + rowGroups []parquet.RowGroup + index []rowGroupIndex + minTimeOrder []int + lbls map[int64]labelsInfo + + offset int + ranges []rowRange + seriesIndexes []uint32 +} + +func newProfileMatchIterator(file *parquet.File, lbls map[int64]labelsInfo) query.Iterator { + it := &profileMatchIterator{file: file, lbls: lbls} + //timeNanosIndex, _ := query.GetColumnIndexByPath(file, "TimeNanos") + it.rowGroups = file.RowGroups() + + /* + it.index = make([]rowGroupIndex, len(it.rowGroups)) + it.minTimeOrder = make([]int, len(it.rowGroups)) + + for rgIdx, rg := range it.rowGroups { + it.minTimeOrder[rgIdx] = rgIdx + idx := rg.ColumnChunks()[timeNanosIndex].ColumnIndex() + for pgIdx := 0; pgIdx < idx.NumPages(); pgIdx++ { + min := idx.MinValue(pgIdx).Int64() + max := idx.MaxValue(pgIdx).Int64() + if it.index[rgIdx].max == 0 || it.index[rgIdx].max < max { + it.index[rgIdx].max = max + } + if it.index[rgIdx].min == 0 || it.index[rgIdx].min > min { + it.index[rgIdx].min = min + } + } + + sort.Slice(it.minTimeOrder, func(i, j int) bool { + return it.index[i].min < it.index[j].min + }) + } + */ + + return it +} + +type parquetUint32Reader interface { + ReadUint32s(values []uint32) (int, error) +} + +func (it *profileMatchIterator) Seek(to query.RowNumberWithDefinitionLevel) bool { + return it.it.Seek(to) +} + +func (it *profileMatchIterator) readNextRowGroup() error { + if it.currentRowGroup >= len(it.rowGroups) { + return io.EOF + } + + rg := it.rowGroups[it.currentRowGroup] + it.currentRowGroup++ + + seriesIndexIndex, _ := query.GetColumnIndexByPath(it.file, "SeriesIndex") + + // find series ids relevant + pages := rg.ColumnChunks()[seriesIndexIndex].Pages() + + batch := make([]uint32, 10_000) + + rangeIdx := -1 + it.ranges = it.ranges[:0] + it.seriesIndexes = it.seriesIndexes[:0] + + for { + page, err := pages.ReadPage() + if err == io.EOF { + break + } else if err != nil { + return err + } + + reader, ok := page.Values().(parquetUint32Reader) + if !ok { + return fmt.Errorf("unexpected reader: %T", page.Values()) + } + + for { + valuesEOF := false + + n, err := reader.ReadUint32s(batch) + if err == io.EOF { + valuesEOF = true + } else if err != nil { + return err + } + + for idx := range batch { + seriesIdx := batch[idx] + + _, selected := it.lbls[int64(seriesIdx)] + if !selected { + continue + } + + // check rowRanges + if rangeIdx > 0 && seriesIdx == it.seriesIndexes[rangeIdx] { + it.ranges[rangeIdx].length += 1 + continue + } + + // if rangeIdx > 0 { + // fmt.Printf("rowRange finished: %+#v seriesIdx=%d offset=%d\n", it.ranges[rangeIdx], it.seriesIndexes[rangeIdx], it.offset) + // } + + // needs new row range + rangeIdx += 1 + it.ranges = append(it.ranges, rowRange{ + rowNum: int64(idx + it.offset), + length: 1, + }) + it.seriesIndexes = append(it.seriesIndexes, seriesIdx) + } + + it.offset += n + if valuesEOF { + break + } + } + } + + var rowNumberIter iter.Iterator[rowNumWithSomething[uint32]] = &rowRangesIter[uint32]{ + r: it.ranges, + fps: it.seriesIndexes, + pos: 0, + } + + it.it = query.NewRowNumberIterator(rowNumberIter) + + return nil + +} + +func (it *profileMatchIterator) Next() bool { + err := it.next() + if err == io.EOF { + return false + } + if err != nil { + it.err = err + return false + } + + return true + +} + +func (it *profileMatchIterator) next() error { + for it.it == nil || !it.it.Next() { + // close old iterator if required + if it.it != nil { + if err := it.it.Close(); err != nil { + return err + } + } + + // get new iterator + err := it.readNextRowGroup() + if err != nil { + return err + } + } + + return nil +} + +func (it *profileMatchIterator) Err() error { + return it.err +} + +func (it *profileMatchIterator) Close() error { + // TODO + return nil +} + +func (it *profileMatchIterator) At() *query.IteratorResult { + v := it.it.At() + return v +} + func (b *singleBlockQuerier) SelectMatchingProfiles(ctx context.Context, params *ingestv1.SelectProfilesRequest) (iter.Iterator[Profile], error) { sp, ctx := opentracing.StartSpanFromContext(ctx, "SelectMatchingProfiles - Block") defer sp.Finish() @@ -992,16 +1190,18 @@ func (b *singleBlockQuerier) SelectMatchingProfiles(ctx context.Context, params joinIters []query.Iterator ) + seriesIndexIter := newProfileMatchIterator(b.profiles.file, lblsPerRef) + if b.meta.Version >= 2 { joinIters = []query.Iterator{ - b.profiles.columnIter(ctx, "SeriesIndex", newMapPredicate(lblsPerRef), "SeriesIndex"), + seriesIndexIter, b.profiles.columnIter(ctx, "TimeNanos", query.NewIntBetweenPredicate(model.Time(params.Start).UnixNano(), model.Time(params.End).UnixNano()), "TimeNanos"), b.profiles.columnIter(ctx, "StacktracePartition", nil, "StacktracePartition"), } buf = make([][]parquet.Value, 3) } else { joinIters = []query.Iterator{ - b.profiles.columnIter(ctx, "SeriesIndex", newMapPredicate(lblsPerRef), "SeriesIndex"), + seriesIndexIter, b.profiles.columnIter(ctx, "TimeNanos", query.NewIntBetweenPredicate(model.Time(params.Start).UnixNano(), model.Time(params.End).UnixNano()), "TimeNanos"), } buf = make([][]parquet.Value, 2) @@ -1015,8 +1215,11 @@ func (b *singleBlockQuerier) SelectMatchingProfiles(ctx context.Context, params var currentSeriesSlice []Profile for pIt.Next() { res := pIt.At() - buf = res.Columns(buf, "SeriesIndex", "TimeNanos", "StacktracePartition") - seriesIndex := buf[0][0].Int64() + + seriesIndexE := res.Entries[0].RowValue.(rowNumWithSomething[uint32]) + seriesIndex := int64(seriesIndexE.elem) + + buf = res.Columns(buf, "TimeNanos", "StacktracePartition") if seriesIndex != currSeriesIndex { currSeriesIndex = seriesIndex if len(currentSeriesSlice) > 0 { @@ -1028,8 +1231,8 @@ func (b *singleBlockQuerier) SelectMatchingProfiles(ctx context.Context, params currentSeriesSlice = append(currentSeriesSlice, BlockProfile{ labels: lblsPerRef[seriesIndex].lbs, fp: lblsPerRef[seriesIndex].fp, - ts: model.TimeFromUnixNano(buf[1][0].Int64()), - stacktracePartition: retrieveStacktracePartition(buf, 2), + ts: model.TimeFromUnixNano(buf[0][0].Int64()), + stacktracePartition: retrieveStacktracePartition(buf, 1), RowNum: res.RowNumber[0], }) } diff --git a/pkg/phlaredb/block_querier_test.go b/pkg/phlaredb/block_querier_test.go index 828a8411e..e66aad371 100644 --- a/pkg/phlaredb/block_querier_test.go +++ b/pkg/phlaredb/block_querier_test.go @@ -3,6 +3,8 @@ package phlaredb import ( "context" "fmt" + "os" + rpprof "runtime/pprof" "strings" "testing" "time" @@ -190,5 +192,60 @@ func TestBlockCompatability(t *testing.T) { }) } +} + +func BenchmarkSelect(t *testing.B) { + path := "/Users/christian/parquet-notebook/" + bucket, err := filesystem.NewBucket(path) + require.NoError(t, err) + + ctx := context.Background() + metas, err := NewBlockQuerier(ctx, bucket).BlockMetas(ctx) + require.NoError(t, err) + + for _, meta := range metas { + + q := NewSingleBlockQuerierFromMeta(ctx, bucket, meta) + require.NoError(t, q.Open(ctx)) + profilesTypes, err := q.index.LabelValues("__profile_type__") + require.NoError(t, err) + + for _, profileType := range profilesTypes { + name := fmt.Sprintf("block-%s-%s", meta.ULID.String(), profileType) + t.Run(name, func(t *testing.B) { + profileTypeParts := strings.Split(profileType, ":") + + it, err := q.SelectMatchingProfiles(ctx, &ingestv1.SelectProfilesRequest{ + LabelSelector: `{namespace="profiles-ops-001"}`, + Start: 0, + End: time.Now().UnixMilli(), + Type: &typesv1.ProfileType{ + Name: profileTypeParts[0], + SampleType: profileTypeParts[1], + SampleUnit: profileTypeParts[2], + PeriodType: profileTypeParts[3], + PeriodUnit: profileTypeParts[4], + }, + }) + require.NoError(t, err) + + f, err := os.Create("heap-after-" + name + ".pprof") + require.NoError(t, err) + + require.NoError(t, rpprof.WriteHeapProfile(f)) + + require.NoError(t, f.Close()) + + // TODO: It would be nice actually comparing the whole profile, but at present the result is not deterministic. + p, err := q.MergePprof(ctx, it) + + var sampleSum int64 + for _, s := range p.Sample { + sampleSum += s.Value[0] + } + t.Logf("profileType=%s sum=%d", profileType, sampleSum) + }) + } + } } diff --git a/pkg/phlaredb/head_queriers.go b/pkg/phlaredb/head_queriers.go index 09eaa6ab0..9ee05ac55 100644 --- a/pkg/phlaredb/head_queriers.go +++ b/pkg/phlaredb/head_queriers.go @@ -66,12 +66,12 @@ func (q *headOnDiskQuerier) SelectMatchingProfiles(ctx context.Context, params * for pIt.Next() { res := pIt.At() - v, ok := res.Entries[0].RowValue.(fingerprintWithRowNum) + v, ok := res.Entries[0].RowValue.(rowNumWithSomething[model.Fingerprint]) if !ok { panic("no fingerprint information found") } - lbls, ok := labelsPerFP[v.fp] + lbls, ok := labelsPerFP[v.elem] if !ok { panic("no profile series labels with matching fingerprint found") } @@ -83,7 +83,7 @@ func (q *headOnDiskQuerier) SelectMatchingProfiles(ctx context.Context, params * } profiles = append(profiles, BlockProfile{ labels: lbls, - fp: v.fp, + fp: v.elem, ts: model.TimeFromUnixNano(buf[0][0].Int64()), stacktracePartition: retrieveStacktracePartition(buf, 1), RowNum: res.RowNumber[0], diff --git a/pkg/phlaredb/profile_test.go b/pkg/phlaredb/profile_test.go index a791dd433..5feafd6e5 100644 --- a/pkg/phlaredb/profile_test.go +++ b/pkg/phlaredb/profile_test.go @@ -168,7 +168,7 @@ func Test_rowRangeIter(t *testing.T) { result := []int64{} for it.Next() { result = append(result, it.At().RowNumber()) - assert.Equal(t, model.Fingerprint(0xff), it.At().fp) + assert.Equal(t, model.Fingerprint(0xff), it.At().elem) } assert.Equal(t, tc.expected, result) }) @@ -238,7 +238,7 @@ func Test_rowRangesIter(t *testing.T) { for it.Next() { rows = append(rows, it.At().RowNumber()) - fingerprints = append(fingerprints, it.At().fp) + fingerprints = append(fingerprints, it.At().elem) } assert.Equal(t, tc.expRows, rows) assert.Equal(t, tc.expFingerprints, fingerprints) diff --git a/pkg/phlaredb/profiles.go b/pkg/phlaredb/profiles.go index f6108b6c5..919fd3b4b 100644 --- a/pkg/phlaredb/profiles.go +++ b/pkg/phlaredb/profiles.go @@ -56,7 +56,7 @@ func (s rowRangesWithSeriesIndex) getSeriesIndex(rowNum int64) uint32 { type rowRanges map[rowRange]model.Fingerprint -func (rR rowRanges) iter() iter.Iterator[fingerprintWithRowNum] { +func (rR rowRanges) iter() iter.Iterator[rowNumWithSomething[model.Fingerprint]] { // ensure row ranges is sorted rRSlice := lo.Keys(rR) sort.Slice(rRSlice, func(i, j int) bool { @@ -68,19 +68,19 @@ func (rR rowRanges) iter() iter.Iterator[fingerprintWithRowNum] { fps = append(fps, rR[elem]) } - return &rowRangesIter{ + return &rowRangesIter[model.Fingerprint]{ r: rRSlice, fps: fps, pos: 0, } } -type fingerprintWithRowNum struct { - fp model.Fingerprint +type rowNumWithSomething[A any] struct { + elem A rowNum int64 } -func (f fingerprintWithRowNum) RowNumber() int64 { +func (f rowNumWithSomething[A]) RowNumber() int64 { return f.rowNum } @@ -88,20 +88,20 @@ func (r rowRanges) fingerprintsWithRowNum() query.Iterator { return query.NewRowNumberIterator(r.iter()) } -type rowRangesIter struct { +type rowRangesIter[A any] struct { r []rowRange - fps []model.Fingerprint + fps []A pos int64 } -func (i *rowRangesIter) At() fingerprintWithRowNum { - return fingerprintWithRowNum{ +func (i *rowRangesIter[A]) At() rowNumWithSomething[A] { + return rowNumWithSomething[A]{ + elem: i.fps[0], rowNum: i.pos - 1, - fp: i.fps[0], } } -func (i *rowRangesIter) Next() bool { +func (i *rowRangesIter[A]) Next() bool { if len(i.r) == 0 { return false } @@ -118,9 +118,9 @@ func (i *rowRangesIter) Next() bool { return true } -func (i *rowRangesIter) Close() error { return nil } +func (i *rowRangesIter[A]) Close() error { return nil } -func (i *rowRangesIter) Err() error { return nil } +func (i *rowRangesIter[A]) Err() error { return nil } type profileSeries struct { lbs phlaremodel.Labels