-
Notifications
You must be signed in to change notification settings - Fork 544
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Extract delta from rate function #10353
base: lamida/mqe-delta-function
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -16,21 +16,21 @@ import ( | |||||||||||
) | ||||||||||||
|
||||||||||||
var Rate = FunctionOverRangeVectorDefinition{ | ||||||||||||
StepFunc: rate(true, true), | ||||||||||||
StepFunc: rate(true), | ||||||||||||
SeriesValidationFuncFactory: rateSeriesValidator, | ||||||||||||
SeriesMetadataFunction: DropSeriesName, | ||||||||||||
NeedsSeriesNamesForAnnotations: true, | ||||||||||||
} | ||||||||||||
|
||||||||||||
var Increase = FunctionOverRangeVectorDefinition{ | ||||||||||||
StepFunc: rate(true, false), | ||||||||||||
StepFunc: rate(false), | ||||||||||||
SeriesValidationFuncFactory: rateSeriesValidator, | ||||||||||||
SeriesMetadataFunction: DropSeriesName, | ||||||||||||
NeedsSeriesNamesForAnnotations: true, | ||||||||||||
} | ||||||||||||
|
||||||||||||
var Delta = FunctionOverRangeVectorDefinition{ | ||||||||||||
StepFunc: rate(false, false), | ||||||||||||
StepFunc: delta(), | ||||||||||||
SeriesMetadataFunction: DropSeriesName, | ||||||||||||
NeedsSeriesNamesForAnnotations: true, | ||||||||||||
} | ||||||||||||
|
@@ -39,8 +39,7 @@ var Delta = FunctionOverRangeVectorDefinition{ | |||||||||||
// It calculates the rate (allowing for counter resets if isCounter is true for rate and increase), | ||||||||||||
// extrapolates if the first/last sample is close to the boundary, and returns | ||||||||||||
// the result as either per-second (if isRate is true) or overall (this is increase function). | ||||||||||||
// If isCounter and isRate are both false, this function will calculate the delta. | ||||||||||||
func rate(isCounter, isRate bool) RangeVectorStepFunction { | ||||||||||||
func rate(isRate bool) RangeVectorStepFunction { | ||||||||||||
return func(step *types.RangeVectorStepData, rangeSeconds float64, emitAnnotation types.EmitAnnotationFunc) (float64, bool, *histogram.FloatHistogram, error) { | ||||||||||||
fHead, fTail := step.Floats.UnsafePoints() | ||||||||||||
fCount := len(fHead) + len(fTail) | ||||||||||||
|
@@ -57,12 +56,12 @@ func rate(isCounter, isRate bool) RangeVectorStepFunction { | |||||||||||
|
||||||||||||
if fCount >= 2 { | ||||||||||||
// TODO: just pass step here? (and below) | ||||||||||||
val := floatRate(isCounter, isRate, fCount, fHead, fTail, step.RangeStart, step.RangeEnd, rangeSeconds) | ||||||||||||
val := floatRate(isRate, fCount, fHead, fTail, step.RangeStart, step.RangeEnd, rangeSeconds) | ||||||||||||
return val, true, nil, nil | ||||||||||||
} | ||||||||||||
|
||||||||||||
if hCount >= 2 { | ||||||||||||
val, err := histogramRate(isCounter, isRate, hCount, hHead, hTail, step.RangeStart, step.RangeEnd, rangeSeconds, emitAnnotation) | ||||||||||||
val, err := histogramRate(isRate, hCount, hHead, hTail, step.RangeStart, step.RangeEnd, rangeSeconds, emitAnnotation) | ||||||||||||
if err != nil { | ||||||||||||
err = NativeHistogramErrorToAnnotation(err, emitAnnotation) | ||||||||||||
return 0, false, nil, err | ||||||||||||
|
@@ -74,7 +73,7 @@ func rate(isCounter, isRate bool) RangeVectorStepFunction { | |||||||||||
} | ||||||||||||
} | ||||||||||||
|
||||||||||||
func histogramRate(isCounter, isRate bool, hCount int, hHead []promql.HPoint, hTail []promql.HPoint, rangeStart int64, rangeEnd int64, rangeSeconds float64, emitAnnotation types.EmitAnnotationFunc) (*histogram.FloatHistogram, error) { | ||||||||||||
func histogramRate(isRate bool, hCount int, hHead []promql.HPoint, hTail []promql.HPoint, rangeStart int64, rangeEnd int64, rangeSeconds float64, emitAnnotation types.EmitAnnotationFunc) (*histogram.FloatHistogram, error) { | ||||||||||||
firstPoint := hHead[0] | ||||||||||||
hHead = hHead[1:] | ||||||||||||
|
||||||||||||
|
@@ -85,7 +84,7 @@ func histogramRate(isCounter, isRate bool, hCount int, hHead []promql.HPoint, hT | |||||||||||
lastPoint = hHead[len(hHead)-1] | ||||||||||||
} | ||||||||||||
|
||||||||||||
if isCounter && (firstPoint.H.CounterResetHint == histogram.GaugeType || lastPoint.H.CounterResetHint == histogram.GaugeType) { | ||||||||||||
if firstPoint.H.CounterResetHint == histogram.GaugeType || lastPoint.H.CounterResetHint == histogram.GaugeType { | ||||||||||||
emitAnnotation(annotations.NewNativeHistogramNotCounterWarning) | ||||||||||||
} | ||||||||||||
|
||||||||||||
|
@@ -132,20 +131,14 @@ func histogramRate(isCounter, isRate bool, hCount int, hHead []promql.HPoint, hT | |||||||||||
return nil | ||||||||||||
} | ||||||||||||
|
||||||||||||
if isCounter { | ||||||||||||
err = accumulate(hHead) | ||||||||||||
if err != nil { | ||||||||||||
return nil, err | ||||||||||||
err = accumulate(hHead) | ||||||||||||
if err != nil { | ||||||||||||
return nil, err | ||||||||||||
|
||||||||||||
} | ||||||||||||
err = accumulate(hTail) | ||||||||||||
if err != nil { | ||||||||||||
return nil, err | ||||||||||||
} | ||||||||||||
} | ||||||||||||
|
||||||||||||
if !isCounter && (firstPoint.H.CounterResetHint != histogram.GaugeType || lastPoint.H.CounterResetHint != histogram.GaugeType) { | ||||||||||||
emitAnnotation(annotations.NewNativeHistogramNotGaugeWarning) | ||||||||||||
err = accumulate(hTail) | ||||||||||||
if err != nil { | ||||||||||||
return nil, err | ||||||||||||
} | ||||||||||||
|
||||||||||||
if delta.Schema != desiredSchema { | ||||||||||||
|
@@ -156,7 +149,7 @@ func histogramRate(isCounter, isRate bool, hCount int, hHead []promql.HPoint, hT | |||||||||||
return val, err | ||||||||||||
} | ||||||||||||
|
||||||||||||
func floatRate(isCounter, isRate bool, fCount int, fHead []promql.FPoint, fTail []promql.FPoint, rangeStart int64, rangeEnd int64, rangeSeconds float64) float64 { | ||||||||||||
func floatRate(isRate bool, fCount int, fHead []promql.FPoint, fTail []promql.FPoint, rangeStart int64, rangeEnd int64, rangeSeconds float64) float64 { | ||||||||||||
firstPoint := fHead[0] | ||||||||||||
fHead = fHead[1:] | ||||||||||||
|
||||||||||||
|
@@ -168,25 +161,23 @@ func floatRate(isCounter, isRate bool, fCount int, fHead []promql.FPoint, fTail | |||||||||||
} | ||||||||||||
|
||||||||||||
delta := lastPoint.F - firstPoint.F | ||||||||||||
if isCounter { | ||||||||||||
previousValue := firstPoint.F | ||||||||||||
|
||||||||||||
accumulate := func(points []promql.FPoint) { | ||||||||||||
for _, p := range points { | ||||||||||||
if p.F < previousValue { | ||||||||||||
// Counter reset. | ||||||||||||
delta += previousValue | ||||||||||||
} | ||||||||||||
previousValue := firstPoint.F | ||||||||||||
|
||||||||||||
previousValue = p.F | ||||||||||||
accumulate := func(points []promql.FPoint) { | ||||||||||||
for _, p := range points { | ||||||||||||
if p.F < previousValue { | ||||||||||||
// Counter reset. | ||||||||||||
delta += previousValue | ||||||||||||
} | ||||||||||||
} | ||||||||||||
|
||||||||||||
accumulate(fHead) | ||||||||||||
accumulate(fTail) | ||||||||||||
previousValue = p.F | ||||||||||||
} | ||||||||||||
} | ||||||||||||
|
||||||||||||
val := calculateFloatRate(isCounter, isRate, rangeStart, rangeEnd, rangeSeconds, firstPoint, lastPoint, delta, fCount) | ||||||||||||
accumulate(fHead) | ||||||||||||
accumulate(fTail) | ||||||||||||
|
||||||||||||
val := calculateFloatRate(true, isRate, rangeStart, rangeEnd, rangeSeconds, firstPoint, lastPoint, delta, fCount) | ||||||||||||
return val | ||||||||||||
} | ||||||||||||
|
||||||||||||
|
@@ -289,3 +280,94 @@ func rateSeriesValidator() RangeVectorSeriesValidationFunction { | |||||||||||
lastCheckedMetricName = metricName | ||||||||||||
} | ||||||||||||
} | ||||||||||||
|
||||||||||||
func delta() RangeVectorStepFunction { | ||||||||||||
return func(step *types.RangeVectorStepData, rangeSeconds float64, emitAnnotation types.EmitAnnotationFunc) (float64, bool, *histogram.FloatHistogram, error) { | ||||||||||||
fHead, fTail := step.Floats.UnsafePoints() | ||||||||||||
fCount := len(fHead) + len(fTail) | ||||||||||||
|
||||||||||||
hHead, hTail := step.Histograms.UnsafePoints() | ||||||||||||
hCount := len(hHead) + len(hTail) | ||||||||||||
|
||||||||||||
if fCount > 0 && hCount > 0 { | ||||||||||||
// We need either at least two histograms and no floats, or at least two floats and no histograms to calculate a rate. | ||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||
// Otherwise, emit a warning and drop this sample. | ||||||||||||
emitAnnotation(annotations.NewMixedFloatsHistogramsWarning) | ||||||||||||
return 0, false, nil, nil | ||||||||||||
} | ||||||||||||
|
||||||||||||
if fCount >= 2 { | ||||||||||||
// TODO: just pass step here? (and below) | ||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this TODO still relevant? |
||||||||||||
val := floatDelta(fCount, fHead, fTail, step.RangeStart, step.RangeEnd, rangeSeconds) | ||||||||||||
return val, true, nil, nil | ||||||||||||
} | ||||||||||||
|
||||||||||||
if hCount >= 2 { | ||||||||||||
val, err := histogramDelta(hCount, hHead, hTail, step.RangeStart, step.RangeEnd, rangeSeconds, emitAnnotation) | ||||||||||||
if err != nil { | ||||||||||||
err = NativeHistogramErrorToAnnotation(err, emitAnnotation) | ||||||||||||
return 0, false, nil, err | ||||||||||||
} | ||||||||||||
return 0, false, val, nil | ||||||||||||
} | ||||||||||||
|
||||||||||||
return 0, false, nil, nil | ||||||||||||
} | ||||||||||||
} | ||||||||||||
|
||||||||||||
func floatDelta(fCount int, fHead []promql.FPoint, fTail []promql.FPoint, rangeStart int64, rangeEnd int64, rangeSeconds float64) float64 { | ||||||||||||
firstPoint := fHead[0] | ||||||||||||
fHead = fHead[1:] | ||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [nit] We don't need to do this for |
||||||||||||
|
||||||||||||
var lastPoint promql.FPoint | ||||||||||||
if len(fTail) > 0 { | ||||||||||||
lastPoint = fTail[len(fTail)-1] | ||||||||||||
} else { | ||||||||||||
lastPoint = fHead[len(fHead)-1] | ||||||||||||
} | ||||||||||||
|
||||||||||||
delta := lastPoint.F - firstPoint.F | ||||||||||||
return calculateFloatRate(false, false, rangeStart, rangeEnd, rangeSeconds, firstPoint, lastPoint, delta, fCount) | ||||||||||||
} | ||||||||||||
|
||||||||||||
func histogramDelta(hCount int, hHead []promql.HPoint, hTail []promql.HPoint, rangeStart int64, rangeEnd int64, rangeSeconds float64, emitAnnotation types.EmitAnnotationFunc) (*histogram.FloatHistogram, error) { | ||||||||||||
firstPoint := hHead[0] | ||||||||||||
hHead = hHead[1:] | ||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [nit] We don't need to do this for |
||||||||||||
|
||||||||||||
var lastPoint promql.HPoint | ||||||||||||
if len(hTail) > 0 { | ||||||||||||
lastPoint = hTail[len(hTail)-1] | ||||||||||||
} else { | ||||||||||||
lastPoint = hHead[len(hHead)-1] | ||||||||||||
} | ||||||||||||
|
||||||||||||
if firstPoint.H.CounterResetHint == histogram.GaugeType || lastPoint.H.CounterResetHint == histogram.GaugeType { | ||||||||||||
emitAnnotation(annotations.NewNativeHistogramNotCounterWarning) | ||||||||||||
} | ||||||||||||
Comment on lines
+344
to
+346
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this correct? We emit a "not a gauge" warning further down if either point is not a gauge. (we might need to add more test cases to |
||||||||||||
|
||||||||||||
desiredSchema := firstPoint.H.Schema | ||||||||||||
if lastPoint.H.Schema < desiredSchema { | ||||||||||||
desiredSchema = lastPoint.H.Schema | ||||||||||||
} | ||||||||||||
Comment on lines
+348
to
+351
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [nit] This could be simplified to:
Suggested change
|
||||||||||||
|
||||||||||||
usingCustomBuckets := firstPoint.H.UsesCustomBuckets() | ||||||||||||
if lastPoint.H.UsesCustomBuckets() != usingCustomBuckets { | ||||||||||||
Comment on lines
+353
to
+354
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [nit] This could be simplified to:
Suggested change
|
||||||||||||
return nil, histogram.ErrHistogramsIncompatibleSchema | ||||||||||||
} | ||||||||||||
|
||||||||||||
delta := lastPoint.H.CopyToSchema(desiredSchema) | ||||||||||||
_, err := delta.Sub(firstPoint.H) | ||||||||||||
Comment on lines
+358
to
+359
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Couldn't we do this?
Suggested change
If so, then we don't need |
||||||||||||
if err != nil { | ||||||||||||
return nil, err | ||||||||||||
} | ||||||||||||
if firstPoint.H.CounterResetHint != histogram.GaugeType || lastPoint.H.CounterResetHint != histogram.GaugeType { | ||||||||||||
emitAnnotation(annotations.NewNativeHistogramNotGaugeWarning) | ||||||||||||
} | ||||||||||||
|
||||||||||||
if delta.Schema != desiredSchema { | ||||||||||||
delta = delta.CopyToSchema(desiredSchema) | ||||||||||||
} | ||||||||||||
Comment on lines
+367
to
+369
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think this is necessary - the |
||||||||||||
|
||||||||||||
val := calculateHistogramRate(false, rangeStart, rangeEnd, rangeSeconds, firstPoint, lastPoint, delta, hCount) | ||||||||||||
return val, nil | ||||||||||||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this can be simplified to this - there's no need for the wrapper function given we don't need different functionality like we do for
rate
andincrease
: