diff --git a/pkg/streamingpromql/engine_test.go b/pkg/streamingpromql/engine_test.go index 0cce9cc6071..99ee5664c25 100644 --- a/pkg/streamingpromql/engine_test.go +++ b/pkg/streamingpromql/engine_test.go @@ -2763,7 +2763,7 @@ func TestCompareVariousMixedMetricsVectorSelectors(t *testing.T) { for _, labels := range labelCombinations { labelRegex := strings.Join(labels, "|") - for _, function := range []string{"rate", "increase", "changes", "resets", "deriv"} { + for _, function := range []string{"rate", "increase", "changes", "resets", "deriv", "irate", "idelta"} { expressions = append(expressions, fmt.Sprintf(`%s(series{label=~"(%s)"}[45s])`, function, labelRegex)) expressions = append(expressions, fmt.Sprintf(`%s(series{label=~"(%s)"}[1m])`, function, labelRegex)) expressions = append(expressions, fmt.Sprintf(`sum(%s(series{label=~"(%s)"}[2m15s]))`, function, labelRegex)) diff --git a/pkg/streamingpromql/functions.go b/pkg/streamingpromql/functions.go index e4229de9175..99415ccce71 100644 --- a/pkg/streamingpromql/functions.go +++ b/pkg/streamingpromql/functions.go @@ -374,7 +374,9 @@ var instantVectorFunctionOperatorFactories = map[string]InstantVectorFunctionOpe "histogram_stddev": InstantVectorTransformationFunctionOperatorFactory("histogram_stddev", functions.HistogramStdDevStdVar(true)), "histogram_stdvar": InstantVectorTransformationFunctionOperatorFactory("histogram_stdvar", functions.HistogramStdDevStdVar(false)), "histogram_sum": InstantVectorTransformationFunctionOperatorFactory("histogram_sum", functions.HistogramSum), + "idelta": FunctionOverRangeVectorOperatorFactory("idelta", functions.Idelta), "increase": FunctionOverRangeVectorOperatorFactory("increase", functions.Increase), + "irate": FunctionOverRangeVectorOperatorFactory("irate", functions.Irate), "label_replace": LabelReplaceFunctionOperatorFactory(), "last_over_time": FunctionOverRangeVectorOperatorFactory("last_over_time", functions.LastOverTime), "ln": InstantVectorTransformationFunctionOperatorFactory("ln", functions.Ln), diff --git a/pkg/streamingpromql/operators/functions/range_vectors.go b/pkg/streamingpromql/operators/functions/range_vectors.go index dea48555c67..4593a6694a9 100644 --- a/pkg/streamingpromql/operators/functions/range_vectors.go +++ b/pkg/streamingpromql/operators/functions/range_vectors.go @@ -533,3 +533,65 @@ func linearRegression(head, tail []promql.FPoint, interceptTime int64) (slope, i intercept = sumY/n - slope*sumX/n return slope, intercept } + +var Irate = FunctionOverRangeVectorDefinition{ + SeriesMetadataFunction: DropSeriesName, + StepFunc: irateIdelta(true), +} + +var Idelta = FunctionOverRangeVectorDefinition{ + SeriesMetadataFunction: DropSeriesName, + StepFunc: irateIdelta(false), +} + +func irateIdelta(isRate bool) RangeVectorStepFunction { + return func(step *types.RangeVectorStepData, _ float64, _ types.EmitAnnotationFunc) (float64, bool, *histogram.FloatHistogram, error) { + // Histograms are ignored + fHead, fTail := step.Floats.UnsafePoints() + + lenTail := len(fTail) + lenHead := len(fHead) + + // We need at least two samples to calculate irate or idelta + if lenHead+lenTail < 2 { + return 0, false, nil, nil + } + + var lastSample promql.FPoint + var previousSample promql.FPoint + + // If tail has more than two samples, we should use the last two samples from tail. + // If tail has only one sample, the last sample is from the tail and the previous sample is last point in the head. + // Otherwise, last two samples are all in the head. + if lenTail >= 2 { + lastSample = fTail[lenTail-1] + previousSample = fTail[lenTail-2] + } else if lenTail == 1 { + lastSample = fTail[0] + previousSample = fHead[lenHead-1] + } else { + lastSample = fHead[lenHead-1] + previousSample = fHead[lenHead-2] + } + + var resultValue float64 + if isRate && lastSample.F < previousSample.F { + // Counter reset. + resultValue = lastSample.F + } else { + resultValue = lastSample.F - previousSample.F + } + + sampledInterval := lastSample.T - previousSample.T + if sampledInterval == 0 { + // Avoid dividing by 0. + return 0, false, nil, nil + } + + if isRate { + // Convert to per-second. + resultValue /= float64(sampledInterval) / 1000 + } + return resultValue, true, nil, nil + } +} diff --git a/pkg/streamingpromql/testdata/ours/functions.test b/pkg/streamingpromql/testdata/ours/functions.test index a1b669f221c..e7cb57c3562 100644 --- a/pkg/streamingpromql/testdata/ours/functions.test +++ b/pkg/streamingpromql/testdata/ours/functions.test @@ -557,6 +557,71 @@ eval range from 0 to 20m step 1m deriv(metric[3m1s]) clear +# Testing irate and idelta +# nh stands for native histogram +# nhcb stands for native histogram custom bucket +load 1m + metric{case="1 float"} 9 + metric{case="2 floats"} 1 5 + metric{case="all floats with reset"} 1 7 1 7 1 7 1 7 + metric{case="2 floats with missing middle sample"} 1 _ 5 + metric{case="2 floats with missing 2 middle samples"} 1 _ _ 5 + metric{case="2 floats with missing last sample"} 1 5 _ + metric{case="2 floats with NaN middle sample"} 1 NaN 5 + metric{case="2 floats with NaN 2 middle samples"} 1 NaN NaN 5 + metric{case="2 floats with NaN last sample"} 1 5 NaN + metric{case="2 floats with Inf middle sample"} 1 Inf 5 + metric{case="2 floats with Inf 2 middle samples"} 1 Inf Inf 5 + metric{case="2 floats with Inf last sample"} 1 5 Inf + metric{case="all NaN"} NaN NaN NaN NaN + metric{case="all Inf"} Inf Inf Inf Inf + metric{case="2 floats and nh middle sample"} 1 {{schema:3 sum:0 count:0 buckets:[1 2 1]}} 5 + metric{case="2 floats and nh last sample"} 1 5 {{schema:3 sum:0 count:0 buckets:[1 2 1]}} + metric{case="2 floats and nhcb middle sample"} 1 {{schema:-53 sum:0 count:0 custom_values:[5 10] buckets:[1 4]}} 5 + metric{case="2 floats and nhcb last sample"} 1 5 {{schema:-53 sum:0 count:0 custom_values:[5 10] buckets:[1 4]}} + metric{case="all nh"} {{schema:3 sum:0 count:0 buckets:[1 2 1]}} {{schema:3 sum:0 count:0 buckets:[1 2 1]}} + metric{case="all nhcb"} {{schema:-53 sum:1 count:5 custom_values:[5 10] buckets:[1 4]}} {{schema:-53 sum:15 count:2 custom_values:[5 10] buckets:[0 2]}} {{schema:-53 sum:3 count:15 custom_values:[5 10] buckets:[7 8]}} {{schema:-53 sum:3 count:15 custom_values:[5 10] buckets:[0 0]}} + +eval range from 0 to 8m step 1m idelta(metric[3m1s]) + {case="2 floats"} _ 4 4 4 + {case="2 floats and nh last sample"} _ 4 4 4 + {case="2 floats and nh middle sample"} _ _ 4 4 + {case="2 floats and nhcb last sample"} _ 4 4 4 + {case="2 floats and nhcb middle sample"} _ _ 4 4 + {case="2 floats with Inf 2 middle samples"} _ Inf NaN -Inf -Inf -Inf + {case="2 floats with Inf last sample"} _ 4 Inf Inf Inf + {case="2 floats with Inf middle sample"} _ Inf -Inf -Inf -Inf + {case="2 floats with NaN 2 middle samples"} _ NaN NaN NaN NaN NaN + {case="2 floats with NaN last sample"} _ 4 NaN NaN NaN + {case="2 floats with NaN middle sample"} _ NaN NaN NaN NaN + {case="2 floats with missing last sample"} _ 4 4 4 + {case="2 floats with missing middle sample"} _ _ 4 4 + {case="2 floats with missing 2 middle samples"} _ _ _ 4 + {case="all Inf"} _ NaN NaN NaN NaN NaN + {case="all NaN"} _ NaN NaN NaN NaN NaN + {case="all floats with reset"} _ 6 -6 6 -6 6 -6 6 6 + +eval range from 0 to 8m step 1m irate(metric[3m1s]) + {case="2 floats"} _ 0.06666666666666667 0.06666666666666667 0.06666666666666667 + {case="2 floats and nh last sample"} _ 0.06666666666666667 0.06666666666666667 0.06666666666666667 + {case="2 floats and nh middle sample"} _ _ 0.03333333333333333 0.03333333333333333 + {case="2 floats and nhcb last sample"} _ 0.06666666666666667 0.06666666666666667 0.06666666666666667 + {case="2 floats and nhcb middle sample"} _ _ 0.03333333333333333 0.03333333333333333 + {case="2 floats with Inf 2 middle samples"} _ Inf NaN 0.08333333333333333 0.08333333333333333 0.08333333333333333 + {case="2 floats with Inf last sample"} _ 0.06666666666666667 Inf Inf Inf + {case="2 floats with Inf middle sample"} _ Inf 0.08333333333333333 0.08333333333333333 0.08333333333333333 + {case="2 floats with NaN 2 middle samples"} _ NaN NaN NaN NaN NaN + {case="2 floats with NaN last sample"} _ 0.06666666666666667 NaN NaN NaN + {case="2 floats with NaN middle sample"} _ NaN NaN NaN NaN + {case="2 floats with missing last sample"} _ 0.06666666666666667 0.06666666666666667 0.06666666666666667 + {case="2 floats with missing middle sample"} _ _ 0.03333333333333333 0.03333333333333333 + {case="2 floats with missing 2 middle samples"} _ _ _ 0.022222222222222223 + {case="all Inf"} _ NaN NaN NaN NaN NaN + {case="all NaN"} _ NaN NaN NaN NaN NaN + {case="all floats with reset"} _ 0.1 0.016666666666666666 0.1 0.016666666666666666 0.1 0.016666666666666666 0.1 0.1 + +clear + load 1m some_metric_count{env="prod", cluster="eu"} _ _ _ 0+1x4 some_metric_count{env="prod", cluster="us"} _ _ _ 0+2x4 diff --git a/pkg/streamingpromql/testdata/upstream/functions.test b/pkg/streamingpromql/testdata/upstream/functions.test index 6d2784c4870..9e8edb1bdf0 100644 --- a/pkg/streamingpromql/testdata/upstream/functions.test +++ b/pkg/streamingpromql/testdata/upstream/functions.test @@ -224,16 +224,14 @@ load 5m http_requests_total{path="/foo"} 0+10x10 http_requests_total{path="/bar"} 0+10x5 0+10x5 -# Unsupported by streaming engine. -# eval instant at 50m irate(http_requests_total[50m]) -# {path="/foo"} .03333333333333333333 -# {path="/bar"} .03333333333333333333 +eval instant at 50m irate(http_requests_total[50m]) + {path="/foo"} .03333333333333333333 + {path="/bar"} .03333333333333333333 # Counter reset. -# Unsupported by streaming engine. -# eval instant at 30m irate(http_requests_total[50m]) -# {path="/foo"} .03333333333333333333 -# {path="/bar"} 0 +eval instant at 30m irate(http_requests_total[50m]) + {path="/foo"} .03333333333333333333 + {path="/bar"} 0 clear @@ -254,10 +252,9 @@ load 5m http_requests{path="/foo"} 0 50 100 150 http_requests{path="/bar"} 0 50 100 50 -# Unsupported by streaming engine. -# eval instant at 20m idelta(http_requests[20m]) -# {path="/foo"} 50 -# {path="/bar"} -50 +eval instant at 20m idelta(http_requests[20m]) + {path="/foo"} 50 + {path="/bar"} -50 clear