-
Notifications
You must be signed in to change notification settings - Fork 12
/
updates_collector.go
99 lines (90 loc) · 2.13 KB
/
updates_collector.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
package kvdb
import (
"fmt"
"strings"
"sync"
"github.com/sirupsen/logrus"
)
type kvdbUpdate struct {
// prefix is the path on which update was triggered
prefix string
// kvp is the actual key-value pair update
kvp *KVPair
// errors on update
err error
}
type updatesCollectorImpl struct {
// stopped is true if collection is stopped
stopped bool
// stoppedMutex protects stopped flag
stoppedMutex sync.RWMutex
// start index
startIndex uint64
// updatesMutex protects updates and start index
updatesMutex sync.Mutex
// updates stores the updates in order
updates []*kvdbUpdate
}
func (c *updatesCollectorImpl) setStopped() {
c.stoppedMutex.Lock()
c.stopped = true
c.stoppedMutex.Unlock()
}
func (c *updatesCollectorImpl) isStopped() bool {
c.stoppedMutex.RLock()
defer c.stoppedMutex.RUnlock()
return c.stopped
}
func (c *updatesCollectorImpl) watchCb(
prefix string,
opaque interface{},
kvp *KVPair,
err error,
) error {
if c.isStopped() {
return fmt.Errorf("Stopped watch")
}
if err != nil {
c.setStopped()
return err
}
update := &kvdbUpdate{prefix: prefix, kvp: kvp, err: err}
c.updatesMutex.Lock()
c.updates = append(c.updates, update)
c.updatesMutex.Unlock()
return nil
}
func (c *updatesCollectorImpl) Stop() {
logrus.Info("Stopping updates collector")
c.setStopped()
}
func (c *updatesCollectorImpl) ReplayUpdates(
cbList []ReplayCb,
) (uint64, error) {
c.updatesMutex.Lock()
updates := make([]*kvdbUpdate, len(c.updates))
copy(updates, c.updates)
c.updatesMutex.Unlock()
index := c.startIndex
logrus.Infof("collect: replaying %d update(s) for %d callback(s)",
len(updates), len(cbList))
for _, update := range updates {
if update.kvp == nil {
continue
}
index = update.kvp.ModifiedIndex
for _, cbInfo := range cbList {
if strings.HasPrefix(update.kvp.Key, cbInfo.Prefix) &&
cbInfo.WaitIndex < update.kvp.ModifiedIndex {
err := cbInfo.WatchCB(update.prefix, cbInfo.Opaque, update.kvp,
update.err)
if err != nil {
logrus.Infof("collect error: watchCB returned error: %v",
err)
return index, err
}
} // else ignore the update
}
}
return index, nil
}