Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[r321] MQE: Fix panic in loading too many samples #10271

Merged
merged 1 commit into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions pkg/streamingpromql/testdata/ours/selectors.test
Original file line number Diff line number Diff line change
Expand Up @@ -73,3 +73,16 @@ eval range from 0 to 7m step 1m some_metric
some_metric{env="prod", cluster="eu"} _ _ _ 0 1 2 3 4
some_metric{env="prod", cluster="us"} _ _ _ 0 2 4 6 8
some_metric{env="prod", cluster="au"} _ _ _ {{count:5}} {{count:10}} {{count:15}} {{count:20}} {{count:25}}

clear

load 1s
metric_total 0+2x86400

# Test our bucket pool can get more than the max points
eval instant at 24h rate(metric_total[24h])
{} 2

# Make sure the ring buffer Use and Append work with power of two pools
eval instant at 24h rate(metric_total[1d:1s])
{} 2
2 changes: 1 addition & 1 deletion pkg/streamingpromql/types/limiting_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
)

const (
MaxExpectedPointsPerSeries = 100_000 // There's not too much science behind this number: 100000 points allows for a point per minute for just under 70 days.
MaxExpectedPointsPerSeries = 131_072 // There's not too much science behind this number: 100,000 points allows for a point per minute for just under 70 days. Then we use the next power of two.

// Treat a native histogram sample as equivalent to this many float samples when considering max in-memory bytes limit.
// Keep in mind that float sample = timestamp + float value, so 5x this is equivalent to five timestamps and five floats.
Expand Down
95 changes: 95 additions & 0 deletions pkg/streamingpromql/types/limiting_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,101 @@ func TestLimitingPool_Mangling(t *testing.T) {
require.Equal(t, []int{123, 123, 123, 123}, s, "returned slice should be mangled when mangling is enabled")
}

func TestLimitingBucketedPool_PowerOfTwoCapacities(t *testing.T) {
memoryConsumptionTracker := limiting.NewMemoryConsumptionTracker(0, nil)

pool := NewLimitingBucketedPool(
pool.NewBucketedPool(100_000, func(size int) []int { return make([]int, 0, size) }),
1,
false,
nil,
)

cases := []struct {
requestedSize int
expectedCap int
}{
{3, 4},
{5, 8},
{10, 16},
{65_000, 65_536},
{100_001, 131_072}, // Exceeds max, expect next power of two
}

for _, c := range cases {
slice, err := pool.Get(c.requestedSize, memoryConsumptionTracker)
require.NoError(t, err, "Unexpected error when requesting size %d", c.requestedSize)
require.Equal(t, c.expectedCap, cap(slice),
"LimitingBucketedPool.Get() returned slice with capacity %d; expected %d", cap(slice), c.expectedCap)
pool.Put(slice, memoryConsumptionTracker)
}
}

func TestLimitingBucketedPool_UnreasonableSizeRequest(t *testing.T) {
const maxMemoryLimit = 1_000_000 * FPointSize

reg, metric := createRejectedMetric()
memoryConsumptionTracker := limiting.NewMemoryConsumptionTracker(uint64(maxMemoryLimit), metric)

pool := NewLimitingBucketedPool(
pool.NewBucketedPool(100_000, func(size int) []int { return make([]int, 0, size) }),
1,
false,
nil,
)

// Request a reasonable size
slice, err := pool.Get(500_000, memoryConsumptionTracker)
require.NoError(t, err, "Expected to succeed for reasonable size request")
require.Equal(t, 524_288, cap(slice), "Capacity should be next power of two")
assertRejectedQueryCount(t, reg, 0)

pool.Put(slice, memoryConsumptionTracker)

// Request an unreasonable size
_, err = pool.Get(10_000_000, memoryConsumptionTracker)
require.Error(t, err, "Expected an error for unreasonably large size request")
require.Contains(t, err.Error(), "exceeded", "Error message should indicate memory consumption limit exceeded")
assertRejectedQueryCount(t, reg, 1)

require.Equal(t, uint64(0), memoryConsumptionTracker.CurrentEstimatedMemoryConsumptionBytes,
"Current memory consumption should remain at 0 after rejected request")
}

func TestLimitingBucketedPool_MaxExpectedPointsPerSeriesConstantIsPowerOfTwo(t *testing.T) {
// Although not strictly required (as the code should handle MaxExpectedPointsPerSeries not being a power of two correctly),
// it is best that we keep it as one for now.
require.True(t, isPowerOfTwo(MaxExpectedPointsPerSeries), "MaxExpectedPointsPerSeries must be a power of two")
}

func TestIsPowerOfTwo(t *testing.T) {
cases := []struct {
input int
expected bool
}{
{-2, false},
{1, true},
{2, true},
{3, false},
{4, true},
{5, false},
{6, false},
{7, false},
{8, true},
{16, true},
{32, true},
{1023, false},
{1024, true},
{1<<12 - 1, false},
{1 << 12, true},
}

for _, c := range cases {
result := isPowerOfTwo(c.input)
require.Equalf(t, c.expected, result, "isPowerOfTwo(%d) should return %v", c.input, c.expected)
}
}

func assertRejectedQueryCount(t *testing.T, reg *prometheus.Registry, expectedRejectionCount int) {
expected := fmt.Sprintf(`
# TYPE %s counter
Expand Down
24 changes: 24 additions & 0 deletions pkg/streamingpromql/types/ring_buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -502,3 +502,27 @@ func setupRingBufferTestingPools(t *testing.T) {
putHPointSliceForRingBuffer = originalPutHPointSlice
})
}

func TestFPointRingBuffer_UseReturnsErrorOnNonPowerOfTwoSlice(t *testing.T) {
memoryConsumptionTracker := limiting.NewMemoryConsumptionTracker(0, nil)
buf := NewFPointRingBuffer(memoryConsumptionTracker)

nonPowerOfTwoSlice := make([]promql.FPoint, 0, 15)

err := buf.Use(nonPowerOfTwoSlice)
require.Error(t, err, "Use() should return an error for a non-power-of-two slice")
require.EqualError(t, err, "slice capacity must be a power of two, but is 15",
"Error message should indicate the invalid capacity")
}

func TestHPointRingBuffer_UseReturnsErrorOnNonPowerOfTwoSlice(t *testing.T) {
memoryConsumptionTracker := limiting.NewMemoryConsumptionTracker(0, nil)
buf := NewHPointRingBuffer(memoryConsumptionTracker)

nonPowerOfTwoSlice := make([]promql.HPoint, 0, 15)

err := buf.Use(nonPowerOfTwoSlice)
require.Error(t, err, "Use() should return an error for a non-power-of-two slice")
require.EqualError(t, err, "slice capacity must be a power of two, but is 15",
"Error message should indicate the invalid capacity")
}
20 changes: 16 additions & 4 deletions pkg/util/pool/bucketed_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ func NewBucketedPool[T ~[]E, E any](maxSize uint, makeFunc func(int) T) *Buckete
}

// Get returns a new slice with capacity greater than or equal to size.
// If no bucket large enough exists, a slice larger than the requested size
// of the next power of two is returned.
// Get guarantees the resulting slice always has a capacity in power of twos.
func (p *BucketedPool[T, E]) Get(size int) T {
if size < 0 {
panic(fmt.Sprintf("BucketedPool.Get with negative size %v", size))
Expand All @@ -51,11 +54,14 @@ func (p *BucketedPool[T, E]) Get(size int) T {
return nil
}

if uint(size) > p.maxSize {
return p.make(size)
bucketIndex := bits.Len(uint(size - 1))

// If bucketIndex exceeds the number of available buckets, return a slice of the next power of two.
if bucketIndex >= len(p.buckets) {
nextPowerOfTwo := 1 << bucketIndex
return p.make(nextPowerOfTwo)
}

bucketIndex := bits.Len(uint(size - 1))
s := p.buckets[bucketIndex].Get()

if s == nil {
Expand All @@ -76,8 +82,14 @@ func (p *BucketedPool[T, E]) Put(s T) {
}

bucketIndex := bits.Len(size - 1)
if bucketIndex >= len(p.buckets) {
return // Ignore slices larger than the largest bucket
}

// Ignore slices that do not align to the current power of 2
// (this will only happen where a slice did not originally come from the pool).
if size != (1 << bucketIndex) {
bucketIndex--
return
}

p.buckets[bucketIndex].Put(s[0:0])
Expand Down
59 changes: 57 additions & 2 deletions pkg/util/pool/bucketed_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func TestBucketedPool_HappyPath(t *testing.T) {
},
{
size: 20,
expectedCap: 20, // Max size is 19, so we expect to get a slice with the size requested (20), not 32 (the next power of two).
expectedCap: 32, // Although max size is 19, we expect to get a slice with the next power of two back. This slice would not have come from a bucket.
},
}

Expand Down Expand Up @@ -120,5 +120,60 @@ func TestBucketedPool_PutSliceLargerThanMaximum(t *testing.T) {
pool.Put(s1)
s2 := pool.Get(101)[:101]
require.NotSame(t, &s1[0], &s2[0])
require.Equal(t, 101, cap(s2))
require.Equal(t, 128, cap(s2))
}

func TestBucketedPool_GetSizeCloseToMax(t *testing.T) {
maxSize := 100000
pool := NewBucketedPool(uint(maxSize), makeFunc)

// Request a size that triggers the last bucket boundary.
s := pool.Get(86401)

// Check that we still get a slice with the correct size.
require.Equal(t, 131072, cap(s))
require.Len(t, s, 0)
}

func TestBucketedPool_AlwaysReturnsPowerOfTwoCapacities(t *testing.T) {
pool := NewBucketedPool(100_000, makeFunc)

cases := []struct {
requestedSize int
expectedCap int
}{
{3, 4},
{5, 8},
{10, 16},
{20, 32},
{65_000, 65_536},
{100_001, 131_072}, // Exceeds max bucket: next power of two is 131,072
}

for _, c := range cases {
slice := pool.Get(c.requestedSize)

require.Equal(t, c.expectedCap, cap(slice),
"BucketedPool.Get() returned slice with capacity %d; expected %d", cap(slice), c.expectedCap)

pool.Put(slice)
}
}

func TestBucketedPool_PutSizeCloseToMax(t *testing.T) {
maxSize := 100000
pool := NewBucketedPool(uint(maxSize), makeFunc)

// Create a slice with capacity that triggers the upper edge case
s := make([]int, 0, 65_000) // 86401 is close to maxSize but not aligned to power of 2

// Ensure Put does not panic when adding this slice
require.NotPanics(t, func() {
pool.Put(s)
}, "Put should not panic for sizes close to maxSize")

// Validate that a subsequent Get for a smaller size works fine
ret := pool.Get(1)
require.Equal(t, 1, cap(ret))
require.Len(t, ret, 0)
}
Loading