Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[weekly-r317] Backport Prometheus 14144, don't merge. #813

Draft
wants to merge 1 commit into
base: weekly-r317-lbl-values-slices-c2b613081
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions tsdb/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ type IndexReader interface {
// avoiding same calculations twice, however this implementation may lead to a worse performance when called once.
PostingsForMatchers(ctx context.Context, concurrent bool, ms ...*labels.Matcher) (index.Postings, error)

// PostingsForAllLabelValues returns a sorted iterator over all postings having a label with the given name.
// If no postings are found with the label in question, an empty iterator is returned.
PostingsForAllLabelValues(ctx context.Context, name string) index.Postings

// SortedPostings returns a postings list that is reordered to be sorted
// by the label set of the underlying series.
SortedPostings(index.Postings) index.Postings
Expand Down Expand Up @@ -554,6 +558,10 @@ func (r blockIndexReader) PostingsForMatchers(ctx context.Context, concurrent bo
return r.ir.PostingsForMatchers(ctx, concurrent, ms...)
}

func (r blockIndexReader) PostingsForAllLabelValues(ctx context.Context, name string) index.Postings {
return r.ir.PostingsForAllLabelValues(ctx, name)
}

func (r blockIndexReader) SortedPostings(p index.Postings) index.Postings {
return r.ir.SortedPostings(p)
}
Expand Down
4 changes: 4 additions & 0 deletions tsdb/head_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,10 @@ func (h *headIndexReader) PostingsForMatchers(ctx context.Context, concurrent bo
return h.head.pfmc.PostingsForMatchers(ctx, h, concurrent, ms...)
}

func (h *headIndexReader) PostingsForAllLabelValues(ctx context.Context, name string) index.Postings {
return h.head.postings.PostingsForAllLabelValues(ctx, name)
}

func (h *headIndexReader) SortedPostings(p index.Postings) index.Postings {
series := make([]*memSeries, 0, 128)

Expand Down
23 changes: 19 additions & 4 deletions tsdb/index/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -1794,6 +1794,15 @@ func (r *Reader) Postings(ctx context.Context, name string, values ...string) (P
}

func (r *Reader) PostingsForLabelMatching(ctx context.Context, name string, match func(string) bool) Postings {
return r.postingsForLabelMatching(ctx, name, match)
}

func (r *Reader) PostingsForAllLabelValues(ctx context.Context, name string) Postings {
return r.postingsForLabelMatching(ctx, name, nil)
}

// postingsForLabelMatching implements PostingsForLabelMatching if match is non-nil, and PostingsForAllLabelValues otherwise.
func (r *Reader) postingsForLabelMatching(ctx context.Context, name string, match func(string) bool) Postings {
if r.version == FormatV1 {
return r.postingsForLabelMatchingV1(ctx, name, match)
}
Expand All @@ -1803,11 +1812,17 @@ func (r *Reader) PostingsForLabelMatching(ctx context.Context, name string, matc
return EmptyPostings()
}

postingsEstimate := 0
if match == nil {
// The caller wants all postings for name.
postingsEstimate = len(e) * symbolFactor
}

lastVal := e[len(e)-1].value
var its []Postings
its := make([]Postings, 0, postingsEstimate)
if err := r.traversePostingOffsets(ctx, e[0].off, func(val string, postingsOff uint64) (bool, error) {
if match(val) {
// We want this postings iterator since the value is a match
if match == nil || match(val) {
// We want this postings iterator since the value is a match.
postingsDec := encoding.NewDecbufAt(r.b, int(postingsOff), castagnoliTable)
_, p, err := r.dec.PostingsFromDecbuf(postingsDec)
if err != nil {
Expand Down Expand Up @@ -1836,7 +1851,7 @@ func (r *Reader) postingsForLabelMatchingV1(ctx context.Context, name string, ma
return ErrPostings(ctx.Err())
}
count++
if !match(val) {
if match != nil && !match(val) {
continue
}

Expand Down
46 changes: 46 additions & 0 deletions tsdb/index/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -616,6 +616,52 @@ func TestChunksTimeOrdering(t *testing.T) {
require.NoError(t, idx.Close())
}

func TestReader_PostingsForLabelMatching(t *testing.T) {
const seriesCount = 9
var input indexWriterSeriesSlice
for i := 1; i <= seriesCount; i++ {
input = append(input, &indexWriterSeries{
labels: labels.FromStrings("__name__", strconv.Itoa(i)),
chunks: []chunks.Meta{
{Ref: 1, MinTime: 0, MaxTime: 10},
},
})
}
ir, _, _ := createFileReader(context.Background(), t, input)

p := ir.PostingsForLabelMatching(context.Background(), "__name__", func(v string) bool {
iv, err := strconv.Atoi(v)
if err != nil {
panic(err)
}
return iv%2 == 0
})
require.NoError(t, p.Err())
refs, err := ExpandPostings(p)
require.NoError(t, err)
require.Equal(t, []storage.SeriesRef{4, 6, 8, 10}, refs)
}

func TestReader_PostingsForAllLabelValues(t *testing.T) {
const seriesCount = 9
var input indexWriterSeriesSlice
for i := 1; i <= seriesCount; i++ {
input = append(input, &indexWriterSeries{
labels: labels.FromStrings("__name__", strconv.Itoa(i)),
chunks: []chunks.Meta{
{Ref: 1, MinTime: 0, MaxTime: 10},
},
})
}
ir, _, _ := createFileReader(context.Background(), t, input)

p := ir.PostingsForAllLabelValues(context.Background(), "__name__")
require.NoError(t, p.Err())
refs, err := ExpandPostings(p)
require.NoError(t, err)
require.Equal(t, []storage.SeriesRef{3, 4, 5, 6, 7, 8, 9, 10, 11}, refs)
}

func TestReader_PostingsForLabelMatchingHonorsContextCancel(t *testing.T) {
const seriesCount = 1000
var input indexWriterSeriesSlice
Expand Down
16 changes: 16 additions & 0 deletions tsdb/index/postings.go
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,22 @@ func (p *MemPostings) PostingsForLabelMatching(ctx context.Context, name string,
return Merge(ctx, its...)
}

func (p *MemPostings) PostingsForAllLabelValues(ctx context.Context, name string) Postings {
p.mtx.RLock()

e := p.m[name]
its := make([]Postings, 0, len(e))
for _, refs := range e {
if len(refs) > 0 {
its = append(its, NewListPostings(refs))
}
}

// Let the mutex go before merging.
p.mtx.RUnlock()
return Merge(ctx, its...)
}

// ExpandPostings returns the postings expanded as a slice.
func ExpandPostings(p Postings) (res []storage.SeriesRef, err error) {
for p.Next() {
Expand Down
15 changes: 15 additions & 0 deletions tsdb/index/postings_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1571,6 +1571,21 @@ func TestMemPostings_PostingsForLabelMatching(t *testing.T) {
require.Equal(t, []storage.SeriesRef{2, 4}, refs)
}

func TestMemPostings_PostingsForAllLabelValues(t *testing.T) {
mp := NewMemPostings()
mp.Add(1, labels.FromStrings("foo", "1"))
mp.Add(2, labels.FromStrings("foo", "2"))
mp.Add(3, labels.FromStrings("foo", "3"))
mp.Add(4, labels.FromStrings("foo", "4"))

p := mp.PostingsForAllLabelValues(context.Background(), "foo")
require.NoError(t, p.Err())
refs, err := ExpandPostings(p)
require.NoError(t, err)
// All postings for the label should be returned.
require.Equal(t, []storage.SeriesRef{1, 2, 3, 4}, refs)
}

func TestMemPostings_PostingsForLabelMatchingHonorsContextCancel(t *testing.T) {
memP := NewMemPostings()
seriesCount := 10 * checkContextEveryNIterations
Expand Down
4 changes: 4 additions & 0 deletions tsdb/ooo_head_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,10 @@ func (ir *OOOCompactionHeadIndexReader) PostingsForLabelMatching(context.Context
return index.ErrPostings(errors.New("not supported"))
}

func (ir *OOOCompactionHeadIndexReader) PostingsForAllLabelValues(context.Context, string) index.Postings {
return index.ErrPostings(errors.New("not supported"))
}

func (ir *OOOCompactionHeadIndexReader) SortedPostings(p index.Postings) index.Postings {
// This will already be sorted from the Postings() call above.
return p
Expand Down
4 changes: 4 additions & 0 deletions tsdb/postings_for_matchers_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ type IndexPostingsReader interface {
// PostingsForLabelMatching returns a sorted iterator over postings having a label with the given name and a value for which match returns true.
// If no postings are found having at least one matching label, an empty iterator is returned.
PostingsForLabelMatching(ctx context.Context, name string, match func(value string) bool) index.Postings

// PostingsForAllLabelValues returns a sorted iterator over all postings having a label with the given name.
// If no postings are found with the label in question, an empty iterator is returned.
PostingsForAllLabelValues(ctx context.Context, name string) index.Postings
}

// NewPostingsForMatchersCache creates a new PostingsForMatchersCache.
Expand Down
4 changes: 4 additions & 0 deletions tsdb/postings_for_matchers_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -709,6 +709,10 @@ func BenchmarkPostingsForMatchersCache_ConcurrencyOnHighEvictionRate(b *testing.

type indexForPostingsMock struct{}

func (idx indexForPostingsMock) PostingsForAllLabelValues(ctx context.Context, name string) index.Postings {
panic("implement me")
}

func (idx indexForPostingsMock) LabelValues(context.Context, string, ...*labels.Matcher) ([]string, error) {
panic("implement me")
}
Expand Down
27 changes: 7 additions & 20 deletions tsdb/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,29 +358,16 @@ func inversePostingsForMatcher(ctx context.Context, ix IndexPostingsReader, m *l
return ix.Postings(ctx, m.Name, m.Value)
}

vals, err := ix.LabelValues(ctx, m.Name)
if err != nil {
return nil, err
}

res := vals[:0]
// If the match before inversion was !="" or !~"", we just want all the values.
// If the matcher being inverted is =~"" or ="", we just want all the values.
if m.Value == "" && (m.Type == labels.MatchRegexp || m.Type == labels.MatchEqual) {
res = vals
} else {
count := 1
for _, val := range vals {
if count%checkContextEveryNIterations == 0 && ctx.Err() != nil {
return nil, ctx.Err()
}
count++
if !m.Matches(val) {
res = append(res, val)
}
}
it := ix.PostingsForAllLabelValues(ctx, m.Name)
return it, it.Err()
}

return ix.Postings(ctx, m.Name, res...)
it := ix.PostingsForLabelMatching(ctx, m.Name, func(s string) bool {
return !m.Matches(s)
})
return it, it.Err()
}

const maxExpandedPostingsFactor = 100 // Division factor for maximum number of matched series.
Expand Down
29 changes: 18 additions & 11 deletions tsdb/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2401,6 +2401,16 @@ func matches(ms []*labels.Matcher, lbls labels.Labels) bool {
return true
}

func (m mockIndex) PostingsForAllLabelValues(ctx context.Context, name string) index.Postings {
var res []index.Postings
for l, srs := range m.postings {
if l.Name == name {
res = append(res, index.NewListPostings(srs))
}
}
return index.Merge(ctx, res...)
}

func (m mockIndex) ShardedPostings(p index.Postings, shardIndex, shardCount uint64) index.Postings {
out := make([]storage.SeriesRef, 0, 128)

Expand Down Expand Up @@ -3335,6 +3345,10 @@ func (m mockMatcherIndex) PostingsForLabelMatching(context.Context, string, func
return index.ErrPostings(fmt.Errorf("PostingsForLabelMatching called"))
}

func (m mockMatcherIndex) PostingsForAllLabelValues(context.Context, string) index.Postings {
return index.ErrPostings(errors.New("PostingsForAllLabelValues called"))
}

func TestPostingsForMatcher(t *testing.T) {
ctx := context.Background()

Expand Down Expand Up @@ -3863,17 +3877,6 @@ func TestReader_PostingsForLabelMatchingHonorsContextCancel(t *testing.T) {
require.Equal(t, failAfter+1, ctx.Count()) // Plus one for the Err() call that puts the error in the result.
}

func TestReader_InversePostingsForMatcherHonorsContextCancel(t *testing.T) {
ir := mockReaderOfLabels{}

failAfter := uint64(mockReaderOfLabelsSeriesCount / 2 / checkContextEveryNIterations)
ctx := &testutil.MockContextErrAfter{FailAfter: failAfter}
_, err := inversePostingsForMatcher(ctx, ir, labels.MustNewMatcher(labels.MatchRegexp, "__name__", ".*"))

require.Error(t, err)
require.Equal(t, failAfter+1, ctx.Count()) // Plus one for the Err() call that puts the error in the result.
}

type mockReaderOfLabels struct{}

const mockReaderOfLabelsSeriesCount = checkContextEveryNIterations * 10
Expand Down Expand Up @@ -3906,6 +3909,10 @@ func (m mockReaderOfLabels) PostingsForLabelMatching(context.Context, string, fu
panic("PostingsForLabelMatching called")
}

func (m mockReaderOfLabels) PostingsForAllLabelValues(context.Context, string) index.Postings {
panic("PostingsForAllLabelValues called")
}

func (m mockReaderOfLabels) Postings(context.Context, string, ...string) (index.Postings, error) {
panic("Postings called")
}
Expand Down
Loading