From 1dc6d0c01406f44d472a6d954ead41ff0f01e18f Mon Sep 17 00:00:00 2001 From: Arve Knudsen Date: Fri, 18 Aug 2023 14:30:05 +0200 Subject: [PATCH] Optimize storage.LabelQuerier.LabelValues implementations Signed-off-by: Arve Knudsen --- model/labels/matcher.go | 13 ++- tsdb/block.go | 10 ++ tsdb/head_read.go | 4 + tsdb/index/index.go | 107 ++++++++++++++++++--- tsdb/index/postings.go | 27 +++++- tsdb/ooo_head_read.go | 8 ++ tsdb/postings_for_matchers_cache.go | 6 ++ tsdb/postings_for_matchers_cache_test.go | 4 + tsdb/querier.go | 114 ++++------------------- tsdb/querier_test.go | 8 ++ 10 files changed, 188 insertions(+), 113 deletions(-) diff --git a/model/labels/matcher.go b/model/labels/matcher.go index 1282f80d63..3c068659b1 100644 --- a/model/labels/matcher.go +++ b/model/labels/matcher.go @@ -26,6 +26,8 @@ const ( MatchNotEqual MatchRegexp MatchNotRegexp + // MatchSet is a special type for internal use, that matches series with a certain label name. + MatchSet ) var matchTypeToStr = [...]string{ @@ -33,10 +35,11 @@ var matchTypeToStr = [...]string{ MatchNotEqual: "!=", MatchRegexp: "=~", MatchNotRegexp: "!~", + MatchSet: "[isSet]", } func (m MatchType) String() string { - if m < MatchEqual || m > MatchNotRegexp { + if m < MatchEqual || m > MatchSet { panic("unknown match type") } return matchTypeToStr[m] @@ -92,8 +95,11 @@ func (m *Matcher) Matches(s string) bool { return m.re.MatchString(s) case MatchNotRegexp: return !m.re.MatchString(s) + case MatchSet: + return true + default: + panic("labels.Matcher.Matches: invalid match type") } - panic("labels.Matcher.Matches: invalid match type") } // Inverse returns a matcher that matches the opposite. @@ -107,8 +113,9 @@ func (m *Matcher) Inverse() (*Matcher, error) { return NewMatcher(MatchNotRegexp, m.Name, m.Value) case MatchNotRegexp: return NewMatcher(MatchRegexp, m.Name, m.Value) + default: + panic("labels.Matcher.Matches: invalid match type") } - panic("labels.Matcher.Matches: invalid match type") } // GetRegexString returns the regex string. diff --git a/tsdb/block.go b/tsdb/block.go index d1c75fc83a..03ed5ee3c8 100644 --- a/tsdb/block.go +++ b/tsdb/block.go @@ -82,6 +82,12 @@ type IndexReader interface { // avoiding same calculations twice, however this implementation may lead to a worse performance when called once. PostingsForMatchers(concurrent bool, ms ...*labels.Matcher) (index.Postings, error) + // PostingsWithLabel returns the postings list iterator for the label name. + // The Postings here contain the offsets to the series inside the index. + // Found IDs are not strictly required to point to a valid Series, e.g. + // during background garbage collections. + PostingsWithLabel(name string) (index.Postings, error) + // SortedPostings returns a postings list that is reordered to be sorted // by the label set of the underlying series. SortedPostings(index.Postings) index.Postings @@ -501,6 +507,10 @@ func (r blockIndexReader) LabelValues(name string, matchers ...*labels.Matcher) return labelValuesWithMatchers(r.ir, name, matchers...) } +func (r blockIndexReader) PostingsWithLabel(name string) (index.Postings, error) { + return r.ir.PostingsWithLabel(name) +} + func (r blockIndexReader) LabelNames(matchers ...*labels.Matcher) ([]string, error) { if len(matchers) == 0 { return r.b.LabelNames() diff --git a/tsdb/head_read.go b/tsdb/head_read.go index 52fef38463..1d18afa66b 100644 --- a/tsdb/head_read.go +++ b/tsdb/head_read.go @@ -124,6 +124,10 @@ func (h *headIndexReader) PostingsForMatchers(concurrent bool, ms ...*labels.Mat return h.head.pfmc.PostingsForMatchers(h, concurrent, ms...) } +func (h *headIndexReader) PostingsWithLabel(name string) (index.Postings, error) { + return h.head.postings.GetWithLabel(name), nil +} + func (h *headIndexReader) SortedPostings(p index.Postings) index.Postings { series := make([]*memSeries, 0, 128) diff --git a/tsdb/index/index.go b/tsdb/index/index.go index 3b672ec2cc..bdf4614acb 100644 --- a/tsdb/index/index.go +++ b/tsdb/index/index.go @@ -1484,12 +1484,7 @@ func (r *Reader) SortedLabelValues(name string, matchers ...*labels.Matcher) ([] // LabelValues returns value tuples that exist for the given label name. // It is not safe to use the return value beyond the lifetime of the byte slice // passed into the Reader. -// TODO(replay): Support filtering by matchers func (r *Reader) LabelValues(name string, matchers ...*labels.Matcher) ([]string, error) { - if len(matchers) > 0 { - return nil, errors.Errorf("matchers parameter is not implemented: %+v", matchers) - } - if r.version == FormatV1 { e, ok := r.postingsV1[name] if !ok { @@ -1497,18 +1492,27 @@ func (r *Reader) LabelValues(name string, matchers ...*labels.Matcher) ([]string } values := make([]string, 0, len(e)) for k := range e { - values = append(values, k) + isMatch := true + for _, m := range matchers { + if m.Name == name && !m.Matches(k) { + isMatch = false + break + } + } + + if isMatch { + values = append(values, k) + } } return values, nil } - e, ok := r.postings[name] - if !ok { - return nil, nil - } + + e := r.postings[name] if len(e) == 0 { return nil, nil } + values := make([]string, 0, len(e)*symbolFactor) d := encoding.NewDecbufAt(r.b, int(r.toc.PostingsTable), nil) @@ -1528,7 +1532,19 @@ func (r *Reader) LabelValues(name string, matchers ...*labels.Matcher) ([]string d.Skip(skip) } s := yoloString(d.UvarintBytes()) // Label value. - values = append(values, s) + + isMatch := true + // Try to exclude via matchers for the label name + for _, m := range matchers { + if m.Name == name && !m.Matches(s) { + isMatch = false + break + } + } + + if isMatch { + values = append(values, s) + } if s == lastVal { break } @@ -1647,8 +1663,8 @@ func (r *Reader) Postings(name string, values ...string) (Postings, error) { return Merge(res...), nil } - e, ok := r.postings[name] - if !ok { + e := r.postings[name] + if len(e) == 0 { return EmptyPostings(), nil } @@ -1725,6 +1741,71 @@ func (r *Reader) Postings(name string, values ...string) (Postings, error) { return Merge(res...), nil } +func (r *Reader) PostingsWithLabel(name string) (Postings, error) { + if r.version == FormatV1 { + e := r.postingsV1[name] + if len(e) == 0 { + return EmptyPostings(), nil + } + + var res []Postings + for _, off := range e { + // Read from the postings table. + d := encoding.NewDecbufAt(r.b, int(off), castagnoliTable) + _, p, err := r.dec.Postings(d.Get()) + if err != nil { + return nil, errors.Wrap(err, "decode postings") + } + res = append(res, p) + } + return Merge(res...), nil + } + + e := r.postings[name] + if len(e) == 0 { + return EmptyPostings(), nil + } + + d := encoding.NewDecbufAt(r.b, int(r.toc.PostingsTable), nil) + // Skip to start + d.Skip(e[0].off) + lastVal := e[len(e)-1].value + + skip := 0 + var res []Postings + for d.Err() == nil { + if skip == 0 { + // These are always the same number of bytes, + // and it's faster to skip than to parse. + skip = d.Len() + d.Uvarint() // Keycount. + d.UvarintBytes() // Label name. + skip -= d.Len() + } else { + d.Skip(skip) + } + v := yoloString(d.UvarintBytes()) // Label value. + + postingsOff := d.Uvarint64() + // Read from the postings table + d2 := encoding.NewDecbufAt(r.b, int(postingsOff), castagnoliTable) + _, p, err := r.dec.Postings(d2.Get()) + if err != nil { + return nil, errors.Wrap(err, "decode postings") + } + res = append(res, p) + + if v == lastVal { + break + } + } + if d.Err() != nil { + return nil, errors.Wrap(d.Err(), "get postings offset entry") + } + + return Merge(res...), nil +} + // SortedPostings returns the given postings list reordered so that the backing series // are sorted. func (r *Reader) SortedPostings(p Postings) Postings { diff --git a/tsdb/index/postings.go b/tsdb/index/postings.go index 9de86f5486..9936eaacf1 100644 --- a/tsdb/index/postings.go +++ b/tsdb/index/postings.go @@ -135,13 +135,24 @@ func (p *MemPostings) LabelNames() []string { } // LabelValues returns label values for the given name. -func (p *MemPostings) LabelValues(name string) []string { +func (p *MemPostings) LabelValues(name string, matchers ...*labels.Matcher) []string { p.mtx.RLock() defer p.mtx.RUnlock() values := make([]string, 0, len(p.m[name])) for v := range p.m[name] { - values = append(values, v) + isMatch := true + // Try to exclude this value through corresponding matchers + for _, m := range matchers { + if m.Name == name && !m.Matches(v) { + isMatch = false + break + } + } + + if isMatch { + values = append(values, v) + } } return values } @@ -216,6 +227,18 @@ func (p *MemPostings) Get(name, value string) Postings { return newListPostings(lp...) } +// GetWithLabel returns a postings list for the given label name. +func (p *MemPostings) GetWithLabel(name string) Postings { + p.mtx.RLock() + var ps []Postings + for _, srs := range p.m[name] { + ps = append(ps, newListPostings(srs...)) + } + p.mtx.RUnlock() + + return Merge(ps...) +} + // All returns a postings list over all documents ever added. func (p *MemPostings) All() Postings { return p.Get(AllPostingsKey()) diff --git a/tsdb/ooo_head_read.go b/tsdb/ooo_head_read.go index 33f774a8c2..00342fbbe9 100644 --- a/tsdb/ooo_head_read.go +++ b/tsdb/ooo_head_read.go @@ -212,6 +212,10 @@ func (oh *OOOHeadIndexReader) Postings(name string, values ...string) (index.Pos } } +func (oh *OOOHeadIndexReader) PostingsWithLabel(name string) (index.Postings, error) { + return oh.head.postings.GetWithLabel(name), nil +} + type OOOHeadChunkReader struct { head *Head mint, maxt int64 @@ -410,6 +414,10 @@ func (ir *OOOCompactionHeadIndexReader) Postings(name string, values ...string) return index.NewListPostings(ir.ch.postings), nil } +func (ir *OOOCompactionHeadIndexReader) PostingsWithLabel(name string) (index.Postings, error) { + return nil, errors.New("not implemented") +} + func (ir *OOOCompactionHeadIndexReader) SortedPostings(p index.Postings) index.Postings { // This will already be sorted from the Postings() call above. return p diff --git a/tsdb/postings_for_matchers_cache.go b/tsdb/postings_for_matchers_cache.go index 8892d7c2ef..4b29f75eac 100644 --- a/tsdb/postings_for_matchers_cache.go +++ b/tsdb/postings_for_matchers_cache.go @@ -25,6 +25,12 @@ type IndexPostingsReader interface { // Found IDs are not strictly required to point to a valid Series, e.g. // during background garbage collections. Input values must be sorted. Postings(name string, values ...string) (index.Postings, error) + + // PostingsWithLabel returns the postings list iterator for the label name. + // The Postings here contain the offsets to the series inside the index. + // Found IDs are not strictly required to point to a valid Series, e.g. + // during background garbage collections. + PostingsWithLabel(name string) (index.Postings, error) } // NewPostingsForMatchersCache creates a new PostingsForMatchersCache. diff --git a/tsdb/postings_for_matchers_cache_test.go b/tsdb/postings_for_matchers_cache_test.go index 819183c5eb..01617d6085 100644 --- a/tsdb/postings_for_matchers_cache_test.go +++ b/tsdb/postings_for_matchers_cache_test.go @@ -269,6 +269,10 @@ func (idx indexForPostingsMock) Postings(string, ...string) (index.Postings, err panic("implement me") } +func (idx indexForPostingsMock) PostingsWithLabel(string) (index.Postings, error) { + panic("implement me") +} + // timeNowMock offers a mockable time.Now() implementation // empty value is ready to be used, and it should not be copied (use a reference) type timeNowMock struct { diff --git a/tsdb/querier.go b/tsdb/querier.go index f54236cec4..29372dd351 100644 --- a/tsdb/querier.go +++ b/tsdb/querier.go @@ -19,6 +19,7 @@ import ( "github.com/oklog/ulid" "github.com/pkg/errors" + "golang.org/x/exp/maps" "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" @@ -196,6 +197,16 @@ func PostingsForMatchers(ix IndexPostingsReader, ms ...*labels.Matcher) (index.P return nil, err } its = append(its, allPostings) + case m.Type == labels.MatchSet: + // This is the special case of a label that must be set for the posting + it, err := ix.PostingsWithLabel(m.Name) + if err != nil { + return nil, err + } + if index.IsEmptyPostingsType(it) { + return index.EmptyPostings(), nil + } + its = append(its, it) case labelMustBeSet[m.Name]: // If this matcher must be non-empty, we can be smarter. matchesEmpty := m.Matches("") @@ -242,7 +253,7 @@ func PostingsForMatchers(ix IndexPostingsReader, ms ...*labels.Matcher) (index.P its = append(its, it) } default: // l="" - // If the matchers for a labelname selects an empty value, it selects all + // If a matcher for a labelname selects an empty value, it selects all // the series which don't have the label name set too. See: // https://github.com/prometheus/prometheus/issues/3575 and // https://github.com/prometheus/prometheus/pull/3578#issuecomment-351653555 @@ -349,110 +360,23 @@ func inversePostingsForMatcher(ix IndexPostingsReader, m *labels.Matcher) (index const maxExpandedPostingsFactor = 100 // Division factor for maximum number of matched series. func labelValuesWithMatchers(r IndexReader, name string, matchers ...*labels.Matcher) ([]string, error) { + // Make sure to intersect with series containing the label name + matchers = append(matchers, labels.MustNewMatcher(labels.MatchSet, name, "")) p, err := PostingsForMatchers(r, matchers...) if err != nil { return nil, errors.Wrap(err, "fetching postings for matchers") } - allValues, err := r.LabelValues(name) - if err != nil { - return nil, errors.Wrapf(err, "fetching values of label %s", name) - } - - // If we have a matcher for the label name, we can filter out values that don't match - // before we fetch postings. This is especially useful for labels with many values. - // e.g. __name__ with a selector like {__name__="xyz"} - for _, m := range matchers { - if m.Name != name { - continue - } - - // re-use the allValues slice to avoid allocations - // this is safe because the iteration is always ahead of the append - filteredValues := allValues[:0] - for _, v := range allValues { - if m.Matches(v) { - filteredValues = append(filteredValues, v) - } - } - allValues = filteredValues - } - - // Let's see if expanded postings for matchers have smaller cardinality than label values. - // Since computing label values from series is expensive, we apply a limit on number of expanded - // postings (and series). - maxExpandedPostings := len(allValues) / maxExpandedPostingsFactor - if maxExpandedPostings > 0 { - // Add space for one extra posting when checking if we expanded all postings. - expanded := make([]storage.SeriesRef, 0, maxExpandedPostings+1) - - // Call p.Next() even if len(expanded) == maxExpandedPostings. This tells us if there are more postings or not. - for len(expanded) <= maxExpandedPostings && p.Next() { - expanded = append(expanded, p.At()) - } - - if len(expanded) <= maxExpandedPostings { - // When we're here, p.Next() must have returned false, so we need to check for errors. - if err := p.Err(); err != nil { - return nil, errors.Wrap(err, "expanding postings for matchers") - } - - // We have expanded all the postings -- all returned label values will be from these series only. - // (We supply allValues as a buffer for storing results. It should be big enough already, since it holds all possible label values.) - return labelValuesFromSeries(r, name, expanded, allValues) - } - - // If we haven't reached end of postings, we prepend our expanded postings to "p", and continue. - p = newPrependPostings(expanded, p) - } - - valuesPostings := make([]index.Postings, len(allValues)) - for i, value := range allValues { - valuesPostings[i], err = r.Postings(name, value) - if err != nil { - return nil, errors.Wrapf(err, "fetching postings for %s=%q", name, value) - } - } - indexes, err := index.FindIntersectingPostings(p, valuesPostings) - if err != nil { - return nil, errors.Wrap(err, "intersecting postings") - } - - values := make([]string, 0, len(indexes)) - for _, idx := range indexes { - values = append(values, allValues[idx]) - } - - return values, nil -} - -// labelValuesFromSeries returns all unique label values from for given label name from supplied series. Values are not sorted. -// buf is space for holding result (if it isn't big enough, it will be ignored), may be nil. -func labelValuesFromSeries(r IndexReader, labelName string, refs []storage.SeriesRef, buf []string) ([]string, error) { values := map[string]struct{}{} - - var builder labels.ScratchBuilder - for _, ref := range refs { - err := r.Series(ref, &builder, nil) + for p.Next() { + v, err := r.LabelValueFor(p.At(), name) if err != nil { - return nil, errors.Wrapf(err, "label values for label %s", labelName) - } - - v := builder.Labels().Get(labelName) - if v != "" { - values[v] = struct{}{} + return nil, errors.Wrapf(err, "getting value for label %s from series %d", name, p.At()) } + values[v] = struct{}{} } - if cap(buf) >= len(values) { - buf = buf[:0] - } else { - buf = make([]string, 0, len(values)) - } - for v := range values { - buf = append(buf, v) - } - return buf, nil + return maps.Keys(values), nil } func newPrependPostings(a []storage.SeriesRef, b index.Postings) index.Postings { diff --git a/tsdb/querier_test.go b/tsdb/querier_test.go index af1b0f1b2f..7c5035d9bd 100644 --- a/tsdb/querier_test.go +++ b/tsdb/querier_test.go @@ -1548,6 +1548,10 @@ func (m mockIndex) Postings(name string, values ...string) (index.Postings, erro return index.Merge(res...), nil } +func (m mockIndex) PostingsWithLabel(name string) (index.Postings, error) { + return nil, errors.New("not implemented") +} + func (m mockIndex) SortedPostings(p index.Postings) index.Postings { ep, err := index.ExpandPostings(p) if err != nil { @@ -2413,6 +2417,10 @@ func (m mockMatcherIndex) PostingsForMatchers(bool, ...*labels.Matcher) (index.P return index.EmptyPostings(), nil } +func (m mockMatcherIndex) PostingsWithLabel(name string) (index.Postings, error) { + return index.EmptyPostings(), nil +} + func (m mockMatcherIndex) SortedPostings(p index.Postings) index.Postings { return index.EmptyPostings() }