diff --git a/pkg/iter/iter.go b/pkg/iter/iter.go index 7895e0d65c..7a00660c98 100644 --- a/pkg/iter/iter.go +++ b/pkg/iter/iter.go @@ -103,6 +103,20 @@ func NewSliceSeekIterator[A constraints.Ordered](s []A) SeekIterator[A, A] { } } +type slicePositionIterator[T constraints.Integer, M any] struct { + i Iterator[T] + s []M +} + +func NewSliceIndexIterator[T constraints.Integer, M any](s []M, i Iterator[T]) Iterator[M] { + return slicePositionIterator[T, M]{s: s, i: i} +} + +func (i slicePositionIterator[T, M]) Next() bool { return i.i.Next() } +func (i slicePositionIterator[T, M]) At() M { return i.s[i.i.At()] } +func (i slicePositionIterator[T, M]) Err() error { return i.i.Err() } +func (i slicePositionIterator[T, M]) Close() error { return i.i.Close() } + type sliceSeekIterator[A constraints.Ordered] struct { *sliceIterator[A] } diff --git a/pkg/phlaredb/block_querier.go b/pkg/phlaredb/block_querier.go index f23f17ee31..aadb0bbf1b 100644 --- a/pkg/phlaredb/block_querier.go +++ b/pkg/phlaredb/block_querier.go @@ -305,7 +305,13 @@ type singleBlockQuerier struct { type StacktraceDB interface { Open(ctx context.Context) error Close() error - Resolve(ctx context.Context, mapping uint64, locs locationsIdsByStacktraceID, stacktraceIDs []uint32) error + + // Load the database into memory entirely. + // This method is used at compaction. + Load(context.Context) error + WriteStats(partition uint64, s *symdb.Stats) + + Resolve(ctx context.Context, partition uint64, locs symdb.StacktraceInserter, stacktraceIDs []uint32) error } type stacktraceResolverV1 struct { @@ -321,18 +327,33 @@ func (r *stacktraceResolverV1) Close() error { return r.stacktraces.Close() } -func (r *stacktraceResolverV1) Resolve(ctx context.Context, mapping uint64, locs locationsIdsByStacktraceID, stacktraceIDs []uint32) error { +func (r *stacktraceResolverV1) Resolve(ctx context.Context, _ uint64, locs symdb.StacktraceInserter, stacktraceIDs []uint32) error { stacktraces := repeatedColumnIter(ctx, r.stacktraces.file, "LocationIDs.list.element", iter.NewSliceIterator(stacktraceIDs)) defer stacktraces.Close() - + t := make([]int32, 0, 64) for stacktraces.Next() { s := stacktraces.At() - locs.addFromParquet(int64(s.Row), s.Values) - + t = grow(t, len(s.Values)) + for i, v := range s.Values { + t[i] = v.Int32() + } + locs.InsertStacktrace(s.Row, t) } return stacktraces.Err() } +func (r *stacktraceResolverV1) WriteStats(_ uint64, s *symdb.Stats) { + s.StacktracesTotal = int(r.stacktraces.file.NumRows()) + s.MaxStacktraceID = s.StacktracesTotal +} + +func (r *stacktraceResolverV1) Load(context.Context) error { + // FIXME(kolesnikovae): Loading all stacktraces from parquet file + // into memory is likely a bad choice. Instead we could convert + // it to symdb first. + return nil +} + type stacktraceResolverV2 struct { reader *symdb.Reader bucketReader phlareobj.Bucket @@ -351,19 +372,25 @@ func (r *stacktraceResolverV2) Close() error { return nil } -func (r *stacktraceResolverV2) Resolve(ctx context.Context, mapping uint64, locs locationsIdsByStacktraceID, stacktraceIDs []uint32) error { - mr, ok := r.reader.MappingReader(mapping) +func (r *stacktraceResolverV2) Resolve(ctx context.Context, partition uint64, locs symdb.StacktraceInserter, stacktraceIDs []uint32) error { + mr, ok := r.reader.SymbolsResolver(partition) if !ok { return nil } resolver := mr.StacktraceResolver() defer resolver.Release() + return resolver.ResolveStacktraces(ctx, locs, stacktraceIDs) +} - return resolver.ResolveStacktraces(ctx, symdb.StacktraceInserterFn( - func(stacktraceID uint32, locations []int32) { - locs.add(int64(stacktraceID), locations) - }, - ), stacktraceIDs) +func (r *stacktraceResolverV2) Load(ctx context.Context) error { + return r.reader.Load(ctx) +} + +func (r *stacktraceResolverV2) WriteStats(partition uint64, s *symdb.Stats) { + mr, ok := r.reader.SymbolsResolver(partition) + if ok { + mr.WriteStats(s) + } } func NewSingleBlockQuerierFromMeta(phlarectx context.Context, bucketReader phlareobj.Bucket, meta *block.Meta) *singleBlockQuerier { @@ -427,6 +454,33 @@ func newStacktraceResolverV2(bucketReader phlareobj.Bucket) StacktraceDB { } } +func (b *singleBlockQuerier) Profiles() []parquet.RowGroup { + return b.profiles.file.RowGroups() +} + +func (b *singleBlockQuerier) Index() IndexReader { + return b.index +} + +func (b *singleBlockQuerier) Symbols() SymbolsReader { + return &inMemorySymbolsReader{ + partitions: make(map[uint64]*inMemorySymbolsResolver), + + strings: b.strings, + functions: b.functions, + locations: b.locations, + mappings: b.mappings, + stacktraces: b.stacktraces, + } +} + +func (b *singleBlockQuerier) Meta() block.Meta { + if b.meta == nil { + return block.Meta{} + } + return *b.meta +} + func (b *singleBlockQuerier) Close() error { b.openLock.Lock() defer func() { diff --git a/pkg/phlaredb/block_symbols_reader.go b/pkg/phlaredb/block_symbols_reader.go new file mode 100644 index 0000000000..e8399e1342 --- /dev/null +++ b/pkg/phlaredb/block_symbols_reader.go @@ -0,0 +1,82 @@ +package phlaredb + +import ( + "context" + + "github.com/grafana/pyroscope/pkg/iter" + schemav1 "github.com/grafana/pyroscope/pkg/phlaredb/schemas/v1" + "github.com/grafana/pyroscope/pkg/phlaredb/symdb" +) + +// TODO(kolesnikovae): Refactor to symdb. + +type SymbolsReader interface { + SymbolsResolver(partition uint64) (SymbolsResolver, error) +} + +type SymbolsResolver interface { + ResolveStacktraces(ctx context.Context, dst symdb.StacktraceInserter, stacktraces []uint32) error + + Locations(iter.Iterator[uint32]) iter.Iterator[*schemav1.InMemoryLocation] + Mappings(iter.Iterator[uint32]) iter.Iterator[*schemav1.InMemoryMapping] + Functions(iter.Iterator[uint32]) iter.Iterator[*schemav1.InMemoryFunction] + Strings(iter.Iterator[uint32]) iter.Iterator[string] + + WriteStats(*symdb.Stats) +} + +type inMemorySymbolsReader struct { + partitions map[uint64]*inMemorySymbolsResolver + + // TODO(kolesnikovae): Split into partitions. + strings inMemoryparquetReader[string, *schemav1.StringPersister] + functions inMemoryparquetReader[*schemav1.InMemoryFunction, *schemav1.FunctionPersister] + locations inMemoryparquetReader[*schemav1.InMemoryLocation, *schemav1.LocationPersister] + mappings inMemoryparquetReader[*schemav1.InMemoryMapping, *schemav1.MappingPersister] + stacktraces StacktraceDB +} + +func (r *inMemorySymbolsReader) SymbolsResolver(partition uint64) (SymbolsResolver, error) { + p, ok := r.partitions[partition] + if !ok { + p = &inMemorySymbolsResolver{ + partition: partition, + reader: r, + } + r.partitions[partition] = p + } + return p, nil +} + +type inMemorySymbolsResolver struct { + partition uint64 + reader *inMemorySymbolsReader +} + +func (s inMemorySymbolsResolver) ResolveStacktraces(ctx context.Context, dst symdb.StacktraceInserter, stacktraces []uint32) error { + return s.reader.stacktraces.Resolve(ctx, s.partition, dst, stacktraces) +} + +func (s inMemorySymbolsResolver) Locations(i iter.Iterator[uint32]) iter.Iterator[*schemav1.InMemoryLocation] { + return iter.NewSliceIndexIterator(s.reader.locations.cache, i) +} + +func (s inMemorySymbolsResolver) Mappings(i iter.Iterator[uint32]) iter.Iterator[*schemav1.InMemoryMapping] { + return iter.NewSliceIndexIterator(s.reader.mappings.cache, i) +} + +func (s inMemorySymbolsResolver) Functions(i iter.Iterator[uint32]) iter.Iterator[*schemav1.InMemoryFunction] { + return iter.NewSliceIndexIterator(s.reader.functions.cache, i) +} + +func (s inMemorySymbolsResolver) Strings(i iter.Iterator[uint32]) iter.Iterator[string] { + return iter.NewSliceIndexIterator(s.reader.strings.cache, i) +} + +func (s inMemorySymbolsResolver) WriteStats(stats *symdb.Stats) { + s.reader.stacktraces.WriteStats(s.partition, stats) + stats.LocationsTotal = len(s.reader.locations.cache) + stats.MappingsTotal = len(s.reader.mappings.cache) + stats.FunctionsTotal = len(s.reader.functions.cache) + stats.StringsTotal = len(s.reader.strings.cache) +} diff --git a/pkg/phlaredb/block_symbols_writer.go b/pkg/phlaredb/block_symbols_writer.go new file mode 100644 index 0000000000..43fd1a09a7 --- /dev/null +++ b/pkg/phlaredb/block_symbols_writer.go @@ -0,0 +1,111 @@ +package phlaredb + +import ( + "context" + "fmt" + "path/filepath" + + schemav1 "github.com/grafana/pyroscope/pkg/phlaredb/schemas/v1" + "github.com/grafana/pyroscope/pkg/phlaredb/symdb" +) + +// TODO(kolesnikovae): Refactor to symdb. + +type SymbolsWriter interface { + SymbolsAppender(partition uint64) (SymbolsAppender, error) +} + +type SymbolsAppender interface { + AppendStacktraces([]uint32, []*schemav1.Stacktrace) + AppendLocations([]uint32, []*schemav1.InMemoryLocation) + AppendMappings([]uint32, []*schemav1.InMemoryMapping) + AppendFunctions([]uint32, []*schemav1.InMemoryFunction) + AppendStrings([]uint32, []string) +} + +type symbolsWriter struct { + partitions map[uint64]*symbolsAppender + + locations deduplicatingSlice[*schemav1.InMemoryLocation, locationsKey, *locationsHelper, *schemav1.LocationPersister] + mappings deduplicatingSlice[*schemav1.InMemoryMapping, mappingsKey, *mappingsHelper, *schemav1.MappingPersister] + functions deduplicatingSlice[*schemav1.InMemoryFunction, functionsKey, *functionsHelper, *schemav1.FunctionPersister] + strings deduplicatingSlice[string, string, *stringsHelper, *schemav1.StringPersister] + tables []Table + + symdb *symdb.SymDB +} + +func newSymbolsWriter(dst string, cfg *ParquetConfig) (*symbolsWriter, error) { + w := symbolsWriter{ + partitions: make(map[uint64]*symbolsAppender), + } + dir := filepath.Join(dst, symdb.DefaultDirName) + w.symdb = symdb.NewSymDB(symdb.DefaultConfig().WithDirectory(dir)) + w.tables = []Table{ + &w.locations, + &w.mappings, + &w.functions, + &w.strings, + } + for _, t := range w.tables { + if err := t.Init(dst, cfg, contextHeadMetrics(context.Background())); err != nil { + return nil, err + } + } + return &w, nil +} + +func (w *symbolsWriter) SymbolsAppender(partition uint64) (SymbolsAppender, error) { + p, ok := w.partitions[partition] + if !ok { + appender := w.symdb.SymbolsAppender(partition) + x := &symbolsAppender{ + stacktraces: appender.StacktraceAppender(), + writer: w, + } + w.partitions[partition] = x + p = x + } + return p, nil +} + +func (w *symbolsWriter) Close() error { + for _, t := range w.tables { + _, _, err := t.Flush(context.Background()) + if err != nil { + return fmt.Errorf("flushing table %s: %w", t.Name(), err) + } + if err = t.Close(); err != nil { + return fmt.Errorf("closing table %s: %w", t.Name(), err) + } + } + if err := w.symdb.Flush(); err != nil { + return fmt.Errorf("flushing symbol database: %w", err) + } + return nil +} + +type symbolsAppender struct { + stacktraces symdb.StacktraceAppender + writer *symbolsWriter +} + +func (s symbolsAppender) AppendStacktraces(dst []uint32, stacktraces []*schemav1.Stacktrace) { + s.stacktraces.AppendStacktrace(dst, stacktraces) +} + +func (s symbolsAppender) AppendLocations(dst []uint32, locations []*schemav1.InMemoryLocation) { + s.writer.locations.append(dst, locations) +} + +func (s symbolsAppender) AppendMappings(dst []uint32, mappings []*schemav1.InMemoryMapping) { + s.writer.mappings.append(dst, mappings) +} + +func (s symbolsAppender) AppendFunctions(dst []uint32, functions []*schemav1.InMemoryFunction) { + s.writer.functions.append(dst, functions) +} + +func (s symbolsAppender) AppendStrings(dst []uint32, strings []string) { + s.writer.strings.append(dst, strings) +} diff --git a/pkg/phlaredb/compact.go b/pkg/phlaredb/compact.go new file mode 100644 index 0000000000..62dc48a1af --- /dev/null +++ b/pkg/phlaredb/compact.go @@ -0,0 +1,935 @@ +package phlaredb + +import ( + "context" + "io/fs" + "math" + "os" + "path/filepath" + "sort" + + "github.com/oklog/ulid" + "github.com/opentracing/opentracing-go" + "github.com/pkg/errors" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/tsdb" + "github.com/segmentio/parquet-go" + + "github.com/grafana/pyroscope/pkg/iter" + phlaremodel "github.com/grafana/pyroscope/pkg/model" + phlareparquet "github.com/grafana/pyroscope/pkg/parquet" + "github.com/grafana/pyroscope/pkg/phlaredb/block" + schemav1 "github.com/grafana/pyroscope/pkg/phlaredb/schemas/v1" + "github.com/grafana/pyroscope/pkg/phlaredb/symdb" + "github.com/grafana/pyroscope/pkg/phlaredb/tsdb/index" + "github.com/grafana/pyroscope/pkg/util" + "github.com/grafana/pyroscope/pkg/util/loser" +) + +type BlockReader interface { + Meta() block.Meta + Profiles() []parquet.RowGroup + Index() IndexReader + Symbols() SymbolsReader +} + +func Compact(ctx context.Context, src []BlockReader, dst string) (meta block.Meta, err error) { + srcMetas := make([]block.Meta, len(src)) + ulids := make([]string, len(src)) + + for i, b := range src { + srcMetas[i] = b.Meta() + ulids[i] = b.Meta().ULID.String() + } + meta = compactMetas(srcMetas) + blockPath := filepath.Join(dst, meta.ULID.String()) + indexPath := filepath.Join(blockPath, block.IndexFilename) + profilePath := filepath.Join(blockPath, (&schemav1.ProfilePersister{}).Name()+block.ParquetSuffix) + + sp, ctx := opentracing.StartSpanFromContext(ctx, "Compact") + defer func() { + // todo: context propagation is not working through objstore + // This is because the BlockReader has no context. + sp.SetTag("src", ulids) + sp.SetTag("block_id", meta.ULID.String()) + if err != nil { + sp.SetTag("error", err) + } + sp.Finish() + }() + + if len(src) <= 1 { + return block.Meta{}, errors.New("not enough blocks to compact") + } + if err := os.MkdirAll(blockPath, 0o777); err != nil { + return block.Meta{}, err + } + + indexw, err := prepareIndexWriter(ctx, indexPath, src) + if err != nil { + return block.Meta{}, err + } + + profileFile, err := os.OpenFile(profilePath, os.O_RDWR|os.O_CREATE|os.O_EXCL, 0o644) + if err != nil { + return block.Meta{}, err + } + profileWriter := newProfileWriter(profileFile) + symw, err := newSymbolsWriter(blockPath, defaultParquetConfig) + if err != nil { + return block.Meta{}, err + } + + rowsIt, err := newMergeRowProfileIterator(src) + if err != nil { + return block.Meta{}, err + } + seriesRewriter := newSeriesRewriter(rowsIt, indexw) + symRewriter := newSymbolsRewriter(seriesRewriter, src, symw) + reader := phlareparquet.NewIteratorRowReader(newRowsIterator(symRewriter)) + + total, _, err := phlareparquet.CopyAsRowGroups(profileWriter, reader, defaultParquetConfig.MaxBufferRowCount) + if err != nil { + return block.Meta{}, err + } + + if err = symRewriter.Close(); err != nil { + return block.Meta{}, err + } + if err = symw.Close(); err != nil { + return block.Meta{}, err + } + + // flush the index file. + if err = indexw.Close(); err != nil { + return block.Meta{}, err + } + + if err = profileWriter.Close(); err != nil { + return block.Meta{}, err + } + + metaFiles, err := metaFilesFromDir(blockPath) + if err != nil { + return block.Meta{}, err + } + meta.Files = metaFiles + meta.Stats.NumProfiles = total + meta.Stats.NumSeries = seriesRewriter.NumSeries() + meta.Stats.NumSamples = symRewriter.NumSamples() + if _, err := meta.WriteToFile(util.Logger, blockPath); err != nil { + return block.Meta{}, err + } + return meta, nil +} + +// metaFilesFromDir returns a list of block files description from a directory. +func metaFilesFromDir(dir string) ([]block.File, error) { + var files []block.File + err := filepath.Walk(dir, func(path string, info fs.FileInfo, err error) error { + if err != nil { + return err + } + if info.IsDir() { + return nil + } + var f block.File + switch filepath.Ext(info.Name()) { + case block.ParquetSuffix: + f, err = parquetMetaFile(path, info.Size()) + if err != nil { + return err + } + case filepath.Ext(block.IndexFilename): + f, err = tsdbMetaFile(path) + if err != nil { + return err + } + } + f.RelPath, err = filepath.Rel(dir, path) + if err != nil { + return err + } + f.SizeBytes = uint64(info.Size()) + files = append(files, f) + return nil + }) + return files, err +} + +func tsdbMetaFile(filePath string) (block.File, error) { + idxReader, err := index.NewFileReader(filePath) + if err != nil { + return block.File{}, err + } + + return idxReader.FileInfo(), nil +} + +func parquetMetaFile(filePath string, size int64) (block.File, error) { + f, err := os.Open(filePath) + if err != nil { + return block.File{}, err + } + defer f.Close() + + pqFile, err := parquet.OpenFile(f, size) + if err != nil { + return block.File{}, err + } + return block.File{ + Parquet: &block.ParquetFile{ + NumRowGroups: uint64(len(pqFile.RowGroups())), + NumRows: uint64(pqFile.NumRows()), + }, + }, nil +} + +// todo write tests +func compactMetas(src []block.Meta) block.Meta { + meta := block.NewMeta() + highestCompactionLevel := 0 + ulids := make([]ulid.ULID, 0, len(src)) + parents := make([]tsdb.BlockDesc, 0, len(src)) + minTime, maxTime := model.Latest, model.Earliest + labels := make(map[string]string) + for _, b := range src { + if b.Compaction.Level > highestCompactionLevel { + highestCompactionLevel = b.Compaction.Level + } + ulids = append(ulids, b.ULID) + parents = append(parents, tsdb.BlockDesc{ + ULID: b.ULID, + MinTime: int64(b.MinTime), + MaxTime: int64(b.MaxTime), + }) + if b.MinTime < minTime { + minTime = b.MinTime + } + if b.MaxTime > maxTime { + maxTime = b.MaxTime + } + for k, v := range b.Labels { + if k == block.HostnameLabel { + continue + } + labels[k] = v + } + } + if hostname, err := os.Hostname(); err == nil { + labels[block.HostnameLabel] = hostname + } + meta.Source = block.CompactorSource + meta.Compaction = tsdb.BlockMetaCompaction{ + Deletable: meta.Stats.NumSamples == 0, + Level: highestCompactionLevel + 1, + Sources: ulids, + Parents: parents, + } + meta.MaxTime = maxTime + meta.MinTime = minTime + meta.Labels = labels + return *meta +} + +type profileRow struct { + timeNanos int64 + + seriesRef uint32 + labels phlaremodel.Labels + fp model.Fingerprint + row schemav1.ProfileRow + + blockReader BlockReader +} + +type profileRowIterator struct { + profiles iter.Iterator[parquet.Row] + blockReader BlockReader + index IndexReader + allPostings index.Postings + err error + + currentRow profileRow + chunks []index.ChunkMeta +} + +func newProfileRowIterator(reader parquet.RowReader, s BlockReader) (*profileRowIterator, error) { + k, v := index.AllPostingsKey() + allPostings, err := s.Index().Postings(k, nil, v) + if err != nil { + return nil, err + } + return &profileRowIterator{ + profiles: phlareparquet.NewBufferedRowReaderIterator(reader, 1024), + blockReader: s, + index: s.Index(), + allPostings: allPostings, + currentRow: profileRow{ + seriesRef: math.MaxUint32, + }, + chunks: make([]index.ChunkMeta, 1), + }, nil +} + +func (p *profileRowIterator) At() profileRow { + return p.currentRow +} + +func (p *profileRowIterator) Next() bool { + if !p.profiles.Next() { + return false + } + p.currentRow.blockReader = p.blockReader + p.currentRow.row = schemav1.ProfileRow(p.profiles.At()) + seriesIndex := p.currentRow.row.SeriesIndex() + p.currentRow.timeNanos = p.currentRow.row.TimeNanos() + // do we have a new series? + if seriesIndex == p.currentRow.seriesRef { + return true + } + p.currentRow.seriesRef = seriesIndex + if !p.allPostings.Next() { + if err := p.allPostings.Err(); err != nil { + p.err = err + return false + } + p.err = errors.New("unexpected end of postings") + return false + } + + fp, err := p.index.Series(p.allPostings.At(), &p.currentRow.labels, &p.chunks) + if err != nil { + p.err = err + return false + } + p.currentRow.fp = model.Fingerprint(fp) + return true +} + +func (p *profileRowIterator) Err() error { + if p.err != nil { + return p.err + } + return p.profiles.Err() +} + +func (p *profileRowIterator) Close() error { + return p.profiles.Close() +} + +func newMergeRowProfileIterator(src []BlockReader) (iter.Iterator[profileRow], error) { + its := make([]iter.Iterator[profileRow], len(src)) + for i, s := range src { + // todo: may be we could merge rowgroups in parallel but that requires locking. + reader := parquet.MultiRowGroup(s.Profiles()...).Rows() + it, err := newProfileRowIterator(reader, s) + if err != nil { + return nil, err + } + its[i] = it + } + return &dedupeProfileRowIterator{ + Iterator: iter.NewTreeIterator(loser.New( + its, + profileRow{ + timeNanos: math.MaxInt64, + }, + func(it iter.Iterator[profileRow]) profileRow { return it.At() }, + func(r1, r2 profileRow) bool { + // first handle max profileRow if it's either r1 or r2 + if r1.timeNanos == math.MaxInt64 { + return false + } + if r2.timeNanos == math.MaxInt64 { + return true + } + // then handle normal profileRows + if cmp := phlaremodel.CompareLabelPairs(r1.labels, r2.labels); cmp != 0 { + return cmp < 0 + } + return r1.timeNanos < r2.timeNanos + }, + func(it iter.Iterator[profileRow]) { _ = it.Close() }, + )), + }, nil +} + +type seriesRewriter struct { + iter.Iterator[profileRow] + + indexw *index.Writer + + seriesRef storage.SeriesRef + labels phlaremodel.Labels + previousFp model.Fingerprint + currentChunkMeta index.ChunkMeta + err error + + numSeries uint64 +} + +func newSeriesRewriter(it iter.Iterator[profileRow], indexw *index.Writer) *seriesRewriter { + return &seriesRewriter{ + Iterator: it, + indexw: indexw, + } +} + +func (s *seriesRewriter) NumSeries() uint64 { + return s.numSeries +} + +func (s *seriesRewriter) Next() bool { + if !s.Iterator.Next() { + if s.previousFp != 0 { + if err := s.indexw.AddSeries(s.seriesRef, s.labels, s.previousFp, s.currentChunkMeta); err != nil { + s.err = err + return false + } + s.numSeries++ + } + return false + } + currentProfile := s.Iterator.At() + if s.previousFp != currentProfile.fp { + // store the previous series. + if s.previousFp != 0 { + if err := s.indexw.AddSeries(s.seriesRef, s.labels, s.previousFp, s.currentChunkMeta); err != nil { + s.err = err + return false + } + s.numSeries++ + } + s.seriesRef++ + s.labels = currentProfile.labels.Clone() + s.previousFp = currentProfile.fp + s.currentChunkMeta.MinTime = currentProfile.timeNanos + } + s.currentChunkMeta.MaxTime = currentProfile.timeNanos + currentProfile.row.SetSeriesIndex(uint32(s.seriesRef)) + return true +} + +type rowsIterator struct { + iter.Iterator[profileRow] +} + +func newRowsIterator(it iter.Iterator[profileRow]) *rowsIterator { + return &rowsIterator{ + Iterator: it, + } +} + +func (r *rowsIterator) At() parquet.Row { + return parquet.Row(r.Iterator.At().row) +} + +type dedupeProfileRowIterator struct { + iter.Iterator[profileRow] + + prevFP model.Fingerprint + prevTimeNanos int64 +} + +func (it *dedupeProfileRowIterator) Next() bool { + for { + if !it.Iterator.Next() { + return false + } + currentProfile := it.Iterator.At() + if it.prevFP == currentProfile.fp && it.prevTimeNanos == currentProfile.timeNanos { + // skip duplicate profile + continue + } + it.prevFP = currentProfile.fp + it.prevTimeNanos = currentProfile.timeNanos + return true + } +} + +func prepareIndexWriter(ctx context.Context, path string, readers []BlockReader) (*index.Writer, error) { + var symbols index.StringIter + indexw, err := index.NewWriter(ctx, path) + if err != nil { + return nil, err + } + for i, r := range readers { + if i == 0 { + symbols = r.Index().Symbols() + } + symbols = tsdb.NewMergedStringIter(symbols, r.Index().Symbols()) + } + + for symbols.Next() { + if err := indexw.AddSymbol(symbols.At()); err != nil { + return nil, errors.Wrap(err, "add symbol") + } + } + if symbols.Err() != nil { + return nil, errors.Wrap(symbols.Err(), "next symbol") + } + + return indexw, nil +} + +type symbolsRewriter struct { + profiles iter.Iterator[profileRow] + rewriters map[BlockReader]*stacktraceRewriter + stacktraces []uint32 + err error + + numSamples uint64 +} + +func newSymbolsRewriter(it iter.Iterator[profileRow], blocks []BlockReader, w SymbolsWriter) *symbolsRewriter { + sr := symbolsRewriter{ + profiles: it, + rewriters: make(map[BlockReader]*stacktraceRewriter, len(blocks)), + } + for _, r := range blocks { + sr.rewriters[r] = newStacktraceRewriter(r.Symbols(), w) + } + return &sr +} + +func (s *symbolsRewriter) NumSamples() uint64 { return s.numSamples } + +func (s *symbolsRewriter) At() profileRow { return s.profiles.At() } + +func (s *symbolsRewriter) Close() error { return s.profiles.Close() } + +func (s *symbolsRewriter) Err() error { + if s.err != nil { + return s.err + } + return s.profiles.Err() +} + +func (s *symbolsRewriter) Next() bool { + if !s.profiles.Next() { + return false + } + var err error + profile := s.profiles.At() + profile.row.ForStacktraceIDsValues(func(values []parquet.Value) { + s.loadStacktracesID(values) + r := s.rewriters[profile.blockReader] + if err = r.rewriteStacktraces(profile.row.StacktracePartitionID(), s.stacktraces); err != nil { + return + } + s.numSamples += uint64(len(values)) + for i, v := range values { + // FIXME: the original order is not preserved, which will affect encoding. + values[i] = parquet.Int64Value(int64(s.stacktraces[i])).Level(v.RepetitionLevel(), v.DefinitionLevel(), v.Column()) + } + }) + if err != nil { + s.err = err + return false + } + return true +} + +func (s *symbolsRewriter) loadStacktracesID(values []parquet.Value) { + s.stacktraces = grow(s.stacktraces, len(values)) + for i := range values { + s.stacktraces[i] = values[i].Uint32() + } +} + +type stacktraceRewriter struct { + reader SymbolsReader + writer SymbolsWriter + + partitions map[uint64]*symPartitionRewriter + inserter *stacktraceInserter + + // Objects below have global addressing. + // TODO(kolesnikovae): Move to partition. + locations *lookupTable[*schemav1.InMemoryLocation] + mappings *lookupTable[*schemav1.InMemoryMapping] + functions *lookupTable[*schemav1.InMemoryFunction] + strings *lookupTable[string] +} + +type symPartitionRewriter struct { + name uint64 + stats symdb.Stats + // Stacktrace identifiers are only valid within the partition. + stacktraces *lookupTable[[]int32] + resolver SymbolsResolver + appender SymbolsAppender + + r *stacktraceRewriter + + // FIXME(kolesnikovae): schemav1.Stacktrace should be just a uint32 slice: + // type Stacktrace []uint32 + current []*schemav1.Stacktrace +} + +func newStacktraceRewriter(r SymbolsReader, w SymbolsWriter) *stacktraceRewriter { + return &stacktraceRewriter{ + reader: r, + writer: w, + } +} + +func (r *stacktraceRewriter) init(partition uint64) (p *symPartitionRewriter, err error) { + if r.partitions == nil { + r.partitions = make(map[uint64]*symPartitionRewriter) + } + if p, err = r.getOrCreatePartition(partition); err != nil { + return nil, err + } + + if r.locations == nil { + r.locations = newLookupTable[*schemav1.InMemoryLocation](p.stats.LocationsTotal) + r.mappings = newLookupTable[*schemav1.InMemoryMapping](p.stats.MappingsTotal) + r.functions = newLookupTable[*schemav1.InMemoryFunction](p.stats.FunctionsTotal) + r.strings = newLookupTable[string](p.stats.StringsTotal) + } else { + r.locations.reset() + r.mappings.reset() + r.functions.reset() + r.strings.reset() + } + + r.inserter = &stacktraceInserter{ + stacktraces: p.stacktraces, + locations: r.locations, + } + + return p, nil +} + +func (r *stacktraceRewriter) getOrCreatePartition(partition uint64) (_ *symPartitionRewriter, err error) { + p, ok := r.partitions[partition] + if ok { + p.reset() + return p, nil + } + n := &symPartitionRewriter{r: r, name: partition} + if n.resolver, err = r.reader.SymbolsResolver(partition); err != nil { + return nil, err + } + if n.appender, err = r.writer.SymbolsAppender(partition); err != nil { + return nil, err + } + n.resolver.WriteStats(&n.stats) + n.stacktraces = newLookupTable[[]int32](n.stats.MaxStacktraceID) + r.partitions[partition] = n + return n, nil +} + +func (r *stacktraceRewriter) rewriteStacktraces(partition uint64, stacktraces []uint32) error { + p, err := r.init(partition) + if err != nil { + return err + } + if err = p.populateUnresolved(stacktraces); err != nil { + return err + } + if p.hasUnresolved() { + return p.appendRewrite(stacktraces) + } + return nil +} + +func (p *symPartitionRewriter) reset() { + p.stacktraces.reset() + p.current = p.current[:0] +} + +func (p *symPartitionRewriter) hasUnresolved() bool { + return len(p.stacktraces.unresolved)+ + len(p.r.locations.unresolved)+ + len(p.r.mappings.unresolved)+ + len(p.r.functions.unresolved)+ + len(p.r.strings.unresolved) > 0 +} + +func (p *symPartitionRewriter) populateUnresolved(stacktraceIDs []uint32) error { + // Filter out all stack traces that have been already + // resolved and populate locations lookup table. + if err := p.resolveStacktraces(stacktraceIDs); err != nil { + return err + } + if len(p.r.locations.unresolved) == 0 { + return nil + } + + // Resolve functions and mappings for new locations. + unresolvedLocs := p.r.locations.iter() + locations := p.resolver.Locations(unresolvedLocs) + for locations.Next() { + location := locations.At() + location.MappingId = p.r.mappings.tryLookup(location.MappingId) + for j, line := range location.Line { + location.Line[j].FunctionId = p.r.functions.tryLookup(line.FunctionId) + } + unresolvedLocs.setValue(location) + } + if err := locations.Err(); err != nil { + return err + } + + // Resolve strings. + unresolvedMappings := p.r.mappings.iter() + mappings := p.resolver.Mappings(unresolvedMappings) + for mappings.Next() { + mapping := mappings.At() + mapping.BuildId = p.r.strings.tryLookup(mapping.BuildId) + mapping.Filename = p.r.strings.tryLookup(mapping.Filename) + unresolvedMappings.setValue(mapping) + } + if err := mappings.Err(); err != nil { + return err + } + + unresolvedFunctions := p.r.functions.iter() + functions := p.resolver.Functions(unresolvedFunctions) + for functions.Next() { + function := functions.At() + function.Name = p.r.strings.tryLookup(function.Name) + function.Filename = p.r.strings.tryLookup(function.Filename) + function.SystemName = p.r.strings.tryLookup(function.SystemName) + unresolvedFunctions.setValue(function) + } + if err := functions.Err(); err != nil { + return err + } + + unresolvedStrings := p.r.strings.iter() + strings := p.resolver.Strings(unresolvedStrings) + for strings.Next() { + unresolvedStrings.setValue(strings.At()) + } + return strings.Err() +} + +func (p *symPartitionRewriter) appendRewrite(stacktraces []uint32) error { + p.appender.AppendStrings(p.r.strings.buf, p.r.strings.values) + p.r.strings.updateResolved() + + for _, v := range p.r.functions.values { + v.Name = p.r.strings.lookupResolved(v.Name) + v.Filename = p.r.strings.lookupResolved(v.Filename) + v.SystemName = p.r.strings.lookupResolved(v.SystemName) + } + p.appender.AppendFunctions(p.r.functions.buf, p.r.functions.values) + p.r.functions.updateResolved() + + for _, v := range p.r.mappings.values { + v.BuildId = p.r.strings.lookupResolved(v.BuildId) + v.Filename = p.r.strings.lookupResolved(v.Filename) + } + p.appender.AppendMappings(p.r.mappings.buf, p.r.mappings.values) + p.r.mappings.updateResolved() + + for _, v := range p.r.locations.values { + v.MappingId = p.r.mappings.lookupResolved(v.MappingId) + for j, line := range v.Line { + v.Line[j].FunctionId = p.r.functions.lookupResolved(line.FunctionId) + } + } + p.appender.AppendLocations(p.r.locations.buf, p.r.locations.values) + p.r.locations.updateResolved() + + for _, v := range p.stacktraces.values { + for j, location := range v { + v[j] = int32(p.r.locations.lookupResolved(uint32(location))) + } + } + p.appender.AppendStacktraces(p.stacktraces.buf, p.stacktracesFromResolvedValues()) + p.stacktraces.updateResolved() + + for i, v := range stacktraces { + stacktraces[i] = p.stacktraces.lookupResolved(v) + } + + return nil +} + +func (p *symPartitionRewriter) resolveStacktraces(stacktraceIDs []uint32) error { + for i, v := range stacktraceIDs { + stacktraceIDs[i] = p.stacktraces.tryLookup(v) + } + if len(p.stacktraces.unresolved) == 0 { + return nil + } + p.stacktraces.initSorted() + return p.resolver.ResolveStacktraces(context.TODO(), p.r.inserter, p.stacktraces.buf) +} + +func (p *symPartitionRewriter) stacktracesFromResolvedValues() []*schemav1.Stacktrace { + p.current = grow(p.current, len(p.stacktraces.values)) + for i, v := range p.stacktraces.values { + s := p.current[i] + if s == nil { + s = &schemav1.Stacktrace{LocationIDs: make([]uint64, len(v))} + p.current[i] = s + } + s.LocationIDs = grow(s.LocationIDs, len(v)) + for j, m := range v { + s.LocationIDs[j] = uint64(m) + } + } + return p.current +} + +type stacktraceInserter struct { + stacktraces *lookupTable[[]int32] + locations *lookupTable[*schemav1.InMemoryLocation] +} + +func (i *stacktraceInserter) InsertStacktrace(stacktrace uint32, locations []int32) { + // Resolve locations for new stack traces. + for j, loc := range locations { + locations[j] = int32(i.locations.tryLookup(uint32(loc))) + } + // stacktrace points to resolved which should + // be a marked pointer to unresolved value. + idx := i.stacktraces.resolved[stacktrace] & markerMask + v := &i.stacktraces.values[idx] + n := grow(*v, len(locations)) + copy(n, locations) + // Preserve allocated capacity. + i.stacktraces.values[idx] = n +} + +const ( + marker = 1 << 31 + markerMask = math.MaxUint32 >> 1 +) + +type lookupTable[T any] struct { + // Index is source ID, and the value is the destination ID. + // If destination ID is not known, the element is index to 'unresolved' (marked). + resolved []uint32 + unresolved []uint32 // Points to resolved. Index matches values. + values []T // Values are populated for unresolved items. + buf []uint32 // Sorted unresolved values. +} + +func newLookupTable[T any](size int) *lookupTable[T] { + var t lookupTable[T] + t.init(size) + return &t +} + +func (t *lookupTable[T]) init(size int) { + if cap(t.resolved) < size { + t.resolved = make([]uint32, size) + return + } + t.resolved = t.resolved[:size] + for i := range t.resolved { + t.resolved[i] = 0 + } +} + +func (t *lookupTable[T]) reset() { + t.unresolved = t.unresolved[:0] + t.values = t.values[:0] + t.buf = t.buf[:0] +} + +// tryLookup looks up the value at x in resolved. +// If x is has not been resolved yet, the x is memorized +// for future resolve, and returned values is the marked +// index to unresolved. +func (t *lookupTable[T]) tryLookup(x uint32) uint32 { + if v := t.resolved[x]; v != 0 { + if v&marker > 0 { + return v // Already marked for resolve. + } + return v - 1 // Already resolved. + } + u := t.newUnresolved(x) | marker + t.resolved[x] = u + return u +} + +func (t *lookupTable[T]) newUnresolved(rid uint32) uint32 { + t.unresolved = append(t.unresolved, rid) + x := len(t.values) + if x < cap(t.values) { + // Try to reuse previously allocated value. + t.values = t.values[:x+1] + } else { + var v T + t.values = append(t.values, v) + } + return uint32(x) +} + +func (t *lookupTable[T]) storeResolved(i int, rid uint32) { + // The index is incremented to avoid 0 because it is + // used as sentinel and indicates absence (resolved is + // a sparse slice initialized with the maximal expected + // size). Correspondingly, lookupResolved should + // decrement the index on read. + t.resolved[t.unresolved[i]] = rid + 1 +} + +func (t *lookupTable[T]) lookupResolved(x uint32) uint32 { + if x&marker > 0 { + return t.resolved[t.unresolved[x&markerMask]] - 1 + } + return x // Already resolved. +} + +// updateResolved loads indices from buf to resolved. +// It is expected that the order matches values. +func (t *lookupTable[T]) updateResolved() { + for i, rid := range t.unresolved { + t.resolved[rid] = t.buf[i] + 1 + } +} + +func (t *lookupTable[T]) initSorted() { + // Gather and sort references to unresolved values. + t.buf = grow(t.buf, len(t.unresolved)) + copy(t.buf, t.unresolved) + sort.Slice(t.buf, func(i, j int) bool { + return t.buf[i] < t.buf[j] + }) +} + +func (t *lookupTable[T]) iter() *lookupTableIterator[T] { + t.initSorted() + return &lookupTableIterator[T]{table: t} +} + +type lookupTableIterator[T any] struct { + table *lookupTable[T] + cur uint32 +} + +func (t *lookupTableIterator[T]) Next() bool { + return t.cur < uint32(len(t.table.buf)) +} + +func (t *lookupTableIterator[T]) At() uint32 { + x := t.table.buf[t.cur] + t.cur++ + return x +} + +func (t *lookupTableIterator[T]) setValue(v T) { + u := t.table.resolved[t.table.buf[t.cur-1]] + t.table.values[u&markerMask] = v +} + +func (t *lookupTableIterator[T]) Close() error { return nil } + +func (t *lookupTableIterator[T]) Err() error { return nil } + +func grow[T any](s []T, n int) []T { + if cap(s) < n { + return make([]T, n, 2*n) + } + return s[:n] +} diff --git a/pkg/phlaredb/compact_test.go b/pkg/phlaredb/compact_test.go new file mode 100644 index 0000000000..f386e51497 --- /dev/null +++ b/pkg/phlaredb/compact_test.go @@ -0,0 +1,317 @@ +package phlaredb + +import ( + "context" + "fmt" + _ "net/http/pprof" + "os" + "path/filepath" + "testing" + + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/storage" + "github.com/segmentio/parquet-go" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + typesv1 "github.com/grafana/pyroscope/api/gen/proto/go/types/v1" + phlaremodel "github.com/grafana/pyroscope/pkg/model" + "github.com/grafana/pyroscope/pkg/phlaredb/block" + schemav1 "github.com/grafana/pyroscope/pkg/phlaredb/schemas/v1" + "github.com/grafana/pyroscope/pkg/phlaredb/tsdb/index" +) + +// func testCompact(t *testing.T, metas []*block.Meta, bkt phlareobj.Bucket, dst string) { +// t.Helper() +// g, ctx := errgroup.WithContext(context.Background()) +// var src []BlockReader +// now := time.Now() +// for i, m := range metas { +// t.Log("src block(#", i, ")", +// "ID", m.ULID.String(), +// "minTime", m.MinTime.Time().Format(time.RFC3339Nano), +// "maxTime", m.MaxTime.Time().Format(time.RFC3339Nano), +// "numSeries", m.Stats.NumSeries, +// "numProfiles", m.Stats.NumProfiles, +// "numSamples", m.Stats.NumSamples) +// b := NewSingleBlockQuerierFromMeta(ctx, bkt, m) +// g.Go(func() error { +// if err := b.Open(ctx); err != nil { +// return err +// } +// return b.stacktraces.Load(ctx) +// }) +// src = append(src, b) +// } + +// require.NoError(t, g.Wait()) + +// new, err := Compact(context.Background(), src, dst) +// require.NoError(t, err) +// t.Log(new, dst) +// t.Log("Compaction duration", time.Since(now)) +// t.Log("numSeries", new.Stats.NumSeries, +// "numProfiles", new.Stats.NumProfiles, +// "numSamples", new.Stats.NumSamples) +// } + +type blockReaderMock struct { + BlockReader + idxr IndexReader +} + +func (m *blockReaderMock) Index() IndexReader { + return m.idxr +} + +func TestProfileRowIterator(t *testing.T) { + filePath := t.TempDir() + "/index.tsdb" + idxw, err := index.NewWriter(context.Background(), filePath) + require.NoError(t, err) + require.NoError(t, idxw.AddSymbol("a")) + require.NoError(t, idxw.AddSymbol("b")) + require.NoError(t, idxw.AddSymbol("c")) + addSeries(t, idxw, 0, phlaremodel.Labels{ + &typesv1.LabelPair{Name: "a", Value: "b"}, + }) + addSeries(t, idxw, 1, phlaremodel.Labels{ + &typesv1.LabelPair{Name: "a", Value: "c"}, + }) + addSeries(t, idxw, 2, phlaremodel.Labels{ + &typesv1.LabelPair{Name: "b", Value: "a"}, + }) + require.NoError(t, idxw.Close()) + idxr, err := index.NewFileReader(filePath) + require.NoError(t, err) + + it, err := newProfileRowIterator(schemav1.NewInMemoryProfilesRowReader( + []schemav1.InMemoryProfile{ + {SeriesIndex: 0, TimeNanos: 1}, + {SeriesIndex: 1, TimeNanos: 2}, + {SeriesIndex: 2, TimeNanos: 3}, + }, + ), &blockReaderMock{idxr: idxr}) + require.NoError(t, err) + + assert.True(t, it.Next()) + require.Equal(t, it.At().labels, phlaremodel.Labels{ + &typesv1.LabelPair{Name: "a", Value: "b"}, + }) + require.Equal(t, it.At().timeNanos, int64(1)) + require.Equal(t, it.At().seriesRef, uint32(0)) + + assert.True(t, it.Next()) + require.Equal(t, it.At().labels, phlaremodel.Labels{ + &typesv1.LabelPair{Name: "a", Value: "c"}, + }) + require.Equal(t, it.At().timeNanos, int64(2)) + require.Equal(t, it.At().seriesRef, uint32(1)) + + assert.True(t, it.Next()) + require.Equal(t, it.At().labels, phlaremodel.Labels{ + &typesv1.LabelPair{Name: "b", Value: "a"}, + }) + require.Equal(t, it.At().timeNanos, int64(3)) + require.Equal(t, it.At().seriesRef, uint32(2)) + + assert.False(t, it.Next()) + require.NoError(t, it.Err()) + require.NoError(t, it.Close()) +} + +func addSeries(t *testing.T, idxw *index.Writer, idx int, labels phlaremodel.Labels) { + t.Helper() + require.NoError(t, idxw.AddSeries(storage.SeriesRef(idx), labels, model.Fingerprint(labels.Hash()), index.ChunkMeta{SeriesIndex: uint32(idx)})) +} + +func TestMetaFilesFromDir(t *testing.T) { + dst := t.TempDir() + generateParquetFile(t, filepath.Join(dst, "foo.parquet")) + generateParquetFile(t, filepath.Join(dst, "symbols", "bar.parquet")) + generateFile(t, filepath.Join(dst, "symbols", "index.symdb"), 100) + generateFile(t, filepath.Join(dst, "symbols", "stacktraces.symdb"), 200) + generateIndexFile(t, dst) + actual, err := metaFilesFromDir(dst) + + require.NoError(t, err) + require.Equal(t, 5, len(actual)) + require.Equal(t, []block.File{ + { + Parquet: &block.ParquetFile{ + NumRows: 100, + NumRowGroups: 10, + }, + RelPath: "foo.parquet", + SizeBytes: fileSize(t, filepath.Join(dst, "foo.parquet")), + }, + { + RelPath: block.IndexFilename, + SizeBytes: fileSize(t, filepath.Join(dst, block.IndexFilename)), + TSDB: &block.TSDBFile{ + NumSeries: 3, + }, + }, + { + Parquet: &block.ParquetFile{ + NumRows: 100, + NumRowGroups: 10, + }, + RelPath: filepath.Join("symbols", "bar.parquet"), + SizeBytes: fileSize(t, filepath.Join(dst, "symbols", "bar.parquet")), + }, + { + RelPath: filepath.Join("symbols", "index.symdb"), + SizeBytes: fileSize(t, filepath.Join(dst, "symbols", "index.symdb")), + }, + { + RelPath: filepath.Join("symbols", "stacktraces.symdb"), + SizeBytes: fileSize(t, filepath.Join(dst, "symbols", "stacktraces.symdb")), + }, + }, actual) +} + +func fileSize(t *testing.T, path string) uint64 { + t.Helper() + fi, err := os.Stat(path) + require.NoError(t, err) + return uint64(fi.Size()) +} + +func generateFile(t *testing.T, path string, size int) { + t.Helper() + require.NoError(t, os.MkdirAll(filepath.Dir(path), 0o755)) + f, err := os.Create(path) + require.NoError(t, err) + defer f.Close() + require.NoError(t, f.Truncate(int64(size))) +} + +func generateIndexFile(t *testing.T, dir string) { + t.Helper() + filePath := filepath.Join(dir, block.IndexFilename) + idxw, err := index.NewWriter(context.Background(), filePath) + require.NoError(t, err) + require.NoError(t, idxw.AddSymbol("a")) + require.NoError(t, idxw.AddSymbol("b")) + require.NoError(t, idxw.AddSymbol("c")) + addSeries(t, idxw, 0, phlaremodel.Labels{ + &typesv1.LabelPair{Name: "a", Value: "b"}, + }) + addSeries(t, idxw, 1, phlaremodel.Labels{ + &typesv1.LabelPair{Name: "a", Value: "c"}, + }) + addSeries(t, idxw, 2, phlaremodel.Labels{ + &typesv1.LabelPair{Name: "b", Value: "a"}, + }) + require.NoError(t, idxw.Close()) +} + +func generateParquetFile(t *testing.T, path string) { + t.Helper() + require.NoError(t, os.MkdirAll(filepath.Dir(path), 0o755)) + file, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE|os.O_EXCL, 0o644) + require.NoError(t, err) + defer file.Close() + + writer := parquet.NewGenericWriter[struct{ Name string }](file, parquet.MaxRowsPerRowGroup(10)) + defer writer.Close() + for i := 0; i < 100; i++ { + _, err := writer.Write([]struct{ Name string }{ + {Name: fmt.Sprintf("name-%d", i)}, + }) + require.NoError(t, err) + } +} + +func Test_lookupTable(t *testing.T) { + // Given the source data set. + // Copy arbitrary subsets of items from src to dst. + var dst []string + src := []string{ + "zero", + "one", + "two", + "three", + "four", + "five", + "six", + "seven", + } + + type testCase struct { + description string + input []uint32 + expected []string + } + + testCases := []testCase{ + { + description: "empty table", + input: []uint32{5, 0, 3, 1, 2, 2, 4}, + expected: []string{"five", "zero", "three", "one", "two", "two", "four"}, + }, + { + description: "no new values", + input: []uint32{2, 1, 2, 3}, + expected: []string{"two", "one", "two", "three"}, + }, + { + description: "new value mixed", + input: []uint32{2, 1, 6, 2, 3}, + expected: []string{"two", "one", "six", "two", "three"}, + }, + } + + // Try to lookup values in src lazily. + // Table size must be greater or equal + // to the source data set. + l := newLookupTable[string](10) + + populate := func(t *testing.T, x []uint32) { + for i, v := range x { + x[i] = l.tryLookup(v) + } + // Resolve unknown yet values. + // Mind the order and deduplication. + p := -1 + for it := l.iter(); it.Err() == nil && it.Next(); { + m := int(it.At()) + if m <= p { + t.Fatal("iterator order invalid") + } + p = m + it.setValue(src[m]) + } + } + + resolveAppend := func() { + // Populate dst with the newly resolved values. + // Note that order in dst does not have to match src. + for i, v := range l.values { + l.storeResolved(i, uint32(len(dst))) + dst = append(dst, v) + } + } + + resolve := func(x []uint32) []string { + // Lookup resolved values. + var resolved []string + for _, v := range x { + resolved = append(resolved, dst[l.lookupResolved(v)]) + } + return resolved + } + + for _, tc := range testCases { + tc := tc + t.Run(tc.description, func(t *testing.T) { + l.reset() + populate(t, tc.input) + resolveAppend() + assert.Equal(t, tc.expected, resolve(tc.input)) + }) + } + + assert.Len(t, dst, 7) + assert.NotContains(t, dst, "seven") +} diff --git a/pkg/phlaredb/deduplicating_slice.go b/pkg/phlaredb/deduplicating_slice.go index 3442fb9e6f..cdeaf15ec0 100644 --- a/pkg/phlaredb/deduplicating_slice.go +++ b/pkg/phlaredb/deduplicating_slice.go @@ -227,3 +227,37 @@ func (s *deduplicatingSlice[M, K, H, P]) ingest(_ context.Context, elems []M, re return nil } + +func (s *deduplicatingSlice[M, K, H, P]) append(dst []uint32, elems []M) { + missing := int64SlicePool.Get()[:0] + s.lock.RLock() + for i, v := range elems { + k := s.helper.key(v) + if x, ok := s.lookup[k]; ok { + dst[i] = uint32(x) + } else { + missing = append(missing, int64(i)) + } + } + s.lock.RUnlock() + if len(missing) > 0 { + s.lock.RLock() + p := uint32(len(s.slice)) + for _, i := range missing { + e := elems[i] + k := s.helper.key(e) + x, ok := s.lookup[k] + if ok { + dst[i] = uint32(x) + continue + } + s.size.Add(s.helper.size(e)) + s.slice = append(s.slice, s.helper.clone(e)) + s.lookup[k] = int64(p) + dst[i] = p + p++ + } + s.lock.RUnlock() + } + int64SlicePool.Put(missing) +} diff --git a/pkg/phlaredb/head.go b/pkg/phlaredb/head.go index 08311ffb67..14ab9e16d4 100644 --- a/pkg/phlaredb/head.go +++ b/pkg/phlaredb/head.go @@ -317,7 +317,7 @@ func (h *Head) convertSamples(_ context.Context, r *rewriter, stacktracePartitio r.locations.rewriteUint64(&stacktraces[idxSample].LocationIDs[i]) } } - appender := h.symbolDB.MappingWriter(stacktracePartition).StacktraceAppender() + appender := h.symbolDB.SymbolsAppender(stacktracePartition).StacktraceAppender() defer appender.Release() if cap(stacktracesIds) < len(stacktraces) { @@ -609,7 +609,7 @@ func (h *Head) resolveStacktraces(ctx context.Context, stacktracesByMapping stac sp.LogFields(otlog.String("msg", "building MergeProfilesStacktracesResult")) _ = stacktracesByMapping.ForEach( func(mapping uint64, stacktraceSamples stacktraceSampleMap) error { - mp, ok := h.symbolDB.MappingReader(mapping) + mp, ok := h.symbolDB.SymbolsResolver(mapping) if !ok { return nil } @@ -672,7 +672,7 @@ func (h *Head) resolvePprof(ctx context.Context, stacktracesByMapping profileSam // now add locationIDs and stacktraces _ = stacktracesByMapping.ForEach( func(mapping uint64, stacktraceSamples profileSampleMap) error { - mp, ok := h.symbolDB.MappingReader(mapping) + mp, ok := h.symbolDB.SymbolsResolver(mapping) if !ok { return nil } @@ -985,9 +985,9 @@ func (h *Head) flush(ctx context.Context) error { } // add total size symdb - symbDBFiles, error := h.SymDBFiles() - if error != nil { - return error + symbDBFiles, err := symdbMetaFiles(h.headPath) + if err != nil { + return err } for _, file := range symbDBFiles { @@ -1034,6 +1034,26 @@ func (h *Head) SymDBFiles() ([]block.File, error) { return result, nil } +func symdbMetaFiles(dir string) ([]block.File, error) { + files, err := os.ReadDir(filepath.Join(dir, symdb.DefaultDirName)) + if err != nil { + return nil, err + } + result := make([]block.File, len(files)) + for idx, f := range files { + if f.IsDir() { + continue + } + result[idx].RelPath = filepath.Join(symdb.DefaultDirName, f.Name()) + info, err := f.Info() + if err != nil { + return nil, err + } + result[idx].SizeBytes = uint64(info.Size()) + } + return result, nil +} + // Move moves the head directory to local blocks. The call is not thread-safe: // no concurrent reads and writes are allowed. // diff --git a/pkg/phlaredb/profile_store.go b/pkg/phlaredb/profile_store.go index fb25d8d992..94d5d0d465 100644 --- a/pkg/phlaredb/profile_store.go +++ b/pkg/phlaredb/profile_store.go @@ -63,6 +63,14 @@ type profileStore struct { flushBufferLbs []phlaremodel.Labels } +func newProfileWriter(writer io.Writer) *parquet.GenericWriter[*schemav1.Profile] { + return parquet.NewGenericWriter[*schemav1.Profile](writer, (&schemav1.ProfilePersister{}).Schema(), + parquet.ColumnPageBuffers(parquet.NewFileBufferPool(os.TempDir(), "phlaredb-parquet-buffers*")), + parquet.CreatedBy("github.com/grafana/phlare/", build.Version, build.Revision), + parquet.PageBufferSize(3*1024*1024), + ) +} + func newProfileStore(phlarectx context.Context) *profileStore { s := &profileStore{ logger: phlarecontext.Logger(phlarectx), @@ -76,11 +84,7 @@ func newProfileStore(phlarectx context.Context) *profileStore { go s.cutRowGroupLoop() // Initialize writer on /dev/null // TODO: Reuse parquet.Writer beyond life time of the head. - s.writer = parquet.NewGenericWriter[*schemav1.Profile](io.Discard, s.persister.Schema(), - parquet.ColumnPageBuffers(parquet.NewFileBufferPool(os.TempDir(), "phlaredb-parquet-buffers*")), - parquet.CreatedBy("github.com/grafana/pyroscope/", build.Version, build.Revision), - parquet.PageBufferSize(3*1024*1024), - ) + s.writer = newProfileWriter(io.Discard) return s } diff --git a/pkg/phlaredb/sample_merge.go b/pkg/phlaredb/sample_merge.go index e9fcd2671b..1c44f24a76 100644 --- a/pkg/phlaredb/sample_merge.go +++ b/pkg/phlaredb/sample_merge.go @@ -56,20 +56,12 @@ func newLocationsIdsByStacktraceID(size int) locationsIdsByStacktraceID { } } -func (l locationsIdsByStacktraceID) addFromParquet(stacktraceID int64, locs []parquet.Value) { - l.byStacktraceID[stacktraceID] = make([]int32, len(locs)) - for i, locationID := range locs { - locID := locationID.Uint64() - l.ids[int64(locID)] = struct{}{} - l.byStacktraceID[stacktraceID][i] = int32(locID) - } -} - -func (l locationsIdsByStacktraceID) add(stacktraceID int64, locs []int32) { - l.byStacktraceID[stacktraceID] = make([]int32, len(locs)) +func (l locationsIdsByStacktraceID) InsertStacktrace(stacktraceID uint32, locs []int32) { + s := make([]int32, len(locs)) + l.byStacktraceID[int64(stacktraceID)] = s for i, locationID := range locs { l.ids[int64(locationID)] = struct{}{} - l.byStacktraceID[stacktraceID][i] = locationID + s[i] = locationID } } diff --git a/pkg/phlaredb/schemas/v1/profiles.go b/pkg/phlaredb/schemas/v1/profiles.go index b6b75d1b00..01be0095fc 100644 --- a/pkg/phlaredb/schemas/v1/profiles.go +++ b/pkg/phlaredb/schemas/v1/profiles.go @@ -42,9 +42,11 @@ var ( phlareparquet.NewGroupField("DefaultSampleType", parquet.Optional(parquet.Int(64))), }) - maxProfileRow parquet.Row - seriesIndexColIndex int - timeNanoColIndex int + maxProfileRow parquet.Row + seriesIndexColIndex int + stacktraceIDColIndex int + timeNanoColIndex int + stacktracePartitionColIndex int ) func init() { @@ -62,6 +64,16 @@ func init() { panic(fmt.Errorf("TimeNanos column not found")) } timeNanoColIndex = timeCol.ColumnIndex + stacktraceIDCol, ok := profilesSchema.Lookup("Samples", "list", "element", "StacktraceID") + if !ok { + panic(fmt.Errorf("StacktraceID column not found")) + } + stacktraceIDColIndex = stacktraceIDCol.ColumnIndex + stacktracePartitionCol, ok := profilesSchema.Lookup("StacktracePartition") + if !ok { + panic(fmt.Errorf("StacktracePartition column not found")) + } + stacktracePartitionColIndex = stacktracePartitionCol.ColumnIndex } type Sample struct { @@ -461,3 +473,47 @@ func lessProfileRows(r1, r2 parquet.Row) bool { } return ts1 < ts2 } + +type ProfileRow parquet.Row + +func (p ProfileRow) SeriesIndex() uint32 { + return p[seriesIndexColIndex].Uint32() +} + +func (p ProfileRow) StacktracePartitionID() uint64 { + return p[stacktracePartitionColIndex].Uint64() +} + +func (p ProfileRow) TimeNanos() int64 { + var ts int64 + for i := len(p) - 1; i >= 0; i-- { + if p[i].Column() == timeNanoColIndex { + ts = p[i].Int64() + break + } + } + return ts +} + +func (p ProfileRow) SetSeriesIndex(v uint32) { + p[seriesIndexColIndex] = parquet.Int32Value(int32(v)).Level(0, 0, seriesIndexColIndex) +} + +func (p ProfileRow) ForStacktraceIDsValues(fn func([]parquet.Value)) { + start := -1 + var i int + for i = 0; i < len(p); i++ { + col := p[i].Column() + if col == stacktraceIDColIndex && p[i].DefinitionLevel() == 1 { + if start == -1 { + start = i + } + } + if col > stacktraceIDColIndex { + break + } + } + if start != -1 { + fn(p[start:i]) + } +} diff --git a/pkg/phlaredb/schemas/v1/profiles_test.go b/pkg/phlaredb/schemas/v1/profiles_test.go index 6584844b44..c23a0ef145 100644 --- a/pkg/phlaredb/schemas/v1/profiles_test.go +++ b/pkg/phlaredb/schemas/v1/profiles_test.go @@ -207,6 +207,66 @@ func TestLessProfileRows(t *testing.T) { } } +func TestProfileRowStacktraceIDs(t *testing.T) { + for _, tc := range []struct { + name string + expected []uint32 + profile InMemoryProfile + }{ + {"empty", nil, InMemoryProfile{}}, + {"one sample", []uint32{1}, InMemoryProfile{ + SeriesIndex: 1, + StacktracePartition: 2, + TotalValue: 3, + Samples: Samples{StacktraceIDs: []uint32{1}, Values: []uint64{1}}, + }}, + {"many", []uint32{1, 1, 2, 3, 4}, InMemoryProfile{ + SeriesIndex: 1, + StacktracePartition: 2, + TotalValue: 3, + Samples: Samples{ + StacktraceIDs: []uint32{1, 1, 2, 3, 4}, + Values: []uint64{4, 2, 4, 5, 2}, + }, + }}, + } { + tc := tc + t.Run(tc.name, func(t *testing.T) { + rows := generateProfileRow([]InMemoryProfile{tc.profile}) + var ids []uint32 + ProfileRow(rows[0]).ForStacktraceIDsValues(func(values []parquet.Value) { + for _, v := range values { + ids = append(ids, v.Uint32()) + } + }) + require.Equal(t, tc.expected, ids) + }) + } +} + +func TestProfileRowMutateValues(t *testing.T) { + row := ProfileRow(generateProfileRow([]InMemoryProfile{ + { + Samples: Samples{ + StacktraceIDs: []uint32{1, 1, 2, 3, 4}, + Values: []uint64{4, 2, 4, 5, 2}, + }, + }, + })[0]) + row.ForStacktraceIDsValues(func(values []parquet.Value) { + for i := range values { + values[i] = parquet.Int32Value(1).Level(0, 1, values[i].Column()) + } + }) + var ids []uint32 + row.ForStacktraceIDsValues(func(values []parquet.Value) { + for _, v := range values { + ids = append(ids, v.Uint32()) + } + }) + require.Equal(t, []uint32{1, 1, 1, 1, 1}, ids) +} + func BenchmarkProfileRows(b *testing.B) { a := generateProfileRow([]InMemoryProfile{{SeriesIndex: 1, TimeNanos: 1}})[0] a1 := generateProfileRow([]InMemoryProfile{{SeriesIndex: 1, TimeNanos: 2}})[0] diff --git a/pkg/phlaredb/symdb/format.go b/pkg/phlaredb/symdb/format.go index 83ba737cb2..04924bbf4b 100644 --- a/pkg/phlaredb/symdb/format.go +++ b/pkg/phlaredb/symdb/format.go @@ -80,8 +80,8 @@ type IndexFile struct { // Version-specific parts. - // StacktraceChunkHeaders are sorted by mapping - // name and chunk index in ascending order. + // StacktraceChunkHeaders are sorted by + // partition and chunk index in ascending order. StacktraceChunkHeaders StacktraceChunkHeaders CRC uint32 @@ -201,29 +201,29 @@ func (h *StacktraceChunkHeaders) UnmarshalBinary(b []byte) error { return nil } -type stacktraceChunkHeadersByMappingAndIndex StacktraceChunkHeaders +type stacktraceChunkHeadersByPartitionAndIndex StacktraceChunkHeaders -func (h stacktraceChunkHeadersByMappingAndIndex) Len() int { +func (h stacktraceChunkHeadersByPartitionAndIndex) Len() int { return len(h.Entries) } -func (h stacktraceChunkHeadersByMappingAndIndex) Less(i, j int) bool { +func (h stacktraceChunkHeadersByPartitionAndIndex) Less(i, j int) bool { a, b := h.Entries[i], h.Entries[j] - if a.MappingName == b.MappingName { + if a.Partition == b.Partition { return a.ChunkIndex < b.ChunkIndex } - return a.MappingName < b.MappingName + return a.Partition < b.Partition } -func (h stacktraceChunkHeadersByMappingAndIndex) Swap(i, j int) { +func (h stacktraceChunkHeadersByPartitionAndIndex) Swap(i, j int) { h.Entries[i], h.Entries[j] = h.Entries[j], h.Entries[i] } type StacktraceChunkHeader struct { - Offset int64 // Relative to the mapping offset. + Offset int64 Size int64 - MappingName uint64 // MappingName the chunk refers to. + Partition uint64 ChunkIndex uint16 ChunkEncoding ChunkEncoding _ [5]byte // Reserved. @@ -247,7 +247,7 @@ const ( func (h *StacktraceChunkHeader) marshal(b []byte) { binary.BigEndian.PutUint64(b[0:8], uint64(h.Offset)) binary.BigEndian.PutUint64(b[8:16], uint64(h.Size)) - binary.BigEndian.PutUint64(b[16:24], h.MappingName) + binary.BigEndian.PutUint64(b[16:24], h.Partition) binary.BigEndian.PutUint16(b[24:26], h.ChunkIndex) b[27] = byte(h.ChunkEncoding) // 5 bytes reserved. @@ -262,7 +262,7 @@ func (h *StacktraceChunkHeader) marshal(b []byte) { func (h *StacktraceChunkHeader) unmarshal(b []byte) { h.Offset = int64(binary.BigEndian.Uint64(b[0:8])) h.Size = int64(binary.BigEndian.Uint64(b[8:16])) - h.MappingName = binary.BigEndian.Uint64(b[16:24]) + h.Partition = binary.BigEndian.Uint64(b[16:24]) h.ChunkIndex = binary.BigEndian.Uint16(b[24:26]) h.ChunkEncoding = ChunkEncoding(b[27]) // 5 bytes reserved. @@ -333,7 +333,7 @@ func (f *IndexFile) WriteTo(dst io.Writer) (n int64, err error) { return w.offset, fmt.Errorf("toc write: %w", err) } - sort.Sort(stacktraceChunkHeadersByMappingAndIndex(f.StacktraceChunkHeaders)) + sort.Sort(stacktraceChunkHeadersByPartitionAndIndex(f.StacktraceChunkHeaders)) sch, _ := f.StacktraceChunkHeaders.MarshalBinary() if _, err = w.Write(sch); err != nil { return w.offset, fmt.Errorf("stacktrace chunk headers: %w", err) diff --git a/pkg/phlaredb/symdb/interfaces.go b/pkg/phlaredb/symdb/interfaces.go index 2b9baaac84..1010bea37e 100644 --- a/pkg/phlaredb/symdb/interfaces.go +++ b/pkg/phlaredb/symdb/interfaces.go @@ -11,24 +11,22 @@ import ( // // In the package, Mapping represents all the version of a binary. -type MappingWriter interface { - // StacktraceAppender provides exclusive write access - // to the stack traces of the mapping. - // - // StacktraceAppender.Release must be called in order - // to dispose the object and release the lock. - // Released resolver must not be used. +type SymbolsAppender interface { StacktraceAppender() StacktraceAppender } -type MappingReader interface { - // StacktraceResolver provides non-exclusive read - // access to the stack traces of the mapping. - // - // StacktraceResolver.Release must be called in order - // to dispose the object and release the lock. - // Released resolver must not be used. +type SymbolsResolver interface { StacktraceResolver() StacktraceResolver + WriteStats(*Stats) +} + +type Stats struct { + StacktracesTotal int + LocationsTotal int + MappingsTotal int + FunctionsTotal int + StringsTotal int + MaxStacktraceID int } type StacktraceAppender interface { diff --git a/pkg/phlaredb/symdb/mapping_memory.go b/pkg/phlaredb/symdb/partition_memory.go similarity index 76% rename from pkg/phlaredb/symdb/mapping_memory.go rename to pkg/phlaredb/symdb/partition_memory.go index 2182f7b37e..58c78deed7 100644 --- a/pkg/phlaredb/symdb/mapping_memory.go +++ b/pkg/phlaredb/symdb/partition_memory.go @@ -12,21 +12,19 @@ import ( ) var ( - _ MappingReader = (*inMemoryMapping)(nil) - _ MappingWriter = (*inMemoryMapping)(nil) + _ SymbolsResolver = (*inMemoryPartition)(nil) + _ SymbolsAppender = (*inMemoryPartition)(nil) _ StacktraceAppender = (*stacktraceAppender)(nil) _ StacktraceResolver = (*stacktraceResolverMemory)(nil) ) -type inMemoryMapping struct { +type inMemoryPartition struct { name uint64 maxNodesPerChunk uint32 // maxStackDepth uint32 - // Stack traces originating from the mapping (binary): - // their bottom frames (roots) refer to this mapping. stacktraceMutex sync.RWMutex stacktraceHashToID map[uint64]uint32 stacktraceChunks []*stacktraceChunk @@ -34,33 +32,44 @@ type inMemoryMapping struct { stacktraceChunkHeaders []StacktraceChunkHeader } -func (b *inMemoryMapping) StacktraceAppender() StacktraceAppender { +func (b *inMemoryPartition) StacktraceAppender() StacktraceAppender { b.stacktraceMutex.RLock() - // Assuming there is at least one chunk. - c := b.stacktraceChunks[len(b.stacktraceChunks)-1] + c := b.currentStacktraceChunk() b.stacktraceMutex.RUnlock() return &stacktraceAppender{ - mapping: b, - chunk: c, + partition: b, + chunk: c, } } -func (b *inMemoryMapping) StacktraceResolver() StacktraceResolver { +func (b *inMemoryPartition) StacktraceResolver() StacktraceResolver { return &stacktraceResolverMemory{ - mapping: b, + partition: b, } } +func (b *inMemoryPartition) WriteStats(s *Stats) { + b.stacktraceMutex.RLock() + c := b.currentStacktraceChunk() + s.MaxStacktraceID = int(c.stid + c.tree.len()) + s.StacktracesTotal = len(b.stacktraceHashToID) + b.stacktraceMutex.RUnlock() +} + // stacktraceChunkForInsert returns a chunk for insertion: // if the existing one has capacity, or a new one, if the former is full. // Must be called with the stracktraces mutex write lock held. -func (b *inMemoryMapping) stacktraceChunkForInsert(x int) *stacktraceChunk { - c := b.stacktraceChunks[len(b.stacktraceChunks)-1] +func (b *inMemoryPartition) stacktraceChunkForInsert(x int) *stacktraceChunk { + c := b.currentStacktraceChunk() if n := c.tree.len() + uint32(x); b.maxNodesPerChunk > 0 && n >= b.maxNodesPerChunk { + // Calculate number of stacks in the chunk. + s := uint32(len(b.stacktraceHashToID)) + c.stacks = s - c.stacks c = &stacktraceChunk{ - mapping: b, - tree: newStacktraceTree(defaultStacktraceTreeSize), - stid: c.stid + b.maxNodesPerChunk, + parition: b, + tree: newStacktraceTree(defaultStacktraceTreeSize), + stid: c.stid + b.maxNodesPerChunk, + stacks: s, } b.stacktraceChunks = append(b.stacktraceChunks, c) } @@ -69,17 +78,23 @@ func (b *inMemoryMapping) stacktraceChunkForInsert(x int) *stacktraceChunk { // stacktraceChunkForRead returns a chunk for reads. // Must be called with the stracktraces mutex read lock held. -func (b *inMemoryMapping) stacktraceChunkForRead(i int) (*stacktraceChunk, bool) { +func (b *inMemoryPartition) stacktraceChunkForRead(i int) (*stacktraceChunk, bool) { if i < len(b.stacktraceChunks) { return b.stacktraceChunks[i], true } return nil, false } +func (b *inMemoryPartition) currentStacktraceChunk() *stacktraceChunk { + // Assuming there is at least one chunk. + return b.stacktraceChunks[len(b.stacktraceChunks)-1] +} + type stacktraceChunk struct { - mapping *inMemoryMapping - stid uint32 // Initial stack trace ID. - tree *stacktraceTree + parition *inMemoryPartition + tree *stacktraceTree + stid uint32 // Initial stack trace ID. + stacks uint32 // } func (s *stacktraceChunk) WriteTo(dst io.Writer) (int64, error) { @@ -87,7 +102,7 @@ func (s *stacktraceChunk) WriteTo(dst io.Writer) (int64, error) { } type stacktraceAppender struct { - mapping *inMemoryMapping + partition *inMemoryPartition chunk *stacktraceChunk releaseOnce sync.Once } @@ -103,13 +118,13 @@ func (a *stacktraceAppender) AppendStacktrace(dst []uint32, s []*v1.Stacktrace) misses int ) - a.mapping.stacktraceMutex.RLock() + a.partition.stacktraceMutex.RLock() for i, x := range s { - if dst[i], found = a.mapping.stacktraceHashToID[hashLocations(x.LocationIDs)]; !found { + if dst[i], found = a.partition.stacktraceHashToID[hashLocations(x.LocationIDs)]; !found { misses++ } } - a.mapping.stacktraceMutex.RUnlock() + a.partition.stacktraceMutex.RUnlock() if misses == 0 { return } @@ -125,10 +140,10 @@ func (a *stacktraceAppender) AppendStacktrace(dst []uint32, s []*v1.Stacktrace) // Instead of inserting stacks one by one, it is better to // build a tree, and merge it to the existing one. - a.mapping.stacktraceMutex.Lock() - defer a.mapping.stacktraceMutex.Unlock() + a.partition.stacktraceMutex.Lock() + defer a.partition.stacktraceMutex.Unlock() - m := int(a.mapping.maxNodesPerChunk) + m := int(a.partition.maxNodesPerChunk) t, j := a.chunk.tree, a.chunk.stid for i, v := range dst[:len(s)] { if v != 0 { @@ -142,7 +157,7 @@ func (a *stacktraceAppender) AppendStacktrace(dst []uint32, s []*v1.Stacktrace) // If we're close to the max nodes limit and can // potentially exceed it, we take the next chunk, // even if there are some space. - a.chunk = a.mapping.stacktraceChunkForInsert(len(x)) + a.chunk = a.partition.stacktraceChunkForInsert(len(x)) t, j = a.chunk.tree, a.chunk.stid } @@ -150,7 +165,7 @@ func (a *stacktraceAppender) AppendStacktrace(dst []uint32, s []*v1.Stacktrace) // we don't need to check the map. id = t.insert(x) + j h := hashLocations(x) - a.mapping.stacktraceHashToID[h] = id + a.partition.stacktraceHashToID[h] = id dst[i] = id } } @@ -174,7 +189,7 @@ func hashLocations(s []uint64) uint64 { } type stacktraceResolverMemory struct { - mapping *inMemoryMapping + partition *inMemoryPartition } const defaultStacktraceDepth = 64 @@ -196,7 +211,7 @@ func (p *stacktraceLocationsPool) put(x []int32) { func (r *stacktraceResolverMemory) ResolveStacktraces(_ context.Context, dst StacktraceInserter, stacktraces []uint32) (err error) { // TODO(kolesnikovae): Add option to do resolve concurrently. // Depends on StacktraceInserter implementation. - for _, sr := range SplitStacktraces(stacktraces, r.mapping.maxNodesPerChunk) { + for _, sr := range SplitStacktraces(stacktraces, r.partition.maxNodesPerChunk) { if err = r.ResolveStacktracesChunk(dst, sr); err != nil { return err } @@ -212,10 +227,10 @@ func (r *stacktraceResolverMemory) ResolveStacktraces(_ context.Context, dst Sta // the options, the package provides. func (r *stacktraceResolverMemory) ResolveStacktracesChunk(dst StacktraceInserter, sr StacktracesRange) error { - r.mapping.stacktraceMutex.RLock() - c, found := r.mapping.stacktraceChunkForRead(int(sr.chunk)) + r.partition.stacktraceMutex.RLock() + c, found := r.partition.stacktraceChunkForRead(int(sr.chunk)) if !found { - r.mapping.stacktraceMutex.RUnlock() + r.partition.stacktraceMutex.RUnlock() return ErrInvalidStacktraceRange } t := stacktraceTree{nodes: c.tree.nodes} @@ -227,7 +242,7 @@ func (r *stacktraceResolverMemory) ResolveStacktracesChunk(dst StacktraceInserte // races when the slice grows: in the worst case, the underlying // capacity will be retained and thus not be eligible for GC during // the call. - r.mapping.stacktraceMutex.RUnlock() + r.partition.stacktraceMutex.RUnlock() s := stacktraceLocations.get() // Restore the original stacktrace ID. off := sr.offset() diff --git a/pkg/phlaredb/symdb/mapping_memory_test.go b/pkg/phlaredb/symdb/partition_memory_test.go similarity index 95% rename from pkg/phlaredb/symdb/mapping_memory_test.go rename to pkg/phlaredb/symdb/partition_memory_test.go index 316703f6d9..82fc9b0bb9 100644 --- a/pkg/phlaredb/symdb/mapping_memory_test.go +++ b/pkg/phlaredb/symdb/partition_memory_test.go @@ -22,7 +22,7 @@ func Test_StacktraceAppender_shards(t *testing.T) { }, }) - w := db.MappingWriter(0) + w := db.SymbolsAppender(0) a := w.StacktraceAppender() defer a.Release() @@ -48,8 +48,8 @@ func Test_StacktraceAppender_shards(t *testing.T) { }) assert.Equal(t, []uint32{18}, sids[:1]) - require.Len(t, db.mappings, 1) - m := db.mappings[0] + require.Len(t, db.partitions, 1) + m := db.partitions[0] require.Len(t, m.stacktraceChunks, 3) c1 := m.stacktraceChunks[0] @@ -67,7 +67,7 @@ func Test_StacktraceAppender_shards(t *testing.T) { t.Run("WithoutMaxStacktraceTreeNodesPerChunk", func(t *testing.T) { db := NewSymDB(new(Config)) - w := db.MappingWriter(0) + w := db.SymbolsAppender(0) a := w.StacktraceAppender() defer a.Release() @@ -81,8 +81,8 @@ func Test_StacktraceAppender_shards(t *testing.T) { }) assert.Equal(t, []uint32{3, 2, 4, 5, 6}, sids) - require.Len(t, db.mappings, 1) - m := db.mappings[0] + require.Len(t, db.partitions, 1) + m := db.partitions[0] require.Len(t, m.stacktraceChunks, 1) c1 := m.stacktraceChunks[0] @@ -166,7 +166,7 @@ func Test_StacktraceResolver_stacktraces_split(t *testing.T) { func Test_Stacktrace_append_existing(t *testing.T) { db := NewSymDB(new(Config)) - w := db.MappingWriter(0) + w := db.SymbolsAppender(0) a := w.StacktraceAppender() defer a.Release() sids := make([]uint32, 2) @@ -185,7 +185,7 @@ func Test_Stacktrace_append_existing(t *testing.T) { func Test_Stacktrace_append_empty(t *testing.T) { db := NewSymDB(new(Config)) - w := db.MappingWriter(0) + w := db.SymbolsAppender(0) a := w.StacktraceAppender() defer a.Release() @@ -205,7 +205,7 @@ func Test_Stacktraces_append_resolve(t *testing.T) { t.Run("single chunk", func(t *testing.T) { db := NewSymDB(new(Config)) - w := db.MappingWriter(0) + w := db.SymbolsAppender(0) a := w.StacktraceAppender() defer a.Release() @@ -218,7 +218,7 @@ func Test_Stacktraces_append_resolve(t *testing.T) { {LocationIDs: []uint64{5, 2, 1}}, }) - mr, _ := db.MappingReader(0) + mr, _ := db.SymbolsResolver(0) r := mr.StacktraceResolver() defer r.Release() dst := new(mockStacktraceInserter) @@ -237,7 +237,7 @@ func Test_Stacktraces_append_resolve(t *testing.T) { }, }) - w := db.MappingWriter(0) + w := db.SymbolsAppender(0) a := w.StacktraceAppender() defer a.Release() @@ -274,10 +274,10 @@ func Test_Stacktraces_append_resolve(t *testing.T) { */ sids := make([]uint32, len(stacktraces)) a.AppendStacktrace(sids, stacktraces) - require.Len(t, db.mappings[0].stacktraceChunks, 6) + require.Len(t, db.partitions[0].stacktraceChunks, 6) t.Run("adjacent shards at beginning", func(t *testing.T) { - mr, _ := db.MappingReader(0) + mr, _ := db.SymbolsResolver(0) r := mr.StacktraceResolver() defer r.Release() dst := new(mockStacktraceInserter) @@ -290,7 +290,7 @@ func Test_Stacktraces_append_resolve(t *testing.T) { }) t.Run("adjacent shards at end", func(t *testing.T) { - mr, _ := db.MappingReader(0) + mr, _ := db.SymbolsResolver(0) r := mr.StacktraceResolver() defer r.Release() dst := new(mockStacktraceInserter) @@ -303,7 +303,7 @@ func Test_Stacktraces_append_resolve(t *testing.T) { }) t.Run("non-adjacent shards", func(t *testing.T) { - mr, _ := db.MappingReader(0) + mr, _ := db.SymbolsResolver(0) r := mr.StacktraceResolver() defer r.Release() dst := new(mockStacktraceInserter) @@ -348,13 +348,13 @@ func Test_Stacktraces_memory_resolve_pprof(t *testing.T) { sids := make([]uint32, len(stacktraces)) db := NewSymDB(new(Config)) - mw := db.MappingWriter(0) + mw := db.SymbolsAppender(0) a := mw.StacktraceAppender() defer a.Release() a.AppendStacktrace(sids, stacktraces) - mr, ok := db.MappingReader(0) + mr, ok := db.SymbolsResolver(0) require.True(t, ok) r := mr.StacktraceResolver() defer r.Release() @@ -378,13 +378,13 @@ func Test_Stacktraces_memory_resolve_chunked(t *testing.T) { }, } db := NewSymDB(cfg) - mw := db.MappingWriter(0) + mw := db.SymbolsAppender(0) a := mw.StacktraceAppender() defer a.Release() a.AppendStacktrace(sids, stacktraces) - mr, ok := db.MappingReader(0) + mr, ok := db.SymbolsResolver(0) require.True(t, ok) r := mr.StacktraceResolver() defer r.Release() @@ -417,7 +417,7 @@ func Test_Stacktraces_memory_resolve_concurrency(t *testing.T) { // Allocate stacktrace IDs. sids := make([]uint32, len(stacktraces)) db := NewSymDB(cfg) - mw := db.MappingWriter(0) + mw := db.SymbolsAppender(0) a := mw.StacktraceAppender() a.AppendStacktrace(sids, stacktraces) a.Release() @@ -438,7 +438,7 @@ func Test_Stacktraces_memory_resolve_concurrency(t *testing.T) { go func() { defer wg.Done() - a := db.MappingWriter(0).StacktraceAppender() + a := db.SymbolsAppender(0).StacktraceAppender() defer a.Release() for j := 0; j < appends; j++ { @@ -452,7 +452,7 @@ func Test_Stacktraces_memory_resolve_concurrency(t *testing.T) { go func() { defer wg.Done() - mr, ok := db.MappingReader(0) + mr, ok := db.SymbolsResolver(0) if !ok { return } diff --git a/pkg/phlaredb/symdb/stacktrace_tree_test.go b/pkg/phlaredb/symdb/stacktrace_tree_test.go index b8e3d776ae..94f1796844 100644 --- a/pkg/phlaredb/symdb/stacktrace_tree_test.go +++ b/pkg/phlaredb/symdb/stacktrace_tree_test.go @@ -88,7 +88,7 @@ func Test_stacktrace_tree_encoding_group(t *testing.T) { } func Test_stacktrace_tree_encoding_rand(t *testing.T) { - // TODO: Fuzzing. With random data it's easy to hit overflow. + // TODO: Fuzzing. nodes := make([]node, 1<<20) for i := range nodes { nodes[i] = node{ diff --git a/pkg/phlaredb/symdb/mapping_reader_file.go b/pkg/phlaredb/symdb/symbols_reader_file.go similarity index 55% rename from pkg/phlaredb/symdb/mapping_reader_file.go rename to pkg/phlaredb/symdb/symbols_reader_file.go index 1aa2dbdf41..c2b8167be0 100644 --- a/pkg/phlaredb/symdb/mapping_reader_file.go +++ b/pkg/phlaredb/symdb/symbols_reader_file.go @@ -6,6 +6,7 @@ import ( "fmt" "hash/crc32" "io" + "sort" "sync" "github.com/grafana/dskit/multierror" @@ -15,23 +16,23 @@ import ( ) var ( - _ MappingReader = (*mappingFileReader)(nil) + _ SymbolsResolver = (*partitionFileReader)(nil) _ StacktraceResolver = (*stacktraceResolverFile)(nil) ) type Reader struct { bucket objstore.BucketReader - maxConcurrentChunkFetch int - chunkFetchBufferSize int + maxConcurrentChunks int + chunkFetchBufferSize int - idx IndexFile - mappings map[uint64]*mappingFileReader + idx IndexFile + partitions map[uint64]*partitionFileReader } const ( - defaultMaxConcurrentChunkFetch = 8 - defaultChunkFetchBufferSize = 4096 + defaultMaxConcurrentChunks = 1 + defaultChunkFetchBufferSize = 4096 ) // NOTE(kolesnikovae): @@ -41,16 +42,16 @@ const ( type ReaderConfig struct { BucketReader objstore.BucketReader - MaxConcurrentChunkFetch int - ChunkFetchBufferSize int + MaxConcurrentChunks int + ChunkFetchBufferSize int } func Open(ctx context.Context, b objstore.BucketReader) (*Reader, error) { r := Reader{ bucket: b, - maxConcurrentChunkFetch: defaultMaxConcurrentChunkFetch, - chunkFetchBufferSize: defaultChunkFetchBufferSize, + maxConcurrentChunks: defaultMaxConcurrentChunks, + chunkFetchBufferSize: defaultChunkFetchBufferSize, } if err := r.open(ctx); err != nil { return nil, err @@ -71,46 +72,96 @@ func (r *Reader) open(ctx context.Context) error { return err } // TODO(kolesnikovae): Load in a smarter way as headers are ordered. - r.mappings = make(map[uint64]*mappingFileReader, len(r.idx.StacktraceChunkHeaders.Entries)/3) + r.partitions = make(map[uint64]*partitionFileReader, len(r.idx.StacktraceChunkHeaders.Entries)/3) for _, h := range r.idx.StacktraceChunkHeaders.Entries { - r.mapping(h.MappingName).addStacktracesChunk(h) + r.partition(h.Partition).addStacktracesChunk(h) } return nil } -func (r *Reader) mapping(n uint64) *mappingFileReader { - if m, ok := r.mappings[n]; ok { +func (r *Reader) partition(n uint64) *partitionFileReader { + if m, ok := r.partitions[n]; ok { return m } - m := &mappingFileReader{reader: r} - r.mappings[n] = m + m := &partitionFileReader{reader: r} + r.partitions[n] = m return m } -func (r *Reader) MappingReader(mappingName uint64) (MappingReader, bool) { - m, ok := r.mappings[mappingName] +func (r *Reader) SymbolsResolver(partition uint64) (SymbolsResolver, bool) { + m, ok := r.partitions[partition] return m, ok } -type mappingFileReader struct { +// Load causes reader to load all contents into memory. +func (r *Reader) Load(ctx context.Context) error { + partitions := make([]*partitionFileReader, len(r.partitions)) + var i int + for _, v := range r.partitions { + partitions[i] = v + i++ + } + sort.Slice(partitions, func(i, j int) bool { + return partitions[i].stacktraceChunks[0].header.Offset < + partitions[j].stacktraceChunks[0].header.Offset + }) + + offset := partitions[0].stacktraceChunks[0].header.Offset + var size int64 + for i = range partitions { + for _, c := range partitions[i].stacktraceChunks { + size += c.header.Size + } + } + + rc, err := r.bucket.GetRange(ctx, StacktracesFileName, offset, size) + if err != nil { + return err + } + defer func() { + err = multierror.New(err, rc.Close()).Err() + }() + + buf := bufio.NewReaderSize(rc, r.chunkFetchBufferSize) + for _, p := range partitions { + for _, c := range p.stacktraceChunks { + if err = c.readFrom(io.LimitReader(buf, c.header.Size)); err != nil { + return err + } + } + } + + return nil +} + +type partitionFileReader struct { reader *Reader stacktraceChunks []*stacktraceChunkFileReader } -func (m *mappingFileReader) StacktraceResolver() StacktraceResolver { +func (m *partitionFileReader) StacktraceResolver() StacktraceResolver { return &stacktraceResolverFile{ - mapping: m, + partition: m, } } -func (m *mappingFileReader) addStacktracesChunk(h StacktraceChunkHeader) { +func (m *partitionFileReader) WriteStats(s *Stats) { + var nodes uint32 + for _, c := range m.stacktraceChunks { + s.StacktracesTotal += int(c.header.Stacktraces) + nodes += c.header.StacktraceNodes + } + s.MaxStacktraceID = int(nodes) +} + +func (m *partitionFileReader) addStacktracesChunk(h StacktraceChunkHeader) { m.stacktraceChunks = append(m.stacktraceChunks, &stacktraceChunkFileReader{ reader: m.reader, header: h, }) } -func (m *mappingFileReader) stacktraceChunkReader(i uint32) *stacktraceChunkFileReader { +func (m *partitionFileReader) stacktraceChunkReader(i uint32) *stacktraceChunkFileReader { if int(i) < len(m.stacktraceChunks) { return m.stacktraceChunks[i] } @@ -118,7 +169,7 @@ func (m *mappingFileReader) stacktraceChunkReader(i uint32) *stacktraceChunkFile } type stacktraceResolverFile struct { - mapping *mappingFileReader + partition *partitionFileReader } func (r *stacktraceResolverFile) Release() {} @@ -126,37 +177,24 @@ func (r *stacktraceResolverFile) Release() {} var ErrInvalidStacktraceRange = fmt.Errorf("invalid range: stack traces can't be resolved") func (r *stacktraceResolverFile) ResolveStacktraces(ctx context.Context, dst StacktraceInserter, s []uint32) error { - if len(r.mapping.stacktraceChunks) == 0 { + if len(s) == 0 { + return nil + } + if len(r.partition.stacktraceChunks) == 0 { return ErrInvalidStacktraceRange } // First, we determine the chunks needed for the range. // All chunks in a block must have the same StacktraceMaxNodes. - sr := SplitStacktraces(s, r.mapping.stacktraceChunks[0].header.StacktraceMaxNodes) - - // TODO(kolesnikovae): - // Chunks are fetched concurrently, but inserted to dst sequentially, - // to avoid race condition on the implementation end: - // - Add maxConcurrentChunkResolve option that controls the behaviour. - // - Caching: already fetched chunks should be cached (serialized or not). + sr := SplitStacktraces(s, r.partition.stacktraceChunks[0].header.StacktraceMaxNodes) g, ctx := errgroup.WithContext(ctx) - g.SetLimit(r.mapping.reader.maxConcurrentChunkFetch) - rs := make([]*stacktracesResolve, len(sr)) - for i, c := range sr { - rs[i] = r.newResolve(ctx, dst, c) - g.Go(rs[i].fetch) - } - if err := g.Wait(); err != nil { - return err - } - - for _, cr := range rs { - cr.resolveStacktracesChunk(dst) - cr.release() + g.SetLimit(r.partition.reader.maxConcurrentChunks) + for _, c := range sr { + g.Go(r.newResolve(ctx, dst, c).do) } - return nil + return g.Wait() } func (r *stacktraceResolverFile) newResolve(ctx context.Context, dst StacktraceInserter, c StacktracesRange) *stacktracesResolve { @@ -179,8 +217,17 @@ type stacktracesResolve struct { c StacktracesRange } +func (r *stacktracesResolve) do() error { + if err := r.fetch(); err != nil { + return err + } + r.resolveStacktracesChunk(r.dst) + r.release() + return nil +} + func (r *stacktracesResolve) fetch() (err error) { - if r.cr = r.r.mapping.stacktraceChunkReader(r.c.chunk); r.cr == nil { + if r.cr = r.r.partition.stacktraceChunkReader(r.c.chunk); r.cr == nil { return ErrInvalidStacktraceRange } if r.t, err = r.cr.fetch(r.ctx); err != nil { @@ -207,6 +254,10 @@ type stacktraceChunkFileReader struct { header StacktraceChunkHeader m sync.Mutex tree *parentPointerTree + // Indicates that the chunk has been loaded into + // memory with Load call and should not be released + // until the block is closed. + loaded bool } func (c *stacktraceChunkFileReader) fetch(ctx context.Context) (_ *parentPointerTree, err error) { @@ -215,7 +266,6 @@ func (c *stacktraceChunkFileReader) fetch(ctx context.Context) (_ *parentPointer if c.tree != nil { return c.tree, nil } - rc, err := c.reader.bucket.GetRange(ctx, StacktracesFileName, c.header.Offset, c.header.Size) if err != nil { return nil, err @@ -223,36 +273,41 @@ func (c *stacktraceChunkFileReader) fetch(ctx context.Context) (_ *parentPointer defer func() { err = multierror.New(err, rc.Close()).Err() }() + // Consider pooling the buffer. + buf := bufio.NewReaderSize(rc, c.reader.chunkFetchBufferSize) + if err = c.readFrom(buf); err != nil { + return nil, err + } + return c.tree, nil +} +func (c *stacktraceChunkFileReader) readFrom(r io.Reader) error { // NOTE(kolesnikovae): Pool of node chunks could reduce // the alloc size, but it may affect memory locality. // Although, properly aligned chunks of, say, 1-4K nodes // which is 8-32KiB respectively, should not make things // much worse than they are. Worth experimenting. t := newParentPointerTree(c.header.StacktraceNodes) - // We unmarshal the tree speculatively, before validating // the checksum. Even random bytes can be unmarshalled to // a tree not causing any errors, therefore it is vital // to verify the correctness of the data. crc := crc32.New(castagnoli) - tee := io.TeeReader(rc, crc) - - // Consider pooling the buffer. - buf := bufio.NewReaderSize(tee, c.reader.chunkFetchBufferSize) - if _, err = t.ReadFrom(buf); err != nil { - return nil, fmt.Errorf("failed to unmarshal stack treaces: %w", err) + tee := io.TeeReader(r, crc) + if _, err := t.ReadFrom(tee); err != nil { + return fmt.Errorf("failed to unmarshal stack treaces: %w", err) } if c.header.CRC != crc.Sum32() { - return nil, ErrInvalidCRC + return ErrInvalidCRC } - c.tree = t - return t, nil + return nil } func (c *stacktraceChunkFileReader) reset() { c.m.Lock() - c.tree = nil + if !c.loaded { + c.tree = nil + } c.m.Unlock() } diff --git a/pkg/phlaredb/symdb/mapping_reader_file_test.go b/pkg/phlaredb/symdb/symbols_reader_file_test.go similarity index 95% rename from pkg/phlaredb/symdb/mapping_reader_file_test.go rename to pkg/phlaredb/symdb/symbols_reader_file_test.go index af46f9bf10..03be33139e 100644 --- a/pkg/phlaredb/symdb/mapping_reader_file_test.go +++ b/pkg/phlaredb/symdb/symbols_reader_file_test.go @@ -19,7 +19,7 @@ func Test_Reader_Open(t *testing.T) { } db := NewSymDB(cfg) - w := db.MappingWriter(1) + w := db.SymbolsAppender(1) a := w.StacktraceAppender() sids := make([]uint32, 5) a.AppendStacktrace(sids, []*schemav1.Stacktrace{ @@ -37,7 +37,7 @@ func Test_Reader_Open(t *testing.T) { require.NoError(t, err) x, err := Open(context.Background(), b) require.NoError(t, err) - mr, ok := x.MappingReader(1) + mr, ok := x.SymbolsResolver(1) require.True(t, ok) dst := new(mockStacktraceInserter) diff --git a/pkg/phlaredb/symdb/mapping_writer_file.go b/pkg/phlaredb/symdb/symbols_writer_file.go similarity index 96% rename from pkg/phlaredb/symdb/mapping_writer_file.go rename to pkg/phlaredb/symdb/symbols_writer_file.go index d14e741080..d0f2336c2a 100644 --- a/pkg/phlaredb/symdb/mapping_writer_file.go +++ b/pkg/phlaredb/symdb/symbols_writer_file.go @@ -38,13 +38,13 @@ func (w *Writer) writeStacktraceChunk(ci int, c *stacktraceChunk) (err error) { h := StacktraceChunkHeader{ Offset: w.scd.w.offset, Size: 0, // Set later. - MappingName: c.mapping.name, + Partition: c.parition.name, ChunkIndex: uint16(ci), ChunkEncoding: ChunkEncodingGroupVarint, - Stacktraces: 0, // TODO + Stacktraces: c.stacks, StacktraceNodes: c.tree.len(), StacktraceMaxDepth: 0, // TODO - StacktraceMaxNodes: c.mapping.maxNodesPerChunk, + StacktraceMaxNodes: c.parition.maxNodesPerChunk, CRC: 0, // Set later. } crc := crc32.New(castagnoli) diff --git a/pkg/phlaredb/symdb/mapping_writer_file_test.go b/pkg/phlaredb/symdb/symbols_writer_file_test.go similarity index 84% rename from pkg/phlaredb/symdb/mapping_writer_file_test.go rename to pkg/phlaredb/symdb/symbols_writer_file_test.go index 5f031d3fe9..0e38cd0904 100644 --- a/pkg/phlaredb/symdb/mapping_writer_file_test.go +++ b/pkg/phlaredb/symdb/symbols_writer_file_test.go @@ -22,7 +22,7 @@ func Test_Writer_IndexFile(t *testing.T) { sids := make([]uint32, 5) - w := db.MappingWriter(0) + w := db.SymbolsAppender(0) a := w.StacktraceAppender() a.AppendStacktrace(sids, []*schemav1.Stacktrace{ {LocationIDs: []uint64{3, 2, 1}}, @@ -34,7 +34,7 @@ func Test_Writer_IndexFile(t *testing.T) { assert.Equal(t, []uint32{3, 2, 11, 16, 18}, sids) a.Release() - w = db.MappingWriter(1) + w = db.SymbolsAppender(1) a = w.StacktraceAppender() a.AppendStacktrace(sids, []*schemav1.Stacktrace{ {LocationIDs: []uint64{3, 2, 1}}, @@ -46,9 +46,9 @@ func Test_Writer_IndexFile(t *testing.T) { assert.Equal(t, []uint32{3, 2, 11, 16, 18}, sids) a.Release() - require.Len(t, db.mappings, 2) - require.Len(t, db.mappings[0].stacktraceChunks, 3) - require.Len(t, db.mappings[1].stacktraceChunks, 3) + require.Len(t, db.partitions, 2) + require.Len(t, db.partitions[0].stacktraceChunks, 3) + require.Len(t, db.partitions[1].stacktraceChunks, 3) require.NoError(t, db.Flush()) @@ -75,10 +75,10 @@ func Test_Writer_IndexFile(t *testing.T) { { Offset: 0, Size: 10, - MappingName: 0x0, + Partition: 0x0, ChunkIndex: 0x0, ChunkEncoding: 0x1, - Stacktraces: 0x0, + Stacktraces: 0x2, StacktraceNodes: 0x4, StacktraceMaxDepth: 0x0, StacktraceMaxNodes: 0x7, @@ -87,10 +87,10 @@ func Test_Writer_IndexFile(t *testing.T) { { Offset: 10, Size: 15, - MappingName: 0x0, + Partition: 0x0, ChunkIndex: 0x1, ChunkEncoding: 0x1, - Stacktraces: 0x0, + Stacktraces: 0x1, StacktraceNodes: 0x5, StacktraceMaxDepth: 0x0, StacktraceMaxNodes: 0x7, @@ -99,10 +99,10 @@ func Test_Writer_IndexFile(t *testing.T) { { Offset: 25, Size: 15, - MappingName: 0x0, + Partition: 0x0, ChunkIndex: 0x2, ChunkEncoding: 0x1, - Stacktraces: 0x0, + Stacktraces: 0x3, StacktraceNodes: 0x5, StacktraceMaxDepth: 0x0, StacktraceMaxNodes: 0x7, @@ -111,10 +111,10 @@ func Test_Writer_IndexFile(t *testing.T) { { Offset: 40, Size: 10, - MappingName: 0x1, + Partition: 0x1, ChunkIndex: 0x0, ChunkEncoding: 0x1, - Stacktraces: 0x0, + Stacktraces: 0x2, StacktraceNodes: 0x4, StacktraceMaxDepth: 0x0, StacktraceMaxNodes: 0x7, @@ -123,10 +123,10 @@ func Test_Writer_IndexFile(t *testing.T) { { Offset: 50, Size: 15, - MappingName: 0x1, + Partition: 0x1, ChunkIndex: 0x1, ChunkEncoding: 0x1, - Stacktraces: 0x0, + Stacktraces: 0x1, StacktraceNodes: 0x5, StacktraceMaxDepth: 0x0, StacktraceMaxNodes: 0x7, @@ -135,10 +135,10 @@ func Test_Writer_IndexFile(t *testing.T) { { Offset: 65, Size: 15, - MappingName: 0x1, + Partition: 0x1, ChunkIndex: 0x2, ChunkEncoding: 0x1, - Stacktraces: 0x0, + Stacktraces: 0x3, StacktraceNodes: 0x5, StacktraceMaxDepth: 0x0, StacktraceMaxNodes: 0x7, @@ -146,7 +146,7 @@ func Test_Writer_IndexFile(t *testing.T) { }, }, }, - CRC: 0x5bbecabf, + CRC: 0x6418eaed, } assert.Equal(t, expected, idx) diff --git a/pkg/phlaredb/symdb/symdb.go b/pkg/phlaredb/symdb/symdb.go index 9c6e8ba3ca..38a640dd15 100644 --- a/pkg/phlaredb/symdb/symdb.go +++ b/pkg/phlaredb/symdb/symdb.go @@ -12,8 +12,8 @@ type SymDB struct { writer *Writer stats stats - m sync.RWMutex - mappings map[uint64]*inMemoryMapping + m sync.RWMutex + partitions map[uint64]*inMemoryPartition wg sync.WaitGroup stop chan struct{} @@ -32,7 +32,7 @@ const statsUpdateInterval = 10 * time.Second type stats struct { memorySize atomic.Uint64 - mappings atomic.Uint32 + partitions atomic.Uint32 } func DefaultConfig() *Config { @@ -57,27 +57,27 @@ func NewSymDB(c *Config) *SymDB { c = DefaultConfig() } db := &SymDB{ - config: c, - writer: NewWriter(c.Dir), - mappings: make(map[uint64]*inMemoryMapping), - stop: make(chan struct{}), + config: c, + writer: NewWriter(c.Dir), + partitions: make(map[uint64]*inMemoryPartition), + stop: make(chan struct{}), } db.wg.Add(1) go db.updateStats() return db } -func (s *SymDB) MappingWriter(mappingName uint64) MappingWriter { - return s.mapping(mappingName) +func (s *SymDB) SymbolsAppender(partition uint64) SymbolsAppender { + return s.partition(partition) } -func (s *SymDB) MappingReader(mappingName uint64) (MappingReader, bool) { - return s.lookupMapping(mappingName) +func (s *SymDB) SymbolsResolver(partition uint64) (SymbolsResolver, bool) { + return s.lookupPartition(partition) } -func (s *SymDB) lookupMapping(mappingName uint64) (*inMemoryMapping, bool) { +func (s *SymDB) lookupPartition(partition uint64) (*inMemoryPartition, bool) { s.m.RLock() - p, ok := s.mappings[mappingName] + p, ok := s.partitions[partition] if ok { s.m.RUnlock() return p, true @@ -86,26 +86,26 @@ func (s *SymDB) lookupMapping(mappingName uint64) (*inMemoryMapping, bool) { return nil, false } -func (s *SymDB) mapping(mappingName uint64) *inMemoryMapping { - p, ok := s.lookupMapping(mappingName) +func (s *SymDB) partition(partition uint64) *inMemoryPartition { + p, ok := s.lookupPartition(partition) if ok { return p } s.m.Lock() - if p, ok = s.mappings[mappingName]; ok { + if p, ok = s.partitions[partition]; ok { s.m.Unlock() return p } - p = &inMemoryMapping{ - name: mappingName, + p = &inMemoryPartition{ + name: partition, maxNodesPerChunk: s.config.Stacktraces.MaxNodesPerChunk, stacktraceHashToID: make(map[uint64]uint32, defaultStacktraceTreeSize/2), } p.stacktraceChunks = append(p.stacktraceChunks, &stacktraceChunk{ - tree: newStacktraceTree(defaultStacktraceTreeSize), - mapping: p, + tree: newStacktraceTree(defaultStacktraceTreeSize), + parition: p, }) - s.mappings[mappingName] = p + s.partitions[partition] = p s.m.Unlock() return p } @@ -114,9 +114,9 @@ func (s *SymDB) Flush() error { close(s.stop) s.wg.Wait() s.m.RLock() - m := make([]*inMemoryMapping, len(s.mappings)) + m := make([]*inMemoryPartition, len(s.partitions)) var i int - for _, v := range s.mappings { + for _, v := range s.partitions { m[i] = v i++ } @@ -156,7 +156,7 @@ func (s *SymDB) updateStats() { return case <-t.C: s.m.RLock() - s.stats.mappings.Store(uint32(len(s.mappings))) + s.stats.partitions.Store(uint32(len(s.partitions))) s.stats.memorySize.Store(uint64(s.calculateMemoryFootprint())) s.m.RUnlock() } @@ -165,7 +165,7 @@ func (s *SymDB) updateStats() { // calculateMemoryFootprint estimates the memory footprint. func (s *SymDB) calculateMemoryFootprint() (v int) { - for _, m := range s.mappings { + for _, m := range s.partitions { m.stacktraceMutex.RLock() v += len(m.stacktraceChunkHeaders) * stacktraceChunkHeaderSize for _, c := range m.stacktraceChunks { diff --git a/pkg/phlaredb/tsdb/index/index.go b/pkg/phlaredb/tsdb/index/index.go index a1612b261e..5469252885 100644 --- a/pkg/phlaredb/tsdb/index/index.go +++ b/pkg/phlaredb/tsdb/index/index.go @@ -1334,9 +1334,21 @@ func (r *Reader) Version() int { // FileInfo returns some general stats about the underlying file func (r *Reader) FileInfo() block.File { + k, v := AllPostingsKey() + postings, err := r.Postings(k, nil, v) + if err != nil { + panic(err) + } + var numSeries uint64 + for postings.Next() { + numSeries++ + } return block.File{ RelPath: block.IndexFilename, SizeBytes: uint64(r.Size()), + TSDB: &block.TSDBFile{ + NumSeries: numSeries, + }, } }