Skip to content

Commit

Permalink
Merge pull request #4 from mgnsk/hotfix/ring-overflow
Browse files Browse the repository at this point in the history
Prevent ring from overflowing
  • Loading branch information
mgnsk authored Feb 9, 2021
2 parents 4b4165b + df22a72 commit d7bb6ec
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 47 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,5 @@ This library is a work in progress.
## How it works

The cache is a wrapper for `sync.Map` with autoexpiry, capacity limit and near-LFU eviction.

Writes to the cache do not block other reads. Each key uses its own RWMutex.
2 changes: 1 addition & 1 deletion cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ func (c *Cache) Evict(key interface{}) {
if !ok {
return
}
if c.ring.Remove(r.ring) != key {
if k := c.ring.Remove(r.ring); k != nil && k != key {
panic("evcache: invalid ring")
}
r.wg.Wait()
Expand Down
40 changes: 7 additions & 33 deletions cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,9 +190,7 @@ var _ = Describe("eviction callback", func() {
closer.Close()
Expect(<-evicted).To(Equal(uint64(1)))

Eventually(func() int {
return c.Len()
}).Should(BeZero())
Expect(c.Len()).To(BeZero())
})
})

Expand Down Expand Up @@ -290,22 +288,9 @@ var _ = Describe("overflow when setting values", func() {
Build()
})

When("Set causes an overflow", func() {
Specify("eventually overflowed records are evicted", func() {
for i := 0; i < n+overflow; i++ {
_, closer, _ := c.Fetch(i, 0, func() (interface{}, error) {
return nil, nil
})
closer.Close()
Expect(c.Len()).To(BeNumerically("<=", n+overflow))
}

c.Close()
Expect(c.Len()).To(BeZero())
Eventually(func() uint64 {
return atomic.LoadUint64(&evicted)
}).Should(Equal(uint64(n + overflow)))
})
AfterEach(func() {
c.Close()
Expect(c.Len()).To(BeZero())
})

When("Fetch causes an overflow", func() {
Expand All @@ -318,11 +303,9 @@ var _ = Describe("overflow when setting values", func() {
Expect(c.Len()).To(BeNumerically("<=", n+overflow))
}

c.Close()
Expect(c.Len()).To(BeZero())
Eventually(func() uint64 {
return atomic.LoadUint64(&evicted)
}).Should(Equal(uint64(n + overflow)))
}).Should(Equal(uint64(overflow)))
})
})

Expand Down Expand Up @@ -351,24 +334,15 @@ var _ = Describe("overflow when setting values", func() {
}()

cb(i)
overflow := c.Len() - n
Expect(overflow).To(BeNumerically("<=", concurrency+1), "overflow cannot exceed concurrency+1")

// Randomly evict keys.
if rand.Float64() < 0.6 {
c.Evict(rand.Intn(i + 1))
}
Expect(c.Len()).To(BeNumerically("<=", n), "capacity cannot be exceeded")
}()
}

wg.Wait()

c.Close()
Expect(c.Len()).To(BeZero())

Eventually(func() uint64 {
return atomic.LoadUint64(&evicted)
}).Should(Equal(uint64(n + overflow)))
}).Should(Equal(uint64(overflow)))
},
Entry(
"Fetch",
Expand Down
34 changes: 21 additions & 13 deletions ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package evcache
import (
"container/ring"
"sync"
"sync/atomic"
)

type lfuRing struct {
Expand All @@ -23,44 +22,50 @@ func newLFURing(capacity uint32) *lfuRing {
}

func (l *lfuRing) Len() int {
return int(atomic.LoadUint32(&l.size))
l.mu.Lock()
defer l.mu.Unlock()
return int(l.size)
}

// Push inserts a key as most frequently used. If capacity is exceeded,
// the least frequently used key is returned.
func (l *lfuRing) Push(key interface{}, r *ring.Ring) (lfu interface{}) {
// the least frequently used element is removed and its key returned.
func (l *lfuRing) Push(key interface{}, r *ring.Ring) (lfuKey interface{}) {
l.mu.Lock()
defer l.mu.Unlock()
r.Value = key
if l.cursor != nil {
l.cursor.Link(r)
}
l.cursor = r
size := atomic.AddUint32(&l.size, 1)
if l.capacity > 0 && size > l.capacity {
lfuring := l.cursor.Next()
return lfuring.Value
l.size++
if l.capacity > 0 && l.size > l.capacity {
return l.unlink(l.cursor.Next())
}
return nil
}

// Remove an existing element.
// Remove an element.
func (l *lfuRing) Remove(r *ring.Ring) (key interface{}) {
l.mu.Lock()
defer l.mu.Unlock()
if r == r.Next() && r != l.cursor {
// A single ring element not belonging to lfuRing.
return nil
}
return l.unlink(r)
}

// Promote moves element towards cursor by at most hits positions.
func (l *lfuRing) Promote(r *ring.Ring, hits uint32) {
l.mu.Lock()
defer l.mu.Unlock()
if l.cursor == nil {
panic("evcache: cursor must not be nil")
}
target := r
if target == target.Next() {
if r == r.Next() {
return
}
target := r
for i := uint32(0); i < hits; i++ {
next := target.Next()
target = next
Expand All @@ -87,8 +92,11 @@ func (l *lfuRing) unlink(r *ring.Ring) (key interface{}) {
if r.Prev().Unlink(1) != r {
panic("evcache: invalid ring")
}
if size := atomic.AddUint32(&l.size, ^uint32(0)); size == 0 {
l.size--
if l.size == 0 {
l.cursor = nil
}
return r.Value
key = r.Value
r.Value = nil
return key
}

0 comments on commit d7bb6ec

Please sign in to comment.