diff --git a/pkg/tempopb/pool/config.go b/pkg/tempopb/pool/config.go deleted file mode 100644 index 73fbceb8ed4..00000000000 --- a/pkg/tempopb/pool/config.go +++ /dev/null @@ -1,14 +0,0 @@ -package pool - -type Config struct { - MaxWorkers int `yaml:"max_workers"` - QueueDepth int `yaml:"queue_depth"` -} - -// default is concurrency disabled -func defaultConfig() *Config { - return &Config{ - MaxWorkers: 30, - QueueDepth: 10000, - } -} diff --git a/pkg/tempopb/pool/pool.go b/pkg/tempopb/pool/pool.go index 529759fe1db..a3e73f3f52b 100644 --- a/pkg/tempopb/pool/pool.go +++ b/pkg/tempopb/pool/pool.go @@ -1,196 +1,110 @@ +// Forked with love from: https://github.com/prometheus/prometheus/tree/c954cd9d1d4e3530be2939d39d8633c38b70913f/util/pool +// This package was forked to provide better protection against putting byte slices back into the pool that +// did not originate from it. + package pool import ( - "context" - "fmt" "sync" - "time" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" - "github.com/uber-go/atomic" -) - -const ( - queueLengthReportDuration = 15 * time.Second ) var ( - metricQueryQueueLength = promauto.NewGauge(prometheus.GaugeOpts{ - Namespace: "tempodb", - Name: "work_queue_length", - Help: "Current length of the work queue.", - }) - - metricQueryQueueMax = promauto.NewGauge(prometheus.GaugeOpts{ - Namespace: "tempodb", - Name: "work_queue_max", - Help: "Maximum number of items in the work queue.", - }) + metricMissOver prometheus.Counter + metricMissUnder prometheus.Counter ) -type JobFunc func(ctx context.Context, payload interface{}) (interface{}, error) - -type result struct { - data interface{} - err error -} - -type job struct { - ctx context.Context - payload interface{} - fn JobFunc +func init() { + metricAllocOutPool := promauto.NewCounterVec(prometheus.CounterOpts{ + Namespace: "tempo", + Name: "ingester_prealloc_miss_bytes_total", + Help: "The total number of alloc'ed bytes that missed the sync pools.", + }, []string{"direction"}) - wg *sync.WaitGroup - resultsCh chan result - stop *atomic.Bool + metricMissOver = metricAllocOutPool.WithLabelValues("over") + metricMissUnder = metricAllocOutPool.WithLabelValues("under") } +// Pool is a linearly bucketed pool for variably sized byte slices. type Pool struct { - cfg *Config - size *atomic.Int32 - - workQueue chan *job - shutdownCh chan struct{} + buckets []sync.Pool + bktSize int + minBucket int } -func NewPool(cfg *Config) *Pool { - if cfg == nil { - cfg = defaultConfig() +// New returns a new Pool with size buckets for minSize to maxSize +func New(minBucket, numBuckets, bktSize int) *Pool { + if minBucket < 0 { + panic("invalid min bucket size") } - - q := make(chan *job, cfg.QueueDepth) - p := &Pool{ - cfg: cfg, - workQueue: q, - size: atomic.NewInt32(0), - shutdownCh: make(chan struct{}), + if bktSize < 1 { + panic("invalid bucket size") } - - for i := 0; i < cfg.MaxWorkers; i++ { - go p.worker(q) + if numBuckets < 1 { + panic("invalid num buckets") } - p.reportQueueLength() - - metricQueryQueueMax.Set(float64(cfg.QueueDepth)) - - return p + return &Pool{ + buckets: make([]sync.Pool, numBuckets), + bktSize: bktSize, + minBucket: minBucket, + } } -func (p *Pool) RunJobs(ctx context.Context, payloads []interface{}, fn JobFunc) ([]interface{}, []error, error) { - ctx, cancel := context.WithCancel(ctx) - defer cancel() +// Get returns a new byte slices that fits the given size. +func (p *Pool) Get(sz int) []byte { + if sz < 0 { + panic("requested negative size") + } - totalJobs := len(payloads) + // Find the right bucket. + bkt := p.bucketFor(sz) - // sanity check before we even attempt to start adding jobs - if int(p.size.Load())+totalJobs > p.cfg.QueueDepth { - return nil, nil, fmt.Errorf("queue doesn't have room for %d jobs", len(payloads)) + if bkt < 0 { + metricMissUnder.Add(float64(sz)) + return make([]byte, 0, sz) } - resultsCh := make(chan result, totalJobs) // way for jobs to send back results - stop := atomic.NewBool(false) // way to signal to the jobs to quit - wg := &sync.WaitGroup{} // way to wait for all jobs to complete - - // add each job one at a time. even though we checked length above these might still fail - for _, payload := range payloads { - wg.Add(1) - j := &job{ - ctx: ctx, - fn: fn, - payload: payload, - wg: wg, - resultsCh: resultsCh, - stop: stop, - } - - select { - case p.workQueue <- j: - p.size.Inc() - default: - wg.Done() - stop.Store(true) - return nil, nil, fmt.Errorf("failed to add a job to work queue") - } + if bkt >= len(p.buckets) { + metricMissOver.Add(float64(sz)) + return make([]byte, 0, sz) } - // wait for all jobs to finish - wg.Wait() - - // close resultsCh - close(resultsCh) - - // read all from results channel - var data []interface{} - var funcErrs []error - for result := range resultsCh { - if result.err != nil { - funcErrs = append(funcErrs, result.err) - } else { - data = append(data, result.data) - } + b := p.buckets[bkt].Get() + if b == nil { + alignedSz := (bkt+1)*p.bktSize + p.minBucket + b = make([]byte, 0, alignedSz) } - - return data, funcErrs, nil + return b.([]byte) } -func (p *Pool) Shutdown() { - close(p.workQueue) - close(p.shutdownCh) -} +// Put adds a slice to the right bucket in the pool. +func (p *Pool) Put(s []byte) int { + c := cap(s) -func (p *Pool) worker(j <-chan *job) { - for { - select { - case <-p.shutdownCh: - return - case j, ok := <-j: - if !ok { - return - } - runJob(j) - p.size.Dec() - } + // valid slice? + if (c-p.minBucket)%p.bktSize != 0 { + return -1 + } + bkt := p.bucketFor(c) // -1 puts the slice in the pool below. it will be larger than all requested slices for this bucket + if bkt < 0 { + return -1 + } + if bkt >= len(p.buckets) { + return -1 } -} - -func (p *Pool) reportQueueLength() { - ticker := time.NewTicker(queueLengthReportDuration) - go func() { - defer ticker.Stop() - for { - select { - case <-ticker.C: - metricQueryQueueLength.Set(float64(p.size.Load())) - case <-p.shutdownCh: - return - } - } - }() -} -func runJob(job *job) { - defer job.wg.Done() + p.buckets[bkt].Put(s) // nolint: staticcheck - // bail in case we have been asked to stop - if job.ctx.Err() != nil { - return - } + return bkt // for testing +} - // bail in case not all jobs could be enqueued - if job.stop.Load() { - return +func (p *Pool) bucketFor(sz int) int { + if sz <= p.minBucket { + return -1 } - data, err := job.fn(job.ctx, job.payload) - if data != nil || err != nil { - select { - case job.resultsCh <- result{ - data: data, - err: err, - }: - default: // if we hit default it means that something else already returned a good result. /shrug - } - } + return (sz - p.minBucket - 1) / p.bktSize } diff --git a/pkg/tempopb/pool/pool_test.go b/pkg/tempopb/pool/pool_test.go index 9b3e05b34da..8077b70f289 100644 --- a/pkg/tempopb/pool/pool_test.go +++ b/pkg/tempopb/pool/pool_test.go @@ -1,381 +1,114 @@ +// Forked with love from: https://github.com/prometheus/prometheus/tree/c954cd9d1d4e3530be2939d39d8633c38b70913f/util/pool + package pool import ( - "context" - "fmt" "math/rand" - "sync" "testing" - "time" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "go.uber.org/goleak" - "go.uber.org/multierr" ) -func TestResults(t *testing.T) { - prePoolOpts := goleak.IgnoreCurrent() - - p := NewPool(&Config{ - MaxWorkers: 10, - QueueDepth: 10, - }) - opts := goleak.IgnoreCurrent() - - ret := []byte{0x01, 0x02} - fn := func(ctx context.Context, payload interface{}) (interface{}, error) { - i := payload.(int) - - if i == 3 { - return ret, nil - } - return nil, nil - } - payloads := []interface{}{1, 2, 3, 4, 5} - - msg, funcErrs, err := p.RunJobs(context.Background(), payloads, fn) - assert.NoError(t, err) - assert.Nil(t, funcErrs) - require.Len(t, msg, 1) - assert.Equal(t, ret, msg[0]) - goleak.VerifyNone(t, opts) - - p.Shutdown() - goleak.VerifyNone(t, prePoolOpts) -} - -func TestNoResults(t *testing.T) { - prePoolOpts := goleak.IgnoreCurrent() - - p := NewPool(&Config{ - MaxWorkers: 10, - QueueDepth: 10, - }) - opts := goleak.IgnoreCurrent() - - fn := func(ctx context.Context, payload interface{}) (interface{}, error) { - return nil, nil - } - payloads := []interface{}{1, 2, 3, 4, 5} - - msg, funcErrs, err := p.RunJobs(context.Background(), payloads, fn) - assert.Nil(t, msg) - assert.Nil(t, err) - assert.Nil(t, funcErrs) - goleak.VerifyNone(t, opts) - - p.Shutdown() - goleak.VerifyNone(t, prePoolOpts) -} - -func TestMultipleHits(t *testing.T) { - prePoolOpts := goleak.IgnoreCurrent() - - p := NewPool(&Config{ - MaxWorkers: 10, - QueueDepth: 10, - }) - opts := goleak.IgnoreCurrent() - - ret := []byte{0x01, 0x02} - fn := func(ctx context.Context, payload interface{}) (interface{}, error) { - return ret, nil - } - payloads := []interface{}{1, 2, 3, 4, 5} - - msg, funcErrs, err := p.RunJobs(context.Background(), payloads, fn) - require.Len(t, msg, 5) - for i := range payloads { - assert.Equal(t, ret, msg[i]) - } - assert.Nil(t, err) - assert.Nil(t, funcErrs) - goleak.VerifyNone(t, opts) - - p.Shutdown() - goleak.VerifyNone(t, prePoolOpts) -} - -func TestError(t *testing.T) { - prePoolOpts := goleak.IgnoreCurrent() - - p := NewPool(&Config{ - MaxWorkers: 1, - QueueDepth: 10, - }) - opts := goleak.IgnoreCurrent() - - ret := fmt.Errorf("blerg") - fn := func(ctx context.Context, payload interface{}) (interface{}, error) { - i := payload.(int) - - if i == 3 { - return nil, ret - } - return nil, nil - } - payloads := []interface{}{1, 2, 3, 4, 5} - - msg, funcErrs, err := p.RunJobs(context.Background(), payloads, fn) - assert.Nil(t, msg) - assert.Nil(t, err) - assert.Equal(t, ret, multierr.Combine(funcErrs...)) - goleak.VerifyNone(t, opts) - - p.Shutdown() - goleak.VerifyNone(t, prePoolOpts) -} - -func TestMultipleErrors(t *testing.T) { - prePoolOpts := goleak.IgnoreCurrent() - - p := NewPool(&Config{ - MaxWorkers: 10, - QueueDepth: 10, - }) - opts := goleak.IgnoreCurrent() - - ret := fmt.Errorf("blerg") - fn := func(ctx context.Context, payload interface{}) (interface{}, error) { - return nil, ret - } - payloads := []interface{}{1, 2, 3, 4, 5} - - var expErr []error - for range payloads { - expErr = append(expErr, ret) - } - - msg, funcErrs, err := p.RunJobs(context.Background(), payloads, fn) - assert.Nil(t, msg) - assert.NoError(t, err) - assert.Equal(t, expErr, funcErrs) - goleak.VerifyNone(t, opts) - - p.Shutdown() - goleak.VerifyNone(t, prePoolOpts) -} - -func TestTooManyJobs(t *testing.T) { - prePoolOpts := goleak.IgnoreCurrent() - - p := NewPool(&Config{ - MaxWorkers: 10, - QueueDepth: 3, - }) - opts := goleak.IgnoreCurrent() - - fn := func(ctx context.Context, payload interface{}) (interface{}, error) { - return nil, nil - } - payloads := []interface{}{1, 2, 3, 4, 5} - - msg, funcErrs, err := p.RunJobs(context.Background(), payloads, fn) - assert.Nil(t, msg) - assert.Nil(t, funcErrs) - assert.Error(t, err) - goleak.VerifyNone(t, opts) - - p.Shutdown() - goleak.VerifyNone(t, prePoolOpts) -} - -func TestOneWorker(t *testing.T) { - prePoolOpts := goleak.IgnoreCurrent() - - p := NewPool(&Config{ - MaxWorkers: 1, - QueueDepth: 10, - }) - opts := goleak.IgnoreCurrent() - - ret := []byte{0x01, 0x02, 0x03} - fn := func(ctx context.Context, payload interface{}) (interface{}, error) { - i := payload.(int) - - if i == 3 { - return ret, nil - } - return nil, nil - } - payloads := []interface{}{1, 2, 3, 4, 5} - - msg, funcErrs, err := p.RunJobs(context.Background(), payloads, fn) - assert.NoError(t, err) - assert.Nil(t, funcErrs) - require.Len(t, msg, 1) - assert.Equal(t, ret, msg[0]) - goleak.VerifyNone(t, opts) - - p.Shutdown() - goleak.VerifyNone(t, prePoolOpts) -} - -func TestGoingHam(t *testing.T) { - prePoolOpts := goleak.IgnoreCurrent() - - p := NewPool(&Config{ - MaxWorkers: 1000, - QueueDepth: 10000, - }) - opts := goleak.IgnoreCurrent() - - wg := &sync.WaitGroup{} - - for i := 0; i < 1000; i++ { - wg.Add(1) - go func() { - ret := []byte{0x01, 0x03, 0x04} - fn := func(ctx context.Context, payload interface{}) (interface{}, error) { - i := payload.(int) - - time.Sleep(time.Duration(rand.Uint32()%100) * time.Millisecond) - if i == 5 { - return ret, nil - } - return nil, nil +func TestPoolGet(t *testing.T) { + testPool := New(5, 2, 7) + cases := []struct { + size int + expectedCap int + tooLarge bool + }{ + { // under the smallest pool size, should return an unaligned slice + size: 3, + expectedCap: 3, + }, + { // minBucket is exclusive. 5 is technically an unaligned slice + size: 5, + expectedCap: 5, + }, + { + size: 6, + expectedCap: 12, + }, + { + size: 12, + expectedCap: 12, + }, + { + size: 15, + expectedCap: 19, + }, + { // over the largest pool size, should return an unaligned slice + size: 20, + expectedCap: 20, + tooLarge: true, + }, + } + for _, c := range cases { + for i := 0; i < 10; i++ { + ret := testPool.Get(c.size) + require.Equal(t, c.expectedCap, cap(ret)) + putBucket := testPool.Put(ret) + + if !c.tooLarge { + require.Equal(t, testPool.bucketFor(cap(ret)), putBucket) } - payloads := []interface{}{1, 2, 3, 4, 5} - - msg, funcErrs, err := p.RunJobs(context.Background(), payloads, fn) - assert.NoError(t, err) - assert.Nil(t, funcErrs) - require.Len(t, msg, 1) - assert.Equal(t, ret, msg[0]) - wg.Done() - }() - } - - wg.Wait() - goleak.VerifyNone(t, opts) - - p.Shutdown() - goleak.VerifyNone(t, prePoolOpts) -} - -func TestOverloadingASmallPool(t *testing.T) { - prePoolOpts := goleak.IgnoreCurrent() - - p := NewPool(&Config{ - MaxWorkers: 1, - QueueDepth: 11, - }) - opts := goleak.IgnoreCurrent() - - wg := &sync.WaitGroup{} - - for i := 0; i < 50; i++ { - wg.Add(1) - go func() { - fn := func(ctx context.Context, payload interface{}) (interface{}, error) { - time.Sleep(time.Duration(rand.Uint32()%100) * time.Millisecond) - return nil, nil - } - payloads := []interface{}{1, 2} - _, _, _ = p.RunJobs(context.Background(), payloads, fn) - - wg.Done() - }() - } - - wg.Wait() - goleak.VerifyNone(t, opts) - - p.Shutdown() - goleak.VerifyNone(t, prePoolOpts) -} - -func TestShutdown(t *testing.T) { - prePoolOpts := goleak.IgnoreCurrent() - p := NewPool(&Config{ - MaxWorkers: 1, - QueueDepth: 10, - }) - - ret := []byte{0x01, 0x03, 0x04} - fn := func(ctx context.Context, payload interface{}) (interface{}, error) { - i := payload.(int) - - if i == 3 { - return ret, nil } - return nil, nil } - payloads := []interface{}{1, 2, 3, 4, 5, 1, 2, 3, 4, 5, 1, 2, 3, 4, 5, 1, 2, 3, 4, 5, 1, 2, 3, 4, 5, 1, 2, 3, 4, 5} - _, _, _ = p.RunJobs(context.Background(), payloads, fn) - p.Shutdown() - goleak.VerifyNone(t, prePoolOpts) - - opts := goleak.IgnoreCurrent() - msg, _, err := p.RunJobs(context.Background(), payloads, fn) - assert.Nil(t, msg) - assert.Error(t, err) - goleak.VerifyNone(t, opts) } -func TestDataEncodings(t *testing.T) { - prePoolOpts := goleak.IgnoreCurrent() +func TestPoolSlicesAreAlwaysLargeEnough(t *testing.T) { + testPool := New(100, 200, 5) - p := NewPool(&Config{ - MaxWorkers: 10, - QueueDepth: 10, - }) - opts := goleak.IgnoreCurrent() + for i := 0; i < 10000; i++ { + size := rand.Intn(1000) + externalSlice := make([]byte, 0, size) + testPool.Put(externalSlice) - ret := []byte{0x01, 0x02} - fn := func(ctx context.Context, payload interface{}) (interface{}, error) { - return ret, nil - } - payloads := []interface{}{1, 2, 3, 4, 5} + size = rand.Intn(1000) + ret := testPool.Get(size) - msg, funcErrs, err := p.RunJobs(context.Background(), payloads, fn) - require.Len(t, msg, 5) - for i := range payloads { - assert.Equal(t, ret, msg[i]) - } - - assert.Nil(t, err) - assert.Nil(t, funcErrs) - goleak.VerifyNone(t, opts) - - p.Shutdown() - goleak.VerifyNone(t, prePoolOpts) -} - -func TestCancellation(t *testing.T) { - prePoolOpts := goleak.IgnoreCurrent() - p := NewPool(&Config{ - MaxWorkers: 1, - QueueDepth: 10, - }) - opts := goleak.IgnoreCurrent() - - callCount := 0 - cancelAfter := 2 + require.True(t, cap(ret) >= size, "cap: %d, size: %d", cap(ret), size) - ctx, cancel := context.WithCancel(context.Background()) - - ret := []byte{0x01, 0x02} - fn := func(_ context.Context, _ interface{}) (interface{}, error) { - callCount++ - if callCount >= cancelAfter { - cancel() - } - return ret, nil + testPool.Put(ret) } - payloads := []interface{}{1, 2, 3, 4, 5, 6, 7} +} - results, funcErrs, err := p.RunJobs(ctx, payloads, fn) - require.Len(t, results, 2) - for i := range results { - assert.Equal(t, ret, results[i]) +func TestBucketFor(t *testing.T) { + testPool := New(5, 10, 5) + cases := []struct { + size int + expected int + }{ + { + size: 0, + expected: -1, + }, + { + size: 5, + expected: -1, + }, + { + size: 6, + expected: 0, + }, + { + size: 10, + expected: 0, + }, + { + size: 11, + expected: 1, + }, + { + size: 15, + expected: 1, + }, + { + size: 16, + expected: 2, + }, + } + for _, c := range cases { + ret := testPool.bucketFor(c.size) + require.Equal(t, c.expected, ret, "size: %d", c.size) } - - assert.Nil(t, err) - assert.Nil(t, funcErrs) - goleak.VerifyNone(t, opts) - - p.Shutdown() - goleak.VerifyNone(t, prePoolOpts) }