Skip to content

Commit

Permalink
Add benchmarks (#24)
Browse files Browse the repository at this point in the history
  • Loading branch information
destel authored Jun 19, 2024
1 parent 530b4b6 commit 79cc3b9
Show file tree
Hide file tree
Showing 3 changed files with 182 additions and 98 deletions.
181 changes: 181 additions & 0 deletions benchmark_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
package rill

import (
"sync"
"testing"
"time"

"github.com/destel/rill/internal/th"
)

const benchmarkInputSize = 100000
const benchmarkWorkDuration = 10 * time.Microsecond

func runBenchmark[B any](b *testing.B, name string, body func(in <-chan Try[int]) <-chan B) {
b.Run(name, func(b *testing.B) {
for i := 0; i < b.N; i++ {
b.StopTimer()

in := make(chan Try[int])
done := make(chan struct{})

go func() {
defer close(done)
out := body(in)

if out != nil {
Drain(out)
}
}()

// Give body a some time to spawn goroutines
time.Sleep(100 * time.Millisecond)

b.StartTimer()

// write to input
for k := 0; k < benchmarkInputSize; k++ {
in <- Try[int]{Value: k}
}
close(in)

// wait for body to finish
<-done
b.StopTimer()
}
})
}

func busySleep(d time.Duration) {
if d == 0 {
return
}

start := time.Now()
for time.Since(start) < d {
}
}

func BenchmarkBasicForLoop(b *testing.B) {
for i := 0; i < b.N; i++ {
for k := 0; k < benchmarkInputSize; k++ {
busySleep(benchmarkWorkDuration)
}
}
}

func BenchmarkBasicForLoopWithSleep(b *testing.B) {
for i := 0; i < b.N; i++ {
for k := 0; k < benchmarkInputSize; k++ {
time.Sleep(benchmarkWorkDuration)
}
}
}

func BenchmarkWaitGroup(b *testing.B) {
for _, n := range []int{1, 2, 4, 8} {
runBenchmark(b, th.Name(n), func(in <-chan Try[int]) <-chan Try[int] {
var wg sync.WaitGroup
wg.Add(n)
for i := 0; i < n; i++ {
go func() {
defer wg.Done()
for range in {
busySleep(benchmarkWorkDuration)
}
}()
}
wg.Wait()
return nil
})
}
}

func BenchmarkWaitGroupWithSleep(b *testing.B) {
for _, n := range []int{1, 2, 4, 8} {
runBenchmark(b, th.Name(n), func(in <-chan Try[int]) <-chan Try[int] {
var wg sync.WaitGroup
wg.Add(n)
for i := 0; i < n; i++ {
go func() {
defer wg.Done()
for range in {
time.Sleep(benchmarkWorkDuration)
}
}()
}
wg.Wait()
return nil
})
}
}

func BenchmarkForEach(b *testing.B) {
for _, n := range []int{1, 2, 4, 8} {
runBenchmark(b, th.Name(n), func(in <-chan Try[int]) <-chan Try[int] {
ForEach(in, n, func(x int) error {
busySleep(benchmarkWorkDuration)
return nil
})
return nil
})
}
}

func BenchmarkForEachWithSleep(b *testing.B) {
for _, n := range []int{1, 2, 4, 8} {
runBenchmark(b, th.Name(n), func(in <-chan Try[int]) <-chan Try[int] {
ForEach(in, n, func(x int) error {
time.Sleep(benchmarkWorkDuration)
return nil
})
return nil
})
}
}

func BenchmarkMap(b *testing.B) {
for _, n := range []int{1, 2, 4, 8} {
runBenchmark(b, th.Name(n), func(in <-chan Try[int]) <-chan Try[int] {
return Map(in, n, func(x int) (int, error) {
busySleep(benchmarkWorkDuration)
return x, nil
})
})
}
}

func BenchmarkMapWithSleep(b *testing.B) {
for _, n := range []int{1, 2, 4, 8} {
runBenchmark(b, th.Name(n), func(in <-chan Try[int]) <-chan Try[int] {
return Map(in, n, func(x int) (int, error) {
time.Sleep(benchmarkWorkDuration)
return x, nil
})
})
}
}

func BenchmarkReduce(b *testing.B) {
for _, n := range []int{1, 2, 4, 8} {
runBenchmark(b, th.Name(n), func(in <-chan Try[int]) <-chan int {
Reduce(in, n, func(x, y int) (int, error) {
busySleep(benchmarkWorkDuration)
return x, nil
})
return nil
})
}
}

func BenchmarkReduceWithSleep(b *testing.B) {
for _, n := range []int{1, 2, 4, 8} {
runBenchmark(b, th.Name(n), func(in <-chan Try[int]) <-chan int {
Reduce(in, n, func(x, y int) (int, error) {
time.Sleep(benchmarkWorkDuration)
return x, nil
})
return nil
})
}
}
2 changes: 1 addition & 1 deletion consume_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func TestForEach(t *testing.T) {
if cnt < 100 {
t.Errorf("expected at least 100 iterations to complete")
}
if cnt > 150 {
if cnt == 1000 {
t.Errorf("early exit did not happen")
}

Expand Down
97 changes: 0 additions & 97 deletions internal/core/reduce_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,100 +136,3 @@ func TestMapReduce(t *testing.T) {
}
}
}

func busySleep(d time.Duration) {
if d == 0 {
return
}

start := time.Now()
for time.Since(start) < d {
}
}

func BenchmarkReduce(b *testing.B) {
const chanSize = 100000
const opDuration = 5 * time.Microsecond

for _, n := range []int{1, 2, 3, 4, 5, 6, 7, 8} {
b.Run(th.Name(n), func(b *testing.B) {
b.StopTimer()

for i := 0; i < b.N; i++ {
in := make(chan int, chanSize)
out := make(chan int, 1)

go func() {
tmp, _ := Reduce(in, n, func(x, y int) int {
busySleep(opDuration)
return x + y
})
out <- tmp
}()

// Give reduce some time to setup the pipeline
time.Sleep(10 * time.Millisecond)

b.StartTimer()

for k := 0; k < chanSize; k++ {
in <- 0
}
close(in)

<-out

b.StopTimer()
}
})
}
}

func BenchmarkMapReduce(b *testing.B) {
const chanSize = 100000
const mapOpDuration = 1 * time.Microsecond
const constReduceOpDuration = 5 * time.Microsecond

for _, nm := range []int{1, 4, 8} {
for _, nr := range []int{1, 2, 3, 4, 5, 6, 7, 8} {
b.Run(th.Name(nm, nr), func(b *testing.B) {
b.StopTimer()

for i := 0; i < b.N; i++ {
in := make(chan string, chanSize)
out := make(chan map[string]int, 1)

go func() {
tmp := MapReduce(in,
nm, func(x string) (string, int) {
busySleep(mapOpDuration)
return x, 1
},
nr, func(x, y int) int {
busySleep(constReduceOpDuration)
return x + y
},
)
out <- tmp
}()

// Give reduce some time to setup the pipeline
time.Sleep(10 * time.Millisecond)

b.StartTimer()

for k := 0; k < chanSize; k += 4 {
th.Send(in, "foo", "bar", "baz", "foo")
}
close(in)

<-out

b.StopTimer()
}
})
}
fmt.Println("")
}

}

0 comments on commit 79cc3b9

Please sign in to comment.