-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathconcurrentRingBuffer.go
63 lines (53 loc) · 1.74 KB
/
concurrentRingBuffer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
package blockingQueues
import (
"runtime"
"sync/atomic"
)
type ConcurrentRingBuffer struct {
// The padding members
// below are here to ensure each item is on a separate cache line.
pad1 [8]uint64
lastCommittedIndex uint64
pad2 [8]uint64
writeIndex uint64
pad3 [8]uint64
readIndex uint64
pad4 [8]uint64
store []interface{} // This will gain speed if its a specific type
pad5 [8]uint64
}
func NewConcurrentRingBuffer(capacity uint64) *ConcurrentRingBuffer {
return &ConcurrentRingBuffer{
lastCommittedIndex: 0,
writeIndex: 1,
readIndex: 1,
store: make([]interface{}, capacity),
}
}
func (q *ConcurrentRingBuffer) Put(value interface{}) (bool, error) {
// Load next write index
var nextWriteIndex = atomic.AddUint64(&q.writeIndex, 1) - 1
var mask = uint64(cap(q.store) - 1)
// Wait for reader to catch up as we don't want to go too far on the writes
for nextWriteIndex > (q.readIndex + mask - 1) {
// This will block the writer if the store is full
runtime.Gosched()
}
// Write the item into it's slot
q.store[nextWriteIndex&mask] = value
// Increment the lastCommittedIndex so the item is available for reading
for !atomic.CompareAndSwapUint64(&q.lastCommittedIndex, nextWriteIndex-1, nextWriteIndex) {
runtime.Gosched()
}
return true, nil
}
func (q *ConcurrentRingBuffer) Get() (interface{}, error) {
// Load next read index
var nextReadIndex = atomic.AddUint64(&q.readIndex, 1) - 1
var mask = uint64(cap(q.store) - 1)
// If reader has out-run writer, wait for a value to be committed
for nextReadIndex > q.lastCommittedIndex {
runtime.Gosched()
}
return q.store[nextReadIndex&mask], nil
}