Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MQE: track number of processed samples in each query #10232

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 109 additions & 0 deletions pkg/streamingpromql/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2807,3 +2807,112 @@ func TestCompareVariousMixedMetricsComparisonOps(t *testing.T) {

runMixedMetricsTests(t, expressions, pointsPerSeries, seriesData, false)
}

func TestQueryStats(t *testing.T) {
opts := NewTestEngineOpts()
mimirEngine, err := NewEngine(opts, NewStaticQueryLimitsProvider(0), stats.NewQueryMetrics(nil), log.NewNopLogger())
require.NoError(t, err)

prometheusEngine := promql.NewEngine(opts.CommonOpts)

start := timestamp.Time(0)
end := start.Add(10 * time.Minute)

storage := promqltest.LoadedStorage(t, `
load 1m
dense_series 0 1 2 3 4 5 6 7 8 9 10
start_series 0 1 _ _ _ _ _ _ _ _ _
end_series _ _ _ _ _ 5 6 7 8 9 10
sparse_series 0 _ _ _ _ _ _ 7 _ _ _
charleskorn marked this conversation as resolved.
Show resolved Hide resolved
`)

runQueryAndGetTotalSamples := func(t *testing.T, engine promql.QueryEngine, expr string, isInstantQuery bool) int64 {
var q promql.Query
var err error

if isInstantQuery {
q, err = engine.NewInstantQuery(context.Background(), storage, nil, expr, end)
} else {
q, err = engine.NewRangeQuery(context.Background(), storage, nil, expr, start, end, time.Minute)
}

require.NoError(t, err)

defer q.Close()

res := q.Exec(context.Background())
require.NoError(t, res.Err)

return q.Stats().Samples.TotalSamples
}

testCases := map[string]struct {
expr string
isInstantQuery bool
expectedTotalSamples int64
}{
"instant vector selector with point at every time step": {
expr: `dense_series{}`,
expectedTotalSamples: 11,
},
"instant vector selector with points only in start of time range": {
expr: `start_series{}`,
expectedTotalSamples: 2 + 4, // 2 for original points, plus 4 for lookback to last point.
},
"instant vector selector with points only at end of time range": {
expr: `end_series{}`,
expectedTotalSamples: 6,
},
"instant vector selector with sparse points": {
expr: `sparse_series{}`,
expectedTotalSamples: 5 + 4, // 5 for first point at T=0, and 4 for second point at T=7
},

"raw range vector selector with single point": {
expr: `dense_series[45s]`,
isInstantQuery: true,
expectedTotalSamples: 1,
},
"raw range vector selector with multiple points": {
expr: `dense_series[3m45s]`,
isInstantQuery: true,
expectedTotalSamples: 4,
},

"range vector selector with point at every time step": {
expr: `sum_over_time(dense_series{}[30s])`,
expectedTotalSamples: 11,
},
"range vector selector with points only in start of time range": {
expr: `sum_over_time(start_series{}[30s])`,
expectedTotalSamples: 2,
},
"range vector selector with points only at end of time range": {
expr: `sum_over_time(end_series{}[30s])`,
expectedTotalSamples: 6,
},
"range vector selector with sparse points": {
expr: `sum_over_time(sparse_series{}[30s])`,
expectedTotalSamples: 2,
},
"range vector selector where range overlaps previous step's range": {
expr: `sum_over_time(dense_series{}[1m30s])`,
expectedTotalSamples: 21, // Each step except the first selects two points.
},

"expression with multiple selectors": {
expr: `dense_series{} + end_series{}`,
expectedTotalSamples: 11 + 6,
},
}

for name, testCase := range testCases {
t.Run(name, func(t *testing.T) {
prometheusCount := runQueryAndGetTotalSamples(t, prometheusEngine, testCase.expr, testCase.isInstantQuery)
require.Equal(t, testCase.expectedTotalSamples, prometheusCount, "invalid test case: expected samples does not match value from Prometheus' engine")

mimirCount := runQueryAndGetTotalSamples(t, mimirEngine, testCase.expr, testCase.isInstantQuery)
require.Equal(t, testCase.expectedTotalSamples, mimirCount)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can also compare the samples loaded as part of our test gauntlet if we expect it to be the same in all cases

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is currently difficult due to the optimisation in prometheus/prometheus#14097, as Prometheus' engine sometimes skips loading data for histograms if it's not needed. MQE does not yet have the same optimisation, so there are some expected differences in the total sample count from the two engines in some cases.

Given the tests in TestQueryStats, and the fact the statistics are informational and may differ between engines in the future due to other optimisations, I'm tempted to leave this as-is.

Thoughts?

})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
type InstantVectorSelector struct {
Selector *Selector
MemoryConsumptionTracker *limiting.MemoryConsumptionTracker
Stats *types.QueryStats

chunkIterator chunkenc.Iterator
memoizedIterator *storage.MemoizedSeriesIterator
Expand Down Expand Up @@ -119,6 +120,8 @@ func (v *InstantVectorSelector) NextSeries(ctx context.Context) (types.InstantVe
continue
}

v.Stats.TotalSamples++
charleskorn marked this conversation as resolved.
Show resolved Hide resolved

// if (f, h) have been set by PeekPrev, we do not know if f is 0 because that's the actual value, or because
// the previous value had a histogram.
// PeekPrev will set the histogram to nil, or the value to 0 if the other type exists.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ func TestInstantVectorSelector_NativeHistogramPointerHandling(t *testing.T) {
LookbackDelta: 5 * time.Minute,
},
MemoryConsumptionTracker: limiting.NewMemoryConsumptionTracker(0, nil),
Stats: &types.QueryStats{},
}

ctx := context.Background()
Expand Down Expand Up @@ -239,6 +240,7 @@ func TestInstantVectorSelector_SliceSizing(t *testing.T) {
LookbackDelta: 5 * time.Minute,
},
MemoryConsumptionTracker: limiting.NewMemoryConsumptionTracker(0, nil),
Stats: &types.QueryStats{},
}

ctx := context.Background()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

type RangeVectorSelector struct {
Selector *Selector
Stats *types.QueryStats

rangeMilliseconds int64
chunkIterator chunkenc.Iterator
Expand All @@ -32,9 +33,10 @@ type RangeVectorSelector struct {

var _ types.RangeVectorOperator = &RangeVectorSelector{}

func NewRangeVectorSelector(selector *Selector, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) *RangeVectorSelector {
func NewRangeVectorSelector(selector *Selector, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, stats *types.QueryStats) *RangeVectorSelector {
return &RangeVectorSelector{
Selector: selector,
Stats: stats,
floats: types.NewFPointRingBuffer(memoryConsumptionTracker),
histograms: types.NewHPointRingBuffer(memoryConsumptionTracker),
stepData: &types.RangeVectorStepData{},
Expand Down Expand Up @@ -102,6 +104,8 @@ func (m *RangeVectorSelector) NextStepSamples() (*types.RangeVectorStepData, err
m.stepData.RangeStart = rangeStart
m.stepData.RangeEnd = rangeEnd

m.Stats.TotalSamples += int64(m.stepData.Floats.Count() + m.stepData.Histograms.Count())

return m.stepData, nil
}

Expand Down
13 changes: 10 additions & 3 deletions pkg/streamingpromql/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type Query struct {
cancel context.CancelCauseFunc
memoryConsumptionTracker *limiting.MemoryConsumptionTracker
annotations *annotations.Annotations
stats *types.QueryStats

// Time range of the top-level query.
// Subqueries may use a different range.
Expand Down Expand Up @@ -79,6 +80,7 @@ func newQuery(ctx context.Context, queryable storage.Queryable, opts promql.Quer
qs: qs,
memoryConsumptionTracker: limiting.NewMemoryConsumptionTracker(maxEstimatedMemoryConsumptionPerQuery, engine.queriesRejectedDueToPeakMemoryConsumption),
annotations: annotations.New(),
stats: &types.QueryStats{},

statement: &parser.EvalStmt{
Expr: expr,
Expand Down Expand Up @@ -164,6 +166,7 @@ func (q *Query) convertToInstantVectorOperator(expr parser.Expr, timeRange types

ExpressionPosition: e.PositionRange(),
},
Stats: q.stats,
}, nil
case *parser.AggregateExpr:
if !q.engine.featureToggles.EnableAggregationOperations {
Expand Down Expand Up @@ -343,7 +346,7 @@ func (q *Query) convertToRangeVectorOperator(expr parser.Expr, timeRange types.Q
ExpressionPosition: e.PositionRange(),
}

return selectors.NewRangeVectorSelector(selector, q.memoryConsumptionTracker), nil
return selectors.NewRangeVectorSelector(selector, q.memoryConsumptionTracker, q.stats), nil

case *parser.SubqueryExpr:
if !q.engine.featureToggles.EnableSubqueries {
Expand Down Expand Up @@ -829,8 +832,12 @@ func (q *Query) Statement() parser.Statement {
}

func (q *Query) Stats() *stats.Statistics {
// Not yet supported.
return nil
return &stats.Statistics{
Timers: stats.NewQueryTimers(),
Samples: &stats.QuerySamples{
TotalSamples: q.stats.TotalSamples,
},
}
}

func (q *Query) Cancel() {
Expand Down
18 changes: 18 additions & 0 deletions pkg/streamingpromql/types/stats.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// SPDX-License-Identifier: AGPL-3.0-only
// Provenance-includes-location: https://github.com/prometheus/prometheus/blob/main/util/stats/query_stats.go
// Provenance-includes-license: Apache-2.0
// Provenance-includes-copyright: The Prometheus Authors

package types

// QueryStats tracks statistics about the execution of a single query.
//
// It is not safe to use this type from multiple goroutines simultaneously.
type QueryStats struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why create our own struct instead of using stats.QuerySamples and passing that to the appropriate operators?

It would also make it easier to implement TotalSamplesPerStep if we want to support that too (which I think we do)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why create our own struct instead of using stats.QuerySamples and passing that to the appropriate operators?

Given we don't support anything other than TotalSamples in MQE, I wanted to make this clear in the code by using a struct that only had a field for TotalSamples.

It would also make it easier to implement TotalSamplesPerStep if we want to support that too (which I think we do)

I don't think we want to do this unless there's a specific need for it - per-step stats are considered experimental and disabled by default in Prometheus, and are not possible to enable in Mimir as far as I can see. The docs for this also state that the value for each step should be the same as if the query was run as an instant query, so anyone who wanted this information could run the query as an instant query for the step(s) they're interested in.

// The total number of samples processed during the query.
//
// In the case of range vector selectors, each sample is counted once for each time step it appears in.
// For example, if a query is running with a step of 30s with a range vector selector with range 45s,
// then samples in the overlapping 15s are counted twice.
TotalSamples int64
}
Loading