Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MQE: track number of processed samples in each query #10232

Merged
merged 5 commits into from
Jan 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 138 additions & 2 deletions pkg/streamingpromql/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2635,14 +2635,14 @@ func runMixedMetricsTests(t *testing.T, expressions []string, pointsPerSeries in
q, err := prometheusEngine.NewRangeQuery(context.Background(), storage, nil, expr, start, end, tr.interval)
require.NoError(t, err)
defer q.Close()
expectedResults := q.Exec(context.Background())
prometheusResults := q.Exec(context.Background())

q, err = mimirEngine.NewRangeQuery(context.Background(), storage, nil, expr, start, end, tr.interval)
require.NoError(t, err)
defer q.Close()
mimirResults := q.Exec(context.Background())

testutils.RequireEqualResults(t, expr, expectedResults, mimirResults, skipAnnotationComparison)
testutils.RequireEqualResults(t, expr, prometheusResults, mimirResults, skipAnnotationComparison)
})
}
}
Expand Down Expand Up @@ -2807,3 +2807,139 @@ func TestCompareVariousMixedMetricsComparisonOps(t *testing.T) {

runMixedMetricsTests(t, expressions, pointsPerSeries, seriesData, false)
}

func TestQueryStats(t *testing.T) {
opts := NewTestEngineOpts()
mimirEngine, err := NewEngine(opts, NewStaticQueryLimitsProvider(0), stats.NewQueryMetrics(nil), log.NewNopLogger())
require.NoError(t, err)

prometheusEngine := promql.NewEngine(opts.CommonOpts)

start := timestamp.Time(0)
end := start.Add(10 * time.Minute)

storage := promqltest.LoadedStorage(t, `
load 1m
dense_series 0 1 2 3 4 5 6 7 8 9 10
start_series 0 1 _ _ _ _ _ _ _ _ _
end_series _ _ _ _ _ 5 6 7 8 9 10
sparse_series 0 _ _ _ _ _ _ 7 _ _ _
charleskorn marked this conversation as resolved.
Show resolved Hide resolved
stale_series 0 1 2 3 4 5 stale 7 8 9 10
nan_series NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN
native_histogram_series {{schema:0 sum:2 count:4 buckets:[1 2 1]}} {{sum:2 count:4 buckets:[1 2 1]}}
`)

runQueryAndGetTotalSamples := func(t *testing.T, engine promql.QueryEngine, expr string, isInstantQuery bool) int64 {
var q promql.Query
var err error

if isInstantQuery {
q, err = engine.NewInstantQuery(context.Background(), storage, nil, expr, end)
} else {
q, err = engine.NewRangeQuery(context.Background(), storage, nil, expr, start, end, time.Minute)
}

require.NoError(t, err)

defer q.Close()

res := q.Exec(context.Background())
require.NoError(t, res.Err)

return q.Stats().Samples.TotalSamples
}

testCases := map[string]struct {
expr string
isInstantQuery bool
expectedTotalSamples int64
}{
"instant vector selector with point at every time step": {
expr: `dense_series{}`,
expectedTotalSamples: 11,
},
"instant vector selector with points only in start of time range": {
expr: `start_series{}`,
expectedTotalSamples: 2 + 4, // 2 for original points, plus 4 for lookback to last point.
},
"instant vector selector with points only at end of time range": {
expr: `end_series{}`,
expectedTotalSamples: 6,
},
"instant vector selector with sparse points": {
expr: `sparse_series{}`,
expectedTotalSamples: 5 + 4, // 5 for first point at T=0, and 4 for second point at T=7
},
"instant vector selector with stale marker": {
expr: `stale_series{}`,
expectedTotalSamples: 10, // Instant vector selectors ignore stale markers.
},

"raw range vector selector with single point": {
expr: `dense_series[45s]`,
isInstantQuery: true,
expectedTotalSamples: 1,
},
"raw range vector selector with multiple points": {
expr: `dense_series[3m45s]`,
isInstantQuery: true,
expectedTotalSamples: 4,
},

"range vector selector with point at every time step": {
expr: `sum_over_time(dense_series{}[30s])`,
expectedTotalSamples: 11,
},
"range vector selector with points only in start of time range": {
expr: `sum_over_time(start_series{}[30s])`,
expectedTotalSamples: 2,
},
"range vector selector with points only at end of time range": {
expr: `sum_over_time(end_series{}[30s])`,
expectedTotalSamples: 6,
},
"range vector selector with sparse points": {
expr: `sum_over_time(sparse_series{}[30s])`,
expectedTotalSamples: 2,
},
"range vector selector where range overlaps previous step's range": {
expr: `sum_over_time(dense_series{}[1m30s])`,
expectedTotalSamples: 21, // Each step except the first selects two points.
},
"range vector selector with stale marker": {
expr: `count_over_time(stale_series{}[1m30s])`,
expectedTotalSamples: 19, // Each step except the first selects two points. Range vector selectors ignore stale markers.
},

"expression with multiple selectors": {
expr: `dense_series{} + end_series{}`,
expectedTotalSamples: 11 + 6,
},
"instant vector selector with NaNs": {
expr: `nan_series{}`,
expectedTotalSamples: 11,
},
"range vector selector with NaNs": {
expr: `sum_over_time(nan_series{}[1m])`,
expectedTotalSamples: 11,
},
"instant vector selector with native histograms": {
expr: `native_histogram_series{}`,
expectedTotalSamples: 78,
},
"range vector selector with native histograms": {
expr: `sum_over_time(native_histogram_series{}[1m])`,
expectedTotalSamples: 26,
},
}

for name, testCase := range testCases {
t.Run(name, func(t *testing.T) {
prometheusCount := runQueryAndGetTotalSamples(t, prometheusEngine, testCase.expr, testCase.isInstantQuery)
require.Equal(t, testCase.expectedTotalSamples, prometheusCount, "invalid test case: expected samples does not match value from Prometheus' engine")

mimirCount := runQueryAndGetTotalSamples(t, mimirEngine, testCase.expr, testCase.isInstantQuery)
require.Equal(t, testCase.expectedTotalSamples, mimirCount)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can also compare the samples loaded as part of our test gauntlet if we expect it to be the same in all cases

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is currently difficult due to the optimisation in prometheus/prometheus#14097, as Prometheus' engine sometimes skips loading data for histograms if it's not needed. MQE does not yet have the same optimisation, so there are some expected differences in the total sample count from the two engines in some cases.

Given the tests in TestQueryStats, and the fact the statistics are informational and may differ between engines in the future due to other optimisations, I'm tempted to leave this as-is.

Thoughts?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Happy to leave it out if that's the case.

It might be interesting to see some much larger queries/time ranges etc to see if we are returning consistent results. We could perhaps use the same data generated from the benchmarks to create some large queries (of just floats since NH will be different). Then have a flag to RequireEqualResults to compare them etc.

This isn't a blocker.

})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
type InstantVectorSelector struct {
Selector *Selector
MemoryConsumptionTracker *limiting.MemoryConsumptionTracker
Stats *types.QueryStats

chunkIterator chunkenc.Iterator
memoizedIterator *storage.MemoizedSeriesIterator
Expand Down Expand Up @@ -143,6 +144,9 @@ func (v *InstantVectorSelector) NextSeries(ctx context.Context) (types.InstantVe
data.Histograms = append(data.Histograms, promql.HPoint{T: stepT, H: h})
lastHistogramT = t
lastHistogram = h

// For consistency with Prometheus' engine, we convert each histogram point to an equivalent number of float points.
v.Stats.TotalSamples += types.EquivalentFloatSampleCount(h)
} else {
// Only create the slice once we know the series is a histogram or not.
if len(data.Floats) == 0 {
Expand All @@ -161,6 +165,8 @@ func (v *InstantVectorSelector) NextSeries(ctx context.Context) (types.InstantVe
return types.InstantVectorSeriesData{}, v.memoizedIterator.Err()
}

v.Stats.TotalSamples += int64(len(data.Floats))

return data, nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ func TestInstantVectorSelector_NativeHistogramPointerHandling(t *testing.T) {
LookbackDelta: 5 * time.Minute,
},
MemoryConsumptionTracker: limiting.NewMemoryConsumptionTracker(0, nil),
Stats: &types.QueryStats{},
}

ctx := context.Background()
Expand Down Expand Up @@ -239,6 +240,7 @@ func TestInstantVectorSelector_SliceSizing(t *testing.T) {
LookbackDelta: 5 * time.Minute,
},
MemoryConsumptionTracker: limiting.NewMemoryConsumptionTracker(0, nil),
Stats: &types.QueryStats{},
}

ctx := context.Background()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

type RangeVectorSelector struct {
Selector *Selector
Stats *types.QueryStats

rangeMilliseconds int64
chunkIterator chunkenc.Iterator
Expand All @@ -32,9 +33,10 @@ type RangeVectorSelector struct {

var _ types.RangeVectorOperator = &RangeVectorSelector{}

func NewRangeVectorSelector(selector *Selector, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) *RangeVectorSelector {
func NewRangeVectorSelector(selector *Selector, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, stats *types.QueryStats) *RangeVectorSelector {
return &RangeVectorSelector{
Selector: selector,
Stats: stats,
floats: types.NewFPointRingBuffer(memoryConsumptionTracker),
histograms: types.NewHPointRingBuffer(memoryConsumptionTracker),
stepData: &types.RangeVectorStepData{},
Expand Down Expand Up @@ -102,6 +104,8 @@ func (m *RangeVectorSelector) NextStepSamples() (*types.RangeVectorStepData, err
m.stepData.RangeStart = rangeStart
m.stepData.RangeEnd = rangeEnd

m.Stats.TotalSamples += int64(m.stepData.Floats.Count()) + m.stepData.Histograms.EquivalentFloatSampleCount()

return m.stepData, nil
}

Expand Down
13 changes: 10 additions & 3 deletions pkg/streamingpromql/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type Query struct {
cancel context.CancelCauseFunc
memoryConsumptionTracker *limiting.MemoryConsumptionTracker
annotations *annotations.Annotations
stats *types.QueryStats

// Time range of the top-level query.
// Subqueries may use a different range.
Expand Down Expand Up @@ -79,6 +80,7 @@ func newQuery(ctx context.Context, queryable storage.Queryable, opts promql.Quer
qs: qs,
memoryConsumptionTracker: limiting.NewMemoryConsumptionTracker(maxEstimatedMemoryConsumptionPerQuery, engine.queriesRejectedDueToPeakMemoryConsumption),
annotations: annotations.New(),
stats: &types.QueryStats{},

statement: &parser.EvalStmt{
Expr: expr,
Expand Down Expand Up @@ -164,6 +166,7 @@ func (q *Query) convertToInstantVectorOperator(expr parser.Expr, timeRange types

ExpressionPosition: e.PositionRange(),
},
Stats: q.stats,
}, nil
case *parser.AggregateExpr:
if !q.engine.featureToggles.EnableAggregationOperations {
Expand Down Expand Up @@ -343,7 +346,7 @@ func (q *Query) convertToRangeVectorOperator(expr parser.Expr, timeRange types.Q
ExpressionPosition: e.PositionRange(),
}

return selectors.NewRangeVectorSelector(selector, q.memoryConsumptionTracker), nil
return selectors.NewRangeVectorSelector(selector, q.memoryConsumptionTracker, q.stats), nil

case *parser.SubqueryExpr:
if !q.engine.featureToggles.EnableSubqueries {
Expand Down Expand Up @@ -829,8 +832,12 @@ func (q *Query) Statement() parser.Statement {
}

func (q *Query) Stats() *stats.Statistics {
// Not yet supported.
return nil
return &stats.Statistics{
Timers: stats.NewQueryTimers(),
Samples: &stats.QuerySamples{
TotalSamples: q.stats.TotalSamples,
},
}
}

func (q *Query) Cancel() {
Expand Down
16 changes: 16 additions & 0 deletions pkg/streamingpromql/types/hpoint_ring_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,22 @@ func (v HPointRingBufferView) Count() int {
return v.size
}

// EquivalentFloatSampleCount returns the equivalent number of float samples in this ring buffer view.
func (v HPointRingBufferView) EquivalentFloatSampleCount() int64 {
count := int64(0)
head, tail := v.UnsafePoints()

for _, p := range head {
count += EquivalentFloatSampleCount(p.H)
}

for _, p := range tail {
count += EquivalentFloatSampleCount(p.H)
}

return count
}

// Any returns true if this ring buffer view contains any points.
func (v HPointRingBufferView) Any() bool {
return v.size != 0
Expand Down
30 changes: 30 additions & 0 deletions pkg/streamingpromql/types/stats.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// SPDX-License-Identifier: AGPL-3.0-only
// Provenance-includes-location: https://github.com/prometheus/prometheus/blob/main/util/stats/query_stats.go
// Provenance-includes-license: Apache-2.0
// Provenance-includes-copyright: The Prometheus Authors

package types

import (
"unsafe"

"github.com/prometheus/prometheus/model/histogram"
)

// QueryStats tracks statistics about the execution of a single query.
//
// It is not safe to use this type from multiple goroutines simultaneously.
type QueryStats struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why create our own struct instead of using stats.QuerySamples and passing that to the appropriate operators?

It would also make it easier to implement TotalSamplesPerStep if we want to support that too (which I think we do)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why create our own struct instead of using stats.QuerySamples and passing that to the appropriate operators?

Given we don't support anything other than TotalSamples in MQE, I wanted to make this clear in the code by using a struct that only had a field for TotalSamples.

It would also make it easier to implement TotalSamplesPerStep if we want to support that too (which I think we do)

I don't think we want to do this unless there's a specific need for it - per-step stats are considered experimental and disabled by default in Prometheus, and are not possible to enable in Mimir as far as I can see. The docs for this also state that the value for each step should be the same as if the query was run as an instant query, so anyone who wanted this information could run the query as an instant query for the step(s) they're interested in.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why create our own struct instead of using stats.QuerySamples and passing that to the appropriate operators?

Given we don't support anything other than TotalSamples in MQE, I wanted to make this clear in the code by using a struct that only had a field for TotalSamples.

I would be happy with a comment in query.go, but I'm not opposed to a separate struct.

It would also make it easier to implement TotalSamplesPerStep if we want to support that too (which I think we do)

I don't think we want to do this unless there's a specific need for it - per-step stats are considered experimental and disabled by default in Prometheus, and are not possible to enable in Mimir as far as I can see. The docs for this also state that the value for each step should be the same as if the query was run as an instant query, so anyone who wanted this information could run the query as an instant query for the step(s) they're interested in.

Fair enough, but also thinking of any future stats. I don't mind a separate struct though.

// The total number of samples processed during the query.
//
// In the case of range vector selectors, each sample is counted once for each time step it appears in.
// For example, if a query is running with a step of 30s with a range vector selector with range 45s,
// then samples in the overlapping 15s are counted twice.
TotalSamples int64
}

const timestampFieldSize = int64(unsafe.Sizeof(int64(0)))

func EquivalentFloatSampleCount(h *histogram.FloatHistogram) int64 {
return (int64(h.Size()) + timestampFieldSize) / int64(FPointSize)
}
Loading