Skip to content

Commit

Permalink
Optimize storage.LabelQuerier.LabelValues implementations
Browse files Browse the repository at this point in the history
Signed-off-by: Arve Knudsen <[email protected]>
  • Loading branch information
aknuds1 committed Aug 25, 2023
1 parent 002ae0a commit 1dc6d0c
Show file tree
Hide file tree
Showing 10 changed files with 188 additions and 113 deletions.
13 changes: 10 additions & 3 deletions model/labels/matcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,20 @@ const (
MatchNotEqual
MatchRegexp
MatchNotRegexp
// MatchSet is a special type for internal use, that matches series with a certain label name.
MatchSet
)

var matchTypeToStr = [...]string{
MatchEqual: "=",
MatchNotEqual: "!=",
MatchRegexp: "=~",
MatchNotRegexp: "!~",
MatchSet: "[isSet]",
}

func (m MatchType) String() string {
if m < MatchEqual || m > MatchNotRegexp {
if m < MatchEqual || m > MatchSet {
panic("unknown match type")
}
return matchTypeToStr[m]
Expand Down Expand Up @@ -92,8 +95,11 @@ func (m *Matcher) Matches(s string) bool {
return m.re.MatchString(s)
case MatchNotRegexp:
return !m.re.MatchString(s)
case MatchSet:
return true
default:
panic("labels.Matcher.Matches: invalid match type")
}
panic("labels.Matcher.Matches: invalid match type")
}

// Inverse returns a matcher that matches the opposite.
Expand All @@ -107,8 +113,9 @@ func (m *Matcher) Inverse() (*Matcher, error) {
return NewMatcher(MatchNotRegexp, m.Name, m.Value)
case MatchNotRegexp:
return NewMatcher(MatchRegexp, m.Name, m.Value)
default:
panic("labels.Matcher.Matches: invalid match type")
}
panic("labels.Matcher.Matches: invalid match type")
}

// GetRegexString returns the regex string.
Expand Down
10 changes: 10 additions & 0 deletions tsdb/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,12 @@ type IndexReader interface {
// avoiding same calculations twice, however this implementation may lead to a worse performance when called once.
PostingsForMatchers(concurrent bool, ms ...*labels.Matcher) (index.Postings, error)

// PostingsWithLabel returns the postings list iterator for the label name.
// The Postings here contain the offsets to the series inside the index.
// Found IDs are not strictly required to point to a valid Series, e.g.
// during background garbage collections.
PostingsWithLabel(name string) (index.Postings, error)

// SortedPostings returns a postings list that is reordered to be sorted
// by the label set of the underlying series.
SortedPostings(index.Postings) index.Postings
Expand Down Expand Up @@ -501,6 +507,10 @@ func (r blockIndexReader) LabelValues(name string, matchers ...*labels.Matcher)
return labelValuesWithMatchers(r.ir, name, matchers...)
}

func (r blockIndexReader) PostingsWithLabel(name string) (index.Postings, error) {
return r.ir.PostingsWithLabel(name)
}

func (r blockIndexReader) LabelNames(matchers ...*labels.Matcher) ([]string, error) {
if len(matchers) == 0 {
return r.b.LabelNames()
Expand Down
4 changes: 4 additions & 0 deletions tsdb/head_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,10 @@ func (h *headIndexReader) PostingsForMatchers(concurrent bool, ms ...*labels.Mat
return h.head.pfmc.PostingsForMatchers(h, concurrent, ms...)
}

func (h *headIndexReader) PostingsWithLabel(name string) (index.Postings, error) {
return h.head.postings.GetWithLabel(name), nil
}

func (h *headIndexReader) SortedPostings(p index.Postings) index.Postings {
series := make([]*memSeries, 0, 128)

Expand Down
107 changes: 94 additions & 13 deletions tsdb/index/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -1484,31 +1484,35 @@ func (r *Reader) SortedLabelValues(name string, matchers ...*labels.Matcher) ([]
// LabelValues returns value tuples that exist for the given label name.
// It is not safe to use the return value beyond the lifetime of the byte slice
// passed into the Reader.
// TODO(replay): Support filtering by matchers
func (r *Reader) LabelValues(name string, matchers ...*labels.Matcher) ([]string, error) {
if len(matchers) > 0 {
return nil, errors.Errorf("matchers parameter is not implemented: %+v", matchers)
}

if r.version == FormatV1 {
e, ok := r.postingsV1[name]
if !ok {
return nil, nil
}
values := make([]string, 0, len(e))
for k := range e {
values = append(values, k)
isMatch := true
for _, m := range matchers {
if m.Name == name && !m.Matches(k) {
isMatch = false
break
}
}

if isMatch {
values = append(values, k)
}
}
return values, nil

}
e, ok := r.postings[name]
if !ok {
return nil, nil
}

e := r.postings[name]
if len(e) == 0 {
return nil, nil
}

values := make([]string, 0, len(e)*symbolFactor)

d := encoding.NewDecbufAt(r.b, int(r.toc.PostingsTable), nil)
Expand All @@ -1528,7 +1532,19 @@ func (r *Reader) LabelValues(name string, matchers ...*labels.Matcher) ([]string
d.Skip(skip)
}
s := yoloString(d.UvarintBytes()) // Label value.
values = append(values, s)

isMatch := true
// Try to exclude via matchers for the label name
for _, m := range matchers {
if m.Name == name && !m.Matches(s) {
isMatch = false
break
}
}

if isMatch {
values = append(values, s)
}
if s == lastVal {
break
}
Expand Down Expand Up @@ -1647,8 +1663,8 @@ func (r *Reader) Postings(name string, values ...string) (Postings, error) {
return Merge(res...), nil
}

e, ok := r.postings[name]
if !ok {
e := r.postings[name]
if len(e) == 0 {
return EmptyPostings(), nil
}

Expand Down Expand Up @@ -1725,6 +1741,71 @@ func (r *Reader) Postings(name string, values ...string) (Postings, error) {
return Merge(res...), nil
}

func (r *Reader) PostingsWithLabel(name string) (Postings, error) {
if r.version == FormatV1 {
e := r.postingsV1[name]
if len(e) == 0 {
return EmptyPostings(), nil
}

var res []Postings
for _, off := range e {
// Read from the postings table.
d := encoding.NewDecbufAt(r.b, int(off), castagnoliTable)
_, p, err := r.dec.Postings(d.Get())
if err != nil {
return nil, errors.Wrap(err, "decode postings")
}
res = append(res, p)
}
return Merge(res...), nil
}

e := r.postings[name]
if len(e) == 0 {
return EmptyPostings(), nil
}

d := encoding.NewDecbufAt(r.b, int(r.toc.PostingsTable), nil)
// Skip to start
d.Skip(e[0].off)
lastVal := e[len(e)-1].value

skip := 0
var res []Postings
for d.Err() == nil {
if skip == 0 {
// These are always the same number of bytes,
// and it's faster to skip than to parse.
skip = d.Len()
d.Uvarint() // Keycount.
d.UvarintBytes() // Label name.
skip -= d.Len()
} else {
d.Skip(skip)
}
v := yoloString(d.UvarintBytes()) // Label value.

postingsOff := d.Uvarint64()
// Read from the postings table
d2 := encoding.NewDecbufAt(r.b, int(postingsOff), castagnoliTable)
_, p, err := r.dec.Postings(d2.Get())
if err != nil {
return nil, errors.Wrap(err, "decode postings")
}
res = append(res, p)

if v == lastVal {
break
}
}
if d.Err() != nil {
return nil, errors.Wrap(d.Err(), "get postings offset entry")
}

return Merge(res...), nil
}

// SortedPostings returns the given postings list reordered so that the backing series
// are sorted.
func (r *Reader) SortedPostings(p Postings) Postings {
Expand Down
27 changes: 25 additions & 2 deletions tsdb/index/postings.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,13 +135,24 @@ func (p *MemPostings) LabelNames() []string {
}

// LabelValues returns label values for the given name.
func (p *MemPostings) LabelValues(name string) []string {
func (p *MemPostings) LabelValues(name string, matchers ...*labels.Matcher) []string {
p.mtx.RLock()
defer p.mtx.RUnlock()

values := make([]string, 0, len(p.m[name]))
for v := range p.m[name] {
values = append(values, v)
isMatch := true
// Try to exclude this value through corresponding matchers
for _, m := range matchers {
if m.Name == name && !m.Matches(v) {
isMatch = false
break
}
}

if isMatch {
values = append(values, v)
}
}
return values
}
Expand Down Expand Up @@ -216,6 +227,18 @@ func (p *MemPostings) Get(name, value string) Postings {
return newListPostings(lp...)
}

// GetWithLabel returns a postings list for the given label name.
func (p *MemPostings) GetWithLabel(name string) Postings {
p.mtx.RLock()
var ps []Postings
for _, srs := range p.m[name] {
ps = append(ps, newListPostings(srs...))
}
p.mtx.RUnlock()

return Merge(ps...)
}

// All returns a postings list over all documents ever added.
func (p *MemPostings) All() Postings {
return p.Get(AllPostingsKey())
Expand Down
8 changes: 8 additions & 0 deletions tsdb/ooo_head_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,10 @@ func (oh *OOOHeadIndexReader) Postings(name string, values ...string) (index.Pos
}
}

func (oh *OOOHeadIndexReader) PostingsWithLabel(name string) (index.Postings, error) {
return oh.head.postings.GetWithLabel(name), nil
}

type OOOHeadChunkReader struct {
head *Head
mint, maxt int64
Expand Down Expand Up @@ -410,6 +414,10 @@ func (ir *OOOCompactionHeadIndexReader) Postings(name string, values ...string)
return index.NewListPostings(ir.ch.postings), nil
}

func (ir *OOOCompactionHeadIndexReader) PostingsWithLabel(name string) (index.Postings, error) {
return nil, errors.New("not implemented")
}

func (ir *OOOCompactionHeadIndexReader) SortedPostings(p index.Postings) index.Postings {
// This will already be sorted from the Postings() call above.
return p
Expand Down
6 changes: 6 additions & 0 deletions tsdb/postings_for_matchers_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,12 @@ type IndexPostingsReader interface {
// Found IDs are not strictly required to point to a valid Series, e.g.
// during background garbage collections. Input values must be sorted.
Postings(name string, values ...string) (index.Postings, error)

// PostingsWithLabel returns the postings list iterator for the label name.
// The Postings here contain the offsets to the series inside the index.
// Found IDs are not strictly required to point to a valid Series, e.g.
// during background garbage collections.
PostingsWithLabel(name string) (index.Postings, error)
}

// NewPostingsForMatchersCache creates a new PostingsForMatchersCache.
Expand Down
4 changes: 4 additions & 0 deletions tsdb/postings_for_matchers_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,10 @@ func (idx indexForPostingsMock) Postings(string, ...string) (index.Postings, err
panic("implement me")
}

func (idx indexForPostingsMock) PostingsWithLabel(string) (index.Postings, error) {
panic("implement me")
}

// timeNowMock offers a mockable time.Now() implementation
// empty value is ready to be used, and it should not be copied (use a reference)
type timeNowMock struct {
Expand Down
Loading

0 comments on commit 1dc6d0c

Please sign in to comment.