Skip to content

Commit

Permalink
Merge pull request #338 from bonitoo-io/fix/test_retry_timeout
Browse files Browse the repository at this point in the history
fix(test): use higher timeouts on Windows
  • Loading branch information
vlastahajek committed Jun 24, 2022
2 parents af34012 + e070de9 commit 105bf8c
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 15 deletions.
10 changes: 8 additions & 2 deletions api/write_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package api
import (
"fmt"
"math"
"runtime"
"strings"
"sync"
"testing"
Expand Down Expand Up @@ -169,7 +170,12 @@ func TestWriteErrorCallback(t *testing.T) {
Code: "write",
Message: "error",
})
writeAPI := NewWriteAPI("my-org", "my-bucket", service, write.DefaultOptions().SetBatchSize(1).SetRetryInterval(1))
// sleep takes at least more than 10ms (sometimes 15ms) on Windows https://github.com/golang/go/issues/44343
retryInterval := uint(1)
if runtime.GOOS == "windows" {
retryInterval = 20
}
writeAPI := NewWriteAPI("my-org", "my-bucket", service, write.DefaultOptions().SetBatchSize(1).SetRetryInterval(retryInterval))
writeAPI.SetWriteFailedCallback(func(batch string, error http.Error, retryAttempts uint) bool {
return retryAttempts < 2
})
Expand All @@ -178,7 +184,7 @@ func TestWriteErrorCallback(t *testing.T) {
for i, j := 0, 0; i < 6; i++ {
writeAPI.WritePoint(points[i])
writeAPI.waitForFlushing()
w := int(math.Pow(5, float64(j)))
w := int(math.Pow(5, float64(j)) * float64(retryInterval))
fmt.Printf("Waiting %dms\n", w)
<-time.After(time.Duration(w) * time.Millisecond)
j++
Expand Down
4 changes: 2 additions & 2 deletions internal/write/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func NewService(org string, bucket string, httpService http2.Service, options *w
writeOptions: options,
retryQueue: newQueue(int(retryBufferLimit)),
retryExponentialBase: 2,
retryDelay: 0,
retryDelay: options.RetryInterval(),
retryAttempts: 0,
}
}
Expand Down Expand Up @@ -196,7 +196,7 @@ func (w *Service) HandleWrite(ctx context.Context, batch *Batch) error {
}

w.retryDelay = w.writeOptions.RetryInterval()
w.retryDelay = 0
w.retryAttempts = 0
if retrying && !batchToWrite.Evicted {
w.retryQueue.pop()
}
Expand Down
30 changes: 19 additions & 11 deletions internal/write/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,35 +136,41 @@ func TestBufferOverwrite(t *testing.T) {
log.Log.SetLogLevel(log.DebugLevel)
ilog.SetFlags(ilog.Ldate | ilog.Lmicroseconds)
hs := test.NewTestService(t, "http://localhost:8086")
// sleep takes at least more than 10ms (sometimes 15ms) on Windows https://github.com/golang/go/issues/44343
baseRetryInterval := uint(1)
if runtime.GOOS == "windows" {
baseRetryInterval = 20
}
// Buffer limit 15000, bach is 5000 => buffer for 3 batches
opts := write.DefaultOptions().SetRetryInterval(1).SetRetryBufferLimit(15000)
opts := write.DefaultOptions().SetRetryInterval(baseRetryInterval).SetRetryBufferLimit(15000)
ctx := context.Background()
srv := NewService("my-org", "my-bucket", hs, opts)
// Set permanent reply error to force writes fail and retry
hs.SetReplyError(&http.Error{
StatusCode: 429,
})
// This batch will fail and it be added to retry queue
// This batch will fail and it will be added to retry queue
b1 := NewBatch("1\n", opts.MaxRetryTime())
err := srv.HandleWrite(ctx, b1)
assert.NotNil(t, err)
assert.Equal(t, uint(1), srv.retryDelay)
//assert.Equal(t, uint(baseRetryInterval), srv.retryDelay)
assertBetween(t, srv.retryDelay, baseRetryInterval, baseRetryInterval*2)
assert.Equal(t, 1, srv.retryQueue.list.Len())

<-time.After(time.Millisecond * time.Duration(srv.retryDelay))
b2 := NewBatch("2\n", opts.MaxRetryTime())
// First batch will be tried to write again and this one will added to retry queue
err = srv.HandleWrite(ctx, b2)
assert.NotNil(t, err)
assertBetween(t, srv.retryDelay, 2, 4)
assertBetween(t, srv.retryDelay, baseRetryInterval*2, baseRetryInterval*4)
assert.Equal(t, 2, srv.retryQueue.list.Len())

<-time.After(time.Millisecond * time.Duration(srv.retryDelay))
b3 := NewBatch("3\n", opts.MaxRetryTime())
// First batch will be tried to write again and this one will added to retry queue
err = srv.HandleWrite(ctx, b3)
assert.NotNil(t, err)
assertBetween(t, srv.retryDelay, 4, 8)
assertBetween(t, srv.retryDelay, baseRetryInterval*4, baseRetryInterval*8)
assert.Equal(t, 3, srv.retryQueue.list.Len())

// Write early and overwrite
Expand All @@ -179,15 +185,13 @@ func TestBufferOverwrite(t *testing.T) {
assert.Equal(t, 3, srv.retryQueue.list.Len())

// Overwrite
// TODO check time.Duration(srv.RetryDelay))
<-time.After(time.Millisecond * time.Duration(srv.retryDelay) / 2)
b5 := NewBatch("5\n", opts.MaxRetryTime())
// Second batch will be tried to write again
// However, write will fail and as new batch is added to retry queue
// the second batch will be discarded
err = srv.HandleWrite(ctx, b5)
assert.Nil(t, err) // No error should be returned, because no write was attempted (still waiting for retryDelay to expire)
//TODO assertBetween(t, srv.RetryDelay, 2, 4)
assert.Equal(t, 3, srv.retryQueue.list.Len())

<-time.After(time.Millisecond * time.Duration(srv.retryDelay))
Expand Down Expand Up @@ -511,10 +515,14 @@ func TestRetryIntervalAccumulation(t *testing.T) {

// Setup test service with scenario's configuration
hs := test.NewTestService(t, "http://localhost:8086")
baseRetryInterval := uint(20)
if runtime.GOOS == "windows" {
baseRetryInterval = 30
}
opts := write.DefaultOptions().
SetRetryInterval(20).
SetRetryInterval(baseRetryInterval).
SetMaxRetryInterval(300).
SetMaxRetryTime(100)
SetMaxRetryTime(baseRetryInterval * 5)
ctx := context.Background()
srv := NewService("my-org", "my-bucket", hs, opts)
writeInterval := time.Duration(opts.RetryInterval()) * time.Millisecond
Expand All @@ -523,7 +531,7 @@ func TestRetryIntervalAccumulation(t *testing.T) {
hs.SetReplyError(&http.Error{StatusCode: 429})

lastInterval := uint(0)
assert.Equal(t, uint(0), srv.retryDelay) // Should initialize to zero
assert.Equal(t, uint(0), srv.retryAttempts) // Should initialize to zero
i := 1
for ; i <= 45; i++ {
b := NewBatch(fmt.Sprintf("%d\n", i), opts.MaxRetryTime())
Expand Down Expand Up @@ -572,7 +580,7 @@ func TestRetryIntervalAccumulation(t *testing.T) {
err := srv.HandleWrite(ctx, b)
assert.Nil(t, err)
assert.Equal(t, 0, srv.retryQueue.list.Len())
assert.Equal(t, srv.retryDelay, uint(0)) // Should reset to zero
assert.Equal(t, srv.retryAttempts, uint(0)) // Should reset to zero

// Ensure proper batches got written to server
require.Len(t, hs.Lines(), 5)
Expand Down

0 comments on commit 105bf8c

Please sign in to comment.