Skip to content

Commit

Permalink
Add native histograms
Browse files Browse the repository at this point in the history
Signed-off-by: Shivanth <[email protected]>

Apply suggestions from code review

Co-authored-by: Arthur Silva Sens <[email protected]>
Signed-off-by: Shivanth <[email protected]>

Fix references and existing tests

Signed-off-by: Shivanth <[email protected]>

Review comments

Signed-off-by: Shivanth <[email protected]>

fmt

Signed-off-by: Shivanth <[email protected]>

validation

Signed-off-by: Shivanth <[email protected]>

Count validation

Signed-off-by: Shivanth <[email protected]>

Validation

Signed-off-by: Shivanth <[email protected]>

Review comments

Signed-off-by: Shivanth <[email protected]>

Space formatting

Signed-off-by: Shivanth <[email protected]>

Review comments

Signed-off-by: Shivanth <[email protected]>

Rename SyncMaptoMap -> SyncMapToMap

Signed-off-by: Shivanth <[email protected]>

Remove exemplars from parameters for constNativeHistogram function

Signed-off-by: Shivanth <[email protected]>

Update prometheus/histogram.go

Co-authored-by: George Krajcsovits <[email protected]>
Signed-off-by: Shivanth MP <[email protected]>

Update prometheus/histogram.go

Co-authored-by: George Krajcsovits <[email protected]>
Signed-off-by: Shivanth MP <[email protected]>

Update prometheus/histogram.go

Co-authored-by: George Krajcsovits <[email protected]>
Signed-off-by: Shivanth MP <[email protected]>

Update prometheus/histogram.go

Co-authored-by: Björn Rabenstein <[email protected]>
Signed-off-by: Shivanth MP <[email protected]>

Review comments

Signed-off-by: Shivanth <[email protected]>

Lint fix

Signed-off-by: Shivanth <[email protected]>
  • Loading branch information
shivanthzen committed Nov 13, 2024
1 parent 13851e9 commit ae84979
Show file tree
Hide file tree
Showing 2 changed files with 744 additions and 4 deletions.
203 changes: 201 additions & 2 deletions prometheus/histogram.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package prometheus

import (
"errors"
"fmt"
"math"
"runtime"
Expand All @@ -28,6 +29,11 @@ import (
"google.golang.org/protobuf/types/known/timestamppb"
)

const (
nativeHistogramSchemaMaximum = 8
nativeHistogramSchemaMinimum = -4
)

// nativeHistogramBounds for the frac of observed values. Only relevant for
// schema > 0. The position in the slice is the schema. (0 is never used, just
// here for convenience of using the schema directly as the index.)
Expand Down Expand Up @@ -1460,9 +1466,9 @@ func pickSchema(bucketFactor float64) int32 {
floor := math.Floor(math.Log2(math.Log2(bucketFactor)))
switch {
case floor <= -8:
return 8
return nativeHistogramSchemaMaximum
case floor >= 4:
return -4
return nativeHistogramSchemaMinimum
default:
return -int32(floor)
}
Expand Down Expand Up @@ -1851,3 +1857,196 @@ func (n *nativeExemplars) addExemplar(e *dto.Exemplar) {
n.exemplars = append(n.exemplars[:nIdx], append([]*dto.Exemplar{e}, append(n.exemplars[nIdx:rIdx], n.exemplars[rIdx+1:]...)...)...)
}
}

type constNativeHistogram struct {
desc *Desc
dto.Histogram
labelPairs []*dto.LabelPair
}

func validateCount(sum float64, count uint64, negativeBuckets, positiveBuckets map[int]int64, zeroBucket uint64) error {
var bucketPopulationSum int64
for _, v := range positiveBuckets {
bucketPopulationSum += v
}
for _, v := range negativeBuckets {
bucketPopulationSum += v
}
bucketPopulationSum += int64(zeroBucket)

// If the sum of observations is NaN, the number of observations must be greater or equal to the sum of all bucket counts.
// Otherwise, the number of observations must be equal to the sum of all bucket counts .

if math.IsNaN(sum) && bucketPopulationSum > int64(count) ||
!math.IsNaN(sum) && bucketPopulationSum != int64(count) {
return errors.New("the sum of all bucket populations exceeds the count of observations")
}
return nil
}

// NewConstNativeHistogram returns a metric representing a Prometheus native histogram with
// fixed values for the count, sum, and positive/negative/zero bucket counts. As those parameters
// cannot be changed, the returned value does not implement the Histogram
// interface (but only the Metric interface). Users of this package will not
// have much use for it in regular operations. However, when implementing custom
// OpenTelemetry Collectors, it is useful as a throw-away metric that is generated on the fly
// to send it to Prometheus in the Collect method.
//
// zeroBucket counts all (positive and negative)
// observations in the zero bucket (with an absolute value less or equal
// the current threshold).
// positiveBuckets and negativeBuckets are separate maps for negative and positive
// observations. The map's value is an int64, counting observations in
// that bucket. The map's key is the
// index of the bucket according to the used
// Schema. Index 0 is for an upper bound of 1 in positive buckets and for a lower bound of -1 in negative buckets.
// NewConstNativeHistogram returns an error if
// - the length of labelValues is not consistent with the variable labels in Desc or if Desc is invalid.
// - the schema passed is not between 8 and -4
// - the sum of counts in all buckets including the zero bucket does not equal the count if sum is not NaN (or exceeds the count if sum is NaN)
//
// See https://opentelemetry.io/docs/specs/otel/compatibility/prometheus_and_openmetrics/#exponential-histograms for more details about the conversion from OTel to Prometheus.
func NewConstNativeHistogram(
desc *Desc,
count uint64,
sum float64,
positiveBuckets, negativeBuckets map[int]int64,
zeroBucket uint64,
schema int32,
zeroThreshold float64,
createdTimestamp time.Time,
labelValues ...string,
) (Metric, error) {
if desc.err != nil {
return nil, desc.err
}
if err := validateLabelValues(labelValues, len(desc.variableLabels.names)); err != nil {
return nil, err
}
if schema > nativeHistogramSchemaMaximum || schema < nativeHistogramSchemaMinimum {
return nil, errors.New("invalid native histogram schema")
}
if err := validateCount(sum, count, negativeBuckets, positiveBuckets, zeroBucket); err != nil {
return nil, err
}

NegativeSpan, NegativeDelta := makeBucketsFromMap(negativeBuckets)
PositiveSpan, PositiveDelta := makeBucketsFromMap(positiveBuckets)
ret := &constNativeHistogram{
desc: desc,
Histogram: dto.Histogram{
CreatedTimestamp: timestamppb.New(createdTimestamp),
Schema: &schema,
ZeroThreshold: &zeroThreshold,
SampleCount: &count,
SampleSum: &sum,

NegativeSpan: NegativeSpan,
NegativeDelta: NegativeDelta,

PositiveSpan: PositiveSpan,
PositiveDelta: PositiveDelta,

ZeroCount: proto.Uint64(zeroBucket),
},
labelPairs: MakeLabelPairs(desc, labelValues),
}
if *ret.ZeroThreshold == 0 && *ret.ZeroCount == 0 && len(ret.PositiveSpan) == 0 && len(ret.NegativeSpan) == 0 {
ret.PositiveSpan = []*dto.BucketSpan{{
Offset: proto.Int32(0),
Length: proto.Uint32(0),
}}
}
return ret, nil
}

// MustNewConstNativeHistogram is a version of NewConstNativeHistogram that panics where
// NewConstNativeHistogram would have returned an error.
func MustNewConstNativeHistogram(
desc *Desc,
count uint64,
sum float64,
positiveBuckets, negativeBuckets map[int]int64,
zeroBucket uint64,
nativeHistogramSchema int32,
nativeHistogramZeroThreshold float64,
createdTimestamp time.Time,
labelValues ...string,
) Metric {
nativehistogram, err := NewConstNativeHistogram(desc,
count,
sum,
positiveBuckets,
negativeBuckets,
zeroBucket,
nativeHistogramSchema,
nativeHistogramZeroThreshold,
createdTimestamp,
labelValues...)
if err != nil {
panic(err)
}
return nativehistogram
}

func (h *constNativeHistogram) Desc() *Desc {
return h.desc
}

func (h *constNativeHistogram) Write(out *dto.Metric) error {
out.Histogram = &h.Histogram
out.Label = h.labelPairs
return nil
}

func makeBucketsFromMap(buckets map[int]int64) ([]*dto.BucketSpan, []int64) {
if len(buckets) == 0 {
return nil, nil
}
var ii []int
for k := range buckets {
ii = append(ii, k)
}
sort.Ints(ii)

var (
spans []*dto.BucketSpan
deltas []int64
prevCount int64
nextI int
)

appendDelta := func(count int64) {
*spans[len(spans)-1].Length++
deltas = append(deltas, count-prevCount)
prevCount = count
}

for n, i := range ii {
count := buckets[i]
// Multiple spans with only small gaps in between are probably
// encoded more efficiently as one larger span with a few empty
// buckets. Needs some research to find the sweet spot. For now,
// we assume that gaps of one or two buckets should not create
// a new span.
iDelta := int32(i - nextI)
if n == 0 || iDelta > 2 {
// We have to create a new span, either because we are
// at the very beginning, or because we have found a gap
// of more than two buckets.
spans = append(spans, &dto.BucketSpan{
Offset: proto.Int32(iDelta),
Length: proto.Uint32(0),
})
} else {
// We have found a small gap (or no gap at all).
// Insert empty buckets as needed.
for j := int32(0); j < iDelta; j++ {
appendDelta(0)
}
}
appendDelta(count)
nextI = i + 1
}
return spans, deltas
}
Loading

0 comments on commit ae84979

Please sign in to comment.