From e531c1b1a9905c73a62d9eec0121147137fa568d Mon Sep 17 00:00:00 2001 From: Piotr Gwizdala <17101802+thampiotr@users.noreply.github.com> Date: Tue, 14 Nov 2023 10:17:56 +0000 Subject: [PATCH 1/2] Benchmark for the queue --- pkg/flow/internal/worker/worker_pool.go | 5 +- pkg/flow/internal/worker/worker_pool_test.go | 68 ++++++++++++++++++++ 2 files changed, 72 insertions(+), 1 deletion(-) diff --git a/pkg/flow/internal/worker/worker_pool.go b/pkg/flow/internal/worker/worker_pool.go index 25a8eeebe536..bcd684475876 100644 --- a/pkg/flow/internal/worker/worker_pool.go +++ b/pkg/flow/internal/worker/worker_pool.go @@ -166,7 +166,10 @@ func (w *workQueue) emitNextTask() { return } - // Remove the task from waiting and add it to running set + // Remove the task from waiting and add it to running set. + // NOTE: Even though we remove an element from the middle of a collection, we use a slice instead of a linked list. + // This code is NOT identified as a performance hot spot and given that in large agents we observe max number of + // tasks queued to be ~10, the slice is actually faster because it does not allocate memory. See BenchmarkQueue. w.waitingOrder = append(w.waitingOrder[:index], w.waitingOrder[index+1:]...) task = w.waiting[key] delete(w.waiting, key) diff --git a/pkg/flow/internal/worker/worker_pool_test.go b/pkg/flow/internal/worker/worker_pool_test.go index 3a594cba2622..8dcceca8910e 100644 --- a/pkg/flow/internal/worker/worker_pool_test.go +++ b/pkg/flow/internal/worker/worker_pool_test.go @@ -1,6 +1,7 @@ package worker import ( + "container/list" "fmt" "testing" "time" @@ -279,3 +280,70 @@ func TestWorkerPool(t *testing.T) { }) }) } + +func BenchmarkQueue(b *testing.B) { + /* The slice-based implementation is faster when queue size is less than 300 elements, as it doesn't allocate: + + BenchmarkQueue/slice_10_elements-8 266251536 4.427 ns/op 0 B/op 0 allocs/op + BenchmarkQueue/list_10_elements-8 38036170 31.20 ns/op 56 B/op 1 allocs/op + BenchmarkQueue/slice_100_elements-8 85725766 14.29 ns/op 0 B/op 0 allocs/op + BenchmarkQueue/list_100_elements-8 37889650 31.17 ns/op 56 B/op 1 allocs/op + BenchmarkQueue/slice_300_elements-8 40504732 29.55 ns/op 0 B/op 0 allocs/op + BenchmarkQueue/list_300_elements-8 38032604 31.20 ns/op 56 B/op 1 allocs/op + BenchmarkQueue/slice_1000_elements-8 12571960 95.50 ns/op 0 B/op 0 allocs/op + BenchmarkQueue/list_1000_elements-8 37922080 31.07 ns/op 56 B/op 2 allocs/op + BenchmarkQueue/slice_10000_elements-8 1000000 1026 ns/op 0 B/op 0 allocs/op + BenchmarkQueue/list_10000_elements-8 34379028 34.24 ns/op 56 B/op 1 allocs/op + */ + + queueSizes := []int{10, 100, 300, 1000, 10000} + for _, queueSize := range queueSizes { + elementsStr := fmt.Sprintf("%d elements", queueSize) + b.Run("slice "+elementsStr, func(b *testing.B) { + var slice []int + for i := 0; i < queueSize; i++ { + slice = append(slice, i) + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + // simulate what the `workQueue` does with its `waitingOrder` field. + + // add to queue + slice = append(slice, i) + + // iterate to an arbitrary element + ind := 0 + for ; ind < 5; ind++ { + _ = slice[ind] + } + + // remove it from the queue + slice = append(slice[:ind], slice[ind+1:]...) + } + }) + + b.Run("list "+elementsStr, func(b *testing.B) { + l := list.New() + for i := 0; i < queueSize; i++ { + l.PushBack(i) + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + // simulate what the `workQueue` does with its `waitingOrder` field. + + // add to queue + l.PushBack(i) + + // iterate to an arbitrary element + toRemove := l.Front() + for j := 0; j < 5; j++ { + toRemove = toRemove.Next() + } + // remove it from the queue using copy + l.Remove(toRemove) + } + }) + } +} From 0ffae90c68e16d6a6bb4ee862f671ddfcdd39674 Mon Sep 17 00:00:00 2001 From: Piotr Gwizdala <17101802+thampiotr@users.noreply.github.com> Date: Tue, 14 Nov 2023 11:00:04 +0000 Subject: [PATCH 2/2] update test --- pkg/flow/internal/worker/worker_pool_test.go | 36 ++++++++++---------- 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/pkg/flow/internal/worker/worker_pool_test.go b/pkg/flow/internal/worker/worker_pool_test.go index 8dcceca8910e..7b9b1b6a1e99 100644 --- a/pkg/flow/internal/worker/worker_pool_test.go +++ b/pkg/flow/internal/worker/worker_pool_test.go @@ -282,27 +282,27 @@ func TestWorkerPool(t *testing.T) { } func BenchmarkQueue(b *testing.B) { - /* The slice-based implementation is faster when queue size is less than 300 elements, as it doesn't allocate: - - BenchmarkQueue/slice_10_elements-8 266251536 4.427 ns/op 0 B/op 0 allocs/op - BenchmarkQueue/list_10_elements-8 38036170 31.20 ns/op 56 B/op 1 allocs/op - BenchmarkQueue/slice_100_elements-8 85725766 14.29 ns/op 0 B/op 0 allocs/op - BenchmarkQueue/list_100_elements-8 37889650 31.17 ns/op 56 B/op 1 allocs/op - BenchmarkQueue/slice_300_elements-8 40504732 29.55 ns/op 0 B/op 0 allocs/op - BenchmarkQueue/list_300_elements-8 38032604 31.20 ns/op 56 B/op 1 allocs/op - BenchmarkQueue/slice_1000_elements-8 12571960 95.50 ns/op 0 B/op 0 allocs/op - BenchmarkQueue/list_1000_elements-8 37922080 31.07 ns/op 56 B/op 2 allocs/op - BenchmarkQueue/slice_10000_elements-8 1000000 1026 ns/op 0 B/op 0 allocs/op - BenchmarkQueue/list_10000_elements-8 34379028 34.24 ns/op 56 B/op 1 allocs/op + /* The slice-based implementation is faster when queue size is less than 100 elements, as it doesn't allocate: + + BenchmarkQueue/slice_10_elements-8 179793877 7.119 ns/op 0 B/op 0 allocs/op + BenchmarkQueue/list_10_elements-8 48129548 24.75 ns/op 48 B/op 1 allocs/op + BenchmarkQueue/slice_50_elements-8 100000000 11.52 ns/op 0 B/op 0 allocs/op + BenchmarkQueue/list_50_elements-8 52150766 22.81 ns/op 48 B/op 1 allocs/op + BenchmarkQueue/slice_100_elements-8 57941505 20.71 ns/op 0 B/op 0 allocs/op + BenchmarkQueue/list_100_elements-8 51232423 22.95 ns/op 48 B/op 1 allocs/op + BenchmarkQueue/slice_500_elements-8 12610141 95.32 ns/op 0 B/op 0 allocs/op + BenchmarkQueue/list_500_elements-8 51342938 22.84 ns/op 48 B/op 1 allocs/op + BenchmarkQueue/slice_1000_elements-8 6416760 187.6 ns/op 0 B/op 0 allocs/op + BenchmarkQueue/list_1000_elements-8 51942148 22.92 ns/op 48 B/op 1 allocs/op */ - queueSizes := []int{10, 100, 300, 1000, 10000} + queueSizes := []int{10, 50, 100, 500, 1000} for _, queueSize := range queueSizes { elementsStr := fmt.Sprintf("%d elements", queueSize) b.Run("slice "+elementsStr, func(b *testing.B) { - var slice []int + var slice []string for i := 0; i < queueSize; i++ { - slice = append(slice, i) + slice = append(slice, fmt.Sprintf("some.component.name.%d", i)) } b.ResetTimer() @@ -310,7 +310,7 @@ func BenchmarkQueue(b *testing.B) { // simulate what the `workQueue` does with its `waitingOrder` field. // add to queue - slice = append(slice, i) + slice = append(slice, "some.component.name") // iterate to an arbitrary element ind := 0 @@ -326,7 +326,7 @@ func BenchmarkQueue(b *testing.B) { b.Run("list "+elementsStr, func(b *testing.B) { l := list.New() for i := 0; i < queueSize; i++ { - l.PushBack(i) + l.PushBack(fmt.Sprintf("some.component.name.%d", i)) } b.ResetTimer() @@ -334,7 +334,7 @@ func BenchmarkQueue(b *testing.B) { // simulate what the `workQueue` does with its `waitingOrder` field. // add to queue - l.PushBack(i) + l.PushBack("some.component.name") // iterate to an arbitrary element toRemove := l.Front()