Skip to content

Commit

Permalink
MQE: Ensure BucketedPool always returns slices of power two
Browse files Browse the repository at this point in the history
This is to guarantee they work with the ring buffers which expect slices
to always be power of two.

The limiting pool still protects us from requesting an unreasonable
amount of points with the MemoryConsumptionTracker.
  • Loading branch information
jhesketh committed Dec 17, 2024
1 parent 8484c31 commit 4c1aa8e
Show file tree
Hide file tree
Showing 5 changed files with 118 additions and 1 deletion.
4 changes: 4 additions & 0 deletions pkg/streamingpromql/testdata/ours/selectors.test
Original file line number Diff line number Diff line change
Expand Up @@ -81,4 +81,8 @@ load 1s

# Test our bucket pool can get more than the max points
eval instant at 24h rate(metric_total[24h])
{} 2

# Make sure the ring buffer Use and Append work with power of two pools
eval instant at 24h rate(metric_total[1d:1s])
{} 2
61 changes: 61 additions & 0 deletions pkg/streamingpromql/types/limiting_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,67 @@ func TestLimitingPool_Mangling(t *testing.T) {
require.Equal(t, []int{123, 123, 123, 123}, s, "returned slice should be mangled when mangling is enabled")
}

func TestLimitingBucketedPool_PowerOfTwoCapacities(t *testing.T) {
memoryConsumptionTracker := limiting.NewMemoryConsumptionTracker(0, nil)

pool := NewLimitingBucketedPool(
pool.NewBucketedPool(100_000, func(size int) []int { return make([]int, 0, size) }),
1,
false,
nil,
)

cases := []struct {
requestedSize int
expectedCap int
}{
{3, 4},
{5, 8},
{10, 16},
{65_000, 65_536},
{100_001, 131_072}, // Exceeds max, expect next power of two
}

for _, c := range cases {
slice, err := pool.Get(c.requestedSize, memoryConsumptionTracker)
require.NoError(t, err, "Unexpected error when requesting size %d", c.requestedSize)
require.Equal(t, c.expectedCap, cap(slice),
"LimitingBucketedPool.Get() returned slice with capacity %d; expected %d", cap(slice), c.expectedCap)
pool.Put(slice, memoryConsumptionTracker)
}
}

func TestLimitingBucketedPool_UnreasonableSizeRequest(t *testing.T) {
const maxMemoryLimit = 1_000_000 * FPointSize

reg, metric := createRejectedMetric()
memoryConsumptionTracker := limiting.NewMemoryConsumptionTracker(uint64(maxMemoryLimit), metric)

pool := NewLimitingBucketedPool(
pool.NewBucketedPool(100_000, func(size int) []int { return make([]int, 0, size) }),
1,
false,
nil,
)

// Request a reasonable size
slice, err := pool.Get(500_000, memoryConsumptionTracker)
require.NoError(t, err, "Expected to succeed for reasonable size request")
require.Equal(t, 524_288, cap(slice), "Capacity should be next power of two")
assertRejectedQueryCount(t, reg, 0)

pool.Put(slice, memoryConsumptionTracker)

// Request an unreasonable size
_, err = pool.Get(10_000_000, memoryConsumptionTracker)
require.Error(t, err, "Expected an error for unreasonably large size request")
require.Contains(t, err.Error(), "exceeded", "Error message should indicate memory consumption limit exceeded")
assertRejectedQueryCount(t, reg, 1)

require.Equal(t, uint64(0), memoryConsumptionTracker.CurrentEstimatedMemoryConsumptionBytes,
"Current memory consumption should remain at 0 after rejected request")
}

func assertRejectedQueryCount(t *testing.T, reg *prometheus.Registry, expectedRejectionCount int) {
expected := fmt.Sprintf(`
# TYPE %s counter
Expand Down
24 changes: 24 additions & 0 deletions pkg/streamingpromql/types/ring_buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -502,3 +502,27 @@ func setupRingBufferTestingPools(t *testing.T) {
putHPointSliceForRingBuffer = originalPutHPointSlice
})
}

func TestFPointRingBuffer_UseReturnsErrorOnNonPowerOfTwoSlice(t *testing.T) {
memoryConsumptionTracker := limiting.NewMemoryConsumptionTracker(0, nil)
buf := NewFPointRingBuffer(memoryConsumptionTracker)

nonPowerOfTwoSlice := make([]promql.FPoint, 0, 15)

err := buf.Use(nonPowerOfTwoSlice)
require.Error(t, err, "Use() should return an error for a non-power-of-two slice")
require.EqualError(t, err, "slice capacity must be a power of two, but is 15",
"Error message should indicate the invalid capacity")
}

func TestHPointRingBuffer_UseReturnsErrorOnNonPowerOfTwoSlice(t *testing.T) {
memoryConsumptionTracker := limiting.NewMemoryConsumptionTracker(0, nil)
buf := NewHPointRingBuffer(memoryConsumptionTracker)

nonPowerOfTwoSlice := make([]promql.HPoint, 0, 15)

err := buf.Use(nonPowerOfTwoSlice)
require.Error(t, err, "Use() should return an error for a non-power-of-two slice")
require.EqualError(t, err, "slice capacity must be a power of two, but is 15",
"Error message should indicate the invalid capacity")
}
5 changes: 4 additions & 1 deletion pkg/util/pool/bucketed_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ func NewBucketedPool[T ~[]E, E any](maxSize uint, makeFunc func(int) T) *Buckete
}

// Get returns a new slice with capacity greater than or equal to size.
// If no bucket large enough exists, a slice larger than the requested size
// of the next power of two is returned.
func (p *BucketedPool[T, E]) Get(size int) T {
if size < 0 {
panic(fmt.Sprintf("BucketedPool.Get with negative size %v", size))
Expand All @@ -53,7 +55,8 @@ func (p *BucketedPool[T, E]) Get(size int) T {

bucketIndex := bits.Len(uint(size - 1))
if bucketIndex >= len(p.buckets) {
return p.make(size)
nextPowerOfTwo := 1 << bucketIndex
return p.make(nextPowerOfTwo)
}

s := p.buckets[bucketIndex].Get()
Expand Down
25 changes: 25 additions & 0 deletions pkg/util/pool/bucketed_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,3 +134,28 @@ func TestBucketedPool_GetSizeCloseToMax(t *testing.T) {
require.Equal(t, 86401, cap(s))
require.Len(t, s, 0)
}

func TestBucketedPool_AlwaysReturnsPowerOfTwoCapacities(t *testing.T) {
pool := NewBucketedPool(100_000, makeFunc)

cases := []struct {
requestedSize int
expectedCap int
}{
{3, 4},
{5, 8},
{10, 16},
{20, 32},
{65_000, 65_536},
{100_001, 131_072}, // Exceeds max bucket: next power of two is 131,072
}

for _, c := range cases {
slice := pool.Get(c.requestedSize)

require.Equal(t, c.expectedCap, cap(slice),
"BucketedPool.Get() returned slice with capacity %d; expected %d", cap(slice), c.expectedCap)

pool.Put(slice)
}
}

0 comments on commit 4c1aa8e

Please sign in to comment.