Skip to content

Commit

Permalink
Optimize bytes_queue (#207)
Browse files Browse the repository at this point in the history
Co-authored-by: zhumaohua <[email protected]>
  • Loading branch information
neal-zhu and zhumaohua committed Mar 19, 2020
1 parent 9e4bc32 commit 8fecd16
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 43 deletions.
2 changes: 1 addition & 1 deletion bigcache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,7 @@ func TestCacheCapacity(t *testing.T) {

// then
assertEqual(t, keys, cache.Len())
assertEqual(t, 81920, cache.Capacity())
assertEqual(t, 40960, cache.Capacity())
}

func TestCacheStats(t *testing.T) {
Expand Down
70 changes: 51 additions & 19 deletions queue/bytes_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,10 @@ import (
)

const (
// Number of bytes used to keep information about entry size
headerEntrySize = 4
// Number of bytes to encode 0 in uvarint format
minimumHeaderSize = 1
// Bytes before left margin are not used. Zero index means element does not exist in queue, useful while reading slice from index
leftMarginIndex = 1
// Minimum empty blob size in bytes. Empty blob fills space between tail and head in additional memory allocation.
// It keeps entries indexes unchanged
minimumEmptyBlobSize = 32 + headerEntrySize
)

var (
Expand All @@ -25,6 +22,7 @@ var (
// BytesQueue is a non-thread safe queue type of fifo based on bytes array.
// For every push operation index of entry is returned. It can be used to read the entry later
type BytesQueue struct {
full bool
array []byte
capacity int
maxCapacity int
Expand All @@ -41,6 +39,21 @@ type queueError struct {
message string
}

// getUvarintSize returns the number of bytes to encode x in uvarint format
func getUvarintSize(x uint32) int {
if x < 128 {
return 1
} else if x < 16384 {
return 2
} else if x < 2097152 {
return 3
} else if x < 268435456 {
return 4
} else {
return 5
}
}

// NewBytesQueue initialize new bytes queue.
// Initial capacity is used in bytes array allocation
// When verbose flag is set then information about memory allocation are printed
Expand All @@ -49,7 +62,7 @@ func NewBytesQueue(initialCapacity int, maxCapacity int, verbose bool) *BytesQue
array: make([]byte, initialCapacity),
capacity: initialCapacity,
maxCapacity: maxCapacity,
headerBuffer: make([]byte, headerEntrySize),
headerBuffer: make([]byte, binary.MaxVarintLen32),
tail: leftMarginIndex,
head: leftMarginIndex,
rightMargin: leftMarginIndex,
Expand All @@ -71,9 +84,10 @@ func (q *BytesQueue) Reset() {
// Returns index for pushed data or error if maximum size queue limit is reached.
func (q *BytesQueue) Push(data []byte) (int, error) {
dataLen := len(data)
headerEntrySize := getUvarintSize(uint32(dataLen))

if q.availableSpaceAfterTail() < dataLen+headerEntrySize {
if q.availableSpaceBeforeHead() >= dataLen+headerEntrySize {
if !q.canInsertAfterTail(dataLen + headerEntrySize) {
if q.canInsertBeforeHead(dataLen + headerEntrySize) {
q.tail = leftMarginIndex
} else if q.capacity+headerEntrySize+dataLen >= q.maxCapacity && q.maxCapacity > 0 {
return -1, &queueError{"Full queue. Maximum size limit reached."}
Expand Down Expand Up @@ -106,6 +120,7 @@ func (q *BytesQueue) allocateAdditionalMemory(minimum int) {
copy(q.array, oldArray[:q.rightMargin])

if q.tail < q.head {
headerEntrySize := getUvarintSize(uint32(q.head - q.tail))
emptyBlobLen := q.head - q.tail - headerEntrySize
q.push(make([]byte, emptyBlobLen), emptyBlobLen)
q.head = leftMarginIndex
Expand All @@ -119,14 +134,17 @@ func (q *BytesQueue) allocateAdditionalMemory(minimum int) {
}

func (q *BytesQueue) push(data []byte, len int) {
binary.LittleEndian.PutUint32(q.headerBuffer, uint32(len))
headerEntrySize := binary.PutUvarint(q.headerBuffer, uint64(len))
q.copy(q.headerBuffer, headerEntrySize)

q.copy(data, len)

if q.tail > q.head {
q.rightMargin = q.tail
}
if q.tail == q.head {
q.full = true
}

q.count++
}
Expand All @@ -137,10 +155,11 @@ func (q *BytesQueue) copy(data []byte, len int) {

// Pop reads the oldest entry from queue and moves head pointer to the next one
func (q *BytesQueue) Pop() ([]byte, error) {
data, size, err := q.peek(q.head)
data, headerEntrySize, err := q.peek(q.head)
if err != nil {
return nil, err
}
size := len(data)

q.head += headerEntrySize + size
q.count--
Expand Down Expand Up @@ -199,32 +218,45 @@ func (q *BytesQueue) peekCheckErr(index int) error {
return errInvalidIndex
}

if index+headerEntrySize >= len(q.array) {
if index >= len(q.array) {
return errIndexOutOfBounds
}
return nil
}

// peek returns the data from index and the number of bytes to encode the length of the data in uvarint format
func (q *BytesQueue) peek(index int) ([]byte, int, error) {
err := q.peekCheckErr(index)
if err != nil {
return nil, 0, err
}

blockSize := int(binary.LittleEndian.Uint32(q.array[index : index+headerEntrySize]))
return q.array[index+headerEntrySize : index+headerEntrySize+blockSize], blockSize, nil
blockSize, n := binary.Uvarint(q.array[index:])
return q.array[index+n : index+n+int(blockSize)], n, nil
}

func (q *BytesQueue) availableSpaceAfterTail() int {
// canInsertAfterTail returns true if it's possible to insert an entry of size of need after the tail of the queue
func (q *BytesQueue) canInsertAfterTail(need int) bool {
if q.full {
return false
}
if q.tail >= q.head {
return q.capacity - q.tail
return q.capacity-q.tail >= need
}
return q.head - q.tail - minimumEmptyBlobSize
// 1. there is exactly need bytes between head and tail, so we do not need
// to reserve extra space for a potential emtpy entry when re-allco this queeu
// 2. still have unused space between tail and head, then we must reserve
// at least headerEntrySize bytes so we can put an empty entry
return q.head-q.tail == need || q.head-q.tail >= need+minimumHeaderSize
}

func (q *BytesQueue) availableSpaceBeforeHead() int {
// canInsertBeforeHead returns true if it's possible to insert an entry of size of need before the head of the queue
func (q *BytesQueue) canInsertBeforeHead(need int) bool {
if q.full {
return false
}
if q.tail >= q.head {
return q.head - leftMarginIndex - minimumEmptyBlobSize
return q.head-leftMarginIndex == need || q.head-leftMarginIndex >= need+minimumHeaderSize
}
return q.head - q.tail - minimumEmptyBlobSize
return q.head-q.tail == need || q.head-q.tail >= need+minimumHeaderSize
}
47 changes: 24 additions & 23 deletions queue/bytes_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,13 +145,13 @@ func TestAllocateAdditionalSpaceForInsufficientFreeFragmentedSpaceWhereHeadIsBef
queue := NewBytesQueue(25, 0, false)

// when
queue.Push(blob('a', 3)) // header + entry + left margin = 8 bytes
queue.Push(blob('b', 6)) // additional 10 bytes
queue.Pop() // space freed, 7 bytes available at the beginning
queue.Push(blob('c', 6)) // 10 bytes needed, 14 available but not in one segment, allocate additional memory
queue.Push(blob('a', 3)) // header + entry + left margin = 5 bytes
queue.Push(blob('b', 6)) // additional 7 bytes
queue.Pop() // space freed, 4 bytes available at the beginning
queue.Push(blob('c', 6)) // 7 bytes needed, 13 bytes available at the tail

// then
assertEqual(t, 50, queue.Capacity())
assertEqual(t, 25, queue.Capacity())
assertEqual(t, blob('b', 6), pop(queue))
assertEqual(t, blob('c', 6), pop(queue))
}
Expand All @@ -163,13 +163,13 @@ func TestUnchangedEntriesIndexesAfterAdditionalMemoryAllocationWhereHeadIsBefore
queue := NewBytesQueue(25, 0, false)

// when
queue.Push(blob('a', 3)) // header + entry + left margin = 8 bytes
index, _ := queue.Push(blob('b', 6)) // additional 10 bytes
queue.Pop() // space freed, 7 bytes available at the beginning
newestIndex, _ := queue.Push(blob('c', 6)) // 10 bytes needed, 14 available but not in one segment, allocate additional memory
queue.Push(blob('a', 3)) // header + entry + left margin = 5 bytes
index, _ := queue.Push(blob('b', 6)) // additional 7 bytes
queue.Pop() // space freed, 4 bytes available at the beginning
newestIndex, _ := queue.Push(blob('c', 6)) // 7 bytes needed, 13 available at the tail

// then
assertEqual(t, 50, queue.Capacity())
assertEqual(t, 25, queue.Capacity())
assertEqual(t, blob('b', 6), get(queue, index))
assertEqual(t, blob('c', 6), get(queue, newestIndex))
}
Expand All @@ -181,19 +181,19 @@ func TestAllocateAdditionalSpaceForInsufficientFreeFragmentedSpaceWhereTailIsBef
queue := NewBytesQueue(100, 0, false)

// when
queue.Push(blob('a', 70)) // header + entry + left margin = 75 bytes
queue.Push(blob('b', 10)) // 75 + 10 + 4 = 89 bytes
queue.Push(blob('a', 70)) // header + entry + left margin = 72 bytes
queue.Push(blob('b', 10)) // 72 + 10 + 1 = 83 bytes
queue.Pop() // space freed at the beginning
queue.Push(blob('c', 30)) // 34 bytes used at the beginning, tail pointer is before head pointer
queue.Push(blob('d', 40)) // 44 bytes needed but no available in one segment, allocate new memory
queue.Push(blob('c', 30)) // 31 bytes used at the beginning, tail pointer is before head pointer
queue.Push(blob('d', 40)) // 41 bytes needed but no available in one segment, allocate new memory

// then
assertEqual(t, 200, queue.Capacity())
assertEqual(t, blob('c', 30), pop(queue))
// empty blob fills space between tail and head,
// created when additional memory was allocated,
// it keeps current entries indexes unchanged
assertEqual(t, blob(0, 36), pop(queue))
assertEqual(t, blob(0, 39), pop(queue))
assertEqual(t, blob('b', 10), pop(queue))
assertEqual(t, blob('d', 40), pop(queue))
}
Expand All @@ -205,11 +205,11 @@ func TestUnchangedEntriesIndexesAfterAdditionalMemoryAllocationWhereTailIsBefore
queue := NewBytesQueue(100, 0, false)

// when
queue.Push(blob('a', 70)) // header + entry + left margin = 75 bytes
index, _ := queue.Push(blob('b', 10)) // 75 + 10 + 4 = 89 bytes
queue.Push(blob('a', 70)) // header + entry + left margin = 72 bytes
index, _ := queue.Push(blob('b', 10)) // 72 + 10 + 1 = 83 bytes
queue.Pop() // space freed at the beginning
queue.Push(blob('c', 30)) // 34 bytes used at the beginning, tail pointer is before head pointer
newestIndex, _ := queue.Push(blob('d', 40)) // 44 bytes needed but no available in one segment, allocate new memory
queue.Push(blob('c', 30)) // 31 bytes used at the beginning, tail pointer is before head pointer
newestIndex, _ := queue.Push(blob('d', 40)) // 41 bytes needed but no available in one segment, allocate new memory

// then
assertEqual(t, 200, queue.Capacity())
Expand All @@ -225,10 +225,10 @@ func TestAllocateAdditionalSpaceForValueBiggerThanInitQueue(t *testing.T) {

// when
queue.Push(blob('a', 100))

// then
assertEqual(t, blob('a', 100), pop(queue))
assertEqual(t, 230, queue.Capacity())
// 224 = (101 + 11) * 2
assertEqual(t, 224, queue.Capacity())
}

func TestAllocateAdditionalSpaceForValueBiggerThanQueue(t *testing.T) {
Expand All @@ -246,7 +246,8 @@ func TestAllocateAdditionalSpaceForValueBiggerThanQueue(t *testing.T) {
queue.Pop()
queue.Pop()
assertEqual(t, make([]byte, 100), pop(queue))
assertEqual(t, 250, queue.Capacity())
// 244 = (101 + 21) * 2
assertEqual(t, 244, queue.Capacity())
}

func TestPopWholeQueue(t *testing.T) {
Expand Down Expand Up @@ -343,7 +344,7 @@ func TestMaxSizeLimit(t *testing.T) {
queue.Push(blob('a', 25))
queue.Push(blob('b', 5))
capacity := queue.Capacity()
_, err := queue.Push(blob('c', 15))
_, err := queue.Push(blob('c', 20))

// then
assertEqual(t, 50, capacity)
Expand Down

0 comments on commit 8fecd16

Please sign in to comment.