-
Notifications
You must be signed in to change notification settings - Fork 57
/
Copy pathqueue.go
47 lines (40 loc) · 887 Bytes
/
queue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package main
import (
"sync"
)
//Streaming message queue
type StreamQ struct {
lines []string
unAckd []string // lines being processed
lock sync.RWMutex
}
func newStreamQ() *StreamQ {
return &StreamQ{
lines: []string{},
unAckd: []string{},
lock: sync.RWMutex{},
}
}
func (q *StreamQ) Len() int { return len(q.lines) + len(q.unAckd) }
func (q *StreamQ) IsEmpty() bool { return q.Len() < 1 }
func (q *StreamQ) Add(line string) {
q.lock.Lock()
q.lines = append(q.lines, line)
q.lock.Unlock()
}
// Flush returns all lines in queue
func (q *StreamQ) Flush() []string {
q.lock.Lock()
defer q.lock.Unlock()
for _, l := range q.lines {
q.unAckd = append(q.unAckd, l)
}
q.lines = []string{}
return q.unAckd
}
// acknowledge items from last Get() have been processed
func (q *StreamQ) Ack() {
q.lock.Lock()
defer q.lock.Unlock()
q.unAckd = []string{}
}