diff --git a/api/write_test.go b/api/write_test.go index f55aaa6d..2095fc55 100644 --- a/api/write_test.go +++ b/api/write_test.go @@ -7,6 +7,7 @@ package api import ( "fmt" "math" + "runtime" "strings" "sync" "testing" @@ -169,7 +170,12 @@ func TestWriteErrorCallback(t *testing.T) { Code: "write", Message: "error", }) - writeAPI := NewWriteAPI("my-org", "my-bucket", service, write.DefaultOptions().SetBatchSize(1).SetRetryInterval(1)) + // sleep takes at least more than 10ms (sometimes 15ms) on Windows https://github.com/golang/go/issues/44343 + retryInterval := uint(1) + if runtime.GOOS == "windows" { + retryInterval = 20 + } + writeAPI := NewWriteAPI("my-org", "my-bucket", service, write.DefaultOptions().SetBatchSize(1).SetRetryInterval(retryInterval)) writeAPI.SetWriteFailedCallback(func(batch string, error http.Error, retryAttempts uint) bool { return retryAttempts < 2 }) @@ -178,7 +184,7 @@ func TestWriteErrorCallback(t *testing.T) { for i, j := 0, 0; i < 6; i++ { writeAPI.WritePoint(points[i]) writeAPI.waitForFlushing() - w := int(math.Pow(5, float64(j))) + w := int(math.Pow(5, float64(j)) * float64(retryInterval)) fmt.Printf("Waiting %dms\n", w) <-time.After(time.Duration(w) * time.Millisecond) j++ diff --git a/internal/write/service.go b/internal/write/service.go index 3cc4e958..3095fb4d 100644 --- a/internal/write/service.go +++ b/internal/write/service.go @@ -89,7 +89,7 @@ func NewService(org string, bucket string, httpService http2.Service, options *w writeOptions: options, retryQueue: newQueue(int(retryBufferLimit)), retryExponentialBase: 2, - retryDelay: 0, + retryDelay: options.RetryInterval(), retryAttempts: 0, } } @@ -196,7 +196,7 @@ func (w *Service) HandleWrite(ctx context.Context, batch *Batch) error { } w.retryDelay = w.writeOptions.RetryInterval() - w.retryDelay = 0 + w.retryAttempts = 0 if retrying && !batchToWrite.Evicted { w.retryQueue.pop() } diff --git a/internal/write/service_test.go b/internal/write/service_test.go index 8d98330a..21da5378 100644 --- a/internal/write/service_test.go +++ b/internal/write/service_test.go @@ -136,19 +136,25 @@ func TestBufferOverwrite(t *testing.T) { log.Log.SetLogLevel(log.DebugLevel) ilog.SetFlags(ilog.Ldate | ilog.Lmicroseconds) hs := test.NewTestService(t, "http://localhost:8086") + // sleep takes at least more than 10ms (sometimes 15ms) on Windows https://github.com/golang/go/issues/44343 + baseRetryInterval := uint(1) + if runtime.GOOS == "windows" { + baseRetryInterval = 20 + } // Buffer limit 15000, bach is 5000 => buffer for 3 batches - opts := write.DefaultOptions().SetRetryInterval(1).SetRetryBufferLimit(15000) + opts := write.DefaultOptions().SetRetryInterval(baseRetryInterval).SetRetryBufferLimit(15000) ctx := context.Background() srv := NewService("my-org", "my-bucket", hs, opts) // Set permanent reply error to force writes fail and retry hs.SetReplyError(&http.Error{ StatusCode: 429, }) - // This batch will fail and it be added to retry queue + // This batch will fail and it will be added to retry queue b1 := NewBatch("1\n", opts.MaxRetryTime()) err := srv.HandleWrite(ctx, b1) assert.NotNil(t, err) - assert.Equal(t, uint(1), srv.retryDelay) + //assert.Equal(t, uint(baseRetryInterval), srv.retryDelay) + assertBetween(t, srv.retryDelay, baseRetryInterval, baseRetryInterval*2) assert.Equal(t, 1, srv.retryQueue.list.Len()) <-time.After(time.Millisecond * time.Duration(srv.retryDelay)) @@ -156,7 +162,7 @@ func TestBufferOverwrite(t *testing.T) { // First batch will be tried to write again and this one will added to retry queue err = srv.HandleWrite(ctx, b2) assert.NotNil(t, err) - assertBetween(t, srv.retryDelay, 2, 4) + assertBetween(t, srv.retryDelay, baseRetryInterval*2, baseRetryInterval*4) assert.Equal(t, 2, srv.retryQueue.list.Len()) <-time.After(time.Millisecond * time.Duration(srv.retryDelay)) @@ -164,7 +170,7 @@ func TestBufferOverwrite(t *testing.T) { // First batch will be tried to write again and this one will added to retry queue err = srv.HandleWrite(ctx, b3) assert.NotNil(t, err) - assertBetween(t, srv.retryDelay, 4, 8) + assertBetween(t, srv.retryDelay, baseRetryInterval*4, baseRetryInterval*8) assert.Equal(t, 3, srv.retryQueue.list.Len()) // Write early and overwrite @@ -179,7 +185,6 @@ func TestBufferOverwrite(t *testing.T) { assert.Equal(t, 3, srv.retryQueue.list.Len()) // Overwrite - // TODO check time.Duration(srv.RetryDelay)) <-time.After(time.Millisecond * time.Duration(srv.retryDelay) / 2) b5 := NewBatch("5\n", opts.MaxRetryTime()) // Second batch will be tried to write again @@ -187,7 +192,6 @@ func TestBufferOverwrite(t *testing.T) { // the second batch will be discarded err = srv.HandleWrite(ctx, b5) assert.Nil(t, err) // No error should be returned, because no write was attempted (still waiting for retryDelay to expire) - //TODO assertBetween(t, srv.RetryDelay, 2, 4) assert.Equal(t, 3, srv.retryQueue.list.Len()) <-time.After(time.Millisecond * time.Duration(srv.retryDelay)) @@ -511,10 +515,14 @@ func TestRetryIntervalAccumulation(t *testing.T) { // Setup test service with scenario's configuration hs := test.NewTestService(t, "http://localhost:8086") + baseRetryInterval := uint(20) + if runtime.GOOS == "windows" { + baseRetryInterval = 30 + } opts := write.DefaultOptions(). - SetRetryInterval(20). + SetRetryInterval(baseRetryInterval). SetMaxRetryInterval(300). - SetMaxRetryTime(100) + SetMaxRetryTime(baseRetryInterval * 5) ctx := context.Background() srv := NewService("my-org", "my-bucket", hs, opts) writeInterval := time.Duration(opts.RetryInterval()) * time.Millisecond @@ -523,7 +531,7 @@ func TestRetryIntervalAccumulation(t *testing.T) { hs.SetReplyError(&http.Error{StatusCode: 429}) lastInterval := uint(0) - assert.Equal(t, uint(0), srv.retryDelay) // Should initialize to zero + assert.Equal(t, uint(0), srv.retryAttempts) // Should initialize to zero i := 1 for ; i <= 45; i++ { b := NewBatch(fmt.Sprintf("%d\n", i), opts.MaxRetryTime()) @@ -572,7 +580,7 @@ func TestRetryIntervalAccumulation(t *testing.T) { err := srv.HandleWrite(ctx, b) assert.Nil(t, err) assert.Equal(t, 0, srv.retryQueue.list.Len()) - assert.Equal(t, srv.retryDelay, uint(0)) // Should reset to zero + assert.Equal(t, srv.retryAttempts, uint(0)) // Should reset to zero // Ensure proper batches got written to server require.Len(t, hs.Lines(), 5)