Skip to content

Commit

Permalink
tsdb.IndexReader: Add LabelValuesStream method
Browse files Browse the repository at this point in the history
Signed-off-by: Arve Knudsen <[email protected]>
  • Loading branch information
aknuds1 committed Apr 26, 2024
1 parent 1c8702f commit ea571ef
Show file tree
Hide file tree
Showing 28 changed files with 1,488 additions and 164 deletions.
4 changes: 4 additions & 0 deletions promql/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,10 @@ func (*errQuerier) LabelValues(context.Context, string, ...*labels.Matcher) ([]s
return nil, nil, nil
}

func (*errQuerier) LabelValuesStream(context.Context, string, ...*labels.Matcher) storage.LabelValues {
return storage.EmptyLabelValues()
}

func (*errQuerier) LabelNames(context.Context, ...*labels.Matcher) ([]string, annotations.Annotations, error) {
return nil, nil, nil
}
Expand Down
4 changes: 4 additions & 0 deletions storage/fanout_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,10 @@ func (errQuerier) LabelValues(context.Context, string, ...*labels.Matcher) ([]st
return nil, nil, errors.New("label values error")
}

func (errQuerier) LabelValuesStream(context.Context, string, ...*labels.Matcher) storage.LabelValues {
return storage.ErrLabelValues(errors.New("label values stream error"))
}

func (errQuerier) LabelNames(context.Context, ...*labels.Matcher) ([]string, annotations.Annotations, error) {
return nil, nil, errors.New("label names error")
}
Expand Down
10 changes: 10 additions & 0 deletions storage/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,10 @@ func (q *MockQuerier) LabelValues(context.Context, string, ...*labels.Matcher) (
return nil, nil, nil
}

func (q *MockQuerier) LabelValuesStream(context.Context, string, ...*labels.Matcher) LabelValues {
return EmptyLabelValues()
}

func (q *MockQuerier) LabelNames(context.Context, ...*labels.Matcher) ([]string, annotations.Annotations, error) {
return nil, nil, nil
}
Expand Down Expand Up @@ -163,6 +167,12 @@ type LabelQuerier interface {
// to label values of metrics matching the matchers.
LabelValues(ctx context.Context, name string, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error)

// LabelValuesStream returns an iterator over all potential values for a label name.
// It is not safe to use the strings beyond the lifetime of the querier.
// If matchers are specified the returned result set is reduced
// to label values of metrics matching the matchers.
LabelValuesStream(ctx context.Context, name string, matchers ...*labels.Matcher) LabelValues

// LabelNames returns all the unique label names present in the block in sorted order.
// If matchers are specified the returned result set is reduced
// to label names of metrics matching the matchers.
Expand Down
14 changes: 8 additions & 6 deletions storage/labelvalues.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,15 @@ func EmptyLabelValues() LabelValues {

// ListLabelValues is an iterator over a slice of label values.
type ListLabelValues struct {
cur string
values []string
cur string
values []string
warnings annotations.Annotations
}

func NewListLabelValues(values []string) *ListLabelValues {
func NewListLabelValues(values []string, warnings annotations.Annotations) *ListLabelValues {
return &ListLabelValues{
values: values,
values: values,
warnings: warnings,
}
}

Expand All @@ -63,8 +65,8 @@ func (*ListLabelValues) Err() error {
return nil
}

func (*ListLabelValues) Warnings() annotations.Annotations {
return nil
func (l *ListLabelValues) Warnings() annotations.Annotations {
return l.warnings
}

func (*ListLabelValues) Close() error {
Expand Down
6 changes: 3 additions & 3 deletions storage/labelvalues_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
func TestListLabelValues(t *testing.T) {
t.Run("lets you traverse a slice of label values", func(t *testing.T) {
input := []string{"a", "b", "c", "d"}
it := NewListLabelValues(input)
it := NewListLabelValues(input, nil)
t.Cleanup(func() {
require.NoError(t, it.Close())
})
Expand All @@ -25,7 +25,7 @@ func TestListLabelValues(t *testing.T) {
})

t.Run("can be initialized with an empty slice", func(t *testing.T) {
it := NewListLabelValues([]string{})
it := NewListLabelValues([]string{}, nil)
t.Cleanup(func() {
require.NoError(t, it.Close())
})
Expand All @@ -36,7 +36,7 @@ func TestListLabelValues(t *testing.T) {
})

t.Run("can be initialized with a nil slice", func(t *testing.T) {
it := NewListLabelValues(nil)
it := NewListLabelValues(nil, nil)
t.Cleanup(func() {
require.NoError(t, it.Close())
})
Expand Down
69 changes: 69 additions & 0 deletions storage/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,16 @@ import (
"math"
"slices"
"sync"
"unicode/utf8"

"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/tsdb/chunks"
tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
"github.com/prometheus/prometheus/util/annotations"

"github.com/bboreham/go-loser"
)

type mergeGenericQuerier struct {
Expand Down Expand Up @@ -220,6 +223,72 @@ func mergeStrings(a, b []string) []string {
return res
}

// LabelValuesStream implements LabelQuerier.
func (q *mergeGenericQuerier) LabelValuesStream(ctx context.Context, name string, matchers ...*labels.Matcher) LabelValues {
if len(q.queriers) == 0 {
return EmptyLabelValues()
}
if len(q.queriers) == 1 {
return q.queriers[0].LabelValuesStream(ctx, name, matchers...)
}

its := make([]LabelValues, 0, len(q.queriers))
for _, sq := range q.queriers {
its = append(its, sq.LabelValuesStream(ctx, name, matchers...))
}

lt := loser.New(its, string(utf8.MaxRune))
return &mergedLabelValues{
lt: lt,
lvs: its,
}
}

// mergedLabelValues is a label values iterator merging a collection of sub-iterators.
type mergedLabelValues struct {
lt *loser.Tree[string, LabelValues]
lvs []LabelValues
cur string
}

func (m *mergedLabelValues) Next() bool {
for m.lt.Next() {
// Remove duplicate entries
at := m.lt.At()
if at != m.cur {
m.cur = at
return true
}
}

return false
}

func (m *mergedLabelValues) At() string {
return m.cur
}

func (m *mergedLabelValues) Err() error {
for _, lv := range m.lvs {
if err := lv.Err(); err != nil {
return err
}
}
return nil
}

func (m *mergedLabelValues) Warnings() annotations.Annotations {
var warnings annotations.Annotations
for _, lv := range m.lvs {
warnings = warnings.Merge(lv.Warnings())
}
return warnings
}

func (m *mergedLabelValues) Close() error {
return nil
}

// LabelNames returns all the unique label names present in all queriers in sorted order.
func (q *mergeGenericQuerier) LabelNames(ctx context.Context, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) {
var (
Expand Down
56 changes: 50 additions & 6 deletions storage/merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1386,6 +1386,23 @@ func (m *mockGenericQuerier) LabelValues(_ context.Context, name string, matcher
return m.resp, m.warnings, m.err
}

func (m *mockGenericQuerier) LabelValuesStream(_ context.Context, name string, matchers ...*labels.Matcher) LabelValues {
m.mtx.Lock()
m.labelNamesRequested = append(m.labelNamesRequested, labelNameRequest{
name: name,
matchers: matchers,
})
m.mtx.Unlock()

if m.err == nil {
return NewListLabelValues(m.resp, m.warnings)
}
return errLabelValues{
err: m.err,
warnings: m.warnings,
}
}

func (m *mockGenericQuerier) LabelNames(context.Context, ...*labels.Matcher) ([]string, annotations.Annotations, error) {
m.mtx.Lock()
m.labelNamesCalls++
Expand Down Expand Up @@ -1455,8 +1472,9 @@ func TestMergeGenericQuerierWithSecondaries_ErrorHandling(t *testing.T) {
expectedLabels []string

expectedWarnings annotations.Annotations
expectedErrs [4]error
expectedErrs [5]error
}{
{},
{
name: "one successful primary querier",
queriers: []genericQuerier{&mockGenericQuerier{resp: []string{"a", "b"}, warnings: nil, err: nil}},
Expand All @@ -1482,7 +1500,7 @@ func TestMergeGenericQuerierWithSecondaries_ErrorHandling(t *testing.T) {
{
name: "one failed primary querier",
queriers: []genericQuerier{&mockGenericQuerier{warnings: nil, err: errStorage}},
expectedErrs: [4]error{errStorage, errStorage, errStorage, errStorage},
expectedErrs: [5]error{errStorage, errStorage, errStorage, errStorage, errStorage},
},
{
name: "one successful primary querier with successful secondaries",
Expand Down Expand Up @@ -1518,7 +1536,7 @@ func TestMergeGenericQuerierWithSecondaries_ErrorHandling(t *testing.T) {
&secondaryQuerier{genericQuerier: &mockGenericQuerier{resp: []string{"b"}, warnings: nil, err: nil}},
&secondaryQuerier{genericQuerier: &mockGenericQuerier{resp: []string{"c"}, warnings: nil, err: nil}},
},
expectedErrs: [4]error{errStorage, errStorage, errStorage, errStorage},
expectedErrs: [5]error{errStorage, errStorage, errStorage, errStorage, errStorage},
},
{
name: "one successful primary querier with failed secondaries",
Expand Down Expand Up @@ -1604,11 +1622,36 @@ func TestMergeGenericQuerierWithSecondaries_ErrorHandling(t *testing.T) {
require.Equal(t, []labelNameRequest{{name: "test"}}, m.labelNamesRequested)
}
})
t.Run("LabelValuesStream", func(t *testing.T) {
it := q.LabelValuesStream(ctx, "test2")
require.NotNil(t, it)
var res []string
for it.Next() {
res = append(res, it.At())
}
require.Equal(t, tcase.expectedWarnings, it.Warnings())
err := it.Err()
require.ErrorIs(t, err, tcase.expectedErrs[3], "expected error doesn't match")
if err != nil {
return
}

require.Equal(t, tcase.expectedLabels, res)

for _, qr := range q.queriers {
m := unwrapMockGenericQuerier(t, qr)

require.Equal(t, []labelNameRequest{
{name: "test"},
{name: "test2"},
}, m.labelNamesRequested)
}
})
t.Run("LabelValuesWithMatchers", func(t *testing.T) {
matcher := labels.MustNewMatcher(labels.MatchEqual, "otherLabel", "someValue")
res, w, err := q.LabelValues(ctx, "test2", matcher)
res, w, err := q.LabelValues(ctx, "test3", matcher)
require.Subset(t, tcase.expectedWarnings, w)
require.ErrorIs(t, err, tcase.expectedErrs[3], "expected error doesn't match")
require.ErrorIs(t, err, tcase.expectedErrs[4], "expected error doesn't match")
require.Equal(t, tcase.expectedLabels, res)

if err != nil {
Expand All @@ -1619,7 +1662,8 @@ func TestMergeGenericQuerierWithSecondaries_ErrorHandling(t *testing.T) {

require.Equal(t, []labelNameRequest{
{name: "test"},
{name: "test2", matchers: []*labels.Matcher{matcher}},
{name: "test2"},
{name: "test3", matchers: []*labels.Matcher{matcher}},
}, m.labelNamesRequested)
}
})
Expand Down
8 changes: 8 additions & 0 deletions storage/noop.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ func (noopQuerier) LabelValues(context.Context, string, ...*labels.Matcher) ([]s
return nil, nil, nil
}

func (noopQuerier) LabelValuesStream(context.Context, string, ...*labels.Matcher) LabelValues {
return EmptyLabelValues()
}

func (noopQuerier) LabelNames(context.Context, ...*labels.Matcher) ([]string, annotations.Annotations, error) {
return nil, nil, nil
}
Expand All @@ -58,6 +62,10 @@ func (noopChunkQuerier) LabelValues(context.Context, string, ...*labels.Matcher)
return nil, nil, nil
}

func (noopChunkQuerier) LabelValuesStream(context.Context, string, ...*labels.Matcher) LabelValues {
return EmptyLabelValues()
}

func (noopChunkQuerier) LabelNames(context.Context, ...*labels.Matcher) ([]string, annotations.Annotations, error) {
return nil, nil, nil
}
Expand Down
8 changes: 7 additions & 1 deletion storage/remote/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,12 +209,18 @@ func (q querier) addExternalLabels(ms []*labels.Matcher) ([]*labels.Matcher, []s
return ms, names
}

// LabelValues implements storage.Querier and is a noop.
// LabelValues implements storage.LabelQuerier and is a noop.
func (q *querier) LabelValues(context.Context, string, ...*labels.Matcher) ([]string, annotations.Annotations, error) {
// TODO: Implement: https://github.com/prometheus/prometheus/issues/3351
return nil, nil, errors.New("not implemented")
}

// LabelValuesStream implements storage.LabelQuerier and is a noop.
func (q *querier) LabelValuesStream(context.Context, string, ...*labels.Matcher) storage.LabelValues {
// TODO: Implement: https://github.com/prometheus/prometheus/issues/3351
return storage.ErrLabelValues(errors.New("not implemented"))
}

// LabelNames implements storage.Querier and is a noop.
func (q *querier) LabelNames(context.Context, ...*labels.Matcher) ([]string, annotations.Annotations, error) {
// TODO: Implement: https://github.com/prometheus/prometheus/issues/3351
Expand Down
22 changes: 22 additions & 0 deletions storage/secondary.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,28 @@ func (s *secondaryQuerier) LabelValues(ctx context.Context, name string, matcher
return vals, w, nil
}

func (s *secondaryQuerier) LabelValuesStream(ctx context.Context, name string, matchers ...*labels.Matcher) LabelValues {
return &secondaryLabelValues{
s.genericQuerier.LabelValuesStream(ctx, name, matchers...),
}
}

type secondaryLabelValues struct {
LabelValues
}

func (s *secondaryLabelValues) Err() error {
return nil
}

func (s *secondaryLabelValues) Warnings() annotations.Annotations {
ws := s.LabelValues.Warnings()
if s.LabelValues.Err() != nil {
ws.Add(s.LabelValues.Err())
}
return ws
}

func (s *secondaryQuerier) LabelNames(ctx context.Context, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) {
names, w, err := s.genericQuerier.LabelNames(ctx, matchers...)
if err != nil {
Expand Down
Loading

0 comments on commit ea571ef

Please sign in to comment.