Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

revise WriteAPI.Flush to attempt to send batches in the retry queue #291

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 6 additions & 23 deletions api/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,7 @@ type WriteAPIImpl struct {
writeOptions *write.Options
}

type writeBuffInfoReq struct {
writeBuffLen int
}
type writeBuffInfoReq struct{}

// NewWriteAPI returns new non-blocking write client for writing data to bucket belonging to org
func NewWriteAPI(org string, bucket string, service http2.Service, writeOptions *write.Options) *WriteAPIImpl {
Expand Down Expand Up @@ -112,24 +110,11 @@ func (w *WriteAPIImpl) Flush() {
}

func (w *WriteAPIImpl) waitForFlushing() {
for {
w.bufferInfoCh <- writeBuffInfoReq{}
writeBuffInfo := <-w.bufferInfoCh
if writeBuffInfo.writeBuffLen == 0 {
break
}
log.Info("Waiting buffer is flushed")
<-time.After(time.Millisecond)
}
for {
w.writeInfoCh <- writeBuffInfoReq{}
writeBuffInfo := <-w.writeInfoCh
if writeBuffInfo.writeBuffLen == 0 {
break
}
log.Info("Waiting buffer is flushed")
<-time.After(time.Millisecond)
}
w.bufferInfoCh <- writeBuffInfoReq{}
<-w.bufferInfoCh
w.writeInfoCh <- writeBuffInfoReq{}
<-w.writeInfoCh
w.writeCh <- nil
}

func (w *WriteAPIImpl) bufferProc() {
Expand All @@ -152,7 +137,6 @@ x:
w.flushBuffer()
break x
case buffInfo := <-w.bufferInfoCh:
buffInfo.writeBuffLen = len(w.bufferInfoCh)
w.bufferInfoCh <- buffInfo
}
}
Expand Down Expand Up @@ -183,7 +167,6 @@ x:
log.Info("Write proc: received stop")
break x
case buffInfo := <-w.writeInfoCh:
buffInfo.writeBuffLen = len(w.writeCh)
w.writeInfoCh <- buffInfo
}
}
Expand Down
19 changes: 14 additions & 5 deletions internal/write/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,12 @@ func (w *Service) SetBatchErrorCallback(cb BatchErrorCallback) {
// If writes continues failing and # of attempts reaches maximum or total retry time reaches maxRetryTime,
// batch is discarded.
func (w *Service) HandleWrite(ctx context.Context, batch *Batch) error {
log.Debug("Write proc: received write request")
batchToWrite := batch
if batchToWrite == nil {
log.Debug("Write proc: received flush retry request")
} else {
log.Debug("Write proc: received write request")
}
retrying := false
for {
select {
Expand All @@ -125,12 +129,17 @@ func (w *Service) HandleWrite(ctx context.Context, batch *Batch) error {
if !retrying {
b := w.retryQueue.first()
// Can we write? In case of retryable error we must wait a bit
if w.lastWriteAttempt.IsZero() || time.Now().After(w.lastWriteAttempt.Add(time.Millisecond*time.Duration(b.RetryDelay))) {
if batch == nil ||
w.lastWriteAttempt.IsZero() ||
time.Now().After(w.lastWriteAttempt.Add(time.Millisecond*time.Duration(b.RetryDelay))) {
retrying = true
} else {
log.Warn("Write proc: cannot write yet, storing batch to queue")
if w.retryQueue.push(batch) {
log.Warn("Write proc: Retry buffer full, discarding oldest batch")
if batch != nil {
log.Warn("Write proc: cannot write yet, storing batch to queue")
if w.retryQueue.push(batch) {
log.Warn("Write proc: Retry buffer full, discarding oldest batch")
}
batch = nil
}
batchToWrite = nil
}
Expand Down