Skip to content

Commit

Permalink
Reuse the timer for the retry wait channel
Browse files Browse the repository at this point in the history
Wait() was creating a new timer every time it was called. In high throughput
scenarios this will cause a lot of unnecessary timers to be created.

To avoid this, use a single timer and reuse it by checking Stop() return
value and reusing it if it has not fired yet.

Signed-off-by: Jussi Maki <[email protected]>
  • Loading branch information
joamaki committed Apr 10, 2024
1 parent 7c2435f commit 33b817f
Showing 1 changed file with 37 additions and 13 deletions.
50 changes: 37 additions & 13 deletions reconciler/retries.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ func newRetries(minDuration, maxDuration time.Duration, objectToKey func(any) in
queue: nil,
items: make(map[string]*retryItem),
objectToKey: objectToKey,
waitTimer: nil,
waitChan: make(chan struct{}),
}
}

Expand All @@ -44,6 +46,8 @@ type retries struct {
queue retryPrioQueue
items map[string]*retryItem
objectToKey func(any) index.Key
waitTimer *time.Timer
waitChan chan struct{}
}

type retryItem struct {
Expand All @@ -56,18 +60,7 @@ type retryItem struct {
// Wait returns a channel that is closed when there is an item to retry.
// Returns nil channel if no items are queued.
func (rq *retries) Wait() <-chan struct{} {
if _, retryAt, ok := rq.Top(); ok {
now := time.Now()
ch := make(chan struct{}, 1)
if now.After(retryAt) {
// Already expired.
close(ch)
} else {
time.AfterFunc(retryAt.Sub(now), func() { close(ch) })
}
return ch
}
return nil
return rq.waitChan
}

func (rq *retries) Top() (object any, retryAt time.Time, ok bool) {
Expand All @@ -82,6 +75,27 @@ func (rq *retries) Pop() {
// Pop the object from the queue, but leave it into the map until
// the object is cleared or re-added.
heap.Pop(&rq.queue)

rq.resetTimer()
}

func (rq *retries) resetTimer() {
if rq.waitTimer == nil || !rq.waitTimer.Stop() {
// Already fired so the channel was closed. Create a new one
// channel and timer.
waitChan := make(chan struct{})
rq.waitChan = waitChan
if rq.queue.Len() == 0 {
rq.waitTimer = nil
} else {
d := time.Until(rq.queue[0].retryAt)
rq.waitTimer = time.AfterFunc(d, func() { close(waitChan) })
}
} else if rq.queue.Len() > 0 {
d := time.Until(rq.queue[0].retryAt)
// Did not fire yet so we can just reset the timer.
rq.waitTimer.Reset(d)
}
}

func (rq *retries) Add(obj any) {
Expand All @@ -98,17 +112,27 @@ func (rq *retries) Add(obj any) {
}
item.object = obj
item.numRetries += 1
item.retryAt = time.Now().Add(rq.backoff.Duration(item.numRetries))
duration := rq.backoff.Duration(item.numRetries)
item.retryAt = time.Now().Add(duration)
heap.Push(&rq.queue, item)

// New item is at the top of the queue, reset the timer.
rq.resetTimer()
}

func (rq *retries) Clear(obj any) {
key := rq.objectToKey(obj)
if item, ok := rq.items[string(key)]; ok {
// Remove the object from the queue if it is still there.
index := item.index // hold onto the index as heap.Remove messes with it
if item.index >= 0 && item.index < len(rq.queue) &&
key.Equal(rq.objectToKey(rq.queue[item.index].object)) {
heap.Remove(&rq.queue, item.index)

// Reset the timer in case we removed the top item.
if index == 0 {
rq.resetTimer()
}
}
// Completely forget the object and its retry count.
delete(rq.items, string(key))
Expand Down

0 comments on commit 33b817f

Please sign in to comment.