Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MQE: Add support for irate and idelta function #10326

Merged
merged 11 commits into from
Jan 3, 2025
2 changes: 1 addition & 1 deletion pkg/streamingpromql/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2763,7 +2763,7 @@ func TestCompareVariousMixedMetricsVectorSelectors(t *testing.T) {

for _, labels := range labelCombinations {
labelRegex := strings.Join(labels, "|")
for _, function := range []string{"rate", "increase", "changes", "resets", "deriv"} {
for _, function := range []string{"rate", "increase", "changes", "resets", "deriv", "irate", "idelta"} {
expressions = append(expressions, fmt.Sprintf(`%s(series{label=~"(%s)"}[45s])`, function, labelRegex))
expressions = append(expressions, fmt.Sprintf(`%s(series{label=~"(%s)"}[1m])`, function, labelRegex))
expressions = append(expressions, fmt.Sprintf(`sum(%s(series{label=~"(%s)"}[2m15s]))`, function, labelRegex))
Expand Down
2 changes: 2 additions & 0 deletions pkg/streamingpromql/functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,9 @@ var instantVectorFunctionOperatorFactories = map[string]InstantVectorFunctionOpe
"histogram_stddev": InstantVectorTransformationFunctionOperatorFactory("histogram_stddev", functions.HistogramStdDevStdVar(true)),
"histogram_stdvar": InstantVectorTransformationFunctionOperatorFactory("histogram_stdvar", functions.HistogramStdDevStdVar(false)),
"histogram_sum": InstantVectorTransformationFunctionOperatorFactory("histogram_sum", functions.HistogramSum),
"idelta": FunctionOverRangeVectorOperatorFactory("idelta", functions.Idelta),
"increase": FunctionOverRangeVectorOperatorFactory("increase", functions.Increase),
"irate": FunctionOverRangeVectorOperatorFactory("irate", functions.Irate),
"label_replace": LabelReplaceFunctionOperatorFactory(),
"last_over_time": FunctionOverRangeVectorOperatorFactory("last_over_time", functions.LastOverTime),
"ln": InstantVectorTransformationFunctionOperatorFactory("ln", functions.Ln),
Expand Down
62 changes: 62 additions & 0 deletions pkg/streamingpromql/operators/functions/range_vectors.go
Original file line number Diff line number Diff line change
Expand Up @@ -533,3 +533,65 @@ func linearRegression(head, tail []promql.FPoint, interceptTime int64) (slope, i
intercept = sumY/n - slope*sumX/n
return slope, intercept
}

var Irate = FunctionOverRangeVectorDefinition{
SeriesMetadataFunction: DropSeriesName,
StepFunc: irateIdelta(true),
}

var Idelta = FunctionOverRangeVectorDefinition{
SeriesMetadataFunction: DropSeriesName,
StepFunc: irateIdelta(false),
}

func irateIdelta(isRate bool) RangeVectorStepFunction {
return func(step *types.RangeVectorStepData, _ float64, _ types.EmitAnnotationFunc) (float64, bool, *histogram.FloatHistogram, error) {
lamida marked this conversation as resolved.
Show resolved Hide resolved
// Histograms are ignored
fHead, fTail := step.Floats.UnsafePoints()

lenTail := len(fTail)
lenHead := len(fHead)

// We need at least two samples to calculate irate or idelta
if lenHead+lenTail < 2 {
return 0, false, nil, nil
}

var lastSample promql.FPoint
var previousSample promql.FPoint

// If tail has more than two samples, we should use the last two samples from tail.
// If tail has only one sample, the last sample is from the tail and the previous sample is last point in the head.
// Otherwise, last two samples are all in the head.
if lenTail >= 2 {
lastSample = fTail[lenTail-1]
previousSample = fTail[lenTail-2]
} else if lenTail == 1 {
lastSample = fTail[0]
previousSample = fHead[lenHead-1]
} else {
lastSample = fHead[lenHead-1]
previousSample = fHead[lenHead-2]
}

var resultValue float64
if isRate && lastSample.F < previousSample.F {
// Counter reset.
resultValue = lastSample.F
} else {
resultValue = lastSample.F - previousSample.F
}

sampledInterval := lastSample.T - previousSample.T
if sampledInterval == 0 {
// Avoid dividing by 0.
return 0, false, nil, nil
}

if isRate {
// Convert to per-second.
resultValue /= float64(sampledInterval) / 1000
}
return resultValue, true, nil, nil
}
}
65 changes: 65 additions & 0 deletions pkg/streamingpromql/testdata/ours/functions.test
Original file line number Diff line number Diff line change
Expand Up @@ -557,6 +557,71 @@ eval range from 0 to 20m step 1m deriv(metric[3m1s])

clear

# Testing irate and idelta
# nh stands for native histogram
# nhcb stands for native histogram custom bucket
load 1m
metric{case="1 float"} 9
metric{case="2 floats"} 1 5
metric{case="all floats with reset"} 1 7 1 7 1 7 1 7
metric{case="2 floats with missing middle sample"} 1 _ 5
metric{case="2 floats with missing 2 middle samples"} 1 _ _ 5
metric{case="2 floats with missing last sample"} 1 5 _
metric{case="2 floats with NaN middle sample"} 1 NaN 5
metric{case="2 floats with NaN 2 middle samples"} 1 NaN NaN 5
metric{case="2 floats with NaN last sample"} 1 5 NaN
metric{case="2 floats with Inf middle sample"} 1 Inf 5
metric{case="2 floats with Inf 2 middle samples"} 1 Inf Inf 5
metric{case="2 floats with Inf last sample"} 1 5 Inf
metric{case="all NaN"} NaN NaN NaN NaN
metric{case="all Inf"} Inf Inf Inf Inf
metric{case="2 floats and nh middle sample"} 1 {{schema:3 sum:0 count:0 buckets:[1 2 1]}} 5
metric{case="2 floats and nh last sample"} 1 5 {{schema:3 sum:0 count:0 buckets:[1 2 1]}}
metric{case="2 floats and nhcb middle sample"} 1 {{schema:-53 sum:0 count:0 custom_values:[5 10] buckets:[1 4]}} 5
metric{case="2 floats and nhcb last sample"} 1 5 {{schema:-53 sum:0 count:0 custom_values:[5 10] buckets:[1 4]}}
metric{case="all nh"} {{schema:3 sum:0 count:0 buckets:[1 2 1]}} {{schema:3 sum:0 count:0 buckets:[1 2 1]}}
metric{case="all nhcb"} {{schema:-53 sum:1 count:5 custom_values:[5 10] buckets:[1 4]}} {{schema:-53 sum:15 count:2 custom_values:[5 10] buckets:[0 2]}} {{schema:-53 sum:3 count:15 custom_values:[5 10] buckets:[7 8]}} {{schema:-53 sum:3 count:15 custom_values:[5 10] buckets:[0 0]}}

eval range from 0 to 8m step 1m idelta(metric[3m1s])
{case="2 floats"} _ 4 4 4
{case="2 floats and nh last sample"} _ 4 4 4
{case="2 floats and nh middle sample"} _ _ 4 4
{case="2 floats and nhcb last sample"} _ 4 4 4
{case="2 floats and nhcb middle sample"} _ _ 4 4
{case="2 floats with Inf 2 middle samples"} _ Inf NaN -Inf -Inf -Inf
{case="2 floats with Inf last sample"} _ 4 Inf Inf Inf
{case="2 floats with Inf middle sample"} _ Inf -Inf -Inf -Inf
{case="2 floats with NaN 2 middle samples"} _ NaN NaN NaN NaN NaN
{case="2 floats with NaN last sample"} _ 4 NaN NaN NaN
{case="2 floats with NaN middle sample"} _ NaN NaN NaN NaN
{case="2 floats with missing last sample"} _ 4 4 4
{case="2 floats with missing middle sample"} _ _ 4 4
{case="2 floats with missing 2 middle samples"} _ _ _ 4
{case="all Inf"} _ NaN NaN NaN NaN NaN
{case="all NaN"} _ NaN NaN NaN NaN NaN
{case="all floats with reset"} _ 6 -6 6 -6 6 -6 6 6

eval range from 0 to 8m step 1m irate(metric[3m1s])
{case="2 floats"} _ 0.06666666666666667 0.06666666666666667 0.06666666666666667
{case="2 floats and nh last sample"} _ 0.06666666666666667 0.06666666666666667 0.06666666666666667
{case="2 floats and nh middle sample"} _ _ 0.03333333333333333 0.03333333333333333
{case="2 floats and nhcb last sample"} _ 0.06666666666666667 0.06666666666666667 0.06666666666666667
{case="2 floats and nhcb middle sample"} _ _ 0.03333333333333333 0.03333333333333333
{case="2 floats with Inf 2 middle samples"} _ Inf NaN 0.08333333333333333 0.08333333333333333 0.08333333333333333
{case="2 floats with Inf last sample"} _ 0.06666666666666667 Inf Inf Inf
{case="2 floats with Inf middle sample"} _ Inf 0.08333333333333333 0.08333333333333333 0.08333333333333333
{case="2 floats with NaN 2 middle samples"} _ NaN NaN NaN NaN NaN
{case="2 floats with NaN last sample"} _ 0.06666666666666667 NaN NaN NaN
{case="2 floats with NaN middle sample"} _ NaN NaN NaN NaN
{case="2 floats with missing last sample"} _ 0.06666666666666667 0.06666666666666667 0.06666666666666667
{case="2 floats with missing middle sample"} _ _ 0.03333333333333333 0.03333333333333333
{case="2 floats with missing 2 middle samples"} _ _ _ 0.022222222222222223
{case="all Inf"} _ NaN NaN NaN NaN NaN
{case="all NaN"} _ NaN NaN NaN NaN NaN
{case="all floats with reset"} _ 0.1 0.016666666666666666 0.1 0.016666666666666666 0.1 0.016666666666666666 0.1 0.1

clear

load 1m
some_metric_count{env="prod", cluster="eu"} _ _ _ 0+1x4
some_metric_count{env="prod", cluster="us"} _ _ _ 0+2x4
Expand Down
21 changes: 9 additions & 12 deletions pkg/streamingpromql/testdata/upstream/functions.test
Original file line number Diff line number Diff line change
Expand Up @@ -224,16 +224,14 @@ load 5m
http_requests_total{path="/foo"} 0+10x10
http_requests_total{path="/bar"} 0+10x5 0+10x5

# Unsupported by streaming engine.
# eval instant at 50m irate(http_requests_total[50m])
# {path="/foo"} .03333333333333333333
# {path="/bar"} .03333333333333333333
eval instant at 50m irate(http_requests_total[50m])
{path="/foo"} .03333333333333333333
{path="/bar"} .03333333333333333333

# Counter reset.
# Unsupported by streaming engine.
# eval instant at 30m irate(http_requests_total[50m])
# {path="/foo"} .03333333333333333333
# {path="/bar"} 0
eval instant at 30m irate(http_requests_total[50m])
{path="/foo"} .03333333333333333333
{path="/bar"} 0

clear

Expand All @@ -254,10 +252,9 @@ load 5m
http_requests{path="/foo"} 0 50 100 150
http_requests{path="/bar"} 0 50 100 50

# Unsupported by streaming engine.
# eval instant at 20m idelta(http_requests[20m])
# {path="/foo"} 50
# {path="/bar"} -50
eval instant at 20m idelta(http_requests[20m])
{path="/foo"} 50
{path="/bar"} -50

clear

Expand Down
Loading