Skip to content

Commit

Permalink
[PERF] TSDB: Optimize inverse matching (#14144)
Browse files Browse the repository at this point in the history
Simple follow-up to #13620. Modify `tsdb.PostingsForMatchers` to use the optimized tsdb.IndexReader.PostingsForLabelMatching method also for inverse matching.

Introduce method `PostingsForAllLabelValues`, to avoid changing the existing method.

The performance is much improved for a subset of the cases; there are up to
~60% CPU gains and ~12.5% reduction in memory usage.

Remove `TestReader_InversePostingsForMatcherHonorsContextCancel` since
`inversePostingsForMatcher` only passes `ctx` to `IndexReader` implementations now.

Signed-off-by: Arve Knudsen <[email protected]>
Signed-off-by: Oleg Zaytsev <[email protected]>
  • Loading branch information
aknuds1 authored and colega committed Jan 3, 2025
1 parent c2b6130 commit 00ed8b5
Show file tree
Hide file tree
Showing 11 changed files with 145 additions and 35 deletions.
8 changes: 8 additions & 0 deletions tsdb/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ type IndexReader interface {
// avoiding same calculations twice, however this implementation may lead to a worse performance when called once.
PostingsForMatchers(ctx context.Context, concurrent bool, ms ...*labels.Matcher) (index.Postings, error)

// PostingsForAllLabelValues returns a sorted iterator over all postings having a label with the given name.
// If no postings are found with the label in question, an empty iterator is returned.
PostingsForAllLabelValues(ctx context.Context, name string) index.Postings

// SortedPostings returns a postings list that is reordered to be sorted
// by the label set of the underlying series.
SortedPostings(index.Postings) index.Postings
Expand Down Expand Up @@ -554,6 +558,10 @@ func (r blockIndexReader) PostingsForMatchers(ctx context.Context, concurrent bo
return r.ir.PostingsForMatchers(ctx, concurrent, ms...)
}

func (r blockIndexReader) PostingsForAllLabelValues(ctx context.Context, name string) index.Postings {
return r.ir.PostingsForAllLabelValues(ctx, name)
}

func (r blockIndexReader) SortedPostings(p index.Postings) index.Postings {
return r.ir.SortedPostings(p)
}
Expand Down
4 changes: 4 additions & 0 deletions tsdb/head_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,10 @@ func (h *headIndexReader) PostingsForMatchers(ctx context.Context, concurrent bo
return h.head.pfmc.PostingsForMatchers(ctx, h, concurrent, ms...)
}

func (h *headIndexReader) PostingsForAllLabelValues(ctx context.Context, name string) index.Postings {
return h.head.postings.PostingsForAllLabelValues(ctx, name)
}

func (h *headIndexReader) SortedPostings(p index.Postings) index.Postings {
series := make([]*memSeries, 0, 128)

Expand Down
23 changes: 19 additions & 4 deletions tsdb/index/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -1794,6 +1794,15 @@ func (r *Reader) Postings(ctx context.Context, name string, values ...string) (P
}

func (r *Reader) PostingsForLabelMatching(ctx context.Context, name string, match func(string) bool) Postings {
return r.postingsForLabelMatching(ctx, name, match)
}

func (r *Reader) PostingsForAllLabelValues(ctx context.Context, name string) Postings {
return r.postingsForLabelMatching(ctx, name, nil)
}

// postingsForLabelMatching implements PostingsForLabelMatching if match is non-nil, and PostingsForAllLabelValues otherwise.
func (r *Reader) postingsForLabelMatching(ctx context.Context, name string, match func(string) bool) Postings {
if r.version == FormatV1 {
return r.postingsForLabelMatchingV1(ctx, name, match)
}
Expand All @@ -1803,11 +1812,17 @@ func (r *Reader) PostingsForLabelMatching(ctx context.Context, name string, matc
return EmptyPostings()
}

postingsEstimate := 0
if match == nil {
// The caller wants all postings for name.
postingsEstimate = len(e) * symbolFactor
}

lastVal := e[len(e)-1].value
var its []Postings
its := make([]Postings, 0, postingsEstimate)
if err := r.traversePostingOffsets(ctx, e[0].off, func(val string, postingsOff uint64) (bool, error) {
if match(val) {
// We want this postings iterator since the value is a match
if match == nil || match(val) {
// We want this postings iterator since the value is a match.
postingsDec := encoding.NewDecbufAt(r.b, int(postingsOff), castagnoliTable)
_, p, err := r.dec.PostingsFromDecbuf(postingsDec)
if err != nil {
Expand Down Expand Up @@ -1836,7 +1851,7 @@ func (r *Reader) postingsForLabelMatchingV1(ctx context.Context, name string, ma
return ErrPostings(ctx.Err())
}
count++
if !match(val) {
if match != nil && !match(val) {
continue
}

Expand Down
46 changes: 46 additions & 0 deletions tsdb/index/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -616,6 +616,52 @@ func TestChunksTimeOrdering(t *testing.T) {
require.NoError(t, idx.Close())
}

func TestReader_PostingsForLabelMatching(t *testing.T) {
const seriesCount = 9
var input indexWriterSeriesSlice
for i := 1; i <= seriesCount; i++ {
input = append(input, &indexWriterSeries{
labels: labels.FromStrings("__name__", strconv.Itoa(i)),
chunks: []chunks.Meta{
{Ref: 1, MinTime: 0, MaxTime: 10},
},
})
}
ir, _, _ := createFileReader(context.Background(), t, input)

p := ir.PostingsForLabelMatching(context.Background(), "__name__", func(v string) bool {
iv, err := strconv.Atoi(v)
if err != nil {
panic(err)
}
return iv%2 == 0
})
require.NoError(t, p.Err())
refs, err := ExpandPostings(p)
require.NoError(t, err)
require.Equal(t, []storage.SeriesRef{4, 6, 8, 10}, refs)
}

func TestReader_PostingsForAllLabelValues(t *testing.T) {
const seriesCount = 9
var input indexWriterSeriesSlice
for i := 1; i <= seriesCount; i++ {
input = append(input, &indexWriterSeries{
labels: labels.FromStrings("__name__", strconv.Itoa(i)),
chunks: []chunks.Meta{
{Ref: 1, MinTime: 0, MaxTime: 10},
},
})
}
ir, _, _ := createFileReader(context.Background(), t, input)

p := ir.PostingsForAllLabelValues(context.Background(), "__name__")
require.NoError(t, p.Err())
refs, err := ExpandPostings(p)
require.NoError(t, err)
require.Equal(t, []storage.SeriesRef{3, 4, 5, 6, 7, 8, 9, 10, 11}, refs)
}

func TestReader_PostingsForLabelMatchingHonorsContextCancel(t *testing.T) {
const seriesCount = 1000
var input indexWriterSeriesSlice
Expand Down
16 changes: 16 additions & 0 deletions tsdb/index/postings.go
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,22 @@ func (p *MemPostings) PostingsForLabelMatching(ctx context.Context, name string,
return Merge(ctx, its...)
}

func (p *MemPostings) PostingsForAllLabelValues(ctx context.Context, name string) Postings {
p.mtx.RLock()

e := p.m[name]
its := make([]Postings, 0, len(e))
for _, refs := range e {
if len(refs) > 0 {
its = append(its, NewListPostings(refs))
}
}

// Let the mutex go before merging.
p.mtx.RUnlock()
return Merge(ctx, its...)
}

// ExpandPostings returns the postings expanded as a slice.
func ExpandPostings(p Postings) (res []storage.SeriesRef, err error) {
for p.Next() {
Expand Down
15 changes: 15 additions & 0 deletions tsdb/index/postings_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1571,6 +1571,21 @@ func TestMemPostings_PostingsForLabelMatching(t *testing.T) {
require.Equal(t, []storage.SeriesRef{2, 4}, refs)
}

func TestMemPostings_PostingsForAllLabelValues(t *testing.T) {
mp := NewMemPostings()
mp.Add(1, labels.FromStrings("foo", "1"))
mp.Add(2, labels.FromStrings("foo", "2"))
mp.Add(3, labels.FromStrings("foo", "3"))
mp.Add(4, labels.FromStrings("foo", "4"))

p := mp.PostingsForAllLabelValues(context.Background(), "foo")
require.NoError(t, p.Err())
refs, err := ExpandPostings(p)
require.NoError(t, err)
// All postings for the label should be returned.
require.Equal(t, []storage.SeriesRef{1, 2, 3, 4}, refs)
}

func TestMemPostings_PostingsForLabelMatchingHonorsContextCancel(t *testing.T) {
memP := NewMemPostings()
seriesCount := 10 * checkContextEveryNIterations
Expand Down
4 changes: 4 additions & 0 deletions tsdb/ooo_head_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,10 @@ func (ir *OOOCompactionHeadIndexReader) PostingsForLabelMatching(context.Context
return index.ErrPostings(errors.New("not supported"))
}

func (ir *OOOCompactionHeadIndexReader) PostingsForAllLabelValues(context.Context, string) index.Postings {
return index.ErrPostings(errors.New("not supported"))
}

func (ir *OOOCompactionHeadIndexReader) SortedPostings(p index.Postings) index.Postings {
// This will already be sorted from the Postings() call above.
return p
Expand Down
4 changes: 4 additions & 0 deletions tsdb/postings_for_matchers_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ type IndexPostingsReader interface {
// PostingsForLabelMatching returns a sorted iterator over postings having a label with the given name and a value for which match returns true.
// If no postings are found having at least one matching label, an empty iterator is returned.
PostingsForLabelMatching(ctx context.Context, name string, match func(value string) bool) index.Postings

// PostingsForAllLabelValues returns a sorted iterator over all postings having a label with the given name.
// If no postings are found with the label in question, an empty iterator is returned.
PostingsForAllLabelValues(ctx context.Context, name string) index.Postings
}

// NewPostingsForMatchersCache creates a new PostingsForMatchersCache.
Expand Down
4 changes: 4 additions & 0 deletions tsdb/postings_for_matchers_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -709,6 +709,10 @@ func BenchmarkPostingsForMatchersCache_ConcurrencyOnHighEvictionRate(b *testing.

type indexForPostingsMock struct{}

func (idx indexForPostingsMock) PostingsForAllLabelValues(ctx context.Context, name string) index.Postings {
panic("implement me")
}

func (idx indexForPostingsMock) LabelValues(context.Context, string, ...*labels.Matcher) ([]string, error) {
panic("implement me")
}
Expand Down
27 changes: 7 additions & 20 deletions tsdb/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,29 +358,16 @@ func inversePostingsForMatcher(ctx context.Context, ix IndexPostingsReader, m *l
return ix.Postings(ctx, m.Name, m.Value)
}

vals, err := ix.LabelValues(ctx, m.Name)
if err != nil {
return nil, err
}

res := vals[:0]
// If the match before inversion was !="" or !~"", we just want all the values.
// If the matcher being inverted is =~"" or ="", we just want all the values.
if m.Value == "" && (m.Type == labels.MatchRegexp || m.Type == labels.MatchEqual) {
res = vals
} else {
count := 1
for _, val := range vals {
if count%checkContextEveryNIterations == 0 && ctx.Err() != nil {
return nil, ctx.Err()
}
count++
if !m.Matches(val) {
res = append(res, val)
}
}
it := ix.PostingsForAllLabelValues(ctx, m.Name)
return it, it.Err()
}

return ix.Postings(ctx, m.Name, res...)
it := ix.PostingsForLabelMatching(ctx, m.Name, func(s string) bool {
return !m.Matches(s)
})
return it, it.Err()
}

const maxExpandedPostingsFactor = 100 // Division factor for maximum number of matched series.
Expand Down
29 changes: 18 additions & 11 deletions tsdb/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2401,6 +2401,16 @@ func matches(ms []*labels.Matcher, lbls labels.Labels) bool {
return true
}

func (m mockIndex) PostingsForAllLabelValues(ctx context.Context, name string) index.Postings {
var res []index.Postings
for l, srs := range m.postings {
if l.Name == name {
res = append(res, index.NewListPostings(srs))
}
}
return index.Merge(ctx, res...)
}

func (m mockIndex) ShardedPostings(p index.Postings, shardIndex, shardCount uint64) index.Postings {
out := make([]storage.SeriesRef, 0, 128)

Expand Down Expand Up @@ -3335,6 +3345,10 @@ func (m mockMatcherIndex) PostingsForLabelMatching(context.Context, string, func
return index.ErrPostings(fmt.Errorf("PostingsForLabelMatching called"))
}

func (m mockMatcherIndex) PostingsForAllLabelValues(context.Context, string) index.Postings {
return index.ErrPostings(errors.New("PostingsForAllLabelValues called"))
}

func TestPostingsForMatcher(t *testing.T) {
ctx := context.Background()

Expand Down Expand Up @@ -3863,17 +3877,6 @@ func TestReader_PostingsForLabelMatchingHonorsContextCancel(t *testing.T) {
require.Equal(t, failAfter+1, ctx.Count()) // Plus one for the Err() call that puts the error in the result.
}

func TestReader_InversePostingsForMatcherHonorsContextCancel(t *testing.T) {
ir := mockReaderOfLabels{}

failAfter := uint64(mockReaderOfLabelsSeriesCount / 2 / checkContextEveryNIterations)
ctx := &testutil.MockContextErrAfter{FailAfter: failAfter}
_, err := inversePostingsForMatcher(ctx, ir, labels.MustNewMatcher(labels.MatchRegexp, "__name__", ".*"))

require.Error(t, err)
require.Equal(t, failAfter+1, ctx.Count()) // Plus one for the Err() call that puts the error in the result.
}

type mockReaderOfLabels struct{}

const mockReaderOfLabelsSeriesCount = checkContextEveryNIterations * 10
Expand Down Expand Up @@ -3906,6 +3909,10 @@ func (m mockReaderOfLabels) PostingsForLabelMatching(context.Context, string, fu
panic("PostingsForLabelMatching called")
}

func (m mockReaderOfLabels) PostingsForAllLabelValues(context.Context, string) index.Postings {
panic("PostingsForAllLabelValues called")
}

func (m mockReaderOfLabels) Postings(context.Context, string, ...string) (index.Postings, error) {
panic("Postings called")
}
Expand Down

0 comments on commit 00ed8b5

Please sign in to comment.