Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extract delta from rate function #10353

Open
wants to merge 1 commit into
base: lamida/mqe-delta-function
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
154 changes: 118 additions & 36 deletions pkg/streamingpromql/operators/functions/rate_increase.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,21 @@ import (
)

var Rate = FunctionOverRangeVectorDefinition{
StepFunc: rate(true, true),
StepFunc: rate(true),
SeriesValidationFuncFactory: rateSeriesValidator,
SeriesMetadataFunction: DropSeriesName,
NeedsSeriesNamesForAnnotations: true,
}

var Increase = FunctionOverRangeVectorDefinition{
StepFunc: rate(true, false),
StepFunc: rate(false),
SeriesValidationFuncFactory: rateSeriesValidator,
SeriesMetadataFunction: DropSeriesName,
NeedsSeriesNamesForAnnotations: true,
}

var Delta = FunctionOverRangeVectorDefinition{
StepFunc: rate(false, false),
StepFunc: delta(),
SeriesMetadataFunction: DropSeriesName,
NeedsSeriesNamesForAnnotations: true,
}
Expand All @@ -39,8 +39,7 @@ var Delta = FunctionOverRangeVectorDefinition{
// It calculates the rate (allowing for counter resets if isCounter is true for rate and increase),
// extrapolates if the first/last sample is close to the boundary, and returns
// the result as either per-second (if isRate is true) or overall (this is increase function).
// If isCounter and isRate are both false, this function will calculate the delta.
func rate(isCounter, isRate bool) RangeVectorStepFunction {
func rate(isRate bool) RangeVectorStepFunction {
return func(step *types.RangeVectorStepData, rangeSeconds float64, emitAnnotation types.EmitAnnotationFunc) (float64, bool, *histogram.FloatHistogram, error) {
fHead, fTail := step.Floats.UnsafePoints()
fCount := len(fHead) + len(fTail)
Expand All @@ -57,12 +56,12 @@ func rate(isCounter, isRate bool) RangeVectorStepFunction {

if fCount >= 2 {
// TODO: just pass step here? (and below)
val := floatRate(isCounter, isRate, fCount, fHead, fTail, step.RangeStart, step.RangeEnd, rangeSeconds)
val := floatRate(isRate, fCount, fHead, fTail, step.RangeStart, step.RangeEnd, rangeSeconds)
return val, true, nil, nil
}

if hCount >= 2 {
val, err := histogramRate(isCounter, isRate, hCount, hHead, hTail, step.RangeStart, step.RangeEnd, rangeSeconds, emitAnnotation)
val, err := histogramRate(isRate, hCount, hHead, hTail, step.RangeStart, step.RangeEnd, rangeSeconds, emitAnnotation)
if err != nil {
err = NativeHistogramErrorToAnnotation(err, emitAnnotation)
return 0, false, nil, err
Expand All @@ -74,7 +73,7 @@ func rate(isCounter, isRate bool) RangeVectorStepFunction {
}
}

func histogramRate(isCounter, isRate bool, hCount int, hHead []promql.HPoint, hTail []promql.HPoint, rangeStart int64, rangeEnd int64, rangeSeconds float64, emitAnnotation types.EmitAnnotationFunc) (*histogram.FloatHistogram, error) {
func histogramRate(isRate bool, hCount int, hHead []promql.HPoint, hTail []promql.HPoint, rangeStart int64, rangeEnd int64, rangeSeconds float64, emitAnnotation types.EmitAnnotationFunc) (*histogram.FloatHistogram, error) {
firstPoint := hHead[0]
hHead = hHead[1:]

Expand All @@ -85,7 +84,7 @@ func histogramRate(isCounter, isRate bool, hCount int, hHead []promql.HPoint, hT
lastPoint = hHead[len(hHead)-1]
}

if isCounter && (firstPoint.H.CounterResetHint == histogram.GaugeType || lastPoint.H.CounterResetHint == histogram.GaugeType) {
if firstPoint.H.CounterResetHint == histogram.GaugeType || lastPoint.H.CounterResetHint == histogram.GaugeType {
emitAnnotation(annotations.NewNativeHistogramNotCounterWarning)
}

Expand Down Expand Up @@ -132,20 +131,14 @@ func histogramRate(isCounter, isRate bool, hCount int, hHead []promql.HPoint, hT
return nil
}

if isCounter {
err = accumulate(hHead)
if err != nil {
return nil, err
err = accumulate(hHead)
if err != nil {
return nil, err

}
err = accumulate(hTail)
if err != nil {
return nil, err
}
}

if !isCounter && (firstPoint.H.CounterResetHint != histogram.GaugeType || lastPoint.H.CounterResetHint != histogram.GaugeType) {
emitAnnotation(annotations.NewNativeHistogramNotGaugeWarning)
err = accumulate(hTail)
if err != nil {
return nil, err
}

if delta.Schema != desiredSchema {
Expand All @@ -156,7 +149,7 @@ func histogramRate(isCounter, isRate bool, hCount int, hHead []promql.HPoint, hT
return val, err
}

func floatRate(isCounter, isRate bool, fCount int, fHead []promql.FPoint, fTail []promql.FPoint, rangeStart int64, rangeEnd int64, rangeSeconds float64) float64 {
func floatRate(isRate bool, fCount int, fHead []promql.FPoint, fTail []promql.FPoint, rangeStart int64, rangeEnd int64, rangeSeconds float64) float64 {
firstPoint := fHead[0]
fHead = fHead[1:]

Expand All @@ -168,25 +161,23 @@ func floatRate(isCounter, isRate bool, fCount int, fHead []promql.FPoint, fTail
}

delta := lastPoint.F - firstPoint.F
if isCounter {
previousValue := firstPoint.F

accumulate := func(points []promql.FPoint) {
for _, p := range points {
if p.F < previousValue {
// Counter reset.
delta += previousValue
}
previousValue := firstPoint.F

previousValue = p.F
accumulate := func(points []promql.FPoint) {
for _, p := range points {
if p.F < previousValue {
// Counter reset.
delta += previousValue
}
}

accumulate(fHead)
accumulate(fTail)
previousValue = p.F
}
}

val := calculateFloatRate(isCounter, isRate, rangeStart, rangeEnd, rangeSeconds, firstPoint, lastPoint, delta, fCount)
accumulate(fHead)
accumulate(fTail)

val := calculateFloatRate(true, isRate, rangeStart, rangeEnd, rangeSeconds, firstPoint, lastPoint, delta, fCount)
return val
}

Expand Down Expand Up @@ -289,3 +280,94 @@ func rateSeriesValidator() RangeVectorSeriesValidationFunction {
lastCheckedMetricName = metricName
}
}

func delta() RangeVectorStepFunction {
return func(step *types.RangeVectorStepData, rangeSeconds float64, emitAnnotation types.EmitAnnotationFunc) (float64, bool, *histogram.FloatHistogram, error) {
Comment on lines +284 to +285
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this can be simplified to this - there's no need for the wrapper function given we don't need different functionality like we do for rate and increase:

Suggested change
func delta() RangeVectorStepFunction {
return func(step *types.RangeVectorStepData, rangeSeconds float64, emitAnnotation types.EmitAnnotationFunc) (float64, bool, *histogram.FloatHistogram, error) {
func delta(step *types.RangeVectorStepData, rangeSeconds float64, emitAnnotation types.EmitAnnotationFunc) (float64, bool, *histogram.FloatHistogram, error) {

fHead, fTail := step.Floats.UnsafePoints()
fCount := len(fHead) + len(fTail)

hHead, hTail := step.Histograms.UnsafePoints()
hCount := len(hHead) + len(hTail)

if fCount > 0 && hCount > 0 {
// We need either at least two histograms and no floats, or at least two floats and no histograms to calculate a rate.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// We need either at least two histograms and no floats, or at least two floats and no histograms to calculate a rate.
// We need either at least two histograms and no floats, or at least two floats and no histograms to calculate a delta.

// Otherwise, emit a warning and drop this sample.
emitAnnotation(annotations.NewMixedFloatsHistogramsWarning)
return 0, false, nil, nil
}

if fCount >= 2 {
// TODO: just pass step here? (and below)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this TODO still relevant?

val := floatDelta(fCount, fHead, fTail, step.RangeStart, step.RangeEnd, rangeSeconds)
return val, true, nil, nil
}

if hCount >= 2 {
val, err := histogramDelta(hCount, hHead, hTail, step.RangeStart, step.RangeEnd, rangeSeconds, emitAnnotation)
if err != nil {
err = NativeHistogramErrorToAnnotation(err, emitAnnotation)
return 0, false, nil, err
}
return 0, false, val, nil
}

return 0, false, nil, nil
}
}

func floatDelta(fCount int, fHead []promql.FPoint, fTail []promql.FPoint, rangeStart int64, rangeEnd int64, rangeSeconds float64) float64 {
firstPoint := fHead[0]
fHead = fHead[1:]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nit] We don't need to do this for delta.


var lastPoint promql.FPoint
if len(fTail) > 0 {
lastPoint = fTail[len(fTail)-1]
} else {
lastPoint = fHead[len(fHead)-1]
}

delta := lastPoint.F - firstPoint.F
return calculateFloatRate(false, false, rangeStart, rangeEnd, rangeSeconds, firstPoint, lastPoint, delta, fCount)
}

func histogramDelta(hCount int, hHead []promql.HPoint, hTail []promql.HPoint, rangeStart int64, rangeEnd int64, rangeSeconds float64, emitAnnotation types.EmitAnnotationFunc) (*histogram.FloatHistogram, error) {
firstPoint := hHead[0]
hHead = hHead[1:]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nit] We don't need to do this for delta.


var lastPoint promql.HPoint
if len(hTail) > 0 {
lastPoint = hTail[len(hTail)-1]
} else {
lastPoint = hHead[len(hHead)-1]
}

if firstPoint.H.CounterResetHint == histogram.GaugeType || lastPoint.H.CounterResetHint == histogram.GaugeType {
emitAnnotation(annotations.NewNativeHistogramNotCounterWarning)
}
Comment on lines +344 to +346
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this correct? We emit a "not a gauge" warning further down if either point is not a gauge.

(we might need to add more test cases to TestAnnotations in engine_test.go)


desiredSchema := firstPoint.H.Schema
if lastPoint.H.Schema < desiredSchema {
desiredSchema = lastPoint.H.Schema
}
Comment on lines +348 to +351
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nit] This could be simplified to:

Suggested change
desiredSchema := firstPoint.H.Schema
if lastPoint.H.Schema < desiredSchema {
desiredSchema = lastPoint.H.Schema
}
desiredSchema := min(firstPoint.H.Schema, lastPoint.H.Schema)


usingCustomBuckets := firstPoint.H.UsesCustomBuckets()
if lastPoint.H.UsesCustomBuckets() != usingCustomBuckets {
Comment on lines +353 to +354
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nit] This could be simplified to:

Suggested change
usingCustomBuckets := firstPoint.H.UsesCustomBuckets()
if lastPoint.H.UsesCustomBuckets() != usingCustomBuckets {
if firstPoint.H.UsesCustomBuckets() != lastPoint.H.UsesCustomBuckets() {

return nil, histogram.ErrHistogramsIncompatibleSchema
}

delta := lastPoint.H.CopyToSchema(desiredSchema)
_, err := delta.Sub(firstPoint.H)
Comment on lines +358 to +359
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couldn't we do this? Sub will automatically adjust the schema if necessary.

Suggested change
delta := lastPoint.H.CopyToSchema(desiredSchema)
_, err := delta.Sub(firstPoint.H)
delta, err := lastPoint.H.Copy().Sub(firstPoint.H)

If so, then we don't need desiredSchema at all.

if err != nil {
return nil, err
}
if firstPoint.H.CounterResetHint != histogram.GaugeType || lastPoint.H.CounterResetHint != histogram.GaugeType {
emitAnnotation(annotations.NewNativeHistogramNotGaugeWarning)
}

if delta.Schema != desiredSchema {
delta = delta.CopyToSchema(desiredSchema)
}
Comment on lines +367 to +369
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is necessary - the Sub and CopyToSchema calls above should cover this.


val := calculateHistogramRate(false, rangeStart, rangeEnd, rangeSeconds, firstPoint, lastPoint, delta, hCount)
return val, nil
}
Loading