diff --git a/bigcache_test.go b/bigcache_test.go index cacb2db8..8a04060f 100644 --- a/bigcache_test.go +++ b/bigcache_test.go @@ -426,7 +426,7 @@ func TestCacheCapacity(t *testing.T) { // then assertEqual(t, keys, cache.Len()) - assertEqual(t, 81920, cache.Capacity()) + assertEqual(t, 40960, cache.Capacity()) } func TestCacheStats(t *testing.T) { diff --git a/queue/bytes_queue.go b/queue/bytes_queue.go index 0af18615..9619550a 100644 --- a/queue/bytes_queue.go +++ b/queue/bytes_queue.go @@ -7,13 +7,10 @@ import ( ) const ( - // Number of bytes used to keep information about entry size - headerEntrySize = 4 + // Number of bytes to encode 0 in uvarint format + minimumHeaderSize = 1 // Bytes before left margin are not used. Zero index means element does not exist in queue, useful while reading slice from index leftMarginIndex = 1 - // Minimum empty blob size in bytes. Empty blob fills space between tail and head in additional memory allocation. - // It keeps entries indexes unchanged - minimumEmptyBlobSize = 32 + headerEntrySize ) var ( @@ -25,6 +22,7 @@ var ( // BytesQueue is a non-thread safe queue type of fifo based on bytes array. // For every push operation index of entry is returned. It can be used to read the entry later type BytesQueue struct { + full bool array []byte capacity int maxCapacity int @@ -41,6 +39,21 @@ type queueError struct { message string } +// getUvarintSize returns the number of bytes to encode x in uvarint format +func getUvarintSize(x uint32) int { + if x < 128 { + return 1 + } else if x < 16384 { + return 2 + } else if x < 2097152 { + return 3 + } else if x < 268435456 { + return 4 + } else { + return 5 + } +} + // NewBytesQueue initialize new bytes queue. // Initial capacity is used in bytes array allocation // When verbose flag is set then information about memory allocation are printed @@ -49,7 +62,7 @@ func NewBytesQueue(initialCapacity int, maxCapacity int, verbose bool) *BytesQue array: make([]byte, initialCapacity), capacity: initialCapacity, maxCapacity: maxCapacity, - headerBuffer: make([]byte, headerEntrySize), + headerBuffer: make([]byte, binary.MaxVarintLen32), tail: leftMarginIndex, head: leftMarginIndex, rightMargin: leftMarginIndex, @@ -71,9 +84,10 @@ func (q *BytesQueue) Reset() { // Returns index for pushed data or error if maximum size queue limit is reached. func (q *BytesQueue) Push(data []byte) (int, error) { dataLen := len(data) + headerEntrySize := getUvarintSize(uint32(dataLen)) - if q.availableSpaceAfterTail() < dataLen+headerEntrySize { - if q.availableSpaceBeforeHead() >= dataLen+headerEntrySize { + if !q.canInsertAfterTail(dataLen + headerEntrySize) { + if q.canInsertBeforeHead(dataLen + headerEntrySize) { q.tail = leftMarginIndex } else if q.capacity+headerEntrySize+dataLen >= q.maxCapacity && q.maxCapacity > 0 { return -1, &queueError{"Full queue. Maximum size limit reached."} @@ -106,6 +120,7 @@ func (q *BytesQueue) allocateAdditionalMemory(minimum int) { copy(q.array, oldArray[:q.rightMargin]) if q.tail < q.head { + headerEntrySize := getUvarintSize(uint32(q.head - q.tail)) emptyBlobLen := q.head - q.tail - headerEntrySize q.push(make([]byte, emptyBlobLen), emptyBlobLen) q.head = leftMarginIndex @@ -119,7 +134,7 @@ func (q *BytesQueue) allocateAdditionalMemory(minimum int) { } func (q *BytesQueue) push(data []byte, len int) { - binary.LittleEndian.PutUint32(q.headerBuffer, uint32(len)) + headerEntrySize := binary.PutUvarint(q.headerBuffer, uint64(len)) q.copy(q.headerBuffer, headerEntrySize) q.copy(data, len) @@ -127,6 +142,9 @@ func (q *BytesQueue) push(data []byte, len int) { if q.tail > q.head { q.rightMargin = q.tail } + if q.tail == q.head { + q.full = true + } q.count++ } @@ -137,10 +155,11 @@ func (q *BytesQueue) copy(data []byte, len int) { // Pop reads the oldest entry from queue and moves head pointer to the next one func (q *BytesQueue) Pop() ([]byte, error) { - data, size, err := q.peek(q.head) + data, headerEntrySize, err := q.peek(q.head) if err != nil { return nil, err } + size := len(data) q.head += headerEntrySize + size q.count-- @@ -199,32 +218,45 @@ func (q *BytesQueue) peekCheckErr(index int) error { return errInvalidIndex } - if index+headerEntrySize >= len(q.array) { + if index >= len(q.array) { return errIndexOutOfBounds } return nil } +// peek returns the data from index and the number of bytes to encode the length of the data in uvarint format func (q *BytesQueue) peek(index int) ([]byte, int, error) { err := q.peekCheckErr(index) if err != nil { return nil, 0, err } - blockSize := int(binary.LittleEndian.Uint32(q.array[index : index+headerEntrySize])) - return q.array[index+headerEntrySize : index+headerEntrySize+blockSize], blockSize, nil + blockSize, n := binary.Uvarint(q.array[index:]) + return q.array[index+n : index+n+int(blockSize)], n, nil } -func (q *BytesQueue) availableSpaceAfterTail() int { +// canInsertAfterTail returns true if it's possible to insert an entry of size of need after the tail of the queue +func (q *BytesQueue) canInsertAfterTail(need int) bool { + if q.full { + return false + } if q.tail >= q.head { - return q.capacity - q.tail + return q.capacity-q.tail >= need } - return q.head - q.tail - minimumEmptyBlobSize + // 1. there is exactly need bytes between head and tail, so we do not need + // to reserve extra space for a potential emtpy entry when re-allco this queeu + // 2. still have unused space between tail and head, then we must reserve + // at least headerEntrySize bytes so we can put an empty entry + return q.head-q.tail == need || q.head-q.tail >= need+minimumHeaderSize } -func (q *BytesQueue) availableSpaceBeforeHead() int { +// canInsertBeforeHead returns true if it's possible to insert an entry of size of need before the head of the queue +func (q *BytesQueue) canInsertBeforeHead(need int) bool { + if q.full { + return false + } if q.tail >= q.head { - return q.head - leftMarginIndex - minimumEmptyBlobSize + return q.head-leftMarginIndex == need || q.head-leftMarginIndex >= need+minimumHeaderSize } - return q.head - q.tail - minimumEmptyBlobSize + return q.head-q.tail == need || q.head-q.tail >= need+minimumHeaderSize } diff --git a/queue/bytes_queue_test.go b/queue/bytes_queue_test.go index 48b08538..eb29cefc 100644 --- a/queue/bytes_queue_test.go +++ b/queue/bytes_queue_test.go @@ -145,13 +145,13 @@ func TestAllocateAdditionalSpaceForInsufficientFreeFragmentedSpaceWhereHeadIsBef queue := NewBytesQueue(25, 0, false) // when - queue.Push(blob('a', 3)) // header + entry + left margin = 8 bytes - queue.Push(blob('b', 6)) // additional 10 bytes - queue.Pop() // space freed, 7 bytes available at the beginning - queue.Push(blob('c', 6)) // 10 bytes needed, 14 available but not in one segment, allocate additional memory + queue.Push(blob('a', 3)) // header + entry + left margin = 5 bytes + queue.Push(blob('b', 6)) // additional 7 bytes + queue.Pop() // space freed, 4 bytes available at the beginning + queue.Push(blob('c', 6)) // 7 bytes needed, 13 bytes available at the tail // then - assertEqual(t, 50, queue.Capacity()) + assertEqual(t, 25, queue.Capacity()) assertEqual(t, blob('b', 6), pop(queue)) assertEqual(t, blob('c', 6), pop(queue)) } @@ -163,13 +163,13 @@ func TestUnchangedEntriesIndexesAfterAdditionalMemoryAllocationWhereHeadIsBefore queue := NewBytesQueue(25, 0, false) // when - queue.Push(blob('a', 3)) // header + entry + left margin = 8 bytes - index, _ := queue.Push(blob('b', 6)) // additional 10 bytes - queue.Pop() // space freed, 7 bytes available at the beginning - newestIndex, _ := queue.Push(blob('c', 6)) // 10 bytes needed, 14 available but not in one segment, allocate additional memory + queue.Push(blob('a', 3)) // header + entry + left margin = 5 bytes + index, _ := queue.Push(blob('b', 6)) // additional 7 bytes + queue.Pop() // space freed, 4 bytes available at the beginning + newestIndex, _ := queue.Push(blob('c', 6)) // 7 bytes needed, 13 available at the tail // then - assertEqual(t, 50, queue.Capacity()) + assertEqual(t, 25, queue.Capacity()) assertEqual(t, blob('b', 6), get(queue, index)) assertEqual(t, blob('c', 6), get(queue, newestIndex)) } @@ -181,11 +181,11 @@ func TestAllocateAdditionalSpaceForInsufficientFreeFragmentedSpaceWhereTailIsBef queue := NewBytesQueue(100, 0, false) // when - queue.Push(blob('a', 70)) // header + entry + left margin = 75 bytes - queue.Push(blob('b', 10)) // 75 + 10 + 4 = 89 bytes + queue.Push(blob('a', 70)) // header + entry + left margin = 72 bytes + queue.Push(blob('b', 10)) // 72 + 10 + 1 = 83 bytes queue.Pop() // space freed at the beginning - queue.Push(blob('c', 30)) // 34 bytes used at the beginning, tail pointer is before head pointer - queue.Push(blob('d', 40)) // 44 bytes needed but no available in one segment, allocate new memory + queue.Push(blob('c', 30)) // 31 bytes used at the beginning, tail pointer is before head pointer + queue.Push(blob('d', 40)) // 41 bytes needed but no available in one segment, allocate new memory // then assertEqual(t, 200, queue.Capacity()) @@ -193,7 +193,7 @@ func TestAllocateAdditionalSpaceForInsufficientFreeFragmentedSpaceWhereTailIsBef // empty blob fills space between tail and head, // created when additional memory was allocated, // it keeps current entries indexes unchanged - assertEqual(t, blob(0, 36), pop(queue)) + assertEqual(t, blob(0, 39), pop(queue)) assertEqual(t, blob('b', 10), pop(queue)) assertEqual(t, blob('d', 40), pop(queue)) } @@ -205,11 +205,11 @@ func TestUnchangedEntriesIndexesAfterAdditionalMemoryAllocationWhereTailIsBefore queue := NewBytesQueue(100, 0, false) // when - queue.Push(blob('a', 70)) // header + entry + left margin = 75 bytes - index, _ := queue.Push(blob('b', 10)) // 75 + 10 + 4 = 89 bytes + queue.Push(blob('a', 70)) // header + entry + left margin = 72 bytes + index, _ := queue.Push(blob('b', 10)) // 72 + 10 + 1 = 83 bytes queue.Pop() // space freed at the beginning - queue.Push(blob('c', 30)) // 34 bytes used at the beginning, tail pointer is before head pointer - newestIndex, _ := queue.Push(blob('d', 40)) // 44 bytes needed but no available in one segment, allocate new memory + queue.Push(blob('c', 30)) // 31 bytes used at the beginning, tail pointer is before head pointer + newestIndex, _ := queue.Push(blob('d', 40)) // 41 bytes needed but no available in one segment, allocate new memory // then assertEqual(t, 200, queue.Capacity()) @@ -225,10 +225,10 @@ func TestAllocateAdditionalSpaceForValueBiggerThanInitQueue(t *testing.T) { // when queue.Push(blob('a', 100)) - // then assertEqual(t, blob('a', 100), pop(queue)) - assertEqual(t, 230, queue.Capacity()) + // 224 = (101 + 11) * 2 + assertEqual(t, 224, queue.Capacity()) } func TestAllocateAdditionalSpaceForValueBiggerThanQueue(t *testing.T) { @@ -246,7 +246,8 @@ func TestAllocateAdditionalSpaceForValueBiggerThanQueue(t *testing.T) { queue.Pop() queue.Pop() assertEqual(t, make([]byte, 100), pop(queue)) - assertEqual(t, 250, queue.Capacity()) + // 244 = (101 + 21) * 2 + assertEqual(t, 244, queue.Capacity()) } func TestPopWholeQueue(t *testing.T) { @@ -343,7 +344,7 @@ func TestMaxSizeLimit(t *testing.T) { queue.Push(blob('a', 25)) queue.Push(blob('b', 5)) capacity := queue.Capacity() - _, err := queue.Push(blob('c', 15)) + _, err := queue.Push(blob('c', 20)) // then assertEqual(t, 50, capacity)