diff --git a/pkg/streamingpromql/engine_test.go b/pkg/streamingpromql/engine_test.go index 0cce9cc6071..4f9caf2d9b4 100644 --- a/pkg/streamingpromql/engine_test.go +++ b/pkg/streamingpromql/engine_test.go @@ -2635,14 +2635,14 @@ func runMixedMetricsTests(t *testing.T, expressions []string, pointsPerSeries in q, err := prometheusEngine.NewRangeQuery(context.Background(), storage, nil, expr, start, end, tr.interval) require.NoError(t, err) defer q.Close() - expectedResults := q.Exec(context.Background()) + prometheusResults := q.Exec(context.Background()) q, err = mimirEngine.NewRangeQuery(context.Background(), storage, nil, expr, start, end, tr.interval) require.NoError(t, err) defer q.Close() mimirResults := q.Exec(context.Background()) - testutils.RequireEqualResults(t, expr, expectedResults, mimirResults, skipAnnotationComparison) + testutils.RequireEqualResults(t, expr, prometheusResults, mimirResults, skipAnnotationComparison) }) } } @@ -2807,3 +2807,139 @@ func TestCompareVariousMixedMetricsComparisonOps(t *testing.T) { runMixedMetricsTests(t, expressions, pointsPerSeries, seriesData, false) } + +func TestQueryStats(t *testing.T) { + opts := NewTestEngineOpts() + mimirEngine, err := NewEngine(opts, NewStaticQueryLimitsProvider(0), stats.NewQueryMetrics(nil), log.NewNopLogger()) + require.NoError(t, err) + + prometheusEngine := promql.NewEngine(opts.CommonOpts) + + start := timestamp.Time(0) + end := start.Add(10 * time.Minute) + + storage := promqltest.LoadedStorage(t, ` + load 1m + dense_series 0 1 2 3 4 5 6 7 8 9 10 + start_series 0 1 _ _ _ _ _ _ _ _ _ + end_series _ _ _ _ _ 5 6 7 8 9 10 + sparse_series 0 _ _ _ _ _ _ 7 _ _ _ + stale_series 0 1 2 3 4 5 stale 7 8 9 10 + nan_series NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN + native_histogram_series {{schema:0 sum:2 count:4 buckets:[1 2 1]}} {{sum:2 count:4 buckets:[1 2 1]}} + `) + + runQueryAndGetTotalSamples := func(t *testing.T, engine promql.QueryEngine, expr string, isInstantQuery bool) int64 { + var q promql.Query + var err error + + if isInstantQuery { + q, err = engine.NewInstantQuery(context.Background(), storage, nil, expr, end) + } else { + q, err = engine.NewRangeQuery(context.Background(), storage, nil, expr, start, end, time.Minute) + } + + require.NoError(t, err) + + defer q.Close() + + res := q.Exec(context.Background()) + require.NoError(t, res.Err) + + return q.Stats().Samples.TotalSamples + } + + testCases := map[string]struct { + expr string + isInstantQuery bool + expectedTotalSamples int64 + }{ + "instant vector selector with point at every time step": { + expr: `dense_series{}`, + expectedTotalSamples: 11, + }, + "instant vector selector with points only in start of time range": { + expr: `start_series{}`, + expectedTotalSamples: 2 + 4, // 2 for original points, plus 4 for lookback to last point. + }, + "instant vector selector with points only at end of time range": { + expr: `end_series{}`, + expectedTotalSamples: 6, + }, + "instant vector selector with sparse points": { + expr: `sparse_series{}`, + expectedTotalSamples: 5 + 4, // 5 for first point at T=0, and 4 for second point at T=7 + }, + "instant vector selector with stale marker": { + expr: `stale_series{}`, + expectedTotalSamples: 10, // Instant vector selectors ignore stale markers. + }, + + "raw range vector selector with single point": { + expr: `dense_series[45s]`, + isInstantQuery: true, + expectedTotalSamples: 1, + }, + "raw range vector selector with multiple points": { + expr: `dense_series[3m45s]`, + isInstantQuery: true, + expectedTotalSamples: 4, + }, + + "range vector selector with point at every time step": { + expr: `sum_over_time(dense_series{}[30s])`, + expectedTotalSamples: 11, + }, + "range vector selector with points only in start of time range": { + expr: `sum_over_time(start_series{}[30s])`, + expectedTotalSamples: 2, + }, + "range vector selector with points only at end of time range": { + expr: `sum_over_time(end_series{}[30s])`, + expectedTotalSamples: 6, + }, + "range vector selector with sparse points": { + expr: `sum_over_time(sparse_series{}[30s])`, + expectedTotalSamples: 2, + }, + "range vector selector where range overlaps previous step's range": { + expr: `sum_over_time(dense_series{}[1m30s])`, + expectedTotalSamples: 21, // Each step except the first selects two points. + }, + "range vector selector with stale marker": { + expr: `count_over_time(stale_series{}[1m30s])`, + expectedTotalSamples: 19, // Each step except the first selects two points. Range vector selectors ignore stale markers. + }, + + "expression with multiple selectors": { + expr: `dense_series{} + end_series{}`, + expectedTotalSamples: 11 + 6, + }, + "instant vector selector with NaNs": { + expr: `nan_series{}`, + expectedTotalSamples: 11, + }, + "range vector selector with NaNs": { + expr: `sum_over_time(nan_series{}[1m])`, + expectedTotalSamples: 11, + }, + "instant vector selector with native histograms": { + expr: `native_histogram_series{}`, + expectedTotalSamples: 78, + }, + "range vector selector with native histograms": { + expr: `sum_over_time(native_histogram_series{}[1m])`, + expectedTotalSamples: 26, + }, + } + + for name, testCase := range testCases { + t.Run(name, func(t *testing.T) { + prometheusCount := runQueryAndGetTotalSamples(t, prometheusEngine, testCase.expr, testCase.isInstantQuery) + require.Equal(t, testCase.expectedTotalSamples, prometheusCount, "invalid test case: expected samples does not match value from Prometheus' engine") + + mimirCount := runQueryAndGetTotalSamples(t, mimirEngine, testCase.expr, testCase.isInstantQuery) + require.Equal(t, testCase.expectedTotalSamples, mimirCount) + }) + } +} diff --git a/pkg/streamingpromql/operators/selectors/instant_vector_selector.go b/pkg/streamingpromql/operators/selectors/instant_vector_selector.go index ac43ce695b4..11475bb92a7 100644 --- a/pkg/streamingpromql/operators/selectors/instant_vector_selector.go +++ b/pkg/streamingpromql/operators/selectors/instant_vector_selector.go @@ -24,6 +24,7 @@ import ( type InstantVectorSelector struct { Selector *Selector MemoryConsumptionTracker *limiting.MemoryConsumptionTracker + Stats *types.QueryStats chunkIterator chunkenc.Iterator memoizedIterator *storage.MemoizedSeriesIterator @@ -143,6 +144,9 @@ func (v *InstantVectorSelector) NextSeries(ctx context.Context) (types.InstantVe data.Histograms = append(data.Histograms, promql.HPoint{T: stepT, H: h}) lastHistogramT = t lastHistogram = h + + // For consistency with Prometheus' engine, we convert each histogram point to an equivalent number of float points. + v.Stats.TotalSamples += types.EquivalentFloatSampleCount(h) } else { // Only create the slice once we know the series is a histogram or not. if len(data.Floats) == 0 { @@ -161,6 +165,8 @@ func (v *InstantVectorSelector) NextSeries(ctx context.Context) (types.InstantVe return types.InstantVectorSeriesData{}, v.memoizedIterator.Err() } + v.Stats.TotalSamples += int64(len(data.Floats)) + return data, nil } diff --git a/pkg/streamingpromql/operators/selectors/instant_vector_selector_test.go b/pkg/streamingpromql/operators/selectors/instant_vector_selector_test.go index 4cc29dc498d..42a2a7e740f 100644 --- a/pkg/streamingpromql/operators/selectors/instant_vector_selector_test.go +++ b/pkg/streamingpromql/operators/selectors/instant_vector_selector_test.go @@ -196,6 +196,7 @@ func TestInstantVectorSelector_NativeHistogramPointerHandling(t *testing.T) { LookbackDelta: 5 * time.Minute, }, MemoryConsumptionTracker: limiting.NewMemoryConsumptionTracker(0, nil), + Stats: &types.QueryStats{}, } ctx := context.Background() @@ -239,6 +240,7 @@ func TestInstantVectorSelector_SliceSizing(t *testing.T) { LookbackDelta: 5 * time.Minute, }, MemoryConsumptionTracker: limiting.NewMemoryConsumptionTracker(0, nil), + Stats: &types.QueryStats{}, } ctx := context.Background() diff --git a/pkg/streamingpromql/operators/selectors/range_vector_selector.go b/pkg/streamingpromql/operators/selectors/range_vector_selector.go index 6bd1457e2db..d55cb9c59cb 100644 --- a/pkg/streamingpromql/operators/selectors/range_vector_selector.go +++ b/pkg/streamingpromql/operators/selectors/range_vector_selector.go @@ -21,6 +21,7 @@ import ( type RangeVectorSelector struct { Selector *Selector + Stats *types.QueryStats rangeMilliseconds int64 chunkIterator chunkenc.Iterator @@ -32,9 +33,10 @@ type RangeVectorSelector struct { var _ types.RangeVectorOperator = &RangeVectorSelector{} -func NewRangeVectorSelector(selector *Selector, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) *RangeVectorSelector { +func NewRangeVectorSelector(selector *Selector, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, stats *types.QueryStats) *RangeVectorSelector { return &RangeVectorSelector{ Selector: selector, + Stats: stats, floats: types.NewFPointRingBuffer(memoryConsumptionTracker), histograms: types.NewHPointRingBuffer(memoryConsumptionTracker), stepData: &types.RangeVectorStepData{}, @@ -102,6 +104,8 @@ func (m *RangeVectorSelector) NextStepSamples() (*types.RangeVectorStepData, err m.stepData.RangeStart = rangeStart m.stepData.RangeEnd = rangeEnd + m.Stats.TotalSamples += int64(m.stepData.Floats.Count()) + m.stepData.Histograms.EquivalentFloatSampleCount() + return m.stepData, nil } diff --git a/pkg/streamingpromql/query.go b/pkg/streamingpromql/query.go index 2f840f2aff6..1a2bf8ce1ef 100644 --- a/pkg/streamingpromql/query.go +++ b/pkg/streamingpromql/query.go @@ -47,6 +47,7 @@ type Query struct { cancel context.CancelCauseFunc memoryConsumptionTracker *limiting.MemoryConsumptionTracker annotations *annotations.Annotations + stats *types.QueryStats // Time range of the top-level query. // Subqueries may use a different range. @@ -79,6 +80,7 @@ func newQuery(ctx context.Context, queryable storage.Queryable, opts promql.Quer qs: qs, memoryConsumptionTracker: limiting.NewMemoryConsumptionTracker(maxEstimatedMemoryConsumptionPerQuery, engine.queriesRejectedDueToPeakMemoryConsumption), annotations: annotations.New(), + stats: &types.QueryStats{}, statement: &parser.EvalStmt{ Expr: expr, @@ -164,6 +166,7 @@ func (q *Query) convertToInstantVectorOperator(expr parser.Expr, timeRange types ExpressionPosition: e.PositionRange(), }, + Stats: q.stats, }, nil case *parser.AggregateExpr: if !q.engine.featureToggles.EnableAggregationOperations { @@ -343,7 +346,7 @@ func (q *Query) convertToRangeVectorOperator(expr parser.Expr, timeRange types.Q ExpressionPosition: e.PositionRange(), } - return selectors.NewRangeVectorSelector(selector, q.memoryConsumptionTracker), nil + return selectors.NewRangeVectorSelector(selector, q.memoryConsumptionTracker, q.stats), nil case *parser.SubqueryExpr: if !q.engine.featureToggles.EnableSubqueries { @@ -829,8 +832,12 @@ func (q *Query) Statement() parser.Statement { } func (q *Query) Stats() *stats.Statistics { - // Not yet supported. - return nil + return &stats.Statistics{ + Timers: stats.NewQueryTimers(), + Samples: &stats.QuerySamples{ + TotalSamples: q.stats.TotalSamples, + }, + } } func (q *Query) Cancel() { diff --git a/pkg/streamingpromql/types/hpoint_ring_buffer.go b/pkg/streamingpromql/types/hpoint_ring_buffer.go index e7c7293ff29..a34805d399b 100644 --- a/pkg/streamingpromql/types/hpoint_ring_buffer.go +++ b/pkg/streamingpromql/types/hpoint_ring_buffer.go @@ -301,6 +301,22 @@ func (v HPointRingBufferView) Count() int { return v.size } +// EquivalentFloatSampleCount returns the equivalent number of float samples in this ring buffer view. +func (v HPointRingBufferView) EquivalentFloatSampleCount() int64 { + count := int64(0) + head, tail := v.UnsafePoints() + + for _, p := range head { + count += EquivalentFloatSampleCount(p.H) + } + + for _, p := range tail { + count += EquivalentFloatSampleCount(p.H) + } + + return count +} + // Any returns true if this ring buffer view contains any points. func (v HPointRingBufferView) Any() bool { return v.size != 0 diff --git a/pkg/streamingpromql/types/stats.go b/pkg/streamingpromql/types/stats.go new file mode 100644 index 00000000000..f4fc92d3d59 --- /dev/null +++ b/pkg/streamingpromql/types/stats.go @@ -0,0 +1,30 @@ +// SPDX-License-Identifier: AGPL-3.0-only +// Provenance-includes-location: https://github.com/prometheus/prometheus/blob/main/util/stats/query_stats.go +// Provenance-includes-license: Apache-2.0 +// Provenance-includes-copyright: The Prometheus Authors + +package types + +import ( + "unsafe" + + "github.com/prometheus/prometheus/model/histogram" +) + +// QueryStats tracks statistics about the execution of a single query. +// +// It is not safe to use this type from multiple goroutines simultaneously. +type QueryStats struct { + // The total number of samples processed during the query. + // + // In the case of range vector selectors, each sample is counted once for each time step it appears in. + // For example, if a query is running with a step of 30s with a range vector selector with range 45s, + // then samples in the overlapping 15s are counted twice. + TotalSamples int64 +} + +const timestampFieldSize = int64(unsafe.Sizeof(int64(0))) + +func EquivalentFloatSampleCount(h *histogram.FloatHistogram) int64 { + return (int64(h.Size()) + timestampFieldSize) / int64(FPointSize) +}