Skip to content

Commit

Permalink
implement metadata label querier
Browse files Browse the repository at this point in the history
  • Loading branch information
kolesnikovae committed Dec 9, 2024
1 parent 08c2a17 commit fc78a67
Show file tree
Hide file tree
Showing 7 changed files with 244 additions and 170 deletions.
108 changes: 24 additions & 84 deletions pkg/experiment/metastore/index/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,21 @@ import (
"fmt"
"math"
"slices"
"strings"
"sync"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/oklog/ulid"
"github.com/prometheus/prometheus/model/labels"
"go.etcd.io/bbolt"

metastorev1 "github.com/grafana/pyroscope/api/gen/proto/go/metastore/v1"
typesv1 "github.com/grafana/pyroscope/api/gen/proto/go/types/v1"
"github.com/grafana/pyroscope/pkg/experiment/block"
"github.com/grafana/pyroscope/pkg/experiment/metastore/index/store"
"github.com/grafana/pyroscope/pkg/iter"
phlaremodel "github.com/grafana/pyroscope/pkg/model"
)

const (
Expand Down Expand Up @@ -428,44 +430,41 @@ func (i *Index) GetTenantStats(tenant string) *metastorev1.TenantStats {
return stats
}

// TODO(kolesnikovae): We query meta with the mutex held, which
// will cause contention and latency issues. Fix it once we make
// locks more granular (partition-tenant-shard level).

func (i *Index) QueryMetadata(tx *bbolt.Tx, query MetadataQuery) iter.Iterator[*metastorev1.BlockMeta] {
q, err := newMetadataQuery(i, query)
if err != nil {
return iter.NewErrIterator[*metastorev1.BlockMeta](err)
}
// Currently, we only inspect the service name label.
// We could extend this to match any labels of a dataset.
q.matchers = slices.DeleteFunc(q.matchers, func(m *labels.Matcher) bool {
return m.Name != phlaremodel.LabelNameServiceName
})
i.mu.Lock()
defer i.mu.Unlock()
// TODO(kolesnikovae): We collect blocks with the mutex held, which
// will cause contention and latency issues. Fix it once we make
// locks more granular (partition-tenant-shard level).
metas, err := iter.Slice[*metastorev1.BlockMeta](q.iterator(tx))
metas, err := iter.Slice[*metastorev1.BlockMeta](newBlockMetadataIterator(tx, q))
i.mu.Unlock()
if err != nil {
return iter.NewErrIterator[*metastorev1.BlockMeta](err)
}
return iter.NewSliceIterator(metas)
}

func (i *Index) shardIterator(tx *bbolt.Tx, startTime, endTime time.Time, tenants ...string) iter.Iterator[*indexShard] {
startTime = startTime.Add(-i.config.QueryLookaroundPeriod)
endTime = endTime.Add(i.config.QueryLookaroundPeriod)
si := shardIterator{
tx: tx,
partitions: make([]*store.Partition, 0, len(i.partitions)),
tenants: tenants,
index: i,
func (i *Index) QueryMetadataLabels(tx *bbolt.Tx, query MetadataLabelQuery) ([]*typesv1.Labels, error) {
q, err := newMetadataQuery(i, query.MetadataQuery)
if err != nil {
return nil, err
}
for _, p := range i.partitions {
if !p.Overlaps(startTime, endTime) {
continue
}
for _, t := range si.tenants {
if p.HasTenant(t) {
si.partitions = append(si.partitions, p)
break
}
}
i.mu.Lock()
r, err := newMetadataLabelQuerier(tx, q).queryLabels()
i.mu.Unlock()
if err != nil {
return nil, err
}
return &si
return r.Labels(), nil
}

func newIndexPartition(p *store.Partition, tenant string) *indexPartition {
Expand Down Expand Up @@ -518,62 +517,3 @@ func (s *indexShard) getBlock(blockID string) *metastorev1.BlockMeta {
s.TenantShard.StringTable.Export(mdCopy)
return mdCopy
}

type shardIterator struct {
tx *bbolt.Tx
index *Index
tenants []string
partitions []*store.Partition
shards []*indexShard
cur int
err error
}

func (si *shardIterator) Close() error { return nil }

func (si *shardIterator) Err() error { return si.err }

func (si *shardIterator) At() *indexShard { return si.shards[si.cur] }

func (si *shardIterator) Next() bool {
if n := si.cur + 1; n < len(si.shards) {
si.cur = n
return true
}
si.cur = 0
si.shards = si.shards[:0]
for len(si.shards) == 0 && len(si.partitions) > 0 {
si.loadShards(si.partitions[0])
si.partitions = si.partitions[1:]
}
return si.cur < len(si.shards)
}

func (si *shardIterator) loadShards(p *store.Partition) {
for _, t := range si.tenants {
shards := p.TenantShards[t]
if shards == nil {
continue
}
for s := range shards {
shard, err := si.index.getOrLoadTenantShard(si.tx, p, t, s)
if err != nil {
si.err = err
return
}
if shard != nil {
si.shards = append(si.shards, shard)
}
}
}
slices.SortFunc(si.shards, compareShards)
si.shards = slices.Compact(si.shards)
}

func compareShards(a, b *indexShard) int {
cmp := strings.Compare(a.Tenant, b.Tenant)
if cmp == 0 {
return int(a.Shard) - int(b.Shard)
}
return cmp
}
Loading

0 comments on commit fc78a67

Please sign in to comment.