Skip to content

Commit

Permalink
refactor metadata label compaction
Browse files Browse the repository at this point in the history
  • Loading branch information
kolesnikovae committed Dec 9, 2024
1 parent 3caa374 commit a8dfdb7
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 26 deletions.
34 changes: 10 additions & 24 deletions pkg/experiment/block/compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"sort"
"strings"
"sync"
"unsafe"

"github.com/grafana/dskit/multierror"
"github.com/parquet-go/parquet-go"
Expand Down Expand Up @@ -120,7 +119,12 @@ func PlanCompaction(objects Objects) ([]*CompactionPlan, error) {
for _, s := range obj.meta.Datasets {
tm, ok := m[obj.meta.StringTable[s.Tenant]]
if !ok {
tm = newBlockCompaction(g.ULID().String(), obj.meta.StringTable[s.Tenant], r.meta.Shard, level)
tm = newBlockCompaction(
g.ULID().String(),
obj.meta.StringTable[s.Tenant],
r.meta.Shard,
level,
)
m[obj.meta.StringTable[s.Tenant]] = tm
}
// Bind objects to datasets.
Expand Down Expand Up @@ -217,7 +221,7 @@ type datasetCompaction struct {
name string
parent *CompactionPlan
meta *metastorev1.Dataset
labels map[string]struct{}
labels *LabelBuilder
path string // Set at open.

datasets []*Dataset
Expand All @@ -237,7 +241,7 @@ func (b *CompactionPlan) newDatasetCompaction(tenant, name int32) *datasetCompac
return &datasetCompaction{
parent: b,
name: b.strings.Strings[name],
labels: make(map[string]struct{}),
labels: NewLabelBuilder(b.strings),
meta: &metastorev1.Dataset{
Tenant: tenant,
Name: name,
Expand All @@ -260,26 +264,7 @@ func (m *datasetCompaction) append(s *Dataset) {
if s.meta.MaxTime > m.meta.MaxTime {
m.meta.MaxTime = s.meta.MaxTime
}
m.addLabels(s)
}

func (m *datasetCompaction) addLabels(s *Dataset) {
var skip int
for i, v := range s.meta.Labels {
if i == skip {
skip += int(v)*2 + 1
continue
}
s.meta.Labels[i] = m.parent.strings.Put(s.obj.meta.StringTable[v])
}
// We only copy the labels if this is the first time we see it.
k := *(*string)(unsafe.Pointer(&s.meta.Labels))
// The fact that we assume that the order of labels
// is the same across all datasets is a precondition.
if _, ok := m.labels[k]; !ok {
m.labels[string(s.meta.Labels)] = struct{}{}
m.meta.Labels = append(m.meta.Labels, s.meta.Labels...)
}
m.labels.Put(s.meta.Labels, s.obj.meta.StringTable)
}

func (m *datasetCompaction) compact(ctx context.Context, w *Writer) (err error) {
Expand Down Expand Up @@ -416,6 +401,7 @@ func (m *datasetCompaction) writeTo(w *Writer) (err error) {
return err
}
m.meta.Size = w.Offset() - off
m.meta.Labels = m.labels.Build()
return nil
}

Expand Down
45 changes: 45 additions & 0 deletions pkg/experiment/block/metadata_labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,14 @@ import (
"github.com/grafana/pyroscope/pkg/model"
)

// TODO(kolesnikovae): LabelBuilder pool.

type LabelBuilder struct {
strings *MetadataStrings
labels []int32
constant []int32
keys []int32
seen map[string]struct{}
}

func NewLabelBuilder(strings *MetadataStrings) *LabelBuilder {
Expand Down Expand Up @@ -59,6 +62,48 @@ func (lb *LabelBuilder) CreateLabels(values ...string) bool {
return true
}

func (lb *LabelBuilder) Put(x []int32, strings []string) {
if len(x) == 0 {
return
}
if lb.seen == nil {
lb.seen = make(map[string]struct{})
}
var skip int
for i, v := range x {
if i == skip {
skip += int(v)*2 + 1
continue
}
x[i] = lb.strings.Put(strings[v])
}
lb.labels = slices.Grow(lb.labels, len(x))
pairs := LabelPairs(x)
for pairs.Next() {
lb.putPairs(pairs.At())
}
}

func (lb *LabelBuilder) putPairs(p []int32) {
if len(p) == 0 {
return
}
// We only copy the labels if this is the first time we see it.
// The fact that we assume that the order of labels is the same
// across all datasets is a precondition, therefore, we can
// use pairs as a key.
k := *(*string)(unsafe.Pointer(&p))
if _, ok := lb.seen[k]; ok {
return
}
lb.labels = append(lb.labels, int32(len(p)/2))
off := len(lb.labels)
lb.labels = append(lb.labels, p...)
v := lb.labels[off:]
k = *(*string)(unsafe.Pointer(&v))
lb.seen[k] = struct{}{}
}

func (lb *LabelBuilder) Build() []int32 {
c := make([]int32, len(lb.labels))
copy(c, lb.labels)
Expand Down
20 changes: 19 additions & 1 deletion pkg/experiment/block/metadata_labels_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"github.com/grafana/pyroscope/pkg/model"
)

func TestLabelBuilder_Build(t *testing.T) {
func TestLabelBuilder_CreateLabels(t *testing.T) {
strings := NewMetadataStringTable()
b := NewLabelBuilder(strings).
WithConstantPairs("foo", "0").
Expand Down Expand Up @@ -60,6 +60,24 @@ func TestLabelBuilder_Reuse(t *testing.T) {
}, labelStrings(b.Build(), strings))
}

func TestLabelBuilder_Put(t *testing.T) {
strings := NewMetadataStringTable()
b := NewLabelBuilder(strings)

// a=b, a=b; a=b, a=b;
b.Put([]int32{2, 1, 2, 1, 2, 2, 1, 2, 1, 2}, []string{"", "a", "b"})
b.Put([]int32{2, 1, 2, 1, 2, 2, 1, 2, 1, 2}, []string{"", "a", "b"})

// c=d, c=d; c=d, c=d;
b.Put([]int32{2, 1, 2, 1, 2, 2, 1, 2, 1, 2}, []string{"", "c", "d"})
b.Put([]int32{2, 1, 2, 1, 2}, []string{"", "c", "d"})

assert.Equal(t, []int32{
2, 1, 2, 1, 2,
2, 3, 4, 3, 4,
}, b.Build())
}

func labelStrings(v []int32, s *MetadataStrings) []string {
var ls []string
pairs := LabelPairs(v)
Expand Down
9 changes: 8 additions & 1 deletion pkg/experiment/metastore/index/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,12 +201,19 @@ func datasetMatches(
return false
}
pairs := block.LabelPairs(ds.Labels)
var matches bool
for pairs.Next() {
if m.Matches(pairs.At()) {
matches = true
}
// If no labels are specified, we can return early.
// Otherwise, we need to scan all the datasets to
// collect the labels.
if matches && len(q.labels) == 0 {
return true
}
}
return false
return matches
}

func newMetadataLabelQuerier(tx *bbolt.Tx, q *metadataQuery) *metadataLabelQuerier {
Expand Down

0 comments on commit a8dfdb7

Please sign in to comment.