diff --git a/pkg/streamingpromql/engine_test.go b/pkg/streamingpromql/engine_test.go index 0cce9cc6071..b1c87f495fc 100644 --- a/pkg/streamingpromql/engine_test.go +++ b/pkg/streamingpromql/engine_test.go @@ -2807,3 +2807,112 @@ func TestCompareVariousMixedMetricsComparisonOps(t *testing.T) { runMixedMetricsTests(t, expressions, pointsPerSeries, seriesData, false) } + +func TestQueryStats(t *testing.T) { + opts := NewTestEngineOpts() + mimirEngine, err := NewEngine(opts, NewStaticQueryLimitsProvider(0), stats.NewQueryMetrics(nil), log.NewNopLogger()) + require.NoError(t, err) + + prometheusEngine := promql.NewEngine(opts.CommonOpts) + + start := timestamp.Time(0) + end := start.Add(10 * time.Minute) + + storage := promqltest.LoadedStorage(t, ` + load 1m + dense_series 0 1 2 3 4 5 6 7 8 9 10 + start_series 0 1 _ _ _ _ _ _ _ _ _ + end_series _ _ _ _ _ 5 6 7 8 9 10 + sparse_series 0 _ _ _ _ _ _ 7 _ _ _ + `) + + runQueryAndGetTotalSamples := func(t *testing.T, engine promql.QueryEngine, expr string, isInstantQuery bool) int64 { + var q promql.Query + var err error + + if isInstantQuery { + q, err = engine.NewInstantQuery(context.Background(), storage, nil, expr, end) + } else { + q, err = engine.NewRangeQuery(context.Background(), storage, nil, expr, start, end, time.Minute) + } + + require.NoError(t, err) + + defer q.Close() + + res := q.Exec(context.Background()) + require.NoError(t, res.Err) + + return q.Stats().Samples.TotalSamples + } + + testCases := map[string]struct { + expr string + isInstantQuery bool + expectedTotalSamples int64 + }{ + "instant vector selector with point at every time step": { + expr: `dense_series{}`, + expectedTotalSamples: 11, + }, + "instant vector selector with points only in start of time range": { + expr: `start_series{}`, + expectedTotalSamples: 2 + 4, // 2 for original points, plus 4 for lookback to last point. + }, + "instant vector selector with points only at end of time range": { + expr: `end_series{}`, + expectedTotalSamples: 6, + }, + "instant vector selector with sparse points": { + expr: `sparse_series{}`, + expectedTotalSamples: 5 + 4, // 5 for first point at T=0, and 4 for second point at T=7 + }, + + "raw range vector selector with single point": { + expr: `dense_series[45s]`, + isInstantQuery: true, + expectedTotalSamples: 1, + }, + "raw range vector selector with multiple points": { + expr: `dense_series[3m45s]`, + isInstantQuery: true, + expectedTotalSamples: 4, + }, + + "range vector selector with point at every time step": { + expr: `sum_over_time(dense_series{}[30s])`, + expectedTotalSamples: 11, + }, + "range vector selector with points only in start of time range": { + expr: `sum_over_time(start_series{}[30s])`, + expectedTotalSamples: 2, + }, + "range vector selector with points only at end of time range": { + expr: `sum_over_time(end_series{}[30s])`, + expectedTotalSamples: 6, + }, + "range vector selector with sparse points": { + expr: `sum_over_time(sparse_series{}[30s])`, + expectedTotalSamples: 2, + }, + "range vector selector where range overlaps previous step's range": { + expr: `sum_over_time(dense_series{}[1m30s])`, + expectedTotalSamples: 21, // Each step except the first selects two points. + }, + + "expression with multiple selectors": { + expr: `dense_series{} + end_series{}`, + expectedTotalSamples: 11 + 6, + }, + } + + for name, testCase := range testCases { + t.Run(name, func(t *testing.T) { + prometheusCount := runQueryAndGetTotalSamples(t, prometheusEngine, testCase.expr, testCase.isInstantQuery) + require.Equal(t, testCase.expectedTotalSamples, prometheusCount, "invalid test case: expected samples does not match value from Prometheus' engine") + + mimirCount := runQueryAndGetTotalSamples(t, mimirEngine, testCase.expr, testCase.isInstantQuery) + require.Equal(t, testCase.expectedTotalSamples, mimirCount) + }) + } +} diff --git a/pkg/streamingpromql/operators/selectors/instant_vector_selector.go b/pkg/streamingpromql/operators/selectors/instant_vector_selector.go index ac43ce695b4..ca7b10f5b17 100644 --- a/pkg/streamingpromql/operators/selectors/instant_vector_selector.go +++ b/pkg/streamingpromql/operators/selectors/instant_vector_selector.go @@ -24,6 +24,7 @@ import ( type InstantVectorSelector struct { Selector *Selector MemoryConsumptionTracker *limiting.MemoryConsumptionTracker + Stats *types.QueryStats chunkIterator chunkenc.Iterator memoizedIterator *storage.MemoizedSeriesIterator @@ -119,6 +120,8 @@ func (v *InstantVectorSelector) NextSeries(ctx context.Context) (types.InstantVe continue } + v.Stats.TotalSamples++ + // if (f, h) have been set by PeekPrev, we do not know if f is 0 because that's the actual value, or because // the previous value had a histogram. // PeekPrev will set the histogram to nil, or the value to 0 if the other type exists. diff --git a/pkg/streamingpromql/operators/selectors/instant_vector_selector_test.go b/pkg/streamingpromql/operators/selectors/instant_vector_selector_test.go index 4cc29dc498d..42a2a7e740f 100644 --- a/pkg/streamingpromql/operators/selectors/instant_vector_selector_test.go +++ b/pkg/streamingpromql/operators/selectors/instant_vector_selector_test.go @@ -196,6 +196,7 @@ func TestInstantVectorSelector_NativeHistogramPointerHandling(t *testing.T) { LookbackDelta: 5 * time.Minute, }, MemoryConsumptionTracker: limiting.NewMemoryConsumptionTracker(0, nil), + Stats: &types.QueryStats{}, } ctx := context.Background() @@ -239,6 +240,7 @@ func TestInstantVectorSelector_SliceSizing(t *testing.T) { LookbackDelta: 5 * time.Minute, }, MemoryConsumptionTracker: limiting.NewMemoryConsumptionTracker(0, nil), + Stats: &types.QueryStats{}, } ctx := context.Background() diff --git a/pkg/streamingpromql/operators/selectors/range_vector_selector.go b/pkg/streamingpromql/operators/selectors/range_vector_selector.go index 6bd1457e2db..9f410f2fd46 100644 --- a/pkg/streamingpromql/operators/selectors/range_vector_selector.go +++ b/pkg/streamingpromql/operators/selectors/range_vector_selector.go @@ -21,6 +21,7 @@ import ( type RangeVectorSelector struct { Selector *Selector + Stats *types.QueryStats rangeMilliseconds int64 chunkIterator chunkenc.Iterator @@ -32,9 +33,10 @@ type RangeVectorSelector struct { var _ types.RangeVectorOperator = &RangeVectorSelector{} -func NewRangeVectorSelector(selector *Selector, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) *RangeVectorSelector { +func NewRangeVectorSelector(selector *Selector, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, stats *types.QueryStats) *RangeVectorSelector { return &RangeVectorSelector{ Selector: selector, + Stats: stats, floats: types.NewFPointRingBuffer(memoryConsumptionTracker), histograms: types.NewHPointRingBuffer(memoryConsumptionTracker), stepData: &types.RangeVectorStepData{}, @@ -102,6 +104,8 @@ func (m *RangeVectorSelector) NextStepSamples() (*types.RangeVectorStepData, err m.stepData.RangeStart = rangeStart m.stepData.RangeEnd = rangeEnd + m.Stats.TotalSamples += int64(m.stepData.Floats.Count() + m.stepData.Histograms.Count()) + return m.stepData, nil } diff --git a/pkg/streamingpromql/query.go b/pkg/streamingpromql/query.go index 2f840f2aff6..1a2bf8ce1ef 100644 --- a/pkg/streamingpromql/query.go +++ b/pkg/streamingpromql/query.go @@ -47,6 +47,7 @@ type Query struct { cancel context.CancelCauseFunc memoryConsumptionTracker *limiting.MemoryConsumptionTracker annotations *annotations.Annotations + stats *types.QueryStats // Time range of the top-level query. // Subqueries may use a different range. @@ -79,6 +80,7 @@ func newQuery(ctx context.Context, queryable storage.Queryable, opts promql.Quer qs: qs, memoryConsumptionTracker: limiting.NewMemoryConsumptionTracker(maxEstimatedMemoryConsumptionPerQuery, engine.queriesRejectedDueToPeakMemoryConsumption), annotations: annotations.New(), + stats: &types.QueryStats{}, statement: &parser.EvalStmt{ Expr: expr, @@ -164,6 +166,7 @@ func (q *Query) convertToInstantVectorOperator(expr parser.Expr, timeRange types ExpressionPosition: e.PositionRange(), }, + Stats: q.stats, }, nil case *parser.AggregateExpr: if !q.engine.featureToggles.EnableAggregationOperations { @@ -343,7 +346,7 @@ func (q *Query) convertToRangeVectorOperator(expr parser.Expr, timeRange types.Q ExpressionPosition: e.PositionRange(), } - return selectors.NewRangeVectorSelector(selector, q.memoryConsumptionTracker), nil + return selectors.NewRangeVectorSelector(selector, q.memoryConsumptionTracker, q.stats), nil case *parser.SubqueryExpr: if !q.engine.featureToggles.EnableSubqueries { @@ -829,8 +832,12 @@ func (q *Query) Statement() parser.Statement { } func (q *Query) Stats() *stats.Statistics { - // Not yet supported. - return nil + return &stats.Statistics{ + Timers: stats.NewQueryTimers(), + Samples: &stats.QuerySamples{ + TotalSamples: q.stats.TotalSamples, + }, + } } func (q *Query) Cancel() { diff --git a/pkg/streamingpromql/types/stats.go b/pkg/streamingpromql/types/stats.go new file mode 100644 index 00000000000..a074f844679 --- /dev/null +++ b/pkg/streamingpromql/types/stats.go @@ -0,0 +1,18 @@ +// SPDX-License-Identifier: AGPL-3.0-only +// Provenance-includes-location: https://github.com/prometheus/prometheus/blob/main/util/stats/query_stats.go +// Provenance-includes-license: Apache-2.0 +// Provenance-includes-copyright: The Prometheus Authors + +package types + +// QueryStats tracks statistics about the execution of a single query. +// +// It is not safe to use this type from multiple goroutines simultaneously. +type QueryStats struct { + // The total number of samples processed during the query. + // + // In the case of range vector selectors, each sample is counted once for each time step it appears in. + // For example, if a query is running with a step of 30s with a range vector selector with range 45s, + // then samples in the overlapping 15s are counted twice. + TotalSamples int64 +}