Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add LabelQuerier.LabelValuesStream method #517

Draft
wants to merge 14 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions promql/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,10 @@ func (*errQuerier) LabelValues(context.Context, string, ...*labels.Matcher) ([]s
return nil, nil, nil
}

func (*errQuerier) LabelValuesStream(context.Context, string, ...*labels.Matcher) storage.LabelValues {
return storage.EmptyLabelValues()
}

func (*errQuerier) LabelNames(context.Context, ...*labels.Matcher) ([]string, annotations.Annotations, error) {
return nil, nil, nil
}
Expand Down
4 changes: 4 additions & 0 deletions storage/fanout_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,10 @@ func (errQuerier) LabelValues(context.Context, string, ...*labels.Matcher) ([]st
return nil, nil, errors.New("label values error")
}

func (errQuerier) LabelValuesStream(context.Context, string, ...*labels.Matcher) storage.LabelValues {
return storage.ErrLabelValues(errors.New("label values stream error"))
}

func (errQuerier) LabelNames(context.Context, ...*labels.Matcher) ([]string, annotations.Annotations, error) {
return nil, nil, errors.New("label names error")
}
Expand Down
10 changes: 10 additions & 0 deletions storage/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,10 @@ func (q *MockQuerier) LabelValues(context.Context, string, ...*labels.Matcher) (
return nil, nil, nil
}

func (q *MockQuerier) LabelValuesStream(context.Context, string, ...*labels.Matcher) LabelValues {
return EmptyLabelValues()
}

func (q *MockQuerier) LabelNames(context.Context, ...*labels.Matcher) ([]string, annotations.Annotations, error) {
return nil, nil, nil
}
Expand Down Expand Up @@ -163,6 +167,12 @@ type LabelQuerier interface {
// to label values of metrics matching the matchers.
LabelValues(ctx context.Context, name string, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error)

// LabelValuesStream returns an iterator over all potential values for a label name.
// It is not safe to use the strings beyond the lifetime of the querier.
// If matchers are specified the returned result set is reduced
// to label values of metrics matching the matchers.
LabelValuesStream(ctx context.Context, name string, matchers ...*labels.Matcher) LabelValues

// LabelNames returns all the unique label names present in the block in sorted order.
// If matchers are specified the returned result set is reduced
// to label names of metrics matching the matchers.
Expand Down
14 changes: 8 additions & 6 deletions storage/labelvalues.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,15 @@ func EmptyLabelValues() LabelValues {

// ListLabelValues is an iterator over a slice of label values.
type ListLabelValues struct {
cur string
values []string
cur string
values []string
warnings annotations.Annotations
}

func NewListLabelValues(values []string) *ListLabelValues {
func NewListLabelValues(values []string, warnings annotations.Annotations) *ListLabelValues {
return &ListLabelValues{
values: values,
values: values,
warnings: warnings,
}
}

Expand All @@ -63,8 +65,8 @@ func (*ListLabelValues) Err() error {
return nil
}

func (*ListLabelValues) Warnings() annotations.Annotations {
return nil
func (l *ListLabelValues) Warnings() annotations.Annotations {
return l.warnings
}

func (*ListLabelValues) Close() error {
Expand Down
6 changes: 3 additions & 3 deletions storage/labelvalues_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
func TestListLabelValues(t *testing.T) {
t.Run("lets you traverse a slice of label values", func(t *testing.T) {
input := []string{"a", "b", "c", "d"}
it := NewListLabelValues(input)
it := NewListLabelValues(input, nil)
t.Cleanup(func() {
require.NoError(t, it.Close())
})
Expand All @@ -25,7 +25,7 @@ func TestListLabelValues(t *testing.T) {
})

t.Run("can be initialized with an empty slice", func(t *testing.T) {
it := NewListLabelValues([]string{})
it := NewListLabelValues([]string{}, nil)
t.Cleanup(func() {
require.NoError(t, it.Close())
})
Expand All @@ -36,7 +36,7 @@ func TestListLabelValues(t *testing.T) {
})

t.Run("can be initialized with a nil slice", func(t *testing.T) {
it := NewListLabelValues(nil)
it := NewListLabelValues(nil, nil)
t.Cleanup(func() {
require.NoError(t, it.Close())
})
Expand Down
69 changes: 69 additions & 0 deletions storage/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,16 @@ import (
"math"
"slices"
"sync"
"unicode/utf8"

"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/tsdb/chunks"
tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
"github.com/prometheus/prometheus/util/annotations"

"github.com/bboreham/go-loser"
)

type mergeGenericQuerier struct {
Expand Down Expand Up @@ -220,6 +223,72 @@ func mergeStrings(a, b []string) []string {
return res
}

// LabelValuesStream implements LabelQuerier.
func (q *mergeGenericQuerier) LabelValuesStream(ctx context.Context, name string, matchers ...*labels.Matcher) LabelValues {
if len(q.queriers) == 0 {
return EmptyLabelValues()
}
if len(q.queriers) == 1 {
return q.queriers[0].LabelValuesStream(ctx, name, matchers...)
}

its := make([]LabelValues, 0, len(q.queriers))
for _, sq := range q.queriers {
its = append(its, sq.LabelValuesStream(ctx, name, matchers...))
}

lt := loser.New(its, string(utf8.MaxRune))
return &mergedLabelValues{
lt: lt,
lvs: its,
}
}

// mergedLabelValues is a label values iterator merging a collection of sub-iterators.
type mergedLabelValues struct {
lt *loser.Tree[string, LabelValues]
lvs []LabelValues
cur string
}

func (m *mergedLabelValues) Next() bool {
for m.lt.Next() {
// Remove duplicate entries
at := m.lt.At()
if at != m.cur {
m.cur = at
return true
}
}

return false
}

func (m *mergedLabelValues) At() string {
return m.cur
}

func (m *mergedLabelValues) Err() error {
for _, lv := range m.lvs {
if err := lv.Err(); err != nil {
return err
}
}
return nil
}

func (m *mergedLabelValues) Warnings() annotations.Annotations {
var warnings annotations.Annotations
for _, lv := range m.lvs {
warnings = warnings.Merge(lv.Warnings())
}
return warnings
}

func (m *mergedLabelValues) Close() error {
return nil
}

// LabelNames returns all the unique label names present in all queriers in sorted order.
func (q *mergeGenericQuerier) LabelNames(ctx context.Context, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) {
var (
Expand Down
56 changes: 50 additions & 6 deletions storage/merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1386,6 +1386,23 @@ func (m *mockGenericQuerier) LabelValues(_ context.Context, name string, matcher
return m.resp, m.warnings, m.err
}

func (m *mockGenericQuerier) LabelValuesStream(_ context.Context, name string, matchers ...*labels.Matcher) LabelValues {
m.mtx.Lock()
m.labelNamesRequested = append(m.labelNamesRequested, labelNameRequest{
name: name,
matchers: matchers,
})
m.mtx.Unlock()

if m.err == nil {
return NewListLabelValues(m.resp, m.warnings)
}
return errLabelValues{
err: m.err,
warnings: m.warnings,
}
}

func (m *mockGenericQuerier) LabelNames(context.Context, ...*labels.Matcher) ([]string, annotations.Annotations, error) {
m.mtx.Lock()
m.labelNamesCalls++
Expand Down Expand Up @@ -1455,8 +1472,9 @@ func TestMergeGenericQuerierWithSecondaries_ErrorHandling(t *testing.T) {
expectedLabels []string

expectedWarnings annotations.Annotations
expectedErrs [4]error
expectedErrs [5]error
}{
{},
{
name: "one successful primary querier",
queriers: []genericQuerier{&mockGenericQuerier{resp: []string{"a", "b"}, warnings: nil, err: nil}},
Expand All @@ -1482,7 +1500,7 @@ func TestMergeGenericQuerierWithSecondaries_ErrorHandling(t *testing.T) {
{
name: "one failed primary querier",
queriers: []genericQuerier{&mockGenericQuerier{warnings: nil, err: errStorage}},
expectedErrs: [4]error{errStorage, errStorage, errStorage, errStorage},
expectedErrs: [5]error{errStorage, errStorage, errStorage, errStorage, errStorage},
},
{
name: "one successful primary querier with successful secondaries",
Expand Down Expand Up @@ -1518,7 +1536,7 @@ func TestMergeGenericQuerierWithSecondaries_ErrorHandling(t *testing.T) {
&secondaryQuerier{genericQuerier: &mockGenericQuerier{resp: []string{"b"}, warnings: nil, err: nil}},
&secondaryQuerier{genericQuerier: &mockGenericQuerier{resp: []string{"c"}, warnings: nil, err: nil}},
},
expectedErrs: [4]error{errStorage, errStorage, errStorage, errStorage},
expectedErrs: [5]error{errStorage, errStorage, errStorage, errStorage, errStorage},
},
{
name: "one successful primary querier with failed secondaries",
Expand Down Expand Up @@ -1604,11 +1622,36 @@ func TestMergeGenericQuerierWithSecondaries_ErrorHandling(t *testing.T) {
require.Equal(t, []labelNameRequest{{name: "test"}}, m.labelNamesRequested)
}
})
t.Run("LabelValuesStream", func(t *testing.T) {
it := q.LabelValuesStream(ctx, "test2")
require.NotNil(t, it)
var res []string
for it.Next() {
res = append(res, it.At())
}
require.Equal(t, tcase.expectedWarnings, it.Warnings())
err := it.Err()
require.ErrorIs(t, err, tcase.expectedErrs[3], "expected error doesn't match")
if err != nil {
return
}

require.Equal(t, tcase.expectedLabels, res)

for _, qr := range q.queriers {
m := unwrapMockGenericQuerier(t, qr)

require.Equal(t, []labelNameRequest{
{name: "test"},
{name: "test2"},
}, m.labelNamesRequested)
}
})
t.Run("LabelValuesWithMatchers", func(t *testing.T) {
matcher := labels.MustNewMatcher(labels.MatchEqual, "otherLabel", "someValue")
res, w, err := q.LabelValues(ctx, "test2", matcher)
res, w, err := q.LabelValues(ctx, "test3", matcher)
require.Subset(t, tcase.expectedWarnings, w)
require.ErrorIs(t, err, tcase.expectedErrs[3], "expected error doesn't match")
require.ErrorIs(t, err, tcase.expectedErrs[4], "expected error doesn't match")
require.Equal(t, tcase.expectedLabels, res)

if err != nil {
Expand All @@ -1619,7 +1662,8 @@ func TestMergeGenericQuerierWithSecondaries_ErrorHandling(t *testing.T) {

require.Equal(t, []labelNameRequest{
{name: "test"},
{name: "test2", matchers: []*labels.Matcher{matcher}},
{name: "test2"},
{name: "test3", matchers: []*labels.Matcher{matcher}},
}, m.labelNamesRequested)
}
})
Expand Down
8 changes: 8 additions & 0 deletions storage/noop.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ func (noopQuerier) LabelValues(context.Context, string, ...*labels.Matcher) ([]s
return nil, nil, nil
}

func (noopQuerier) LabelValuesStream(context.Context, string, ...*labels.Matcher) LabelValues {
return EmptyLabelValues()
}

func (noopQuerier) LabelNames(context.Context, ...*labels.Matcher) ([]string, annotations.Annotations, error) {
return nil, nil, nil
}
Expand All @@ -58,6 +62,10 @@ func (noopChunkQuerier) LabelValues(context.Context, string, ...*labels.Matcher)
return nil, nil, nil
}

func (noopChunkQuerier) LabelValuesStream(context.Context, string, ...*labels.Matcher) LabelValues {
return EmptyLabelValues()
}

func (noopChunkQuerier) LabelNames(context.Context, ...*labels.Matcher) ([]string, annotations.Annotations, error) {
return nil, nil, nil
}
Expand Down
8 changes: 7 additions & 1 deletion storage/remote/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,12 +209,18 @@ func (q querier) addExternalLabels(ms []*labels.Matcher) ([]*labels.Matcher, []s
return ms, names
}

// LabelValues implements storage.Querier and is a noop.
// LabelValues implements storage.LabelQuerier and is a noop.
func (q *querier) LabelValues(context.Context, string, ...*labels.Matcher) ([]string, annotations.Annotations, error) {
// TODO: Implement: https://github.com/prometheus/prometheus/issues/3351
return nil, nil, errors.New("not implemented")
}

// LabelValuesStream implements storage.LabelQuerier and is a noop.
func (q *querier) LabelValuesStream(context.Context, string, ...*labels.Matcher) storage.LabelValues {
// TODO: Implement: https://github.com/prometheus/prometheus/issues/3351
return storage.ErrLabelValues(errors.New("not implemented"))
}

// LabelNames implements storage.Querier and is a noop.
func (q *querier) LabelNames(context.Context, ...*labels.Matcher) ([]string, annotations.Annotations, error) {
// TODO: Implement: https://github.com/prometheus/prometheus/issues/3351
Expand Down
22 changes: 22 additions & 0 deletions storage/secondary.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,28 @@ func (s *secondaryQuerier) LabelValues(ctx context.Context, name string, matcher
return vals, w, nil
}

func (s *secondaryQuerier) LabelValuesStream(ctx context.Context, name string, matchers ...*labels.Matcher) LabelValues {
return &secondaryLabelValues{
s.genericQuerier.LabelValuesStream(ctx, name, matchers...),
}
}

type secondaryLabelValues struct {
LabelValues
}

func (s *secondaryLabelValues) Err() error {
return nil
}

func (s *secondaryLabelValues) Warnings() annotations.Annotations {
ws := s.LabelValues.Warnings()
if s.LabelValues.Err() != nil {
ws.Add(s.LabelValues.Err())
}
return ws
}

func (s *secondaryQuerier) LabelNames(ctx context.Context, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) {
names, w, err := s.genericQuerier.LabelNames(ctx, matchers...)
if err != nil {
Expand Down
Loading
Loading