Skip to content

Commit 3f31384

Browse files
authored
Benchmark for the queue (#5765)
* Benchmark for the queue * update test
1 parent 97b6b8f commit 3f31384

File tree

2 files changed

+72
-1
lines changed

2 files changed

+72
-1
lines changed

pkg/flow/internal/worker/worker_pool.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,10 @@ func (w *workQueue) emitNextTask() {
166166
return
167167
}
168168

169-
// Remove the task from waiting and add it to running set
169+
// Remove the task from waiting and add it to running set.
170+
// NOTE: Even though we remove an element from the middle of a collection, we use a slice instead of a linked list.
171+
// This code is NOT identified as a performance hot spot and given that in large agents we observe max number of
172+
// tasks queued to be ~10, the slice is actually faster because it does not allocate memory. See BenchmarkQueue.
170173
w.waitingOrder = append(w.waitingOrder[:index], w.waitingOrder[index+1:]...)
171174
task = w.waiting[key]
172175
delete(w.waiting, key)

pkg/flow/internal/worker/worker_pool_test.go

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package worker
22

33
import (
4+
"container/list"
45
"fmt"
56
"testing"
67
"time"
@@ -279,3 +280,70 @@ func TestWorkerPool(t *testing.T) {
279280
})
280281
})
281282
}
283+
284+
func BenchmarkQueue(b *testing.B) {
285+
/* The slice-based implementation is faster when queue size is less than 100 elements, as it doesn't allocate:
286+
287+
BenchmarkQueue/slice_10_elements-8 179793877 7.119 ns/op 0 B/op 0 allocs/op
288+
BenchmarkQueue/list_10_elements-8 48129548 24.75 ns/op 48 B/op 1 allocs/op
289+
BenchmarkQueue/slice_50_elements-8 100000000 11.52 ns/op 0 B/op 0 allocs/op
290+
BenchmarkQueue/list_50_elements-8 52150766 22.81 ns/op 48 B/op 1 allocs/op
291+
BenchmarkQueue/slice_100_elements-8 57941505 20.71 ns/op 0 B/op 0 allocs/op
292+
BenchmarkQueue/list_100_elements-8 51232423 22.95 ns/op 48 B/op 1 allocs/op
293+
BenchmarkQueue/slice_500_elements-8 12610141 95.32 ns/op 0 B/op 0 allocs/op
294+
BenchmarkQueue/list_500_elements-8 51342938 22.84 ns/op 48 B/op 1 allocs/op
295+
BenchmarkQueue/slice_1000_elements-8 6416760 187.6 ns/op 0 B/op 0 allocs/op
296+
BenchmarkQueue/list_1000_elements-8 51942148 22.92 ns/op 48 B/op 1 allocs/op
297+
*/
298+
299+
queueSizes := []int{10, 50, 100, 500, 1000}
300+
for _, queueSize := range queueSizes {
301+
elementsStr := fmt.Sprintf("%d elements", queueSize)
302+
b.Run("slice "+elementsStr, func(b *testing.B) {
303+
var slice []string
304+
for i := 0; i < queueSize; i++ {
305+
slice = append(slice, fmt.Sprintf("some.component.name.%d", i))
306+
}
307+
308+
b.ResetTimer()
309+
for i := 0; i < b.N; i++ {
310+
// simulate what the `workQueue` does with its `waitingOrder` field.
311+
312+
// add to queue
313+
slice = append(slice, "some.component.name")
314+
315+
// iterate to an arbitrary element
316+
ind := 0
317+
for ; ind < 5; ind++ {
318+
_ = slice[ind]
319+
}
320+
321+
// remove it from the queue
322+
slice = append(slice[:ind], slice[ind+1:]...)
323+
}
324+
})
325+
326+
b.Run("list "+elementsStr, func(b *testing.B) {
327+
l := list.New()
328+
for i := 0; i < queueSize; i++ {
329+
l.PushBack(fmt.Sprintf("some.component.name.%d", i))
330+
}
331+
332+
b.ResetTimer()
333+
for i := 0; i < b.N; i++ {
334+
// simulate what the `workQueue` does with its `waitingOrder` field.
335+
336+
// add to queue
337+
l.PushBack("some.component.name")
338+
339+
// iterate to an arbitrary element
340+
toRemove := l.Front()
341+
for j := 0; j < 5; j++ {
342+
toRemove = toRemove.Next()
343+
}
344+
// remove it from the queue using copy
345+
l.Remove(toRemove)
346+
}
347+
})
348+
}
349+
}

0 commit comments

Comments
 (0)