-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathcb_mgr.go
113 lines (95 loc) · 3.03 KB
/
cb_mgr.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
package dials
import (
"context"
"fmt"
)
type callbackMgr[T any] struct {
p *Params[T]
ch <-chan userCallbackEvent
}
type userCallbackEvent interface {
isUserCallbackEvent()
}
type newConfigEvent[T any] struct {
oldConfig, newConfig *T
serial uint64
globalCBsSuppressed bool
}
func (*newConfigEvent[T]) isUserCallbackEvent() {}
var _ userCallbackEvent = (*newConfigEvent[struct{}])(nil)
// watchErrorEvent sends the arguments to an OnWatchedError callback. The
// fields here must stay in sync with the arguments to WatchedErrorHandler.
type watchErrorEvent[T any] struct {
err error
oldConfig, newConfig *T
}
func (*watchErrorEvent[T]) isUserCallbackEvent() {}
var _ userCallbackEvent = (*watchErrorEvent[struct{}])(nil)
type userCallbackHandle[T any] struct {
cb NewConfigHandler[T]
minSerial uint64
}
type userCallbackRegistration[T any] struct {
handle *userCallbackHandle[T]
serial *CfgSerial[T]
}
func (*userCallbackRegistration[T]) isUserCallbackEvent() {}
var _ userCallbackEvent = (*userCallbackRegistration[struct{}])(nil)
type userCallbackUnregister[T any] struct {
// handle describes the relevant callback, and is the key in the newCfgCBs set tracked by
// runCBs.
handle *userCallbackHandle[T]
// done is closed by runCBs immediately after it's removed the handle from its set of user
// callbacks to run.
done chan<- struct{}
}
func (*userCallbackUnregister[T]) isUserCallbackEvent() {}
var _ userCallbackEvent = (*userCallbackUnregister[struct{}])(nil)
func (cbm *callbackMgr[T]) runCBs(ctx context.Context) {
newCfgCBs := make([]*userCallbackHandle[T], 0)
lastSerial := uint64(0)
lastVersion := (*T)(nil)
for ev := range cbm.ch {
switch e := ev.(type) {
case *watchErrorEvent[T]:
if cbm.p.OnWatchedError != nil {
cbm.p.OnWatchedError(ctx, e.err, e.oldConfig, e.newConfig)
}
case *newConfigEvent[T]:
lastSerial = e.serial
lastVersion = e.newConfig
if cbm.p.OnNewConfig != nil && !e.globalCBsSuppressed {
cbm.p.OnNewConfig(ctx, e.oldConfig, e.newConfig)
}
for _, cbh := range newCfgCBs {
if cbh.minSerial >= e.serial {
// Skip the callback if it was registered with a serial for
// a version that we haven't caught up to yet.
continue
}
cbh.cb(ctx, e.oldConfig, e.newConfig)
}
case *userCallbackRegistration[T]:
// Serial values are assigned sequentially, so make sure we don't deliver an
// older config if we've fallen behind.
if e.serial.cfg != nil && e.serial.s < lastSerial {
e.handle.cb(ctx, e.serial.cfg, lastVersion)
}
// add this callback to the set of callbacks
newCfgCBs = append(newCfgCBs, e.handle)
case *userCallbackUnregister[T]:
removed := make([]*userCallbackHandle[T], 0, len(newCfgCBs)-1)
for _, cb := range newCfgCBs {
if e.handle == cb {
// don't add the one we're removing to the new list
continue
}
removed = append(removed, cb)
}
newCfgCBs = removed
close(e.done)
default:
panic(fmt.Errorf("unknown type %T for user callback event", ev))
}
}
}