diff --git a/tsdb/block.go b/tsdb/block.go index 20d66763c..e238af377 100644 --- a/tsdb/block.go +++ b/tsdb/block.go @@ -88,6 +88,10 @@ type IndexReader interface { // avoiding same calculations twice, however this implementation may lead to a worse performance when called once. PostingsForMatchers(ctx context.Context, concurrent bool, ms ...*labels.Matcher) (index.Postings, error) + // PostingsForAllLabelValues returns a sorted iterator over all postings having a label with the given name. + // If no postings are found with the label in question, an empty iterator is returned. + PostingsForAllLabelValues(ctx context.Context, name string) index.Postings + // SortedPostings returns a postings list that is reordered to be sorted // by the label set of the underlying series. SortedPostings(index.Postings) index.Postings @@ -554,6 +558,10 @@ func (r blockIndexReader) PostingsForMatchers(ctx context.Context, concurrent bo return r.ir.PostingsForMatchers(ctx, concurrent, ms...) } +func (r blockIndexReader) PostingsForAllLabelValues(ctx context.Context, name string) index.Postings { + return r.ir.PostingsForAllLabelValues(ctx, name) +} + func (r blockIndexReader) SortedPostings(p index.Postings) index.Postings { return r.ir.SortedPostings(p) } diff --git a/tsdb/head_read.go b/tsdb/head_read.go index f85120a97..bc0ade43b 100644 --- a/tsdb/head_read.go +++ b/tsdb/head_read.go @@ -129,6 +129,10 @@ func (h *headIndexReader) PostingsForMatchers(ctx context.Context, concurrent bo return h.head.pfmc.PostingsForMatchers(ctx, h, concurrent, ms...) } +func (h *headIndexReader) PostingsForAllLabelValues(ctx context.Context, name string) index.Postings { + return h.head.postings.PostingsForAllLabelValues(ctx, name) +} + func (h *headIndexReader) SortedPostings(p index.Postings) index.Postings { series := make([]*memSeries, 0, 128) diff --git a/tsdb/index/index.go b/tsdb/index/index.go index 642e4f4a2..5f407d73d 100644 --- a/tsdb/index/index.go +++ b/tsdb/index/index.go @@ -1794,6 +1794,15 @@ func (r *Reader) Postings(ctx context.Context, name string, values ...string) (P } func (r *Reader) PostingsForLabelMatching(ctx context.Context, name string, match func(string) bool) Postings { + return r.postingsForLabelMatching(ctx, name, match) +} + +func (r *Reader) PostingsForAllLabelValues(ctx context.Context, name string) Postings { + return r.postingsForLabelMatching(ctx, name, nil) +} + +// postingsForLabelMatching implements PostingsForLabelMatching if match is non-nil, and PostingsForAllLabelValues otherwise. +func (r *Reader) postingsForLabelMatching(ctx context.Context, name string, match func(string) bool) Postings { if r.version == FormatV1 { return r.postingsForLabelMatchingV1(ctx, name, match) } @@ -1803,11 +1812,17 @@ func (r *Reader) PostingsForLabelMatching(ctx context.Context, name string, matc return EmptyPostings() } + postingsEstimate := 0 + if match == nil { + // The caller wants all postings for name. + postingsEstimate = len(e) * symbolFactor + } + lastVal := e[len(e)-1].value - var its []Postings + its := make([]Postings, 0, postingsEstimate) if err := r.traversePostingOffsets(ctx, e[0].off, func(val string, postingsOff uint64) (bool, error) { - if match(val) { - // We want this postings iterator since the value is a match + if match == nil || match(val) { + // We want this postings iterator since the value is a match. postingsDec := encoding.NewDecbufAt(r.b, int(postingsOff), castagnoliTable) _, p, err := r.dec.PostingsFromDecbuf(postingsDec) if err != nil { @@ -1836,7 +1851,7 @@ func (r *Reader) postingsForLabelMatchingV1(ctx context.Context, name string, ma return ErrPostings(ctx.Err()) } count++ - if !match(val) { + if match != nil && !match(val) { continue } diff --git a/tsdb/index/index_test.go b/tsdb/index/index_test.go index 9647a3d5f..94e91eb1d 100644 --- a/tsdb/index/index_test.go +++ b/tsdb/index/index_test.go @@ -616,6 +616,52 @@ func TestChunksTimeOrdering(t *testing.T) { require.NoError(t, idx.Close()) } +func TestReader_PostingsForLabelMatching(t *testing.T) { + const seriesCount = 9 + var input indexWriterSeriesSlice + for i := 1; i <= seriesCount; i++ { + input = append(input, &indexWriterSeries{ + labels: labels.FromStrings("__name__", strconv.Itoa(i)), + chunks: []chunks.Meta{ + {Ref: 1, MinTime: 0, MaxTime: 10}, + }, + }) + } + ir, _, _ := createFileReader(context.Background(), t, input) + + p := ir.PostingsForLabelMatching(context.Background(), "__name__", func(v string) bool { + iv, err := strconv.Atoi(v) + if err != nil { + panic(err) + } + return iv%2 == 0 + }) + require.NoError(t, p.Err()) + refs, err := ExpandPostings(p) + require.NoError(t, err) + require.Equal(t, []storage.SeriesRef{4, 6, 8, 10}, refs) +} + +func TestReader_PostingsForAllLabelValues(t *testing.T) { + const seriesCount = 9 + var input indexWriterSeriesSlice + for i := 1; i <= seriesCount; i++ { + input = append(input, &indexWriterSeries{ + labels: labels.FromStrings("__name__", strconv.Itoa(i)), + chunks: []chunks.Meta{ + {Ref: 1, MinTime: 0, MaxTime: 10}, + }, + }) + } + ir, _, _ := createFileReader(context.Background(), t, input) + + p := ir.PostingsForAllLabelValues(context.Background(), "__name__") + require.NoError(t, p.Err()) + refs, err := ExpandPostings(p) + require.NoError(t, err) + require.Equal(t, []storage.SeriesRef{3, 4, 5, 6, 7, 8, 9, 10, 11}, refs) +} + func TestReader_PostingsForLabelMatchingHonorsContextCancel(t *testing.T) { const seriesCount = 1000 var input indexWriterSeriesSlice diff --git a/tsdb/index/postings.go b/tsdb/index/postings.go index 38aa3fb34..88bc1c913 100644 --- a/tsdb/index/postings.go +++ b/tsdb/index/postings.go @@ -486,6 +486,43 @@ func (p *MemPostings) PostingsForLabelMatching(ctx context.Context, name string, return Merge(ctx, its...) } +func (p *MemPostings) PostingsForAllLabelValues(ctx context.Context, name string) Postings { + p.mtx.RLock() + + e := p.m[name] + its := make([]Postings, 0, len(e)) + for _, refs := range e { + if len(refs) > 0 { + its = append(its, NewListPostings(refs)) + } + } + + // Let the mutex go before merging. + p.mtx.RUnlock() + return Merge(ctx, its...) +} + +// labelValues returns a slice of label values for the given label name. +// It will take the read lock. +func (p *MemPostings) labelValues(name string) []string { + p.mtx.RLock() + defer p.mtx.RUnlock() + + e := p.m[name] + if len(e) == 0 { + return nil + } + + vals := make([]string, 0, len(e)) + for v, srs := range e { + if len(srs) > 0 { + vals = append(vals, v) + } + } + + return vals +} + // ExpandPostings returns the postings expanded as a slice. func ExpandPostings(p Postings) (res []storage.SeriesRef, err error) { for p.Next() { diff --git a/tsdb/index/postings_test.go b/tsdb/index/postings_test.go index 93fba7f06..cfa176f79 100644 --- a/tsdb/index/postings_test.go +++ b/tsdb/index/postings_test.go @@ -1571,6 +1571,21 @@ func TestMemPostings_PostingsForLabelMatching(t *testing.T) { require.Equal(t, []storage.SeriesRef{2, 4}, refs) } +func TestMemPostings_PostingsForAllLabelValues(t *testing.T) { + mp := NewMemPostings() + mp.Add(1, labels.FromStrings("foo", "1")) + mp.Add(2, labels.FromStrings("foo", "2")) + mp.Add(3, labels.FromStrings("foo", "3")) + mp.Add(4, labels.FromStrings("foo", "4")) + + p := mp.PostingsForAllLabelValues(context.Background(), "foo") + require.NoError(t, p.Err()) + refs, err := ExpandPostings(p) + require.NoError(t, err) + // All postings for the label should be returned. + require.Equal(t, []storage.SeriesRef{1, 2, 3, 4}, refs) +} + func TestMemPostings_PostingsForLabelMatchingHonorsContextCancel(t *testing.T) { memP := NewMemPostings() seriesCount := 10 * checkContextEveryNIterations diff --git a/tsdb/ooo_head_read.go b/tsdb/ooo_head_read.go index 1f5b7c951..745cd5d5f 100644 --- a/tsdb/ooo_head_read.go +++ b/tsdb/ooo_head_read.go @@ -456,6 +456,10 @@ func (ir *OOOCompactionHeadIndexReader) PostingsForLabelMatching(context.Context return index.ErrPostings(errors.New("not supported")) } +func (ir *OOOCompactionHeadIndexReader) PostingsForAllLabelValues(context.Context, string) index.Postings { + return index.ErrPostings(errors.New("not supported")) +} + func (ir *OOOCompactionHeadIndexReader) SortedPostings(p index.Postings) index.Postings { // This will already be sorted from the Postings() call above. return p diff --git a/tsdb/querier.go b/tsdb/querier.go index 6005ad83c..64c7fac74 100644 --- a/tsdb/querier.go +++ b/tsdb/querier.go @@ -358,29 +358,16 @@ func inversePostingsForMatcher(ctx context.Context, ix IndexPostingsReader, m *l return ix.Postings(ctx, m.Name, m.Value) } - vals, err := ix.LabelValues(ctx, m.Name) - if err != nil { - return nil, err - } - - res := vals[:0] - // If the match before inversion was !="" or !~"", we just want all the values. + // If the matcher being inverted is =~"" or ="", we just want all the values. if m.Value == "" && (m.Type == labels.MatchRegexp || m.Type == labels.MatchEqual) { - res = vals - } else { - count := 1 - for _, val := range vals { - if count%checkContextEveryNIterations == 0 && ctx.Err() != nil { - return nil, ctx.Err() - } - count++ - if !m.Matches(val) { - res = append(res, val) - } - } + it := ix.PostingsForAllLabelValues(ctx, m.Name) + return it, it.Err() } - return ix.Postings(ctx, m.Name, res...) + it := ix.PostingsForLabelMatching(ctx, m.Name, func(s string) bool { + return !m.Matches(s) + }) + return it, it.Err() } const maxExpandedPostingsFactor = 100 // Division factor for maximum number of matched series. diff --git a/tsdb/querier_test.go b/tsdb/querier_test.go index f9d346077..8cc2d29f5 100644 --- a/tsdb/querier_test.go +++ b/tsdb/querier_test.go @@ -2401,6 +2401,16 @@ func matches(ms []*labels.Matcher, lbls labels.Labels) bool { return true } +func (m mockIndex) PostingsForAllLabelValues(ctx context.Context, name string) index.Postings { + var res []index.Postings + for l, srs := range m.postings { + if l.Name == name { + res = append(res, index.NewListPostings(srs)) + } + } + return index.Merge(ctx, res...) +} + func (m mockIndex) ShardedPostings(p index.Postings, shardIndex, shardCount uint64) index.Postings { out := make([]storage.SeriesRef, 0, 128) @@ -3335,6 +3345,10 @@ func (m mockMatcherIndex) PostingsForLabelMatching(context.Context, string, func return index.ErrPostings(fmt.Errorf("PostingsForLabelMatching called")) } +func (m mockMatcherIndex) PostingsForAllLabelValues(context.Context, string) index.Postings { + return index.ErrPostings(errors.New("PostingsForAllLabelValues called")) +} + func TestPostingsForMatcher(t *testing.T) { ctx := context.Background() @@ -3863,17 +3877,6 @@ func TestReader_PostingsForLabelMatchingHonorsContextCancel(t *testing.T) { require.Equal(t, failAfter+1, ctx.Count()) // Plus one for the Err() call that puts the error in the result. } -func TestReader_InversePostingsForMatcherHonorsContextCancel(t *testing.T) { - ir := mockReaderOfLabels{} - - failAfter := uint64(mockReaderOfLabelsSeriesCount / 2 / checkContextEveryNIterations) - ctx := &testutil.MockContextErrAfter{FailAfter: failAfter} - _, err := inversePostingsForMatcher(ctx, ir, labels.MustNewMatcher(labels.MatchRegexp, "__name__", ".*")) - - require.Error(t, err) - require.Equal(t, failAfter+1, ctx.Count()) // Plus one for the Err() call that puts the error in the result. -} - type mockReaderOfLabels struct{} const mockReaderOfLabelsSeriesCount = checkContextEveryNIterations * 10 @@ -3906,6 +3909,10 @@ func (m mockReaderOfLabels) PostingsForLabelMatching(context.Context, string, fu panic("PostingsForLabelMatching called") } +func (m mockReaderOfLabels) PostingsForAllLabelValues(context.Context, string) index.Postings { + panic("PostingsForAllLabelValues called") +} + func (m mockReaderOfLabels) Postings(context.Context, string, ...string) (index.Postings, error) { panic("Postings called") }