diff --git a/api/write.go b/api/write.go index 2dc02438..6c57a1a6 100644 --- a/api/write.go +++ b/api/write.go @@ -61,9 +61,7 @@ type WriteAPIImpl struct { writeOptions *write.Options } -type writeBuffInfoReq struct { - writeBuffLen int -} +type writeBuffInfoReq struct{} // NewWriteAPI returns new non-blocking write client for writing data to bucket belonging to org func NewWriteAPI(org string, bucket string, service http2.Service, writeOptions *write.Options) *WriteAPIImpl { @@ -112,24 +110,11 @@ func (w *WriteAPIImpl) Flush() { } func (w *WriteAPIImpl) waitForFlushing() { - for { - w.bufferInfoCh <- writeBuffInfoReq{} - writeBuffInfo := <-w.bufferInfoCh - if writeBuffInfo.writeBuffLen == 0 { - break - } - log.Info("Waiting buffer is flushed") - <-time.After(time.Millisecond) - } - for { - w.writeInfoCh <- writeBuffInfoReq{} - writeBuffInfo := <-w.writeInfoCh - if writeBuffInfo.writeBuffLen == 0 { - break - } - log.Info("Waiting buffer is flushed") - <-time.After(time.Millisecond) - } + w.bufferInfoCh <- writeBuffInfoReq{} + <-w.bufferInfoCh + w.writeInfoCh <- writeBuffInfoReq{} + <-w.writeInfoCh + w.writeCh <- nil } func (w *WriteAPIImpl) bufferProc() { @@ -152,7 +137,6 @@ x: w.flushBuffer() break x case buffInfo := <-w.bufferInfoCh: - buffInfo.writeBuffLen = len(w.bufferInfoCh) w.bufferInfoCh <- buffInfo } } @@ -183,7 +167,6 @@ x: log.Info("Write proc: received stop") break x case buffInfo := <-w.writeInfoCh: - buffInfo.writeBuffLen = len(w.writeCh) w.writeInfoCh <- buffInfo } } diff --git a/internal/write/service.go b/internal/write/service.go index b7d40162..2ff94910 100644 --- a/internal/write/service.go +++ b/internal/write/service.go @@ -110,8 +110,12 @@ func (w *Service) SetBatchErrorCallback(cb BatchErrorCallback) { // If writes continues failing and # of attempts reaches maximum or total retry time reaches maxRetryTime, // batch is discarded. func (w *Service) HandleWrite(ctx context.Context, batch *Batch) error { - log.Debug("Write proc: received write request") batchToWrite := batch + if batchToWrite == nil { + log.Debug("Write proc: received flush retry request") + } else { + log.Debug("Write proc: received write request") + } retrying := false for { select { @@ -125,12 +129,17 @@ func (w *Service) HandleWrite(ctx context.Context, batch *Batch) error { if !retrying { b := w.retryQueue.first() // Can we write? In case of retryable error we must wait a bit - if w.lastWriteAttempt.IsZero() || time.Now().After(w.lastWriteAttempt.Add(time.Millisecond*time.Duration(b.RetryDelay))) { + if batch == nil || + w.lastWriteAttempt.IsZero() || + time.Now().After(w.lastWriteAttempt.Add(time.Millisecond*time.Duration(b.RetryDelay))) { retrying = true } else { - log.Warn("Write proc: cannot write yet, storing batch to queue") - if w.retryQueue.push(batch) { - log.Warn("Write proc: Retry buffer full, discarding oldest batch") + if batch != nil { + log.Warn("Write proc: cannot write yet, storing batch to queue") + if w.retryQueue.push(batch) { + log.Warn("Write proc: Retry buffer full, discarding oldest batch") + } + batch = nil } batchToWrite = nil }