diff --git a/pkg/flow/internal/worker/worker_pool.go b/pkg/flow/internal/worker/worker_pool.go index 25a8eeebe536..bcd684475876 100644 --- a/pkg/flow/internal/worker/worker_pool.go +++ b/pkg/flow/internal/worker/worker_pool.go @@ -166,7 +166,10 @@ func (w *workQueue) emitNextTask() { return } - // Remove the task from waiting and add it to running set + // Remove the task from waiting and add it to running set. + // NOTE: Even though we remove an element from the middle of a collection, we use a slice instead of a linked list. + // This code is NOT identified as a performance hot spot and given that in large agents we observe max number of + // tasks queued to be ~10, the slice is actually faster because it does not allocate memory. See BenchmarkQueue. w.waitingOrder = append(w.waitingOrder[:index], w.waitingOrder[index+1:]...) task = w.waiting[key] delete(w.waiting, key) diff --git a/pkg/flow/internal/worker/worker_pool_test.go b/pkg/flow/internal/worker/worker_pool_test.go index 3a594cba2622..7b9b1b6a1e99 100644 --- a/pkg/flow/internal/worker/worker_pool_test.go +++ b/pkg/flow/internal/worker/worker_pool_test.go @@ -1,6 +1,7 @@ package worker import ( + "container/list" "fmt" "testing" "time" @@ -279,3 +280,70 @@ func TestWorkerPool(t *testing.T) { }) }) } + +func BenchmarkQueue(b *testing.B) { + /* The slice-based implementation is faster when queue size is less than 100 elements, as it doesn't allocate: + + BenchmarkQueue/slice_10_elements-8 179793877 7.119 ns/op 0 B/op 0 allocs/op + BenchmarkQueue/list_10_elements-8 48129548 24.75 ns/op 48 B/op 1 allocs/op + BenchmarkQueue/slice_50_elements-8 100000000 11.52 ns/op 0 B/op 0 allocs/op + BenchmarkQueue/list_50_elements-8 52150766 22.81 ns/op 48 B/op 1 allocs/op + BenchmarkQueue/slice_100_elements-8 57941505 20.71 ns/op 0 B/op 0 allocs/op + BenchmarkQueue/list_100_elements-8 51232423 22.95 ns/op 48 B/op 1 allocs/op + BenchmarkQueue/slice_500_elements-8 12610141 95.32 ns/op 0 B/op 0 allocs/op + BenchmarkQueue/list_500_elements-8 51342938 22.84 ns/op 48 B/op 1 allocs/op + BenchmarkQueue/slice_1000_elements-8 6416760 187.6 ns/op 0 B/op 0 allocs/op + BenchmarkQueue/list_1000_elements-8 51942148 22.92 ns/op 48 B/op 1 allocs/op + */ + + queueSizes := []int{10, 50, 100, 500, 1000} + for _, queueSize := range queueSizes { + elementsStr := fmt.Sprintf("%d elements", queueSize) + b.Run("slice "+elementsStr, func(b *testing.B) { + var slice []string + for i := 0; i < queueSize; i++ { + slice = append(slice, fmt.Sprintf("some.component.name.%d", i)) + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + // simulate what the `workQueue` does with its `waitingOrder` field. + + // add to queue + slice = append(slice, "some.component.name") + + // iterate to an arbitrary element + ind := 0 + for ; ind < 5; ind++ { + _ = slice[ind] + } + + // remove it from the queue + slice = append(slice[:ind], slice[ind+1:]...) + } + }) + + b.Run("list "+elementsStr, func(b *testing.B) { + l := list.New() + for i := 0; i < queueSize; i++ { + l.PushBack(fmt.Sprintf("some.component.name.%d", i)) + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + // simulate what the `workQueue` does with its `waitingOrder` field. + + // add to queue + l.PushBack("some.component.name") + + // iterate to an arbitrary element + toRemove := l.Front() + for j := 0; j < 5; j++ { + toRemove = toRemove.Next() + } + // remove it from the queue using copy + l.Remove(toRemove) + } + }) + } +}