Skip to content

Commit

Permalink
MQE: Add support for irate and idelta function (#10326)
Browse files Browse the repository at this point in the history
* Implement irate and idelta

Signed-off-by: Jon Kartago Lamida <[email protected]>

* Fix irate and idelta implementation and move it inside range_vectors.go

Signed-off-by: Jon Kartago Lamida <[email protected]>

* Remove unneeded arguments

Signed-off-by: Jon Kartago Lamida <[email protected]>

* Fix lint

Signed-off-by: Jon Kartago Lamida <[email protected]>

* Fix promql test whitespace

Signed-off-by: Jon Kartago Lamida <[email protected]>

* Update pkg/streamingpromql/operators/functions/range_vectors.go

Co-authored-by: Joshua Hesketh <[email protected]>

* Update pkg/streamingpromql/operators/functions/range_vectors.go

Co-authored-by: Joshua Hesketh <[email protected]>

* Put len in var

Signed-off-by: Jon Kartago Lamida <[email protected]>

* Add more comprehensive tests

Signed-off-by: Jon Kartago Lamida <[email protected]>

* Remove extra line

Signed-off-by: Jon Kartago Lamida <[email protected]>

* Move len var further up

Signed-off-by: Jon Kartago Lamida <[email protected]>

---------

Signed-off-by: Jon Kartago Lamida <[email protected]>
Co-authored-by: Joshua Hesketh <[email protected]>
  • Loading branch information
lamida and jhesketh authored Jan 3, 2025
1 parent b99f117 commit c281692
Show file tree
Hide file tree
Showing 5 changed files with 139 additions and 13 deletions.
2 changes: 1 addition & 1 deletion pkg/streamingpromql/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2763,7 +2763,7 @@ func TestCompareVariousMixedMetricsVectorSelectors(t *testing.T) {

for _, labels := range labelCombinations {
labelRegex := strings.Join(labels, "|")
for _, function := range []string{"rate", "increase", "changes", "resets", "deriv"} {
for _, function := range []string{"rate", "increase", "changes", "resets", "deriv", "irate", "idelta"} {
expressions = append(expressions, fmt.Sprintf(`%s(series{label=~"(%s)"}[45s])`, function, labelRegex))
expressions = append(expressions, fmt.Sprintf(`%s(series{label=~"(%s)"}[1m])`, function, labelRegex))
expressions = append(expressions, fmt.Sprintf(`sum(%s(series{label=~"(%s)"}[2m15s]))`, function, labelRegex))
Expand Down
2 changes: 2 additions & 0 deletions pkg/streamingpromql/functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,9 @@ var instantVectorFunctionOperatorFactories = map[string]InstantVectorFunctionOpe
"histogram_stddev": InstantVectorTransformationFunctionOperatorFactory("histogram_stddev", functions.HistogramStdDevStdVar(true)),
"histogram_stdvar": InstantVectorTransformationFunctionOperatorFactory("histogram_stdvar", functions.HistogramStdDevStdVar(false)),
"histogram_sum": InstantVectorTransformationFunctionOperatorFactory("histogram_sum", functions.HistogramSum),
"idelta": FunctionOverRangeVectorOperatorFactory("idelta", functions.Idelta),
"increase": FunctionOverRangeVectorOperatorFactory("increase", functions.Increase),
"irate": FunctionOverRangeVectorOperatorFactory("irate", functions.Irate),
"label_replace": LabelReplaceFunctionOperatorFactory(),
"last_over_time": FunctionOverRangeVectorOperatorFactory("last_over_time", functions.LastOverTime),
"ln": InstantVectorTransformationFunctionOperatorFactory("ln", functions.Ln),
Expand Down
62 changes: 62 additions & 0 deletions pkg/streamingpromql/operators/functions/range_vectors.go
Original file line number Diff line number Diff line change
Expand Up @@ -533,3 +533,65 @@ func linearRegression(head, tail []promql.FPoint, interceptTime int64) (slope, i
intercept = sumY/n - slope*sumX/n
return slope, intercept
}

var Irate = FunctionOverRangeVectorDefinition{
SeriesMetadataFunction: DropSeriesName,
StepFunc: irateIdelta(true),
}

var Idelta = FunctionOverRangeVectorDefinition{
SeriesMetadataFunction: DropSeriesName,
StepFunc: irateIdelta(false),
}

func irateIdelta(isRate bool) RangeVectorStepFunction {
return func(step *types.RangeVectorStepData, _ float64, _ types.EmitAnnotationFunc) (float64, bool, *histogram.FloatHistogram, error) {
// Histograms are ignored
fHead, fTail := step.Floats.UnsafePoints()

lenTail := len(fTail)
lenHead := len(fHead)

// We need at least two samples to calculate irate or idelta
if lenHead+lenTail < 2 {
return 0, false, nil, nil
}

var lastSample promql.FPoint
var previousSample promql.FPoint

// If tail has more than two samples, we should use the last two samples from tail.
// If tail has only one sample, the last sample is from the tail and the previous sample is last point in the head.
// Otherwise, last two samples are all in the head.
if lenTail >= 2 {
lastSample = fTail[lenTail-1]
previousSample = fTail[lenTail-2]
} else if lenTail == 1 {
lastSample = fTail[0]
previousSample = fHead[lenHead-1]
} else {
lastSample = fHead[lenHead-1]
previousSample = fHead[lenHead-2]
}

var resultValue float64
if isRate && lastSample.F < previousSample.F {
// Counter reset.
resultValue = lastSample.F
} else {
resultValue = lastSample.F - previousSample.F
}

sampledInterval := lastSample.T - previousSample.T
if sampledInterval == 0 {
// Avoid dividing by 0.
return 0, false, nil, nil
}

if isRate {
// Convert to per-second.
resultValue /= float64(sampledInterval) / 1000
}
return resultValue, true, nil, nil
}
}
65 changes: 65 additions & 0 deletions pkg/streamingpromql/testdata/ours/functions.test
Original file line number Diff line number Diff line change
Expand Up @@ -557,6 +557,71 @@ eval range from 0 to 20m step 1m deriv(metric[3m1s])

clear

# Testing irate and idelta
# nh stands for native histogram
# nhcb stands for native histogram custom bucket
load 1m
metric{case="1 float"} 9
metric{case="2 floats"} 1 5
metric{case="all floats with reset"} 1 7 1 7 1 7 1 7
metric{case="2 floats with missing middle sample"} 1 _ 5
metric{case="2 floats with missing 2 middle samples"} 1 _ _ 5
metric{case="2 floats with missing last sample"} 1 5 _
metric{case="2 floats with NaN middle sample"} 1 NaN 5
metric{case="2 floats with NaN 2 middle samples"} 1 NaN NaN 5
metric{case="2 floats with NaN last sample"} 1 5 NaN
metric{case="2 floats with Inf middle sample"} 1 Inf 5
metric{case="2 floats with Inf 2 middle samples"} 1 Inf Inf 5
metric{case="2 floats with Inf last sample"} 1 5 Inf
metric{case="all NaN"} NaN NaN NaN NaN
metric{case="all Inf"} Inf Inf Inf Inf
metric{case="2 floats and nh middle sample"} 1 {{schema:3 sum:0 count:0 buckets:[1 2 1]}} 5
metric{case="2 floats and nh last sample"} 1 5 {{schema:3 sum:0 count:0 buckets:[1 2 1]}}
metric{case="2 floats and nhcb middle sample"} 1 {{schema:-53 sum:0 count:0 custom_values:[5 10] buckets:[1 4]}} 5
metric{case="2 floats and nhcb last sample"} 1 5 {{schema:-53 sum:0 count:0 custom_values:[5 10] buckets:[1 4]}}
metric{case="all nh"} {{schema:3 sum:0 count:0 buckets:[1 2 1]}} {{schema:3 sum:0 count:0 buckets:[1 2 1]}}
metric{case="all nhcb"} {{schema:-53 sum:1 count:5 custom_values:[5 10] buckets:[1 4]}} {{schema:-53 sum:15 count:2 custom_values:[5 10] buckets:[0 2]}} {{schema:-53 sum:3 count:15 custom_values:[5 10] buckets:[7 8]}} {{schema:-53 sum:3 count:15 custom_values:[5 10] buckets:[0 0]}}

eval range from 0 to 8m step 1m idelta(metric[3m1s])
{case="2 floats"} _ 4 4 4
{case="2 floats and nh last sample"} _ 4 4 4
{case="2 floats and nh middle sample"} _ _ 4 4
{case="2 floats and nhcb last sample"} _ 4 4 4
{case="2 floats and nhcb middle sample"} _ _ 4 4
{case="2 floats with Inf 2 middle samples"} _ Inf NaN -Inf -Inf -Inf
{case="2 floats with Inf last sample"} _ 4 Inf Inf Inf
{case="2 floats with Inf middle sample"} _ Inf -Inf -Inf -Inf
{case="2 floats with NaN 2 middle samples"} _ NaN NaN NaN NaN NaN
{case="2 floats with NaN last sample"} _ 4 NaN NaN NaN
{case="2 floats with NaN middle sample"} _ NaN NaN NaN NaN
{case="2 floats with missing last sample"} _ 4 4 4
{case="2 floats with missing middle sample"} _ _ 4 4
{case="2 floats with missing 2 middle samples"} _ _ _ 4
{case="all Inf"} _ NaN NaN NaN NaN NaN
{case="all NaN"} _ NaN NaN NaN NaN NaN
{case="all floats with reset"} _ 6 -6 6 -6 6 -6 6 6

eval range from 0 to 8m step 1m irate(metric[3m1s])
{case="2 floats"} _ 0.06666666666666667 0.06666666666666667 0.06666666666666667
{case="2 floats and nh last sample"} _ 0.06666666666666667 0.06666666666666667 0.06666666666666667
{case="2 floats and nh middle sample"} _ _ 0.03333333333333333 0.03333333333333333
{case="2 floats and nhcb last sample"} _ 0.06666666666666667 0.06666666666666667 0.06666666666666667
{case="2 floats and nhcb middle sample"} _ _ 0.03333333333333333 0.03333333333333333
{case="2 floats with Inf 2 middle samples"} _ Inf NaN 0.08333333333333333 0.08333333333333333 0.08333333333333333
{case="2 floats with Inf last sample"} _ 0.06666666666666667 Inf Inf Inf
{case="2 floats with Inf middle sample"} _ Inf 0.08333333333333333 0.08333333333333333 0.08333333333333333
{case="2 floats with NaN 2 middle samples"} _ NaN NaN NaN NaN NaN
{case="2 floats with NaN last sample"} _ 0.06666666666666667 NaN NaN NaN
{case="2 floats with NaN middle sample"} _ NaN NaN NaN NaN
{case="2 floats with missing last sample"} _ 0.06666666666666667 0.06666666666666667 0.06666666666666667
{case="2 floats with missing middle sample"} _ _ 0.03333333333333333 0.03333333333333333
{case="2 floats with missing 2 middle samples"} _ _ _ 0.022222222222222223
{case="all Inf"} _ NaN NaN NaN NaN NaN
{case="all NaN"} _ NaN NaN NaN NaN NaN
{case="all floats with reset"} _ 0.1 0.016666666666666666 0.1 0.016666666666666666 0.1 0.016666666666666666 0.1 0.1

clear

load 1m
some_metric_count{env="prod", cluster="eu"} _ _ _ 0+1x4
some_metric_count{env="prod", cluster="us"} _ _ _ 0+2x4
Expand Down
21 changes: 9 additions & 12 deletions pkg/streamingpromql/testdata/upstream/functions.test
Original file line number Diff line number Diff line change
Expand Up @@ -224,16 +224,14 @@ load 5m
http_requests_total{path="/foo"} 0+10x10
http_requests_total{path="/bar"} 0+10x5 0+10x5

# Unsupported by streaming engine.
# eval instant at 50m irate(http_requests_total[50m])
# {path="/foo"} .03333333333333333333
# {path="/bar"} .03333333333333333333
eval instant at 50m irate(http_requests_total[50m])
{path="/foo"} .03333333333333333333
{path="/bar"} .03333333333333333333

# Counter reset.
# Unsupported by streaming engine.
# eval instant at 30m irate(http_requests_total[50m])
# {path="/foo"} .03333333333333333333
# {path="/bar"} 0
eval instant at 30m irate(http_requests_total[50m])
{path="/foo"} .03333333333333333333
{path="/bar"} 0

clear

Expand All @@ -254,10 +252,9 @@ load 5m
http_requests{path="/foo"} 0 50 100 150
http_requests{path="/bar"} 0 50 100 50

# Unsupported by streaming engine.
# eval instant at 20m idelta(http_requests[20m])
# {path="/foo"} 50
# {path="/bar"} -50
eval instant at 20m idelta(http_requests[20m])
{path="/foo"} 50
{path="/bar"} -50

clear

Expand Down

0 comments on commit c281692

Please sign in to comment.