Skip to content

Compact Blocks #2134

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 36 commits into from
Jul 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
0ebf54d
Testing
simonswine Jun 21, 2022
3c8b16e
Testing v2
simonswine Jun 21, 2022
03274df
Add first draft of block compaction
cyriltovena Jul 6, 2023
021abc5
Removes unused file
cyriltovena Jul 6, 2023
c8026ad
Correct row count
cyriltovena Jul 6, 2023
b6095ea
WIP: Testing against dev.
cyriltovena Jul 6, 2023
bb92491
WIP: Testing against dev.
cyriltovena Jul 7, 2023
2734512
Fixes Profiles Iteration with Labels
cyriltovena Jul 11, 2023
4d43451
Fixes a bug and test compact locally
cyriltovena Jul 12, 2023
cd8433b
add tests instructions
cyriltovena Jul 12, 2023
ca97480
Add more meta informations and rewrite stacktraceIDs
cyriltovena Jul 12, 2023
19bc955
nit todo
cyriltovena Jul 12, 2023
1a69402
Refactoring code
cyriltovena Jul 13, 2023
c926723
Adds meta files information to dst meta
cyriltovena Jul 17, 2023
65639cd
Updates import to grafana/pyroscsope
cyriltovena Jul 19, 2023
6a337ac
Update pkg/iter/tree.go
cyriltovena Jul 20, 2023
a60fd9b
Add stacktrace rewriter
kolesnikovae Jul 17, 2023
71f2416
Fixes
kolesnikovae Jul 17, 2023
59258be
Add lookup table test
kolesnikovae Jul 17, 2023
c0f7566
Symbols reader integration
kolesnikovae Jul 18, 2023
0406791
Add SymbolsResolver.WriteStats
kolesnikovae Jul 18, 2023
fbc64e2
Fix lookup table
kolesnikovae Jul 18, 2023
2ea1e19
Add symbols writer
kolesnikovae Jul 19, 2023
bdf0c56
Load symdb block files at compaction
kolesnikovae Jul 20, 2023
fbab0c4
Fix meta samples stats
kolesnikovae Jul 20, 2023
f77e4fc
Add dedup slice append
kolesnikovae Jul 20, 2023
744d633
Cleanup
kolesnikovae Jul 20, 2023
4e8ca3f
Convert locations to stacktrace
kolesnikovae Jul 23, 2023
718883a
Implement symdb Reader.Load
kolesnikovae Jul 23, 2023
113e69d
Fix stacktrace inserter
kolesnikovae Jul 23, 2023
e7dce94
Fix symdb meta
kolesnikovae Jul 23, 2023
141ebc1
Fix lint issues
kolesnikovae Jul 23, 2023
26e1635
Fix symbols rewriter integration
kolesnikovae Jul 24, 2023
233c723
Remove unused rowNum field
kolesnikovae Jul 24, 2023
694ede7
Merge remote-tracking branch 'origin/next' into feat/compact
cyriltovena Jul 26, 2023
d3d7b9f
Remove bad tests
cyriltovena Jul 26, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions pkg/iter/iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,20 @@ func NewSliceSeekIterator[A constraints.Ordered](s []A) SeekIterator[A, A] {
}
}

type slicePositionIterator[T constraints.Integer, M any] struct {
i Iterator[T]
s []M
}

func NewSliceIndexIterator[T constraints.Integer, M any](s []M, i Iterator[T]) Iterator[M] {
return slicePositionIterator[T, M]{s: s, i: i}
}

func (i slicePositionIterator[T, M]) Next() bool { return i.i.Next() }
func (i slicePositionIterator[T, M]) At() M { return i.s[i.i.At()] }
func (i slicePositionIterator[T, M]) Err() error { return i.i.Err() }
func (i slicePositionIterator[T, M]) Close() error { return i.i.Close() }

type sliceSeekIterator[A constraints.Ordered] struct {
*sliceIterator[A]
}
Expand Down
78 changes: 66 additions & 12 deletions pkg/phlaredb/block_querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,13 @@ type singleBlockQuerier struct {
type StacktraceDB interface {
Open(ctx context.Context) error
Close() error
Resolve(ctx context.Context, mapping uint64, locs locationsIdsByStacktraceID, stacktraceIDs []uint32) error

// Load the database into memory entirely.
// This method is used at compaction.
Load(context.Context) error
WriteStats(partition uint64, s *symdb.Stats)

Resolve(ctx context.Context, partition uint64, locs symdb.StacktraceInserter, stacktraceIDs []uint32) error
}

type stacktraceResolverV1 struct {
Expand All @@ -321,18 +327,33 @@ func (r *stacktraceResolverV1) Close() error {
return r.stacktraces.Close()
}

func (r *stacktraceResolverV1) Resolve(ctx context.Context, mapping uint64, locs locationsIdsByStacktraceID, stacktraceIDs []uint32) error {
func (r *stacktraceResolverV1) Resolve(ctx context.Context, _ uint64, locs symdb.StacktraceInserter, stacktraceIDs []uint32) error {
stacktraces := repeatedColumnIter(ctx, r.stacktraces.file, "LocationIDs.list.element", iter.NewSliceIterator(stacktraceIDs))
defer stacktraces.Close()

t := make([]int32, 0, 64)
for stacktraces.Next() {
s := stacktraces.At()
locs.addFromParquet(int64(s.Row), s.Values)

t = grow(t, len(s.Values))
for i, v := range s.Values {
t[i] = v.Int32()
}
locs.InsertStacktrace(s.Row, t)
}
return stacktraces.Err()
}

func (r *stacktraceResolverV1) WriteStats(_ uint64, s *symdb.Stats) {
s.StacktracesTotal = int(r.stacktraces.file.NumRows())
s.MaxStacktraceID = s.StacktracesTotal
}

func (r *stacktraceResolverV1) Load(context.Context) error {
// FIXME(kolesnikovae): Loading all stacktraces from parquet file
// into memory is likely a bad choice. Instead we could convert
// it to symdb first.
return nil
}

type stacktraceResolverV2 struct {
reader *symdb.Reader
bucketReader phlareobj.Bucket
Expand All @@ -351,19 +372,25 @@ func (r *stacktraceResolverV2) Close() error {
return nil
}

func (r *stacktraceResolverV2) Resolve(ctx context.Context, mapping uint64, locs locationsIdsByStacktraceID, stacktraceIDs []uint32) error {
mr, ok := r.reader.MappingReader(mapping)
func (r *stacktraceResolverV2) Resolve(ctx context.Context, partition uint64, locs symdb.StacktraceInserter, stacktraceIDs []uint32) error {
mr, ok := r.reader.SymbolsResolver(partition)
if !ok {
return nil
}
resolver := mr.StacktraceResolver()
defer resolver.Release()
return resolver.ResolveStacktraces(ctx, locs, stacktraceIDs)
}

return resolver.ResolveStacktraces(ctx, symdb.StacktraceInserterFn(
func(stacktraceID uint32, locations []int32) {
locs.add(int64(stacktraceID), locations)
},
), stacktraceIDs)
func (r *stacktraceResolverV2) Load(ctx context.Context) error {
return r.reader.Load(ctx)
}

func (r *stacktraceResolverV2) WriteStats(partition uint64, s *symdb.Stats) {
mr, ok := r.reader.SymbolsResolver(partition)
if ok {
mr.WriteStats(s)
}
}

func NewSingleBlockQuerierFromMeta(phlarectx context.Context, bucketReader phlareobj.Bucket, meta *block.Meta) *singleBlockQuerier {
Expand Down Expand Up @@ -427,6 +454,33 @@ func newStacktraceResolverV2(bucketReader phlareobj.Bucket) StacktraceDB {
}
}

func (b *singleBlockQuerier) Profiles() []parquet.RowGroup {
return b.profiles.file.RowGroups()
}

func (b *singleBlockQuerier) Index() IndexReader {
return b.index
}

func (b *singleBlockQuerier) Symbols() SymbolsReader {
return &inMemorySymbolsReader{
partitions: make(map[uint64]*inMemorySymbolsResolver),

strings: b.strings,
functions: b.functions,
locations: b.locations,
mappings: b.mappings,
stacktraces: b.stacktraces,
}
}

func (b *singleBlockQuerier) Meta() block.Meta {
if b.meta == nil {
return block.Meta{}
}
return *b.meta
}

func (b *singleBlockQuerier) Close() error {
b.openLock.Lock()
defer func() {
Expand Down
82 changes: 82 additions & 0 deletions pkg/phlaredb/block_symbols_reader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package phlaredb

import (
"context"

"github.com/grafana/pyroscope/pkg/iter"
schemav1 "github.com/grafana/pyroscope/pkg/phlaredb/schemas/v1"
"github.com/grafana/pyroscope/pkg/phlaredb/symdb"
)

// TODO(kolesnikovae): Refactor to symdb.

type SymbolsReader interface {
SymbolsResolver(partition uint64) (SymbolsResolver, error)
}

type SymbolsResolver interface {
ResolveStacktraces(ctx context.Context, dst symdb.StacktraceInserter, stacktraces []uint32) error

Locations(iter.Iterator[uint32]) iter.Iterator[*schemav1.InMemoryLocation]
Mappings(iter.Iterator[uint32]) iter.Iterator[*schemav1.InMemoryMapping]
Functions(iter.Iterator[uint32]) iter.Iterator[*schemav1.InMemoryFunction]
Strings(iter.Iterator[uint32]) iter.Iterator[string]

WriteStats(*symdb.Stats)
}

type inMemorySymbolsReader struct {
partitions map[uint64]*inMemorySymbolsResolver

// TODO(kolesnikovae): Split into partitions.
strings inMemoryparquetReader[string, *schemav1.StringPersister]
functions inMemoryparquetReader[*schemav1.InMemoryFunction, *schemav1.FunctionPersister]
locations inMemoryparquetReader[*schemav1.InMemoryLocation, *schemav1.LocationPersister]
mappings inMemoryparquetReader[*schemav1.InMemoryMapping, *schemav1.MappingPersister]
stacktraces StacktraceDB
}

func (r *inMemorySymbolsReader) SymbolsResolver(partition uint64) (SymbolsResolver, error) {
p, ok := r.partitions[partition]
if !ok {
p = &inMemorySymbolsResolver{
partition: partition,
reader: r,
}
r.partitions[partition] = p
}
return p, nil
}

type inMemorySymbolsResolver struct {
partition uint64
reader *inMemorySymbolsReader
}

func (s inMemorySymbolsResolver) ResolveStacktraces(ctx context.Context, dst symdb.StacktraceInserter, stacktraces []uint32) error {
return s.reader.stacktraces.Resolve(ctx, s.partition, dst, stacktraces)
}

func (s inMemorySymbolsResolver) Locations(i iter.Iterator[uint32]) iter.Iterator[*schemav1.InMemoryLocation] {
return iter.NewSliceIndexIterator(s.reader.locations.cache, i)
}

func (s inMemorySymbolsResolver) Mappings(i iter.Iterator[uint32]) iter.Iterator[*schemav1.InMemoryMapping] {
return iter.NewSliceIndexIterator(s.reader.mappings.cache, i)
}

func (s inMemorySymbolsResolver) Functions(i iter.Iterator[uint32]) iter.Iterator[*schemav1.InMemoryFunction] {
return iter.NewSliceIndexIterator(s.reader.functions.cache, i)
}

func (s inMemorySymbolsResolver) Strings(i iter.Iterator[uint32]) iter.Iterator[string] {
return iter.NewSliceIndexIterator(s.reader.strings.cache, i)
}

func (s inMemorySymbolsResolver) WriteStats(stats *symdb.Stats) {
s.reader.stacktraces.WriteStats(s.partition, stats)
stats.LocationsTotal = len(s.reader.locations.cache)
stats.MappingsTotal = len(s.reader.mappings.cache)
stats.FunctionsTotal = len(s.reader.functions.cache)
stats.StringsTotal = len(s.reader.strings.cache)
}
111 changes: 111 additions & 0 deletions pkg/phlaredb/block_symbols_writer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package phlaredb

import (
"context"
"fmt"
"path/filepath"

schemav1 "github.com/grafana/pyroscope/pkg/phlaredb/schemas/v1"
"github.com/grafana/pyroscope/pkg/phlaredb/symdb"
)

// TODO(kolesnikovae): Refactor to symdb.

type SymbolsWriter interface {
SymbolsAppender(partition uint64) (SymbolsAppender, error)
}

type SymbolsAppender interface {
AppendStacktraces([]uint32, []*schemav1.Stacktrace)
AppendLocations([]uint32, []*schemav1.InMemoryLocation)
AppendMappings([]uint32, []*schemav1.InMemoryMapping)
AppendFunctions([]uint32, []*schemav1.InMemoryFunction)
AppendStrings([]uint32, []string)
}

type symbolsWriter struct {
partitions map[uint64]*symbolsAppender

locations deduplicatingSlice[*schemav1.InMemoryLocation, locationsKey, *locationsHelper, *schemav1.LocationPersister]
mappings deduplicatingSlice[*schemav1.InMemoryMapping, mappingsKey, *mappingsHelper, *schemav1.MappingPersister]
functions deduplicatingSlice[*schemav1.InMemoryFunction, functionsKey, *functionsHelper, *schemav1.FunctionPersister]
strings deduplicatingSlice[string, string, *stringsHelper, *schemav1.StringPersister]
tables []Table

symdb *symdb.SymDB
}

func newSymbolsWriter(dst string, cfg *ParquetConfig) (*symbolsWriter, error) {
w := symbolsWriter{
partitions: make(map[uint64]*symbolsAppender),
}
dir := filepath.Join(dst, symdb.DefaultDirName)
w.symdb = symdb.NewSymDB(symdb.DefaultConfig().WithDirectory(dir))
w.tables = []Table{
&w.locations,
&w.mappings,
&w.functions,
&w.strings,
}
for _, t := range w.tables {
if err := t.Init(dst, cfg, contextHeadMetrics(context.Background())); err != nil {
return nil, err
}
}
return &w, nil
}

func (w *symbolsWriter) SymbolsAppender(partition uint64) (SymbolsAppender, error) {
p, ok := w.partitions[partition]
if !ok {
appender := w.symdb.SymbolsAppender(partition)
x := &symbolsAppender{
stacktraces: appender.StacktraceAppender(),
writer: w,
}
w.partitions[partition] = x
p = x
}
return p, nil
}

func (w *symbolsWriter) Close() error {
for _, t := range w.tables {
_, _, err := t.Flush(context.Background())
if err != nil {
return fmt.Errorf("flushing table %s: %w", t.Name(), err)
}
if err = t.Close(); err != nil {
return fmt.Errorf("closing table %s: %w", t.Name(), err)
}
}
if err := w.symdb.Flush(); err != nil {
return fmt.Errorf("flushing symbol database: %w", err)
}
return nil
}

type symbolsAppender struct {
stacktraces symdb.StacktraceAppender
writer *symbolsWriter
}

func (s symbolsAppender) AppendStacktraces(dst []uint32, stacktraces []*schemav1.Stacktrace) {
s.stacktraces.AppendStacktrace(dst, stacktraces)
}

func (s symbolsAppender) AppendLocations(dst []uint32, locations []*schemav1.InMemoryLocation) {
s.writer.locations.append(dst, locations)
}

func (s symbolsAppender) AppendMappings(dst []uint32, mappings []*schemav1.InMemoryMapping) {
s.writer.mappings.append(dst, mappings)
}

func (s symbolsAppender) AppendFunctions(dst []uint32, functions []*schemav1.InMemoryFunction) {
s.writer.functions.append(dst, functions)
}

func (s symbolsAppender) AppendStrings(dst []uint32, strings []string) {
s.writer.strings.append(dst, strings)
}
Loading