Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

histograms: Add timer to reset ASAP after bucket limiting has happened #1367

Merged
merged 1 commit into from
Oct 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 51 additions & 5 deletions prometheus/histogram.go
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,9 @@ type HistogramOpts struct {

// now is for testing purposes, by default it's time.Now.
now func() time.Time

// afterFunc is for testing purposes, by default it's time.AfterFunc.
afterFunc func(time.Duration, func()) *time.Timer
}

// HistogramVecOpts bundles the options to create a HistogramVec metric.
Expand Down Expand Up @@ -526,7 +529,9 @@ func newHistogram(desc *Desc, opts HistogramOpts, labelValues ...string) Histogr
if opts.now == nil {
opts.now = time.Now
}

if opts.afterFunc == nil {
opts.afterFunc = time.AfterFunc
}
h := &histogram{
desc: desc,
upperBounds: opts.Buckets,
Expand All @@ -536,6 +541,7 @@ func newHistogram(desc *Desc, opts HistogramOpts, labelValues ...string) Histogr
nativeHistogramMinResetDuration: opts.NativeHistogramMinResetDuration,
lastResetTime: opts.now(),
now: opts.now,
afterFunc: opts.afterFunc,
}
if len(h.upperBounds) == 0 && opts.NativeHistogramBucketFactor <= 1 {
h.upperBounds = DefBuckets
Expand Down Expand Up @@ -716,9 +722,16 @@ type histogram struct {
nativeHistogramMinResetDuration time.Duration
// lastResetTime is protected by mtx. It is also used as created timestamp.
lastResetTime time.Time
// resetScheduled is protected by mtx. It is true if a reset is
// scheduled for a later time (when nativeHistogramMinResetDuration has
// passed).
resetScheduled bool

// now is for testing purposes, by default it's time.Now.
now func() time.Time

// afterFunc is for testing purposes, by default it's time.AfterFunc.
afterFunc func(time.Duration, func()) *time.Timer
}

func (h *histogram) Desc() *Desc {
Expand Down Expand Up @@ -874,21 +887,31 @@ func (h *histogram) limitBuckets(counts *histogramCounts, value float64, bucket
if h.maybeReset(hotCounts, coldCounts, coldIdx, value, bucket) {
return
}
// One of the other strategies will happen. To undo what they will do as
// soon as enough time has passed to satisfy
// h.nativeHistogramMinResetDuration, schedule a reset at the right time
// if we haven't done so already.
if h.nativeHistogramMinResetDuration > 0 && !h.resetScheduled {
h.resetScheduled = true
h.afterFunc(h.nativeHistogramMinResetDuration-h.now().Sub(h.lastResetTime), h.reset)
}

if h.maybeWidenZeroBucket(hotCounts, coldCounts) {
return
}
h.doubleBucketWidth(hotCounts, coldCounts)
}

// maybeReset resets the whole histogram if at least h.nativeHistogramMinResetDuration
// has been passed. It returns true if the histogram has been reset. The caller
// must have locked h.mtx.
// maybeReset resets the whole histogram if at least
// h.nativeHistogramMinResetDuration has been passed. It returns true if the
// histogram has been reset. The caller must have locked h.mtx.
func (h *histogram) maybeReset(
hot, cold *histogramCounts, coldIdx uint64, value float64, bucket int,
) bool {
// We are using the possibly mocked h.now() rather than
// time.Since(h.lastResetTime) to enable testing.
if h.nativeHistogramMinResetDuration == 0 ||
if h.nativeHistogramMinResetDuration == 0 || // No reset configured.
h.resetScheduled || // Do not interefere if a reset is already scheduled.
h.now().Sub(h.lastResetTime) < h.nativeHistogramMinResetDuration {
return false
}
Expand All @@ -906,6 +929,29 @@ func (h *histogram) maybeReset(
return true
}

// reset resets the whole histogram. It locks h.mtx itself, i.e. it has to be
// called without having locked h.mtx.
func (h *histogram) reset() {
h.mtx.Lock()
defer h.mtx.Unlock()

n := atomic.LoadUint64(&h.countAndHotIdx)
hotIdx := n >> 63
coldIdx := (^n) >> 63
hot := h.counts[hotIdx]
cold := h.counts[coldIdx]
// Completely reset coldCounts.
h.resetCounts(cold)
// Make coldCounts the new hot counts while resetting countAndHotIdx.
n = atomic.SwapUint64(&h.countAndHotIdx, coldIdx<<63)
count := n & ((1 << 63) - 1)
waitForCooldown(count, hot)
// Finally, reset the formerly hot counts, too.
h.resetCounts(hot)
h.lastResetTime = h.now()
h.resetScheduled = false
}

// maybeWidenZeroBucket widens the zero bucket until it includes the existing
// buckets closest to the zero bucket (which could be two, if an equidistant
// negative and a positive bucket exists, but usually it's only one bucket to be
Expand Down
36 changes: 25 additions & 11 deletions prometheus/histogram_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -925,16 +925,16 @@ func TestNativeHistogram(t *testing.T) {
maxBuckets: 4,
minResetDuration: 9 * time.Minute,
want: &dto.Histogram{
SampleCount: proto.Uint64(2),
SampleSum: proto.Float64(7),
SampleCount: proto.Uint64(3),
SampleSum: proto.Float64(12.1),
Schema: proto.Int32(2),
ZeroThreshold: proto.Float64(2.938735877055719e-39),
ZeroCount: proto.Uint64(0),
PositiveSpan: []*dto.BucketSpan{
{Offset: proto.Int32(7), Length: proto.Uint32(2)},
{Offset: proto.Int32(7), Length: proto.Uint32(4)},
},
PositiveDelta: []int64{1, 0},
CreatedTimestamp: timestamppb.New(now.Add(10 * time.Minute)), // We expect reset to happen after 9 minutes.
PositiveDelta: []int64{1, 0, -1, 1},
CreatedTimestamp: timestamppb.New(now.Add(9 * time.Minute)), // We expect reset to happen after 8 minutes.
},
},
{
Expand All @@ -945,23 +945,27 @@ func TestNativeHistogram(t *testing.T) {
maxZeroThreshold: 1.2,
minResetDuration: 9 * time.Minute,
want: &dto.Histogram{
SampleCount: proto.Uint64(2),
SampleSum: proto.Float64(7),
SampleCount: proto.Uint64(3),
SampleSum: proto.Float64(12.1),
Schema: proto.Int32(2),
ZeroThreshold: proto.Float64(2.938735877055719e-39),
ZeroCount: proto.Uint64(0),
PositiveSpan: []*dto.BucketSpan{
{Offset: proto.Int32(7), Length: proto.Uint32(2)},
{Offset: proto.Int32(7), Length: proto.Uint32(4)},
},
PositiveDelta: []int64{1, 0},
CreatedTimestamp: timestamppb.New(now.Add(10 * time.Minute)), // We expect reset to happen after 9 minutes.
PositiveDelta: []int64{1, 0, -1, 1},
CreatedTimestamp: timestamppb.New(now.Add(9 * time.Minute)), // We expect reset to happen after 8 minutes.
},
},
}

for _, s := range scenarios {
t.Run(s.name, func(t *testing.T) {
ts := now
var (
ts = now
funcToCall func()
whenToCall time.Duration
)

his := NewHistogram(HistogramOpts{
Name: "name",
Expand All @@ -972,12 +976,22 @@ func TestNativeHistogram(t *testing.T) {
NativeHistogramMinResetDuration: s.minResetDuration,
NativeHistogramMaxZeroThreshold: s.maxZeroThreshold,
now: func() time.Time { return ts },
afterFunc: func(d time.Duration, f func()) *time.Timer {
funcToCall = f
whenToCall = d
return nil
},
})

ts = ts.Add(time.Minute)
for _, o := range s.observations {
his.Observe(o)
ts = ts.Add(time.Minute)
whenToCall -= time.Minute
if funcToCall != nil && whenToCall <= 0 {
funcToCall()
funcToCall = nil
}
}
m := &dto.Metric{}
if err := his.Write(m); err != nil {
Expand Down
Loading