Skip to content

Commit

Permalink
MQE: Fix panic in loading too many samples (#10261)
Browse files Browse the repository at this point in the history
* MQE: Fix panic in loading too many samples

BucketedPool would create pools in powers of two up to a set
`maxSize`. That is, if `maxSize` isn't a power of two itself,
then the maximum bucket would be less than the maxSize.

However, when requesting a slice from the pool we were only checking
against the maxSize, and not whether a bucket existed for that size.
Instead calculate the bucketIndex and check if that exists
before using it.

* Use a power of two size max bucket

* MQE: Ensure BucketedPool always returns slices of power two

This is to guarantee they work with the ring buffers which expect slices
to always be power of two.

The limiting pool still protects us from requesting an unreasonable
amount of points with the MemoryConsumptionTracker.

* Fix tests

* Correctly return slices to their respective buckets or discard them

* Extra tests

* Address review feedback

* Remove unncessary slice length check
  • Loading branch information
jhesketh authored Dec 18, 2024
1 parent 3dc5104 commit 34a24b1
Show file tree
Hide file tree
Showing 6 changed files with 206 additions and 7 deletions.
13 changes: 13 additions & 0 deletions pkg/streamingpromql/testdata/ours/selectors.test
Original file line number Diff line number Diff line change
Expand Up @@ -73,3 +73,16 @@ eval range from 0 to 7m step 1m some_metric
some_metric{env="prod", cluster="eu"} _ _ _ 0 1 2 3 4
some_metric{env="prod", cluster="us"} _ _ _ 0 2 4 6 8
some_metric{env="prod", cluster="au"} _ _ _ {{count:5}} {{count:10}} {{count:15}} {{count:20}} {{count:25}}

clear

load 1s
metric_total 0+2x86400

# Test our bucket pool can get more than the max points
eval instant at 24h rate(metric_total[24h])
{} 2

# Make sure the ring buffer Use and Append work with power of two pools
eval instant at 24h rate(metric_total[1d:1s])
{} 2
2 changes: 1 addition & 1 deletion pkg/streamingpromql/types/limiting_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
)

const (
MaxExpectedPointsPerSeries = 100_000 // There's not too much science behind this number: 100000 points allows for a point per minute for just under 70 days.
MaxExpectedPointsPerSeries = 131_072 // There's not too much science behind this number: 100,000 points allows for a point per minute for just under 70 days. Then we use the next power of two.

// Treat a native histogram sample as equivalent to this many float samples when considering max in-memory bytes limit.
// Keep in mind that float sample = timestamp + float value, so 5x this is equivalent to five timestamps and five floats.
Expand Down
95 changes: 95 additions & 0 deletions pkg/streamingpromql/types/limiting_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,101 @@ func TestLimitingPool_Mangling(t *testing.T) {
require.Equal(t, []int{123, 123, 123, 123}, s, "returned slice should be mangled when mangling is enabled")
}

func TestLimitingBucketedPool_PowerOfTwoCapacities(t *testing.T) {
memoryConsumptionTracker := limiting.NewMemoryConsumptionTracker(0, nil)

pool := NewLimitingBucketedPool(
pool.NewBucketedPool(100_000, func(size int) []int { return make([]int, 0, size) }),
1,
false,
nil,
)

cases := []struct {
requestedSize int
expectedCap int
}{
{3, 4},
{5, 8},
{10, 16},
{65_000, 65_536},
{100_001, 131_072}, // Exceeds max, expect next power of two
}

for _, c := range cases {
slice, err := pool.Get(c.requestedSize, memoryConsumptionTracker)
require.NoError(t, err, "Unexpected error when requesting size %d", c.requestedSize)
require.Equal(t, c.expectedCap, cap(slice),
"LimitingBucketedPool.Get() returned slice with capacity %d; expected %d", cap(slice), c.expectedCap)
pool.Put(slice, memoryConsumptionTracker)
}
}

func TestLimitingBucketedPool_UnreasonableSizeRequest(t *testing.T) {
const maxMemoryLimit = 1_000_000 * FPointSize

reg, metric := createRejectedMetric()
memoryConsumptionTracker := limiting.NewMemoryConsumptionTracker(uint64(maxMemoryLimit), metric)

pool := NewLimitingBucketedPool(
pool.NewBucketedPool(100_000, func(size int) []int { return make([]int, 0, size) }),
1,
false,
nil,
)

// Request a reasonable size
slice, err := pool.Get(500_000, memoryConsumptionTracker)
require.NoError(t, err, "Expected to succeed for reasonable size request")
require.Equal(t, 524_288, cap(slice), "Capacity should be next power of two")
assertRejectedQueryCount(t, reg, 0)

pool.Put(slice, memoryConsumptionTracker)

// Request an unreasonable size
_, err = pool.Get(10_000_000, memoryConsumptionTracker)
require.Error(t, err, "Expected an error for unreasonably large size request")
require.Contains(t, err.Error(), "exceeded", "Error message should indicate memory consumption limit exceeded")
assertRejectedQueryCount(t, reg, 1)

require.Equal(t, uint64(0), memoryConsumptionTracker.CurrentEstimatedMemoryConsumptionBytes,
"Current memory consumption should remain at 0 after rejected request")
}

func TestLimitingBucketedPool_MaxExpectedPointsPerSeriesConstantIsPowerOfTwo(t *testing.T) {
// Although not strictly required (as the code should handle MaxExpectedPointsPerSeries not being a power of two correctly),
// it is best that we keep it as one for now.
require.True(t, isPowerOfTwo(MaxExpectedPointsPerSeries), "MaxExpectedPointsPerSeries must be a power of two")
}

func TestIsPowerOfTwo(t *testing.T) {
cases := []struct {
input int
expected bool
}{
{-2, false},
{1, true},
{2, true},
{3, false},
{4, true},
{5, false},
{6, false},
{7, false},
{8, true},
{16, true},
{32, true},
{1023, false},
{1024, true},
{1<<12 - 1, false},
{1 << 12, true},
}

for _, c := range cases {
result := isPowerOfTwo(c.input)
require.Equalf(t, c.expected, result, "isPowerOfTwo(%d) should return %v", c.input, c.expected)
}
}

func assertRejectedQueryCount(t *testing.T, reg *prometheus.Registry, expectedRejectionCount int) {
expected := fmt.Sprintf(`
# TYPE %s counter
Expand Down
24 changes: 24 additions & 0 deletions pkg/streamingpromql/types/ring_buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -502,3 +502,27 @@ func setupRingBufferTestingPools(t *testing.T) {
putHPointSliceForRingBuffer = originalPutHPointSlice
})
}

func TestFPointRingBuffer_UseReturnsErrorOnNonPowerOfTwoSlice(t *testing.T) {
memoryConsumptionTracker := limiting.NewMemoryConsumptionTracker(0, nil)
buf := NewFPointRingBuffer(memoryConsumptionTracker)

nonPowerOfTwoSlice := make([]promql.FPoint, 0, 15)

err := buf.Use(nonPowerOfTwoSlice)
require.Error(t, err, "Use() should return an error for a non-power-of-two slice")
require.EqualError(t, err, "slice capacity must be a power of two, but is 15",
"Error message should indicate the invalid capacity")
}

func TestHPointRingBuffer_UseReturnsErrorOnNonPowerOfTwoSlice(t *testing.T) {
memoryConsumptionTracker := limiting.NewMemoryConsumptionTracker(0, nil)
buf := NewHPointRingBuffer(memoryConsumptionTracker)

nonPowerOfTwoSlice := make([]promql.HPoint, 0, 15)

err := buf.Use(nonPowerOfTwoSlice)
require.Error(t, err, "Use() should return an error for a non-power-of-two slice")
require.EqualError(t, err, "slice capacity must be a power of two, but is 15",
"Error message should indicate the invalid capacity")
}
20 changes: 16 additions & 4 deletions pkg/util/pool/bucketed_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ func NewBucketedPool[T ~[]E, E any](maxSize uint, makeFunc func(int) T) *Buckete
}

// Get returns a new slice with capacity greater than or equal to size.
// If no bucket large enough exists, a slice larger than the requested size
// of the next power of two is returned.
// Get guarantees the resulting slice always has a capacity in power of twos.
func (p *BucketedPool[T, E]) Get(size int) T {
if size < 0 {
panic(fmt.Sprintf("BucketedPool.Get with negative size %v", size))
Expand All @@ -51,11 +54,14 @@ func (p *BucketedPool[T, E]) Get(size int) T {
return nil
}

if uint(size) > p.maxSize {
return p.make(size)
bucketIndex := bits.Len(uint(size - 1))

// If bucketIndex exceeds the number of available buckets, return a slice of the next power of two.
if bucketIndex >= len(p.buckets) {
nextPowerOfTwo := 1 << bucketIndex
return p.make(nextPowerOfTwo)
}

bucketIndex := bits.Len(uint(size - 1))
s := p.buckets[bucketIndex].Get()

if s == nil {
Expand All @@ -76,8 +82,14 @@ func (p *BucketedPool[T, E]) Put(s T) {
}

bucketIndex := bits.Len(size - 1)
if bucketIndex >= len(p.buckets) {
return // Ignore slices larger than the largest bucket
}

// Ignore slices that do not align to the current power of 2
// (this will only happen where a slice did not originally come from the pool).
if size != (1 << bucketIndex) {
bucketIndex--
return
}

p.buckets[bucketIndex].Put(s[0:0])
Expand Down
59 changes: 57 additions & 2 deletions pkg/util/pool/bucketed_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func TestBucketedPool_HappyPath(t *testing.T) {
},
{
size: 20,
expectedCap: 20, // Max size is 19, so we expect to get a slice with the size requested (20), not 32 (the next power of two).
expectedCap: 32, // Although max size is 19, we expect to get a slice with the next power of two back. This slice would not have come from a bucket.
},
}

Expand Down Expand Up @@ -120,5 +120,60 @@ func TestBucketedPool_PutSliceLargerThanMaximum(t *testing.T) {
pool.Put(s1)
s2 := pool.Get(101)[:101]
require.NotSame(t, &s1[0], &s2[0])
require.Equal(t, 101, cap(s2))
require.Equal(t, 128, cap(s2))
}

func TestBucketedPool_GetSizeCloseToMax(t *testing.T) {
maxSize := 100000
pool := NewBucketedPool(uint(maxSize), makeFunc)

// Request a size that triggers the last bucket boundary.
s := pool.Get(86401)

// Check that we still get a slice with the correct size.
require.Equal(t, 131072, cap(s))
require.Len(t, s, 0)
}

func TestBucketedPool_AlwaysReturnsPowerOfTwoCapacities(t *testing.T) {
pool := NewBucketedPool(100_000, makeFunc)

cases := []struct {
requestedSize int
expectedCap int
}{
{3, 4},
{5, 8},
{10, 16},
{20, 32},
{65_000, 65_536},
{100_001, 131_072}, // Exceeds max bucket: next power of two is 131,072
}

for _, c := range cases {
slice := pool.Get(c.requestedSize)

require.Equal(t, c.expectedCap, cap(slice),
"BucketedPool.Get() returned slice with capacity %d; expected %d", cap(slice), c.expectedCap)

pool.Put(slice)
}
}

func TestBucketedPool_PutSizeCloseToMax(t *testing.T) {
maxSize := 100000
pool := NewBucketedPool(uint(maxSize), makeFunc)

// Create a slice with capacity that triggers the upper edge case
s := make([]int, 0, 65_000) // 86401 is close to maxSize but not aligned to power of 2

// Ensure Put does not panic when adding this slice
require.NotPanics(t, func() {
pool.Put(s)
}, "Put should not panic for sizes close to maxSize")

// Validate that a subsequent Get for a smaller size works fine
ret := pool.Get(1)
require.Equal(t, 1, cap(ret))
require.Len(t, ret, 0)
}

0 comments on commit 34a24b1

Please sign in to comment.