diff --git a/promql/engine_test.go b/promql/engine_test.go index 262ec19b7d..0f7f89cbd6 100644 --- a/promql/engine_test.go +++ b/promql/engine_test.go @@ -204,6 +204,10 @@ func (*errQuerier) LabelValues(context.Context, string, ...*labels.Matcher) ([]s return nil, nil, nil } +func (*errQuerier) LabelValuesStream(context.Context, string, ...*labels.Matcher) storage.LabelValues { + return storage.EmptyLabelValues() +} + func (*errQuerier) LabelNames(context.Context, ...*labels.Matcher) ([]string, annotations.Annotations, error) { return nil, nil, nil } diff --git a/storage/fanout_test.go b/storage/fanout_test.go index 913e2fe24e..a6156e1a11 100644 --- a/storage/fanout_test.go +++ b/storage/fanout_test.go @@ -240,6 +240,10 @@ func (errQuerier) LabelValues(context.Context, string, ...*labels.Matcher) ([]st return nil, nil, errors.New("label values error") } +func (errQuerier) LabelValuesStream(context.Context, string, ...*labels.Matcher) storage.LabelValues { + return storage.ErrLabelValues(errors.New("label values stream error")) +} + func (errQuerier) LabelNames(context.Context, ...*labels.Matcher) ([]string, annotations.Annotations, error) { return nil, nil, errors.New("label names error") } diff --git a/storage/interface.go b/storage/interface.go index 892897e51e..a848d831e7 100644 --- a/storage/interface.go +++ b/storage/interface.go @@ -126,6 +126,10 @@ func (q *MockQuerier) LabelValues(context.Context, string, ...*labels.Matcher) ( return nil, nil, nil } +func (q *MockQuerier) LabelValuesStream(context.Context, string, ...*labels.Matcher) LabelValues { + return EmptyLabelValues() +} + func (q *MockQuerier) LabelNames(context.Context, ...*labels.Matcher) ([]string, annotations.Annotations, error) { return nil, nil, nil } @@ -163,6 +167,12 @@ type LabelQuerier interface { // to label values of metrics matching the matchers. LabelValues(ctx context.Context, name string, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) + // LabelValuesStream returns an iterator over all potential values for a label name. + // It is not safe to use the strings beyond the lifetime of the querier. + // If matchers are specified the returned result set is reduced + // to label values of metrics matching the matchers. + LabelValuesStream(ctx context.Context, name string, matchers ...*labels.Matcher) LabelValues + // LabelNames returns all the unique label names present in the block in sorted order. // If matchers are specified the returned result set is reduced // to label names of metrics matching the matchers. diff --git a/storage/labelvalues.go b/storage/labelvalues.go index b14a69b9e3..c3dd8ec594 100644 --- a/storage/labelvalues.go +++ b/storage/labelvalues.go @@ -35,13 +35,15 @@ func EmptyLabelValues() LabelValues { // ListLabelValues is an iterator over a slice of label values. type ListLabelValues struct { - cur string - values []string + cur string + values []string + warnings annotations.Annotations } -func NewListLabelValues(values []string) *ListLabelValues { +func NewListLabelValues(values []string, warnings annotations.Annotations) *ListLabelValues { return &ListLabelValues{ - values: values, + values: values, + warnings: warnings, } } @@ -63,8 +65,8 @@ func (*ListLabelValues) Err() error { return nil } -func (*ListLabelValues) Warnings() annotations.Annotations { - return nil +func (l *ListLabelValues) Warnings() annotations.Annotations { + return l.warnings } func (*ListLabelValues) Close() error { diff --git a/storage/labelvalues_test.go b/storage/labelvalues_test.go index a3cfe3e0b7..efbc58d554 100644 --- a/storage/labelvalues_test.go +++ b/storage/labelvalues_test.go @@ -10,7 +10,7 @@ import ( func TestListLabelValues(t *testing.T) { t.Run("lets you traverse a slice of label values", func(t *testing.T) { input := []string{"a", "b", "c", "d"} - it := NewListLabelValues(input) + it := NewListLabelValues(input, nil) t.Cleanup(func() { require.NoError(t, it.Close()) }) @@ -25,7 +25,7 @@ func TestListLabelValues(t *testing.T) { }) t.Run("can be initialized with an empty slice", func(t *testing.T) { - it := NewListLabelValues([]string{}) + it := NewListLabelValues([]string{}, nil) t.Cleanup(func() { require.NoError(t, it.Close()) }) @@ -36,7 +36,7 @@ func TestListLabelValues(t *testing.T) { }) t.Run("can be initialized with a nil slice", func(t *testing.T) { - it := NewListLabelValues(nil) + it := NewListLabelValues(nil, nil) t.Cleanup(func() { require.NoError(t, it.Close()) }) diff --git a/storage/merge.go b/storage/merge.go index d64388e884..c08e367493 100644 --- a/storage/merge.go +++ b/storage/merge.go @@ -20,6 +20,7 @@ import ( "fmt" "math" "sync" + "unicode/utf8" "golang.org/x/exp/slices" @@ -29,6 +30,8 @@ import ( "github.com/prometheus/prometheus/tsdb/chunks" tsdb_errors "github.com/prometheus/prometheus/tsdb/errors" "github.com/prometheus/prometheus/util/annotations" + + "github.com/bboreham/go-loser" ) type mergeGenericQuerier struct { @@ -221,6 +224,72 @@ func mergeStrings(a, b []string) []string { return res } +// LabelValuesStream implements LabelQuerier. +func (q *mergeGenericQuerier) LabelValuesStream(ctx context.Context, name string, matchers ...*labels.Matcher) LabelValues { + if len(q.queriers) == 0 { + return EmptyLabelValues() + } + if len(q.queriers) == 1 { + return q.queriers[0].LabelValuesStream(ctx, name, matchers...) + } + + its := make([]LabelValues, 0, len(q.queriers)) + for _, sq := range q.queriers { + its = append(its, sq.LabelValuesStream(ctx, name, matchers...)) + } + + lt := loser.New(its, string(utf8.MaxRune)) + return &mergedLabelValues{ + lt: lt, + lvs: its, + } +} + +// mergedLabelValues is a label values iterator merging a collection of sub-iterators. +type mergedLabelValues struct { + lt *loser.Tree[string, LabelValues] + lvs []LabelValues + cur string +} + +func (m *mergedLabelValues) Next() bool { + for m.lt.Next() { + // Remove duplicate entries + at := m.lt.At() + if at != m.cur { + m.cur = at + return true + } + } + + return false +} + +func (m *mergedLabelValues) At() string { + return m.cur +} + +func (m *mergedLabelValues) Err() error { + for _, lv := range m.lvs { + if err := lv.Err(); err != nil { + return err + } + } + return nil +} + +func (m *mergedLabelValues) Warnings() annotations.Annotations { + var warnings annotations.Annotations + for _, lv := range m.lvs { + warnings = warnings.Merge(lv.Warnings()) + } + return warnings +} + +func (m *mergedLabelValues) Close() error { + return nil +} + // LabelNames returns all the unique label names present in all queriers in sorted order. func (q *mergeGenericQuerier) LabelNames(ctx context.Context, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) { var ( diff --git a/storage/merge_test.go b/storage/merge_test.go index 4632b170da..fda832554c 100644 --- a/storage/merge_test.go +++ b/storage/merge_test.go @@ -1387,6 +1387,23 @@ func (m *mockGenericQuerier) LabelValues(_ context.Context, name string, matcher return m.resp, m.warnings, m.err } +func (m *mockGenericQuerier) LabelValuesStream(_ context.Context, name string, matchers ...*labels.Matcher) LabelValues { + m.mtx.Lock() + m.labelNamesRequested = append(m.labelNamesRequested, labelNameRequest{ + name: name, + matchers: matchers, + }) + m.mtx.Unlock() + + if m.err == nil { + return NewListLabelValues(m.resp, m.warnings) + } + return errLabelValues{ + err: m.err, + warnings: m.warnings, + } +} + func (m *mockGenericQuerier) LabelNames(context.Context, ...*labels.Matcher) ([]string, annotations.Annotations, error) { m.mtx.Lock() m.labelNamesCalls++ @@ -1456,8 +1473,9 @@ func TestMergeGenericQuerierWithSecondaries_ErrorHandling(t *testing.T) { expectedLabels []string expectedWarnings annotations.Annotations - expectedErrs [4]error + expectedErrs [5]error }{ + {}, { name: "one successful primary querier", queriers: []genericQuerier{&mockGenericQuerier{resp: []string{"a", "b"}, warnings: nil, err: nil}}, @@ -1483,7 +1501,7 @@ func TestMergeGenericQuerierWithSecondaries_ErrorHandling(t *testing.T) { { name: "one failed primary querier", queriers: []genericQuerier{&mockGenericQuerier{warnings: nil, err: errStorage}}, - expectedErrs: [4]error{errStorage, errStorage, errStorage, errStorage}, + expectedErrs: [5]error{errStorage, errStorage, errStorage, errStorage, errStorage}, }, { name: "one successful primary querier with successful secondaries", @@ -1519,7 +1537,7 @@ func TestMergeGenericQuerierWithSecondaries_ErrorHandling(t *testing.T) { &secondaryQuerier{genericQuerier: &mockGenericQuerier{resp: []string{"b"}, warnings: nil, err: nil}}, &secondaryQuerier{genericQuerier: &mockGenericQuerier{resp: []string{"c"}, warnings: nil, err: nil}}, }, - expectedErrs: [4]error{errStorage, errStorage, errStorage, errStorage}, + expectedErrs: [5]error{errStorage, errStorage, errStorage, errStorage, errStorage}, }, { name: "one successful primary querier with failed secondaries", @@ -1605,11 +1623,36 @@ func TestMergeGenericQuerierWithSecondaries_ErrorHandling(t *testing.T) { require.Equal(t, []labelNameRequest{{name: "test"}}, m.labelNamesRequested) } }) + t.Run("LabelValuesStream", func(t *testing.T) { + it := q.LabelValuesStream(ctx, "test2") + require.NotNil(t, it) + var res []string + for it.Next() { + res = append(res, it.At()) + } + require.Equal(t, tcase.expectedWarnings, it.Warnings()) + err := it.Err() + require.ErrorIs(t, err, tcase.expectedErrs[3], "expected error doesn't match") + if err != nil { + return + } + + require.Equal(t, tcase.expectedLabels, res) + + for _, qr := range q.queriers { + m := unwrapMockGenericQuerier(t, qr) + + require.Equal(t, []labelNameRequest{ + {name: "test"}, + {name: "test2"}, + }, m.labelNamesRequested) + } + }) t.Run("LabelValuesWithMatchers", func(t *testing.T) { matcher := labels.MustNewMatcher(labels.MatchEqual, "otherLabel", "someValue") - res, w, err := q.LabelValues(ctx, "test2", matcher) + res, w, err := q.LabelValues(ctx, "test3", matcher) require.Subset(t, tcase.expectedWarnings, w) - require.ErrorIs(t, err, tcase.expectedErrs[3], "expected error doesn't match") + require.ErrorIs(t, err, tcase.expectedErrs[4], "expected error doesn't match") require.Equal(t, tcase.expectedLabels, res) if err != nil { @@ -1620,7 +1663,8 @@ func TestMergeGenericQuerierWithSecondaries_ErrorHandling(t *testing.T) { require.Equal(t, []labelNameRequest{ {name: "test"}, - {name: "test2", matchers: []*labels.Matcher{matcher}}, + {name: "test2"}, + {name: "test3", matchers: []*labels.Matcher{matcher}}, }, m.labelNamesRequested) } }) diff --git a/storage/noop.go b/storage/noop.go index be5741ddd8..1dd485573d 100644 --- a/storage/noop.go +++ b/storage/noop.go @@ -35,6 +35,10 @@ func (noopQuerier) LabelValues(context.Context, string, ...*labels.Matcher) ([]s return nil, nil, nil } +func (noopQuerier) LabelValuesStream(context.Context, string, ...*labels.Matcher) LabelValues { + return EmptyLabelValues() +} + func (noopQuerier) LabelNames(context.Context, ...*labels.Matcher) ([]string, annotations.Annotations, error) { return nil, nil, nil } @@ -58,6 +62,10 @@ func (noopChunkQuerier) LabelValues(context.Context, string, ...*labels.Matcher) return nil, nil, nil } +func (noopChunkQuerier) LabelValuesStream(context.Context, string, ...*labels.Matcher) LabelValues { + return EmptyLabelValues() +} + func (noopChunkQuerier) LabelNames(context.Context, ...*labels.Matcher) ([]string, annotations.Annotations, error) { return nil, nil, nil } diff --git a/storage/remote/read.go b/storage/remote/read.go index 723030091a..2f41e5faf2 100644 --- a/storage/remote/read.go +++ b/storage/remote/read.go @@ -209,12 +209,18 @@ func (q querier) addExternalLabels(ms []*labels.Matcher) ([]*labels.Matcher, []s return ms, names } -// LabelValues implements storage.Querier and is a noop. +// LabelValues implements storage.LabelQuerier and is a noop. func (q *querier) LabelValues(context.Context, string, ...*labels.Matcher) ([]string, annotations.Annotations, error) { // TODO: Implement: https://github.com/prometheus/prometheus/issues/3351 return nil, nil, errors.New("not implemented") } +// LabelValuesStream implements storage.LabelQuerier and is a noop. +func (q *querier) LabelValuesStream(context.Context, string, ...*labels.Matcher) storage.LabelValues { + // TODO: Implement: https://github.com/prometheus/prometheus/issues/3351 + return storage.ErrLabelValues(errors.New("not implemented")) +} + // LabelNames implements storage.Querier and is a noop. func (q *querier) LabelNames(context.Context, ...*labels.Matcher) ([]string, annotations.Annotations, error) { // TODO: Implement: https://github.com/prometheus/prometheus/issues/3351 diff --git a/storage/secondary.go b/storage/secondary.go index 44d9781835..f63160db86 100644 --- a/storage/secondary.go +++ b/storage/secondary.go @@ -57,6 +57,28 @@ func (s *secondaryQuerier) LabelValues(ctx context.Context, name string, matcher return vals, w, nil } +func (s *secondaryQuerier) LabelValuesStream(ctx context.Context, name string, matchers ...*labels.Matcher) LabelValues { + return &secondaryLabelValues{ + s.genericQuerier.LabelValuesStream(ctx, name, matchers...), + } +} + +type secondaryLabelValues struct { + LabelValues +} + +func (s *secondaryLabelValues) Err() error { + return nil +} + +func (s *secondaryLabelValues) Warnings() annotations.Annotations { + ws := s.LabelValues.Warnings() + if s.LabelValues.Err() != nil { + ws.Add(s.LabelValues.Err()) + } + return ws +} + func (s *secondaryQuerier) LabelNames(ctx context.Context, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) { names, w, err := s.genericQuerier.LabelNames(ctx, matchers...) if err != nil { diff --git a/tsdb/block.go b/tsdb/block.go index ad0d59748a..f67fdc84f8 100644 --- a/tsdb/block.go +++ b/tsdb/block.go @@ -62,6 +62,7 @@ type IndexWriter interface { // IndexReader provides reading access of serialized index data. type IndexReader interface { index.PostingsReader + index.LabelsGetter // Symbols return an iterator over sorted string symbols that may occur in // series' labels and indices. It is not safe to use the returned strings @@ -74,6 +75,9 @@ type IndexReader interface { // LabelValues returns possible label values which may not be sorted. LabelValues(ctx context.Context, name string, matchers ...*labels.Matcher) ([]string, error) + // LabelValuesStream returns an iterator over matching label values. + LabelValuesStream(ctx context.Context, name string, matchers ...*labels.Matcher) storage.LabelValues + // PostingsForMatcher returns a sorted iterator over postings having a label matching the provided label matcher. // If no postings are found having a label with the correct name and matching value, an empty iterator is returned. PostingsForMatcher(ctx context.Context, m *labels.Matcher) index.Postings @@ -526,6 +530,21 @@ func (r blockIndexReader) LabelValues(ctx context.Context, name string, matchers return labelValuesWithMatchers(ctx, r.ir, name, matchers...) } +func (r blockIndexReader) LabelValuesStream(ctx context.Context, name string, matchers ...*labels.Matcher) storage.LabelValues { + ownMatchers := 0 + for _, m := range matchers { + if m.Name == name { + ownMatchers++ + } + } + if ownMatchers == len(matchers) { + return r.ir.LabelValuesStream(ctx, name, matchers...) + } + + // There are matchers on other label names than the requested one, so will need to intersect matching series + return labelValuesForMatchersStream(ctx, r.ir, name, matchers) +} + func (r blockIndexReader) LabelNames(ctx context.Context, matchers ...*labels.Matcher) ([]string, error) { if len(matchers) == 0 { return r.b.LabelNames(ctx) @@ -576,6 +595,13 @@ func (r blockIndexReader) Series(ref storage.SeriesRef, builder *labels.ScratchB return nil } +func (r blockIndexReader) Labels(ref storage.SeriesRef, builder *labels.ScratchBuilder) error { + if err := r.ir.Labels(ref, builder); err != nil { + return fmt.Errorf("block: %s: %w", r.b.Meta().ULID, err) + } + return nil +} + func (r blockIndexReader) Close() error { r.b.pendingReaders.Done() return nil diff --git a/tsdb/block_test.go b/tsdb/block_test.go index fbffdc3c4d..bb1be0379d 100644 --- a/tsdb/block_test.go +++ b/tsdb/block_test.go @@ -306,6 +306,100 @@ func TestLabelValuesWithMatchers(t *testing.T) { } } +func TestLabelValuesStream_WithMatchers(t *testing.T) { + var seriesEntries []storage.Series + for i := 0; i < 100; i++ { + seriesEntries = append(seriesEntries, storage.NewListSeries(labels.FromStrings( + "tens", fmt.Sprintf("value%d", i/10), + "unique", fmt.Sprintf("value%d", i), + ), []chunks.Sample{sample{100, 0, nil, nil}})) + } + // Add another series with an overlapping unique label, but leaving out the tens label + seriesEntries = append(seriesEntries, storage.NewListSeries(labels.FromStrings( + "unique", "value99", + ), []chunks.Sample{sample{100, 0, nil, nil}})) + + ctx := context.Background() + + blockDir := createBlock(t, t.TempDir(), seriesEntries) + files, err := sequenceFiles(chunkDir(blockDir)) + require.NoError(t, err) + require.NotEmpty(t, files, "No chunk created.") + + block, err := OpenBlock(nil, blockDir, nil) + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, block.Close()) }) + + indexReader, err := block.Index() + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, indexReader.Close()) }) + + var uniqueWithout30s []string + for i := 0; i < 100; i++ { + if i/10 != 3 { + uniqueWithout30s = append(uniqueWithout30s, fmt.Sprintf("value%d", i)) + } + } + sort.Strings(uniqueWithout30s) + testCases := []struct { + name string + labelName string + matchers []*labels.Matcher + expectedValues []string + }{ + { + name: "get tens based on unique ID", + labelName: "tens", + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "unique", "value35")}, + expectedValues: []string{"value3"}, + }, { + name: "get unique IDs based on a ten", + labelName: "unique", + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "tens", "value1")}, + expectedValues: []string{"value10", "value11", "value12", "value13", "value14", "value15", "value16", "value17", "value18", "value19"}, + }, { + name: "get tens by pattern matching on unique ID", + labelName: "tens", + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchRegexp, "unique", "value[5-7]5")}, + expectedValues: []string{"value5", "value6", "value7"}, + }, { + name: "get tens by matching for presence of unique label", + labelName: "tens", + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchNotEqual, "unique", "")}, + expectedValues: []string{"value0", "value1", "value2", "value3", "value4", "value5", "value6", "value7", "value8", "value9"}, + }, { + name: "get unique IDs based on tens not being equal to a certain value, while not empty", + labelName: "unique", + matchers: []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchNotEqual, "tens", "value3"), + labels.MustNewMatcher(labels.MatchNotEqual, "tens", ""), + }, + expectedValues: uniqueWithout30s, + }, { + // In this case, we query for the "unique" label where the "tens" label is absent. + // We have one series where "tens" is empty (unique="value99"), but also another with + // the same value for "unique" and "tens" present. Make sure that unique="value99" is + // still found. + name: "get unique ID where tens is empty", + labelName: "unique", + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "tens", "")}, + expectedValues: []string{"value99"}, + }, + } + for _, tt := range testCases { + t.Run(tt.name, func(t *testing.T) { + it := indexReader.LabelValuesStream(ctx, tt.labelName, tt.matchers...) + var values []string + for it.Next() { + values = append(values, it.At()) + } + require.NoError(t, it.Err()) + require.Empty(t, it.Warnings()) + require.Equal(t, tt.expectedValues, values) + }) + } +} + // TestBlockSize ensures that the block size is calculated correctly. func TestBlockSize(t *testing.T) { tmpdir := t.TempDir() diff --git a/tsdb/db_bench_test.go b/tsdb/db_bench_test.go new file mode 100644 index 0000000000..498d0cacd7 --- /dev/null +++ b/tsdb/db_bench_test.go @@ -0,0 +1,167 @@ +package tsdb + +import ( + "context" + "strconv" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/prometheus/prometheus/model/labels" +) + +func BenchmarkQuerier_LabelValuesStream(b *testing.B) { + db := openTestDB(b, nil, []int64{1000}) + b.Cleanup(func() { + require.NoError(b, db.Close()) + }) + + ctx := context.Background() + + app := db.head.Appender(ctx) + // var seriesEntries []storage.Series + addSeries := func(l labels.Labels) { + app.Append(0, l, 0, 0) + // seriesEntries = append(seriesEntries, storage.NewListSeries(l, tsdbutil.GenerateSamples(1, 1))) + } + + for n := 0; n < 10; n++ { + for i := 0; i < 100000; i++ { + addSeries(labels.FromStrings("i", strconv.Itoa(i)+postingsBenchSuffix, "n", strconv.Itoa(n)+postingsBenchSuffix, "j", "foo", "i_times_n", strconv.Itoa(i*n))) + // Have some series that won't be matched, to properly test inverted matchers. + addSeries(labels.FromStrings("i", strconv.Itoa(i)+postingsBenchSuffix, "n", strconv.Itoa(n)+postingsBenchSuffix, "j", "bar")) + addSeries(labels.FromStrings("i", strconv.Itoa(i)+postingsBenchSuffix, "n", "0_"+strconv.Itoa(n)+postingsBenchSuffix, "j", "bar")) + addSeries(labels.FromStrings("i", strconv.Itoa(i)+postingsBenchSuffix, "n", "1_"+strconv.Itoa(n)+postingsBenchSuffix, "j", "bar")) + addSeries(labels.FromStrings("i", strconv.Itoa(i)+postingsBenchSuffix, "n", "2_"+strconv.Itoa(n)+postingsBenchSuffix, "j", "foo")) + } + } + require.NoError(b, app.Commit()) + // createBlock(b, db.Dir(), seriesEntries) + require.NoError(b, db.reloadBlocks()) + + querier, err := db.Querier(0, 1) + require.NoError(b, err) + b.Cleanup(func() { require.NoError(b, querier.Close()) }) + + i1 := labels.MustNewMatcher(labels.MatchEqual, "i", "1") + iStar := labels.MustNewMatcher(labels.MatchRegexp, "i", "^.*$") + jNotFoo := labels.MustNewMatcher(labels.MatchNotEqual, "j", "foo") + jXXXYYY := labels.MustNewMatcher(labels.MatchRegexp, "j", "XXX|YYY") + jXplus := labels.MustNewMatcher(labels.MatchRegexp, "j", "X.+") + n1 := labels.MustNewMatcher(labels.MatchEqual, "n", "1"+postingsBenchSuffix) + nX := labels.MustNewMatcher(labels.MatchNotEqual, "n", "X"+postingsBenchSuffix) + nPlus := labels.MustNewMatcher(labels.MatchRegexp, "i", "^.+$") + primesTimes := labels.MustNewMatcher(labels.MatchEqual, "i_times_n", "533701") // = 76243*7, ie. multiplication of primes. It will match single i*n combination. + nonPrimesTimes := labels.MustNewMatcher(labels.MatchEqual, "i_times_n", "20") // 1*20, 2*10, 4*5, 5*4 + times12 := labels.MustNewMatcher(labels.MatchRegexp, "i_times_n", "12.*") + + cases := []struct { + name string + labelName string + matchers []*labels.Matcher + }{ + {`i with i="1"`, "i", []*labels.Matcher{i1}}, + // i has 100k values. + {`i with n="1"`, "i", []*labels.Matcher{n1}}, + {`i with n="^.+$"`, "i", []*labels.Matcher{nPlus}}, + {`i with n="1",j!="foo"`, "i", []*labels.Matcher{n1, jNotFoo}}, + {`i with n="1",j=~"X.+"`, "i", []*labels.Matcher{n1, jXplus}}, + {`i with n="1",j=~"XXX|YYY"`, "i", []*labels.Matcher{n1, jXXXYYY}}, + {`i with n="X",j!="foo"`, "i", []*labels.Matcher{nX, jNotFoo}}, + {`i with n="1",i=~"^.*$",j!="foo"`, "i", []*labels.Matcher{n1, iStar, jNotFoo}}, + {`i with i_times_n=533701`, "i", []*labels.Matcher{primesTimes}}, + {`i with i_times_n=20`, "i", []*labels.Matcher{nonPrimesTimes}}, + {`i with i_times_n=~"12.*""`, "i", []*labels.Matcher{times12}}, + // n has 10 values. + {`n with j!="foo"`, "n", []*labels.Matcher{jNotFoo}}, + {`n with i="1"`, "n", []*labels.Matcher{i1}}, + } + for _, c := range cases { + b.Run(c.name, func(b *testing.B) { + for i := 0; i < b.N; i++ { + it := querier.LabelValuesStream(ctx, c.labelName, c.matchers...) + for it.Next() { + } + require.NoError(b, it.Err()) + require.Empty(b, it.Warnings()) + } + }) + } +} + +func BenchmarkQuerier_LabelValues(b *testing.B) { + db := openTestDB(b, nil, []int64{1000}) + b.Cleanup(func() { + require.NoError(b, db.Close()) + }) + + ctx := context.Background() + + app := db.head.Appender(ctx) + // var seriesEntries []storage.Series + addSeries := func(l labels.Labels) { + app.Append(0, l, 0, 0) + // seriesEntries = append(seriesEntries, storage.NewListSeries(l, tsdbutil.GenerateSamples(1, 1))) + } + + for n := 0; n < 10; n++ { + for i := 0; i < 100000; i++ { + addSeries(labels.FromStrings("i", strconv.Itoa(i)+postingsBenchSuffix, "n", strconv.Itoa(n)+postingsBenchSuffix, "j", "foo", "i_times_n", strconv.Itoa(i*n))) + // Have some series that won't be matched, to properly test inverted matchers. + addSeries(labels.FromStrings("i", strconv.Itoa(i)+postingsBenchSuffix, "n", strconv.Itoa(n)+postingsBenchSuffix, "j", "bar")) + addSeries(labels.FromStrings("i", strconv.Itoa(i)+postingsBenchSuffix, "n", "0_"+strconv.Itoa(n)+postingsBenchSuffix, "j", "bar")) + addSeries(labels.FromStrings("i", strconv.Itoa(i)+postingsBenchSuffix, "n", "1_"+strconv.Itoa(n)+postingsBenchSuffix, "j", "bar")) + addSeries(labels.FromStrings("i", strconv.Itoa(i)+postingsBenchSuffix, "n", "2_"+strconv.Itoa(n)+postingsBenchSuffix, "j", "foo")) + } + } + require.NoError(b, app.Commit()) + // createBlock(b, db.Dir(), seriesEntries) + require.NoError(b, db.reloadBlocks()) + + querier, err := db.Querier(0, 1) + require.NoError(b, err) + b.Cleanup(func() { require.NoError(b, querier.Close()) }) + + i1 := labels.MustNewMatcher(labels.MatchEqual, "i", "1") + iStar := labels.MustNewMatcher(labels.MatchRegexp, "i", "^.*$") + jNotFoo := labels.MustNewMatcher(labels.MatchNotEqual, "j", "foo") + jXXXYYY := labels.MustNewMatcher(labels.MatchRegexp, "j", "XXX|YYY") + jXplus := labels.MustNewMatcher(labels.MatchRegexp, "j", "X.+") + n1 := labels.MustNewMatcher(labels.MatchEqual, "n", "1"+postingsBenchSuffix) + nX := labels.MustNewMatcher(labels.MatchNotEqual, "n", "X"+postingsBenchSuffix) + nPlus := labels.MustNewMatcher(labels.MatchRegexp, "i", "^.+$") + primesTimes := labels.MustNewMatcher(labels.MatchEqual, "i_times_n", "533701") // = 76243*7, ie. multiplication of primes. It will match single i*n combination. + nonPrimesTimes := labels.MustNewMatcher(labels.MatchEqual, "i_times_n", "20") // 1*20, 2*10, 4*5, 5*4 + times12 := labels.MustNewMatcher(labels.MatchRegexp, "i_times_n", "12.*") + + cases := []struct { + name string + labelName string + matchers []*labels.Matcher + }{ + {`i with i="1"`, "i", []*labels.Matcher{i1}}, + // i has 100k values. + {`i with n="1"`, "i", []*labels.Matcher{n1}}, + {`i with n="^.+$"`, "i", []*labels.Matcher{nPlus}}, + {`i with n="1",j!="foo"`, "i", []*labels.Matcher{n1, jNotFoo}}, + {`i with n="1",j=~"X.+"`, "i", []*labels.Matcher{n1, jXplus}}, + {`i with n="1",j=~"XXX|YYY"`, "i", []*labels.Matcher{n1, jXXXYYY}}, + {`i with n="X",j!="foo"`, "i", []*labels.Matcher{nX, jNotFoo}}, + {`i with n="1",i=~"^.*$",j!="foo"`, "i", []*labels.Matcher{n1, iStar, jNotFoo}}, + {`i with i_times_n=533701`, "i", []*labels.Matcher{primesTimes}}, + {`i with i_times_n=20`, "i", []*labels.Matcher{nonPrimesTimes}}, + {`i with i_times_n=~"12.*""`, "i", []*labels.Matcher{times12}}, + // n has 10 values. + {`n with j!="foo"`, "n", []*labels.Matcher{jNotFoo}}, + {`n with i="1"`, "n", []*labels.Matcher{i1}}, + } + for _, c := range cases { + b.Run(c.name, func(b *testing.B) { + for i := 0; i < b.N; i++ { + _, warnings, err := querier.LabelValues(ctx, c.labelName, c.matchers...) + require.NoError(b, err) + require.Empty(b, warnings) + } + }) + } +} diff --git a/tsdb/db_test.go b/tsdb/db_test.go index f3a7ba80ac..fb8effcc58 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -6951,6 +6951,126 @@ Outer: require.NoError(t, writerErr) } +func TestQuerier_LabelValuesStream(t *testing.T) { + db := openTestDB(t, nil, nil) + t.Cleanup(func() { + require.NoError(t, db.Close()) + }) + + ctx := context.Background() + + var seriesEntries []storage.Series + // Add a block of 70 series with timestamp 1 + for i := 0; i < 70; i++ { + seriesEntries = append(seriesEntries, storage.NewListSeries(labels.FromStrings( + "tens", fmt.Sprintf("value%d", i/10), + "unique", fmt.Sprintf("value%d", i), + ), chunks.GenerateSamples(1, 1))) + } + createBlock(t, db.Dir(), seriesEntries) + + // Add a block of 50 series with timestamp 2 + // Since "tens" start at 50, two of the label values ("value5", "value6") will overlap with the + // previous block + seriesEntries = seriesEntries[:0] + for i := 50; i < 100; i++ { + seriesEntries = append(seriesEntries, storage.NewListSeries(labels.FromStrings( + "tens", fmt.Sprintf("value%d", i/10), + "unique", fmt.Sprintf("value%d", i), + ), chunks.GenerateSamples(2, 1))) + } + createBlock(t, db.Dir(), seriesEntries) + + require.NoError(t, db.reloadBlocks()) + + querier, err := db.Querier(1, 2) + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, querier.Close()) }) + + t.Run("without matchers", func(t *testing.T) { + it := querier.LabelValuesStream(ctx, "tens") + var values []string + for it.Next() { + values = append(values, it.At()) + } + require.NoError(t, it.Err()) + + require.Equal(t, []string{ + "value0", "value1", "value2", "value3", "value4", "value5", "value6", "value7", "value8", "value9", + }, values) + }) + + t.Run("with matchers", func(t *testing.T) { + var uniqueWithout30s []string + for i := 0; i < 100; i++ { + if i/10 != 3 { + uniqueWithout30s = append(uniqueWithout30s, fmt.Sprintf("value%d", i)) + } + } + sort.Strings(uniqueWithout30s) + testCases := []struct { + name string + label string + matchers []*labels.Matcher + expLabels []string + }{ + { + name: "matching on requested label", + label: "tens", + matchers: []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchEqual, "tens", "value1"), + }, + expLabels: []string{"value1"}, + }, + { + name: "unsuccessful matching on requested label", + label: "tens", + matchers: []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchEqual, "tens", "value10"), + }, + expLabels: nil, + }, + { + name: "matching on other label", + label: "tens", + matchers: []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchEqual, "unique", "value51"), + }, + expLabels: []string{"value5"}, + }, + { + name: "tens for empty unique ID", + label: "tens", + matchers: []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchEqual, "unique", ""), + }, + expLabels: nil, + }, + { + name: "get unique IDs based on tens not being equal to a certain value, while not empty", + label: "unique", + matchers: []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchNotEqual, "tens", "value3"), + labels.MustNewMatcher(labels.MatchNotEqual, "tens", ""), + }, + expLabels: uniqueWithout30s, + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + it := querier.LabelValuesStream(ctx, tc.label, tc.matchers...) + var values []string + for it.Next() { + values = append(values, it.At()) + } + require.NoError(t, it.Err()) + + require.Equal(t, tc.expLabels, values) + }) + } + }) +} + func requireEqualOOOSamples(t *testing.T, expectedSamples int, db *DB) { require.Equal(t, float64(expectedSamples), prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamplesAppended.WithLabelValues(sampleMetricTypeFloat)), diff --git a/tsdb/head_read.go b/tsdb/head_read.go index 29bbc6331b..2fb6a6adac 100644 --- a/tsdb/head_read.go +++ b/tsdb/head_read.go @@ -87,6 +87,30 @@ func (h *headIndexReader) LabelValues(ctx context.Context, name string, matchers return labelValuesWithMatchers(ctx, h, name, matchers...) } +// LabelValuesStream returns an iterator over label values present in +// the head for the specific label name that are within the time range +// mint to maxt. +// If matchers are specified the returned result set is reduced +// to label values of metrics matching the matchers. +func (h *headIndexReader) LabelValuesStream(ctx context.Context, name string, matchers ...*labels.Matcher) storage.LabelValues { + if h.maxt < h.head.MinTime() || h.mint > h.head.MaxTime() { + return storage.EmptyLabelValues() + } + + ownMatchers := 0 + for _, m := range matchers { + if m.Name == name { + ownMatchers++ + } + } + if ownMatchers == len(matchers) { + return h.head.postings.LabelValuesStream(ctx, name, matchers...) + } + + // There are matchers on other label names than the requested one, so will need to intersect matching series + return labelValuesForMatchersStream(ctx, h, name, matchers) +} + // LabelNames returns all the unique label names present in the head // that are within the time range mint to maxt. func (h *headIndexReader) LabelNames(ctx context.Context, matchers ...*labels.Matcher) ([]string, error) { @@ -186,7 +210,7 @@ func (h *headIndexReader) ShardedPostings(p index.Postings, shardIndex, shardCou // LabelValuesFor returns LabelValues for the given label name in the series referred to by postings. func (h *headIndexReader) LabelValuesFor(postings index.Postings, name string) storage.LabelValues { - return h.head.postings.LabelValuesFor(postings, name) + return h.head.postings.LabelValuesFor(postings, name, h) } // LabelValuesExcluding returns LabelValues for the given label name in all other series than those referred to by postings. @@ -195,6 +219,17 @@ func (h *headIndexReader) LabelValuesExcluding(postings index.Postings, name str return h.head.postings.LabelValuesExcluding(postings, name) } +// Labels reads the series with the given ref and writes its labels into builder. +func (h *headIndexReader) Labels(ref storage.SeriesRef, builder *labels.ScratchBuilder) error { + s := h.head.series.getByID(chunks.HeadSeriesRef(ref)) + if s == nil { + h.head.metrics.seriesNotFound.Inc() + return storage.ErrNotFound + } + builder.Assign(s.lset) + return nil +} + // Series returns the series for the given reference. // Chunks are skipped if chks is nil. func (h *headIndexReader) Series(ref storage.SeriesRef, builder *labels.ScratchBuilder, chks *[]chunks.Meta) error { diff --git a/tsdb/head_test.go b/tsdb/head_test.go index fdaef8ab06..da5a22edd5 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -2842,6 +2842,103 @@ func TestHeadLabelValuesWithMatchers(t *testing.T) { } } +func TestHeadLabelValuesStream_WithMatchers(t *testing.T) { + head, _ := newTestHead(t, 1000, wlog.CompressionNone, false) + t.Cleanup(func() { require.NoError(t, head.Close()) }) + + ctx := context.Background() + + app := head.Appender(ctx) + for i := 0; i < 100; i++ { + _, err := app.Append(0, labels.FromStrings( + "tens", fmt.Sprintf("value%d", i/10), + "unique", fmt.Sprintf("value%d", i), + ), 100, 0) + require.NoError(t, err) + } + // Add another series with an overlapping unique label, but leaving out the tens label + _, err := app.Append(0, labels.FromStrings( + "unique", "value99", + ), 100, 0) + require.NoError(t, err) + require.NoError(t, app.Commit()) + + indexReader := head.indexRange(0, 200) + t.Cleanup(func() { + require.NoError(t, indexReader.Close()) + }) + + var uniqueWithout30s []string + for i := 0; i < 100; i++ { + if i/10 != 3 { + uniqueWithout30s = append(uniqueWithout30s, fmt.Sprintf("value%d", i)) + } + } + sort.Strings(uniqueWithout30s) + testCases := []struct { + name string + labelName string + matchers []*labels.Matcher + expectedValues []string + }{ + { + name: "get tens based on unique ID", + labelName: "tens", + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "unique", "value35")}, + expectedValues: []string{"value3"}, + }, { + name: "get unique IDs based on a ten", + labelName: "unique", + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "tens", "value1")}, + expectedValues: []string{"value10", "value11", "value12", "value13", "value14", "value15", "value16", "value17", "value18", "value19"}, + }, { + name: "get tens by pattern matching on unique ID", + labelName: "tens", + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchRegexp, "unique", "value[5-7]5")}, + expectedValues: []string{"value5", "value6", "value7"}, + }, { + name: "get tens by matching for presence of unique label", + labelName: "tens", + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchNotEqual, "unique", "")}, + expectedValues: []string{"value0", "value1", "value2", "value3", "value4", "value5", "value6", "value7", "value8", "value9"}, + }, { + name: "get unique IDs based on tens not being equal to a certain value, while not empty", + labelName: "unique", + matchers: []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchNotEqual, "tens", "value3"), + labels.MustNewMatcher(labels.MatchNotEqual, "tens", ""), + }, + expectedValues: uniqueWithout30s, + }, { + name: "get tens for empty unique ID", + labelName: "tens", + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "unique", "")}, + expectedValues: nil, + }, { + // In this case, we query for the "unique" label where the "tens" label is absent. + // We have one series where "tens" is empty (unique="value99"), but also another with + // the same value for "unique" and "tens" present. Make sure that unique="value99" is + // still found. + name: "get unique ID where tens is empty", + labelName: "unique", + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "tens", "")}, + expectedValues: []string{"value99"}, + }, + } + for _, tt := range testCases { + t.Run(tt.name, func(t *testing.T) { + it := indexReader.LabelValuesStream(ctx, tt.labelName, tt.matchers...) + var values []string + for it.Next() { + values = append(values, it.At()) + } + require.NoError(t, it.Err()) + require.Empty(t, it.Warnings()) + require.Equal(t, tt.expectedValues, values) + }) + } +} + func TestHeadLabelNamesWithMatchers(t *testing.T) { head, _ := newTestHead(t, 1000, wlog.CompressionNone, false) defer func() { diff --git a/tsdb/index/index.go b/tsdb/index/index.go index 7162e2e1b2..05278cda4b 100644 --- a/tsdb/index/index.go +++ b/tsdb/index/index.go @@ -25,6 +25,7 @@ import ( "math" "os" "path/filepath" + "reflect" "sort" "unsafe" @@ -1301,6 +1302,10 @@ func newReader(b ByteSlice, c io.Closer, cacheProvider ReaderCacheProvider) (*Re return r, nil } +func (r *Reader) Labels(ref storage.SeriesRef, builder *labels.ScratchBuilder) error { + return r.Series(ref, builder, nil) +} + // Version returns the file format version of the underlying index. func (r *Reader) Version() int { return r.version @@ -1596,6 +1601,22 @@ func (r *Reader) LabelValues(ctx context.Context, name string, matchers ...*labe return values, ctx.Err() } +func (r *Reader) LabelValuesStream(_ context.Context, name string, matchers ...*labels.Matcher) storage.LabelValues { + if r.version == FormatV1 { + p := r.postingsV1[name] + if len(p) == 0 { + return storage.EmptyLabelValues() + } + return &labelValuesV1{ + matchers: matchers, + it: reflect.ValueOf(p).MapRange(), + name: name, + } + } + + return r.newLabelValuesV2(name, matchers) +} + // LabelNamesFor returns all the label names for the series referred to by IDs. // The names returned are sorted. func (r *Reader) LabelNamesFor(ctx context.Context, ids ...storage.SeriesRef) ([]string, error) { diff --git a/tsdb/index/index_test.go b/tsdb/index/index_test.go index faae6b1a07..27316ba151 100644 --- a/tsdb/index/index_test.go +++ b/tsdb/index/index_test.go @@ -136,6 +136,15 @@ func (m mockIndex) Series(ref storage.SeriesRef, builder *labels.ScratchBuilder, return nil } +func (m mockIndex) Labels(ref storage.SeriesRef, builder *labels.ScratchBuilder) error { + s, ok := m.series[ref] + if !ok { + return errors.New("not found") + } + builder.Assign(s.l) + return nil +} + func TestIndexRW_Create_Open(t *testing.T) { dir := t.TempDir() diff --git a/tsdb/index/labelvalues.go b/tsdb/index/labelvalues.go index 9680e5d695..481f2fbfd4 100644 --- a/tsdb/index/labelvalues.go +++ b/tsdb/index/labelvalues.go @@ -1,15 +1,163 @@ package index import ( + "context" + "errors" "fmt" + "reflect" "golang.org/x/exp/slices" + "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb/encoding" "github.com/prometheus/prometheus/util/annotations" ) +type labelValuesV2 struct { + name string + cur string + dec encoding.Decbuf + matchers []*labels.Matcher + skip int + lastVal string + exhausted bool + err error +} + +// newLabelValuesV2 returns an iterator over label values in a v2 index. +func (r *Reader) newLabelValuesV2(name string, matchers []*labels.Matcher) storage.LabelValues { + p := r.postings[name] + if len(p) == 0 { + return storage.EmptyLabelValues() + } + + d := encoding.NewDecbufAt(r.b, int(r.toc.PostingsTable), nil) + d.Skip(p[0].off) + // These are always the same number of bytes, and it's faster to skip than to parse + skip := d.Len() + // Key count + d.Uvarint() + // Label name + d.UvarintBytes() + skip -= d.Len() + + return &labelValuesV2{ + name: name, + matchers: matchers, + dec: d, + lastVal: p[len(p)-1].value, + skip: skip, + } +} + +func (l *labelValuesV2) Next() bool { + if l.err != nil || l.exhausted { + return false + } + + // Pick the first matching label value + for l.dec.Err() == nil { + // Label value + val := yoloString(l.dec.UvarintBytes()) + isMatch := true + for _, m := range l.matchers { + if m.Name != l.name { + // This should not happen + continue + } + + if !m.Matches(val) { + isMatch = false + break + } + } + + if isMatch { + l.cur = val + } + if val == l.lastVal { + l.exhausted = true + return isMatch + } + + // Offset + l.dec.Uvarint64() + // Skip forward to next entry + l.dec.Skip(l.skip) + + if isMatch { + break + } + } + if l.dec.Err() != nil { + // An error occurred decoding + l.err = fmt.Errorf("get postings offset entry: %w", l.dec.Err()) + return false + } + + return true +} + +func (l *labelValuesV2) At() string { + return l.cur +} + +func (l *labelValuesV2) Err() error { + return l.err +} + +func (l *labelValuesV2) Warnings() annotations.Annotations { + return nil +} + +func (l *labelValuesV2) Close() error { + return nil +} + +type labelValuesV1 struct { + it *reflect.MapIter + matchers []*labels.Matcher + name string +} + +func (l *labelValuesV1) Next() bool { +loop: + for l.it.Next() { + for _, m := range l.matchers { + if m.Name != l.name { + // This should not happen + continue + } + + if !m.Matches(l.At()) { + continue loop + } + } + + // This entry satisfies all matchers + return true + } + + return false +} + +func (l *labelValuesV1) At() string { + return yoloString(l.it.Value().Bytes()) +} + +func (*labelValuesV1) Err() error { + return nil +} + +func (*labelValuesV1) Warnings() annotations.Annotations { + return nil +} + +func (*labelValuesV1) Close() error { + return nil +} + // LabelValuesFor returns LabelValues for the given label name in the series referred to by postings. func (r *Reader) LabelValuesFor(postings Postings, name string) storage.LabelValues { return r.labelValuesFor(postings, name, true) @@ -58,7 +206,7 @@ func (r *Reader) labelValuesForV1(postings Postings, name string, includeMatches slices.Sort(vals) return &intersectLabelValuesV1{ postingOffsets: e, - values: storage.NewListLabelValues(vals), + values: storage.NewListLabelValues(vals, nil), postings: NewPostingsCloner(postings), b: r.b, dec: r.dec, @@ -208,9 +356,89 @@ func (it *intersectLabelValues) Close() error { return nil } +type LabelsGetter interface { + // Labels reads the series with the given ref and writes its labels into builder. + Labels(ref storage.SeriesRef, builder *labels.ScratchBuilder) error +} + // LabelValuesFor returns LabelValues for the given label name in the series referred to by postings. -func (p *MemPostings) LabelValuesFor(postings Postings, name string) storage.LabelValues { - return p.labelValuesFor(postings, name, true) +// lg is used to get labels from series in case the ratio of postings vs label values is sufficiently low, +// as an optimization. +func (p *MemPostings) LabelValuesFor(postings Postings, name string, lg LabelsGetter) storage.LabelValues { + p.mtx.RLock() + + e := p.m[name] + if len(e) == 0 { + p.mtx.RUnlock() + return storage.EmptyLabelValues() + } + + // With thread safety in mind and due to random key ordering in map, we have to construct the array in memory + vals := make([]string, 0, len(e)) + for val := range e { + vals = append(vals, val) + } + + // Let's see if expanded postings for matchers have smaller cardinality than label values. + // Since computing label values from series is expensive, we apply a limit on number of expanded + // postings (and series). + const maxExpandedPostingsFactor = 100 // Division factor for maximum number of matched series. + maxExpandedPostings := len(vals) / maxExpandedPostingsFactor + if maxExpandedPostings > 0 { + // Add space for one extra posting when checking if we expanded all postings. + expanded := make([]storage.SeriesRef, 0, maxExpandedPostings+1) + + // Call postings.Next() even if len(expanded) == maxExpandedPostings. This tells us if there are more postings or not. + for len(expanded) <= maxExpandedPostings && postings.Next() { + expanded = append(expanded, postings.At()) + } + + if len(expanded) <= maxExpandedPostings { + // When we're here, postings.Next() must have returned false, so we need to check for errors. + p.mtx.RUnlock() + + if err := postings.Err(); err != nil { + return storage.ErrLabelValues(fmt.Errorf("expanding postings for matchers: %w", err)) + } + + // We have expanded all the postings -- all returned label values will be from these series only. + // (We supply vals as a buffer for storing results. It should be big enough already, since it holds all possible label values.) + vals, err := LabelValuesFromSeries(lg, name, expanded, vals) + if err != nil { + return storage.ErrLabelValues(err) + } + + slices.Sort(vals) + return storage.NewListLabelValues(vals, nil) + } + + // If we haven't reached end of postings, we prepend our expanded postings to "postings", and continue. + postings = NewPrependPostings(expanded, postings) + } + + candidates := make([]Postings, 0, len(e)) + vals = vals[:0] + for val, srs := range e { + vals = append(vals, val) + candidates = append(candidates, NewListPostings(srs)) + } + indexes, err := FindIntersectingPostings(postings, candidates) + p.mtx.RUnlock() + if err != nil { + return storage.ErrLabelValues(err) + } + + // Filter the values, keeping only those with intersecting postings + if len(vals) != len(indexes) { + slices.Sort(indexes) + for i, index := range indexes { + vals[i] = vals[index] + } + vals = vals[:len(indexes)] + } + + slices.Sort(vals) + return storage.NewListLabelValues(vals, nil) } // LabelValuesExcluding returns LabelValues for the given label name in all other series than those referred to by postings. @@ -266,7 +494,40 @@ func (p *MemPostings) labelValuesFor(postings Postings, name string, includeMatc } slices.Sort(vals) - return storage.NewListLabelValues(vals) + return storage.NewListLabelValues(vals, nil) +} + +// LabelValuesFromSeries returns all unique label values from r for given label name from supplied series. Values are not sorted. +// buf is space for holding the result (if it isn't big enough, it will be ignored), may be nil. +func LabelValuesFromSeries(lg LabelsGetter, labelName string, refs []storage.SeriesRef, buf []string) ([]string, error) { + values := make(map[string]struct{}, len(buf)) + + var builder labels.ScratchBuilder + for _, ref := range refs { + err := lg.Labels(ref, &builder) + // Postings may be stale. Skip if no underlying series exists. + if errors.Is(err, storage.ErrNotFound) { + continue + } + if err != nil { + return nil, fmt.Errorf("label values for label %s: %w", labelName, err) + } + + v := builder.Labels().Get(labelName) + if v != "" { + values[v] = struct{}{} + } + } + + if cap(buf) >= len(values) { + buf = buf[:0] + } else { + buf = make([]string, 0, len(values)) + } + for v := range values { + buf = append(buf, v) + } + return buf, nil } // intersect returns whether p1 and p2 have at least one series in common. @@ -312,3 +573,30 @@ func contains(p, subp Postings) bool { // Couldn't find any value in subp which is not in p. return true } + +// LabelValuesStream returns an iterator over sorted label values for the given name. +// The matchers should only be for the name in question. +// LabelValues iterators need to be sorted, to enable merging of them. +func (p *MemPostings) LabelValuesStream(_ context.Context, name string, matchers ...*labels.Matcher) storage.LabelValues { + p.mtx.RLock() + + values := make([]string, 0, len(p.m[name])) +loop: + for v := range p.m[name] { + for _, m := range matchers { + if m.Name != name { + // This should not happen + continue + } + + if !m.Matches(v) { + continue loop + } + } + values = append(values, v) + } + p.mtx.RUnlock() + + slices.Sort(values) + return storage.NewListLabelValues(values, nil) +} diff --git a/tsdb/index/labelvalues_test.go b/tsdb/index/labelvalues_test.go index 1c9cb4c42e..a4edfcd20c 100644 --- a/tsdb/index/labelvalues_test.go +++ b/tsdb/index/labelvalues_test.go @@ -230,7 +230,7 @@ func TestMemPostings_LabelValuesFor(t *testing.T) { t.Run("filtering based on non-empty postings", func(t *testing.T) { p := mp.Get("a", "1") - it := mp.LabelValuesFor(p, "b") + it := mp.LabelValuesFor(p, "b", nil) t.Cleanup(func() { require.NoError(t, it.Close()) }) @@ -248,7 +248,7 @@ func TestMemPostings_LabelValuesFor(t *testing.T) { t.Run("requesting a non-existent label value", func(t *testing.T) { p := mp.Get("a", "1") - it := mp.LabelValuesFor(p, "c") + it := mp.LabelValuesFor(p, "c", nil) t.Cleanup(func() { require.NoError(t, it.Close()) }) @@ -259,7 +259,7 @@ func TestMemPostings_LabelValuesFor(t *testing.T) { }) t.Run("filtering based on empty postings", func(t *testing.T) { - it := mp.LabelValuesFor(EmptyPostings(), "a") + it := mp.LabelValuesFor(EmptyPostings(), "a", nil) t.Cleanup(func() { require.NoError(t, it.Close()) }) @@ -272,7 +272,7 @@ func TestMemPostings_LabelValuesFor(t *testing.T) { t.Run("filtering based on a postings set missing the label", func(t *testing.T) { p := mp.Get("d", "1") - it := mp.LabelValuesFor(p, "a") + it := mp.LabelValuesFor(p, "a", nil) t.Cleanup(func() { require.NoError(t, it.Close()) }) @@ -358,3 +358,62 @@ func TestMemPostings_LabelValuesExcluding(t *testing.T) { require.Equal(t, []string{"1", "2"}, vals) }) } + +func TestMemPostings_LabelValuesStream(t *testing.T) { + ctx := context.Background() + mp := NewMemPostings() + mp.Add(1, labels.FromStrings("a", "1")) + mp.Add(1, labels.FromStrings("b", "1")) + mp.Add(2, labels.FromStrings("a", "1")) + mp.Add(2, labels.FromStrings("b", "2")) + mp.Add(3, labels.FromStrings("a", "1")) + mp.Add(3, labels.FromStrings("b", "3")) + mp.Add(4, labels.FromStrings("a", "1")) + mp.Add(4, labels.FromStrings("b", "4")) + mp.Add(5, labels.FromStrings("a", "2")) + mp.Add(5, labels.FromStrings("b", "5")) + + t.Run("without matchers", func(t *testing.T) { + it := mp.LabelValuesStream(context.Background(), "b") + + var vals []string + for it.Next() { + vals = append(vals, it.At()) + } + require.NoError(t, it.Err()) + require.Empty(t, it.Warnings()) + require.Equal(t, []string{"1", "2", "3", "4", "5"}, vals) + }) + + t.Run("with matchers", func(t *testing.T) { + it := mp.LabelValuesStream(context.Background(), "b", labels.MustNewMatcher(labels.MatchRegexp, "b", "[2,3]")) + + var vals []string + for it.Next() { + vals = append(vals, it.At()) + } + require.NoError(t, it.Err()) + require.Empty(t, it.Warnings()) + require.Equal(t, []string{"2", "3"}, vals) + }) + + // Matchers for other labels should be ignored. + t.Run("with matchers for another label", func(t *testing.T) { + it := mp.LabelValuesStream(context.Background(), "b", labels.MustNewMatcher(labels.MatchEqual, "a", "1")) + + var vals []string + for it.Next() { + vals = append(vals, it.At()) + } + require.NoError(t, it.Err()) + require.Empty(t, it.Warnings()) + require.Equal(t, []string{"1", "2", "3", "4", "5"}, vals) + }) + + t.Run("non-existent label", func(t *testing.T) { + it := mp.LabelValuesStream(ctx, "c") + require.False(t, it.Next()) + require.NoError(t, it.Err()) + require.Empty(t, it.Warnings()) + }) +} diff --git a/tsdb/index/postings.go b/tsdb/index/postings.go index acb6948e32..6d7c76199d 100644 --- a/tsdb/index/postings.go +++ b/tsdb/index/postings.go @@ -1007,3 +1007,51 @@ func fastPostingsForMatcher(ctx context.Context, pr PostingsReader, m *labels.Ma return nil, false } + +func NewPrependPostings(a []storage.SeriesRef, b Postings) Postings { + return &prependPostings{ + ix: -1, + prefix: a, + rest: b, + } +} + +// prependPostings returns series references from "prefix" before using "rest" postings. +type prependPostings struct { + ix int + prefix []storage.SeriesRef + rest Postings +} + +func (p *prependPostings) Next() bool { + p.ix++ + if p.ix < len(p.prefix) { + return true + } + return p.rest.Next() +} + +func (p *prependPostings) Seek(v storage.SeriesRef) bool { + for p.ix < len(p.prefix) { + if p.ix >= 0 && p.prefix[p.ix] >= v { + return true + } + p.ix++ + } + + return p.rest.Seek(v) +} + +func (p *prependPostings) At() storage.SeriesRef { + if p.ix >= 0 && p.ix < len(p.prefix) { + return p.prefix[p.ix] + } + return p.rest.At() +} + +func (p *prependPostings) Err() error { + if p.ix >= 0 && p.ix < len(p.prefix) { + return nil + } + return p.rest.Err() +} diff --git a/tsdb/index/postings_test.go b/tsdb/index/postings_test.go index a51655ddbd..53b04eae91 100644 --- a/tsdb/index/postings_test.go +++ b/tsdb/index/postings_test.go @@ -1501,6 +1501,61 @@ func TestListPostings(t *testing.T) { }) } +func TestPrependPostings(t *testing.T) { + t.Run("empty", func(t *testing.T) { + p := NewPrependPostings(nil, NewListPostings(nil)) + require.False(t, p.Next()) + }) + + t.Run("next+At", func(t *testing.T) { + p := NewPrependPostings([]storage.SeriesRef{10, 20, 30}, NewListPostings([]storage.SeriesRef{200, 300, 500})) + + for _, s := range []storage.SeriesRef{10, 20, 30, 200, 300, 500} { + require.True(t, p.Next()) + require.Equal(t, s, p.At()) + require.Equal(t, s, p.At()) // Multiple calls return same value. + } + require.False(t, p.Next()) + }) + + t.Run("seek+At", func(t *testing.T) { + p := NewPrependPostings([]storage.SeriesRef{10, 20, 30}, NewListPostings([]storage.SeriesRef{200, 300, 500})) + + require.True(t, p.Seek(5)) + require.Equal(t, storage.SeriesRef(10), p.At()) + require.Equal(t, storage.SeriesRef(10), p.At()) + + require.True(t, p.Seek(15)) + require.Equal(t, storage.SeriesRef(20), p.At()) + require.Equal(t, storage.SeriesRef(20), p.At()) + + require.True(t, p.Seek(20)) // Seeking to "current" value doesn't move postings iterator. + require.Equal(t, storage.SeriesRef(20), p.At()) + require.Equal(t, storage.SeriesRef(20), p.At()) + + require.True(t, p.Seek(50)) + require.Equal(t, storage.SeriesRef(200), p.At()) + require.Equal(t, storage.SeriesRef(200), p.At()) + + require.False(t, p.Seek(1000)) + require.False(t, p.Next()) + }) + + t.Run("err", func(t *testing.T) { + err := fmt.Errorf("error") + p := NewPrependPostings([]storage.SeriesRef{10, 20, 30}, ErrPostings(err)) + + for _, s := range []storage.SeriesRef{10, 20, 30} { + require.True(t, p.Next()) + require.Equal(t, s, p.At()) + require.NoError(t, p.Err()) + } + // Advancing after prepended values returns false, and gives us access to error. + require.False(t, p.Next()) + require.Equal(t, err, p.Err()) + }) +} + // BenchmarkListPostings benchmarks ListPostings by iterating Next/At sequentially. // See also BenchmarkIntersect as it performs more `At` calls than `Next` calls when intersecting. func BenchmarkListPostings(b *testing.B) { diff --git a/tsdb/labelvalues.go b/tsdb/labelvalues.go new file mode 100644 index 0000000000..5624cdae97 --- /dev/null +++ b/tsdb/labelvalues.go @@ -0,0 +1,117 @@ +package tsdb + +import ( + "context" + "fmt" + + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/tsdb/index" +) + +func labelValuesForMatchersStream(ctx context.Context, r IndexReader, name string, matchers []*labels.Matcher) storage.LabelValues { + // See which labels must be non-empty. + // Optimization for case like {l=~".", l!="1"}. + labelMustBeSet := make(map[string]bool, len(matchers)) + for _, m := range matchers { + if !m.Matches("") { + labelMustBeSet[m.Name] = true + } + } + + var its, notIts []index.Postings + for _, m := range matchers { + switch { + case labelMustBeSet[m.Name]: + // If this matcher must be non-empty, we can be smarter. + matchesEmpty := m.Matches("") + isNot := m.Type == labels.MatchNotEqual || m.Type == labels.MatchNotRegexp + switch { + case isNot && matchesEmpty: // l!="foo" + // If the label can't be empty and is a Not and the inner matcher + // doesn't match empty, then subtract it out at the end. + inverse, err := m.Inverse() + if err != nil { + return storage.ErrLabelValues(err) + } + + it := r.PostingsForMatcher(ctx, inverse) + if it.Err() != nil { + return storage.ErrLabelValues(it.Err()) + } + notIts = append(notIts, it) + case isNot && !matchesEmpty: // l!="" + // If the label can't be empty and is a Not, but the inner matcher can + // be empty we need to use inversePostingsForMatcher. + inverse, err := m.Inverse() + if err != nil { + return storage.ErrLabelValues(err) + } + + it, err := inversePostingsForMatcher(ctx, r, inverse) + if err != nil { + return storage.ErrLabelValues(err) + } + if index.IsEmptyPostingsType(it) { + return storage.EmptyLabelValues() + } + its = append(its, it) + default: // l="a" + // Non-Not matcher, use normal postingsForMatcher. + it := r.PostingsForMatcher(ctx, m) + if it.Err() != nil { + return storage.ErrLabelValues(it.Err()) + } + if index.IsEmptyPostingsType(it) { + return storage.EmptyLabelValues() + } + its = append(its, it) + } + default: // l="" + // If a matcher for a labelname selects an empty value, it selects all + // the series which don't have the label name set too. See: + // https://github.com/prometheus/prometheus/issues/3575 and + // https://github.com/prometheus/prometheus/pull/3578#issuecomment-351653555 + it, err := inversePostingsForMatcher(ctx, r, m) + if err != nil { + return storage.ErrLabelValues(err) + } + notIts = append(notIts, it) + } + } + + if len(its) == 0 && len(notIts) > 0 { + pit := index.Merge(ctx, notIts...) + return r.LabelValuesExcluding(pit, name) + } + + pit := index.Intersect(its...) + for _, n := range notIts { + pit = index.Without(pit, n) + } + pit = expandPostings(pit) + + return r.LabelValuesFor(pit, name) +} + +// expandPostings expands postings up to a certain limit, to reduce runtime complexity when filtering label values. +// If the limit is reached, the rest is unexpanded. +func expandPostings(postings index.Postings) index.Postings { + const expandPostingsLimit = 10_000_000 + var expanded []storage.SeriesRef + // Go one beyond the limit, so we can tell if the iterator is exhausted + for len(expanded) <= expandPostingsLimit && postings.Next() { + expanded = append(expanded, postings.At()) + } + if postings.Err() != nil { + return index.ErrPostings(fmt.Errorf("expanding postings for matchers: %w", postings.Err())) + } + if len(expanded) > expandPostingsLimit { + // Couldn't exhaust the iterator + postings = index.NewPrependPostings(expanded, postings) + } else { + postings = index.NewListPostings(expanded) + } + + return postings +} diff --git a/tsdb/ooo_head_read.go b/tsdb/ooo_head_read.go index 85e8e51118..65c16a456c 100644 --- a/tsdb/ooo_head_read.go +++ b/tsdb/ooo_head_read.go @@ -185,6 +185,28 @@ func (oh *OOOHeadIndexReader) LabelValues(ctx context.Context, name string, matc return labelValuesWithMatchers(ctx, oh, name, matchers...) } +// LabelValuesStream needs to be overridden from the headIndexReader implementation due +// to the check that happens at the beginning where we make sure that the query +// interval overlaps with the head minooot and maxooot. +func (oh *OOOHeadIndexReader) LabelValuesStream(ctx context.Context, name string, matchers ...*labels.Matcher) storage.LabelValues { + if oh.maxt < oh.head.MinOOOTime() || oh.mint > oh.head.MaxOOOTime() { + return storage.EmptyLabelValues() + } + + ownMatchers := 0 + for _, m := range matchers { + if m.Name == name { + ownMatchers++ + } + } + if ownMatchers == len(matchers) { + return oh.head.postings.LabelValuesStream(ctx, name, matchers...) + } + + // There are matchers on other label names than the requested one, so will need to intersect matching series + return labelValuesForMatchersStream(ctx, oh, name, matchers) +} + type chunkMetaAndChunkDiskMapperRef struct { meta chunks.Meta ref chunks.ChunkDiskMapperRef @@ -472,6 +494,10 @@ func (ir *OOOCompactionHeadIndexReader) Series(ref storage.SeriesRef, builder *l return ir.ch.oooIR.series(ref, builder, chks, 0, ir.ch.lastMmapRef) } +func (ir *OOOCompactionHeadIndexReader) Labels(ref storage.SeriesRef, builder *labels.ScratchBuilder) error { + return ir.ch.oooIR.series(ref, builder, nil, 0, ir.ch.lastMmapRef) +} + func (ir *OOOCompactionHeadIndexReader) SortedLabelValues(_ context.Context, name string, matchers ...*labels.Matcher) ([]string, error) { return nil, errors.New("not implemented") } @@ -480,6 +506,10 @@ func (ir *OOOCompactionHeadIndexReader) LabelValues(_ context.Context, name stri return nil, errors.New("not implemented") } +func (ir *OOOCompactionHeadIndexReader) LabelValuesStream(context.Context, string, ...*labels.Matcher) storage.LabelValues { + return storage.ErrLabelValues(errors.New("not implemented")) +} + func (ir *OOOCompactionHeadIndexReader) PostingsForMatchers(_ context.Context, concurrent bool, ms ...*labels.Matcher) (index.Postings, error) { return nil, errors.New("not implemented") } diff --git a/tsdb/querier.go b/tsdb/querier.go index d5bd68caea..7601ba795b 100644 --- a/tsdb/querier.go +++ b/tsdb/querier.go @@ -79,6 +79,10 @@ func (q *blockBaseQuerier) LabelValues(ctx context.Context, name string, matcher return res, nil, err } +func (q *blockBaseQuerier) LabelValuesStream(ctx context.Context, name string, matchers ...*labels.Matcher) storage.LabelValues { + return q.index.LabelValuesStream(ctx, name, matchers...) +} + func (q *blockBaseQuerier) LabelNames(ctx context.Context, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) { res, err := q.index.LabelNames(ctx, matchers...) return res, nil, err @@ -411,11 +415,11 @@ func labelValuesWithMatchers(ctx context.Context, r IndexReader, name string, ma // We have expanded all the postings -- all returned label values will be from these series only. // (We supply allValues as a buffer for storing results. It should be big enough already, since it holds all possible label values.) - return labelValuesFromSeries(r, name, expanded, allValues) + return index.LabelValuesFromSeries(r, name, expanded, allValues) } // If we haven't reached end of postings, we prepend our expanded postings to "p", and continue. - p = newPrependPostings(expanded, p) + p = index.NewPrependPostings(expanded, p) } valuesPostings := make([]index.Postings, len(allValues)) @@ -438,87 +442,6 @@ func labelValuesWithMatchers(ctx context.Context, r IndexReader, name string, ma return values, nil } -// labelValuesFromSeries returns all unique label values from for given label name from supplied series. Values are not sorted. -// buf is space for holding result (if it isn't big enough, it will be ignored), may be nil. -func labelValuesFromSeries(r IndexReader, labelName string, refs []storage.SeriesRef, buf []string) ([]string, error) { - values := map[string]struct{}{} - - var builder labels.ScratchBuilder - for _, ref := range refs { - err := r.Series(ref, &builder, nil) - // Postings may be stale. Skip if no underlying series exists. - if errors.Is(err, storage.ErrNotFound) { - continue - } - if err != nil { - return nil, fmt.Errorf("label values for label %s: %w", labelName, err) - } - - v := builder.Labels().Get(labelName) - if v != "" { - values[v] = struct{}{} - } - } - - if cap(buf) >= len(values) { - buf = buf[:0] - } else { - buf = make([]string, 0, len(values)) - } - for v := range values { - buf = append(buf, v) - } - return buf, nil -} - -func newPrependPostings(a []storage.SeriesRef, b index.Postings) index.Postings { - return &prependPostings{ - ix: -1, - prefix: a, - rest: b, - } -} - -// prependPostings returns series references from "prefix" before using "rest" postings. -type prependPostings struct { - ix int - prefix []storage.SeriesRef - rest index.Postings -} - -func (p *prependPostings) Next() bool { - p.ix++ - if p.ix < len(p.prefix) { - return true - } - return p.rest.Next() -} - -func (p *prependPostings) Seek(v storage.SeriesRef) bool { - for p.ix < len(p.prefix) { - if p.ix >= 0 && p.prefix[p.ix] >= v { - return true - } - p.ix++ - } - - return p.rest.Seek(v) -} - -func (p *prependPostings) At() storage.SeriesRef { - if p.ix >= 0 && p.ix < len(p.prefix) { - return p.prefix[p.ix] - } - return p.rest.At() -} - -func (p *prependPostings) Err() error { - if p.ix >= 0 && p.ix < len(p.prefix) { - return nil - } - return p.rest.Err() -} - func labelNamesWithMatchers(ctx context.Context, r IndexReader, matchers ...*labels.Matcher) ([]string, error) { p, err := r.PostingsForMatchers(ctx, false, matchers...) if err != nil { diff --git a/tsdb/querier_bench_test.go b/tsdb/querier_bench_test.go index 1edc37c23d..eae5ebcadf 100644 --- a/tsdb/querier_bench_test.go +++ b/tsdb/querier_bench_test.go @@ -97,6 +97,60 @@ func BenchmarkQuerier(b *testing.B) { }) } +func BenchmarkIndexReader_LabelValuesStream(b *testing.B) { + opts := DefaultHeadOptions() + opts.ChunkRange = 1000 + opts.ChunkDirRoot = b.TempDir() + h, err := NewHead(nil, nil, nil, nil, opts, nil) + require.NoError(b, err) + b.Cleanup(func() { + require.NoError(b, h.Close()) + }) + + app := h.Appender(context.Background()) + addSeries := func(l labels.Labels) { + app.Append(0, l, 0, 0) + } + + for n := 0; n < 10; n++ { + for i := 0; i < 100000; i++ { + addSeries(labels.FromStrings("i", strconv.Itoa(i)+postingsBenchSuffix, "n", strconv.Itoa(n)+postingsBenchSuffix, "j", "foo", "i_times_n", strconv.Itoa(i*n))) + // Have some series that won't be matched, to properly test inverted matches. + addSeries(labels.FromStrings("i", strconv.Itoa(i)+postingsBenchSuffix, "n", strconv.Itoa(n)+postingsBenchSuffix, "j", "bar")) + addSeries(labels.FromStrings("i", strconv.Itoa(i)+postingsBenchSuffix, "n", "0_"+strconv.Itoa(n)+postingsBenchSuffix, "j", "bar")) + addSeries(labels.FromStrings("i", strconv.Itoa(i)+postingsBenchSuffix, "n", "1_"+strconv.Itoa(n)+postingsBenchSuffix, "j", "bar")) + addSeries(labels.FromStrings("i", strconv.Itoa(i)+postingsBenchSuffix, "n", "2_"+strconv.Itoa(n)+postingsBenchSuffix, "j", "foo")) + } + } + require.NoError(b, app.Commit()) + + b.Run("Head", func(b *testing.B) { + ir, err := h.Index() + require.NoError(b, err) + b.Cleanup(func() { + require.NoError(b, ir.Close()) + }) + + benchmarkLabelValuesStream(b, ir) + }) + + b.Run("Block", func(b *testing.B) { + blockdir := createBlockFromHead(b, b.TempDir(), h) + block, err := OpenBlock(nil, blockdir, nil) + require.NoError(b, err) + b.Cleanup(func() { + require.NoError(b, block.Close()) + }) + ir, err := block.Index() + require.NoError(b, err) + b.Cleanup(func() { + require.NoError(b, ir.Close()) + }) + + benchmarkLabelValuesStream(b, ir) + }) +} + func benchmarkPostingsForMatchers(b *testing.B, ir IndexReader) { ctx := context.Background() @@ -242,6 +296,55 @@ func benchmarkLabelValuesWithMatchers(b *testing.B, ir IndexReader) { } } +func benchmarkLabelValuesStream(b *testing.B, ir IndexReader) { + i1 := labels.MustNewMatcher(labels.MatchEqual, "i", "1") + iStar := labels.MustNewMatcher(labels.MatchRegexp, "i", "^.*$") + jNotFoo := labels.MustNewMatcher(labels.MatchNotEqual, "j", "foo") + jXXXYYY := labels.MustNewMatcher(labels.MatchRegexp, "j", "XXX|YYY") + jXplus := labels.MustNewMatcher(labels.MatchRegexp, "j", "X.+") + n1 := labels.MustNewMatcher(labels.MatchEqual, "n", "1"+postingsBenchSuffix) + nX := labels.MustNewMatcher(labels.MatchNotEqual, "n", "X"+postingsBenchSuffix) + // XXX: This is badly defined, i.e. it advertises as matching on "n", but matches on "i" + nPlus := labels.MustNewMatcher(labels.MatchRegexp, "i", "^.+$") + primesTimes := labels.MustNewMatcher(labels.MatchEqual, "i_times_n", "533701") // = 76243*7, ie. multiplication of primes. It will match a single i*n combination. + nonPrimesTimes := labels.MustNewMatcher(labels.MatchEqual, "i_times_n", "20") // 1*20, 2*10, 4*5, 5*4 + times12 := labels.MustNewMatcher(labels.MatchRegexp, "i_times_n", "12.*") + + ctx := context.Background() + + cases := []struct { + name string + labelName string + matchers []*labels.Matcher + }{ + {`i with i="1"`, "i", []*labels.Matcher{i1}}, + {`i with n="1"`, "i", []*labels.Matcher{n1}}, + {`i with n="^.+$"`, "i", []*labels.Matcher{nPlus}}, + {`i with n="1",j!="foo"`, "i", []*labels.Matcher{n1, jNotFoo}}, + {`i with n="1",j=~"X.+"`, "i", []*labels.Matcher{n1, jXplus}}, + {`i with n="1",j=~"XXX|YYY"`, "i", []*labels.Matcher{n1, jXXXYYY}}, + {`i with n="X",j!="foo"`, "i", []*labels.Matcher{nX, jNotFoo}}, + {`i with n="1",i=~"^.*$",j!="foo"`, "i", []*labels.Matcher{n1, iStar, jNotFoo}}, + {`i with i_times_n=533701`, "i", []*labels.Matcher{primesTimes}}, + {`i with i_times_n=20`, "i", []*labels.Matcher{nonPrimesTimes}}, + {`i with i_times_n=~"12.*""`, "i", []*labels.Matcher{times12}}, + {`n with j!="foo"`, "n", []*labels.Matcher{jNotFoo}}, + {`n with i="1"`, "n", []*labels.Matcher{i1}}, + } + + for _, c := range cases { + b.Run(c.name, func(b *testing.B) { + for i := 0; i < b.N; i++ { + it := ir.LabelValuesStream(ctx, c.labelName, c.matchers...) + for it.Next() { + } + require.NoError(b, it.Err()) + require.Empty(b, it.Warnings()) + } + }) + } +} + func BenchmarkMergedStringIter(b *testing.B) { numSymbols := 100000 s := make([]string, numSymbols) diff --git a/tsdb/querier_test.go b/tsdb/querier_test.go index 082b82f9be..77a681bfaf 100644 --- a/tsdb/querier_test.go +++ b/tsdb/querier_test.go @@ -2327,6 +2327,11 @@ func (m mockIndex) LabelValues(_ context.Context, name string, matchers ...*labe return values, nil } +func (m mockIndex) LabelValuesStream(context.Context, string, ...*labels.Matcher) storage.LabelValues { + // TODO + return storage.EmptyLabelValues() +} + func (m mockIndex) LabelValueFor(_ context.Context, id storage.SeriesRef, label string) (string, error) { return m.series[id].l.Get(label), nil } @@ -2437,6 +2442,15 @@ func (m mockIndex) Series(ref storage.SeriesRef, builder *labels.ScratchBuilder, return nil } +func (m mockIndex) Labels(ref storage.SeriesRef, builder *labels.ScratchBuilder) error { + s, ok := m.series[ref] + if !ok { + return storage.ErrNotFound + } + builder.Assign(s.l) + return nil +} + func (m mockIndex) LabelNames(_ context.Context, matchers ...*labels.Matcher) ([]string, error) { names := map[string]struct{}{} if len(matchers) == 0 { @@ -3541,62 +3555,7 @@ func TestQueryWithDeletedHistograms(t *testing.T) { } } -func TestPrependPostings(t *testing.T) { - t.Run("empty", func(t *testing.T) { - p := newPrependPostings(nil, index.NewListPostings(nil)) - require.False(t, p.Next()) - }) - - t.Run("next+At", func(t *testing.T) { - p := newPrependPostings([]storage.SeriesRef{10, 20, 30}, index.NewListPostings([]storage.SeriesRef{200, 300, 500})) - - for _, s := range []storage.SeriesRef{10, 20, 30, 200, 300, 500} { - require.True(t, p.Next()) - require.Equal(t, s, p.At()) - require.Equal(t, s, p.At()) // Multiple calls return same value. - } - require.False(t, p.Next()) - }) - - t.Run("seek+At", func(t *testing.T) { - p := newPrependPostings([]storage.SeriesRef{10, 20, 30}, index.NewListPostings([]storage.SeriesRef{200, 300, 500})) - - require.True(t, p.Seek(5)) - require.Equal(t, storage.SeriesRef(10), p.At()) - require.Equal(t, storage.SeriesRef(10), p.At()) - - require.True(t, p.Seek(15)) - require.Equal(t, storage.SeriesRef(20), p.At()) - require.Equal(t, storage.SeriesRef(20), p.At()) - - require.True(t, p.Seek(20)) // Seeking to "current" value doesn't move postings iterator. - require.Equal(t, storage.SeriesRef(20), p.At()) - require.Equal(t, storage.SeriesRef(20), p.At()) - - require.True(t, p.Seek(50)) - require.Equal(t, storage.SeriesRef(200), p.At()) - require.Equal(t, storage.SeriesRef(200), p.At()) - - require.False(t, p.Seek(1000)) - require.False(t, p.Next()) - }) - - t.Run("err", func(t *testing.T) { - err := fmt.Errorf("error") - p := newPrependPostings([]storage.SeriesRef{10, 20, 30}, index.ErrPostings(err)) - - for _, s := range []storage.SeriesRef{10, 20, 30} { - require.True(t, p.Next()) - require.Equal(t, s, p.At()) - require.NoError(t, p.Err()) - } - // Advancing after prepended values returns false, and gives us access to error. - require.False(t, p.Next()) - require.Equal(t, err, p.Err()) - }) -} - -func TestLabelsValuesWithMatchersOptimization(t *testing.T) { +func TestLabelValuesWithMatchersOptimization(t *testing.T) { dir := t.TempDir() opts := DefaultHeadOptions() opts.ChunkRange = 1000 diff --git a/web/api/v1/errors_test.go b/web/api/v1/errors_test.go index b6ec7d4e1f..d46b0efbca 100644 --- a/web/api/v1/errors_test.go +++ b/web/api/v1/errors_test.go @@ -174,6 +174,10 @@ func (t errorTestQuerier) LabelValues(context.Context, string, ...*labels.Matche return nil, nil, t.err } +func (t errorTestQuerier) LabelValuesStream(context.Context, string, ...*labels.Matcher) storage.LabelValues { + return storage.ErrLabelValues(t.err) +} + func (t errorTestQuerier) LabelNames(context.Context, ...*labels.Matcher) ([]string, annotations.Annotations, error) { return nil, nil, t.err }