From b9b12c8207e68f569bf81d5a7e3fba82ecc14ebe Mon Sep 17 00:00:00 2001 From: "Peter A. Bigot" Date: Mon, 13 Dec 2021 10:30:37 -0700 Subject: [PATCH 1/2] WriteAPIImpl: simplify Flush() implementation The length of an unbuffered channel is always zero, and all checked channels are unbuffered, so all flush does is a transient synchronization with the writeProc and bufferProc goroutines. Remove the unnecessary logic. Signed-off-by: Peter A. Bigot --- api/write.go | 28 +++++----------------------- 1 file changed, 5 insertions(+), 23 deletions(-) diff --git a/api/write.go b/api/write.go index 2dc02438..a0df8b97 100644 --- a/api/write.go +++ b/api/write.go @@ -61,9 +61,7 @@ type WriteAPIImpl struct { writeOptions *write.Options } -type writeBuffInfoReq struct { - writeBuffLen int -} +type writeBuffInfoReq struct{} // NewWriteAPI returns new non-blocking write client for writing data to bucket belonging to org func NewWriteAPI(org string, bucket string, service http2.Service, writeOptions *write.Options) *WriteAPIImpl { @@ -112,24 +110,10 @@ func (w *WriteAPIImpl) Flush() { } func (w *WriteAPIImpl) waitForFlushing() { - for { - w.bufferInfoCh <- writeBuffInfoReq{} - writeBuffInfo := <-w.bufferInfoCh - if writeBuffInfo.writeBuffLen == 0 { - break - } - log.Info("Waiting buffer is flushed") - <-time.After(time.Millisecond) - } - for { - w.writeInfoCh <- writeBuffInfoReq{} - writeBuffInfo := <-w.writeInfoCh - if writeBuffInfo.writeBuffLen == 0 { - break - } - log.Info("Waiting buffer is flushed") - <-time.After(time.Millisecond) - } + w.bufferInfoCh <- writeBuffInfoReq{} + <-w.bufferInfoCh + w.writeInfoCh <- writeBuffInfoReq{} + <-w.writeInfoCh } func (w *WriteAPIImpl) bufferProc() { @@ -152,7 +136,6 @@ x: w.flushBuffer() break x case buffInfo := <-w.bufferInfoCh: - buffInfo.writeBuffLen = len(w.bufferInfoCh) w.bufferInfoCh <- buffInfo } } @@ -183,7 +166,6 @@ x: log.Info("Write proc: received stop") break x case buffInfo := <-w.writeInfoCh: - buffInfo.writeBuffLen = len(w.writeCh) w.writeInfoCh <- buffInfo } } From 4ca219457acca8e981b1223bfa88e2a41552c65b Mon Sep 17 00:00:00 2001 From: "Peter A. Bigot" Date: Mon, 13 Dec 2021 12:45:11 -0700 Subject: [PATCH 2/2] DNM: WriteAPIImpl: extend Flush() to flush retry queue Flush() is documented to force all pending writes from the buffer to be sent. It does force pending writes in the buffer to be submitted for send, but if there is a pending retry with an unexpired timeout it won't actually send anything. Make Flush() more useful by having it trigger a re-examination of the retry queue and initiating a retry immediately. Depending on whether the documented behavior was intentional, this operation might be better implemented as an additional WriteAPI method FlushAll(). Signed-off-by: Peter A. Bigot --- api/write.go | 1 + internal/write/service.go | 19 ++++++++++++++----- 2 files changed, 15 insertions(+), 5 deletions(-) diff --git a/api/write.go b/api/write.go index a0df8b97..6c57a1a6 100644 --- a/api/write.go +++ b/api/write.go @@ -114,6 +114,7 @@ func (w *WriteAPIImpl) waitForFlushing() { <-w.bufferInfoCh w.writeInfoCh <- writeBuffInfoReq{} <-w.writeInfoCh + w.writeCh <- nil } func (w *WriteAPIImpl) bufferProc() { diff --git a/internal/write/service.go b/internal/write/service.go index b7d40162..2ff94910 100644 --- a/internal/write/service.go +++ b/internal/write/service.go @@ -110,8 +110,12 @@ func (w *Service) SetBatchErrorCallback(cb BatchErrorCallback) { // If writes continues failing and # of attempts reaches maximum or total retry time reaches maxRetryTime, // batch is discarded. func (w *Service) HandleWrite(ctx context.Context, batch *Batch) error { - log.Debug("Write proc: received write request") batchToWrite := batch + if batchToWrite == nil { + log.Debug("Write proc: received flush retry request") + } else { + log.Debug("Write proc: received write request") + } retrying := false for { select { @@ -125,12 +129,17 @@ func (w *Service) HandleWrite(ctx context.Context, batch *Batch) error { if !retrying { b := w.retryQueue.first() // Can we write? In case of retryable error we must wait a bit - if w.lastWriteAttempt.IsZero() || time.Now().After(w.lastWriteAttempt.Add(time.Millisecond*time.Duration(b.RetryDelay))) { + if batch == nil || + w.lastWriteAttempt.IsZero() || + time.Now().After(w.lastWriteAttempt.Add(time.Millisecond*time.Duration(b.RetryDelay))) { retrying = true } else { - log.Warn("Write proc: cannot write yet, storing batch to queue") - if w.retryQueue.push(batch) { - log.Warn("Write proc: Retry buffer full, discarding oldest batch") + if batch != nil { + log.Warn("Write proc: cannot write yet, storing batch to queue") + if w.retryQueue.push(batch) { + log.Warn("Write proc: Retry buffer full, discarding oldest batch") + } + batch = nil } batchToWrite = nil }