From e531c1b1a9905c73a62d9eec0121147137fa568d Mon Sep 17 00:00:00 2001 From: Piotr Gwizdala <17101802+thampiotr@users.noreply.github.com> Date: Tue, 14 Nov 2023 10:17:56 +0000 Subject: [PATCH] Benchmark for the queue --- pkg/flow/internal/worker/worker_pool.go | 5 +- pkg/flow/internal/worker/worker_pool_test.go | 68 ++++++++++++++++++++ 2 files changed, 72 insertions(+), 1 deletion(-) diff --git a/pkg/flow/internal/worker/worker_pool.go b/pkg/flow/internal/worker/worker_pool.go index 25a8eeebe536..bcd684475876 100644 --- a/pkg/flow/internal/worker/worker_pool.go +++ b/pkg/flow/internal/worker/worker_pool.go @@ -166,7 +166,10 @@ func (w *workQueue) emitNextTask() { return } - // Remove the task from waiting and add it to running set + // Remove the task from waiting and add it to running set. + // NOTE: Even though we remove an element from the middle of a collection, we use a slice instead of a linked list. + // This code is NOT identified as a performance hot spot and given that in large agents we observe max number of + // tasks queued to be ~10, the slice is actually faster because it does not allocate memory. See BenchmarkQueue. w.waitingOrder = append(w.waitingOrder[:index], w.waitingOrder[index+1:]...) task = w.waiting[key] delete(w.waiting, key) diff --git a/pkg/flow/internal/worker/worker_pool_test.go b/pkg/flow/internal/worker/worker_pool_test.go index 3a594cba2622..8dcceca8910e 100644 --- a/pkg/flow/internal/worker/worker_pool_test.go +++ b/pkg/flow/internal/worker/worker_pool_test.go @@ -1,6 +1,7 @@ package worker import ( + "container/list" "fmt" "testing" "time" @@ -279,3 +280,70 @@ func TestWorkerPool(t *testing.T) { }) }) } + +func BenchmarkQueue(b *testing.B) { + /* The slice-based implementation is faster when queue size is less than 300 elements, as it doesn't allocate: + + BenchmarkQueue/slice_10_elements-8 266251536 4.427 ns/op 0 B/op 0 allocs/op + BenchmarkQueue/list_10_elements-8 38036170 31.20 ns/op 56 B/op 1 allocs/op + BenchmarkQueue/slice_100_elements-8 85725766 14.29 ns/op 0 B/op 0 allocs/op + BenchmarkQueue/list_100_elements-8 37889650 31.17 ns/op 56 B/op 1 allocs/op + BenchmarkQueue/slice_300_elements-8 40504732 29.55 ns/op 0 B/op 0 allocs/op + BenchmarkQueue/list_300_elements-8 38032604 31.20 ns/op 56 B/op 1 allocs/op + BenchmarkQueue/slice_1000_elements-8 12571960 95.50 ns/op 0 B/op 0 allocs/op + BenchmarkQueue/list_1000_elements-8 37922080 31.07 ns/op 56 B/op 2 allocs/op + BenchmarkQueue/slice_10000_elements-8 1000000 1026 ns/op 0 B/op 0 allocs/op + BenchmarkQueue/list_10000_elements-8 34379028 34.24 ns/op 56 B/op 1 allocs/op + */ + + queueSizes := []int{10, 100, 300, 1000, 10000} + for _, queueSize := range queueSizes { + elementsStr := fmt.Sprintf("%d elements", queueSize) + b.Run("slice "+elementsStr, func(b *testing.B) { + var slice []int + for i := 0; i < queueSize; i++ { + slice = append(slice, i) + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + // simulate what the `workQueue` does with its `waitingOrder` field. + + // add to queue + slice = append(slice, i) + + // iterate to an arbitrary element + ind := 0 + for ; ind < 5; ind++ { + _ = slice[ind] + } + + // remove it from the queue + slice = append(slice[:ind], slice[ind+1:]...) + } + }) + + b.Run("list "+elementsStr, func(b *testing.B) { + l := list.New() + for i := 0; i < queueSize; i++ { + l.PushBack(i) + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + // simulate what the `workQueue` does with its `waitingOrder` field. + + // add to queue + l.PushBack(i) + + // iterate to an arbitrary element + toRemove := l.Front() + for j := 0; j < 5; j++ { + toRemove = toRemove.Next() + } + // remove it from the queue using copy + l.Remove(toRemove) + } + }) + } +}