-
Notifications
You must be signed in to change notification settings - Fork 3
/
observer.go
124 lines (112 loc) · 2.8 KB
/
observer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
package observer
import (
"context"
"errors"
"sync"
"sync/atomic"
"time"
)
var ErrClientAlreadyDeRegistered = errors.New("client already de-registered")
type CancelFunc func() error
// Observable
type Observable[T any] interface {
// Subscribe registers a client to the observable.
Subscribe() (<-chan T, CancelFunc)
// NotifyAll notifies all registered clients.
NotifyAll(data T)
}
// Observer offers the possibility to notify all registered clients.
// Since the client map must be initialized, it is not possible to use this structure directly.
// Use NewObserver instead.
type Observer[T any] struct {
NotifyTimeout time.Duration
clients sync.Map
mu sync.RWMutex
n int64
numDeleted int64
}
func (o *Observer[T]) notifyTimeout() time.Duration {
if o.NotifyTimeout != 0 {
return o.NotifyTimeout
}
return time.Second * 5
}
// Subscribe registers a client to the observer and returns a channel to receive notifications.
// The returned CancelFunc can be used to de-register the client.
func (o *Observer[T]) Subscribe() (<-chan T, CancelFunc) {
n := atomic.AddInt64(&o.n, 1)
listenCh := make(chan T)
o.clients.Store(n, listenCh)
return listenCh, func() error {
return o.deleteClient(n)
}
}
func (o *Observer[T]) deleteClient(key any) error {
o.mu.Lock()
defer o.mu.Unlock()
c, ok := o.clients.LoadAndDelete(key)
if !ok {
return ErrClientAlreadyDeRegistered
}
atomic.AddInt64(&o.numDeleted, 1)
close(c.(chan T))
return nil
}
// NotifyAll notifies all registered clients.
func (o *Observer[T]) NotifyAll(data T) {
o.clients.Range(func(key, _ any) bool {
go func(key any) {
o.mu.RLock()
defer o.mu.RUnlock()
client, ok := o.clients.Load(key)
if !ok {
return
}
select {
case client.(chan T) <- data:
// the message was sent successfully
return
case <-time.After(o.notifyTimeout()):
// client is not responding
return
}
}(key)
return true
})
}
// Clients returns the number of registered clients.
func (o *Observer[t]) Clients() int64 {
return o.n - o.numDeleted
}
// Handle builds repetitive message consumer using provided handler function h.
// Returned func() error value is suitable to run in errrgroup's Go() method.
func Handle[T any](ctx context.Context, o Observable[T], h func(context.Context, T) error) func() error {
msgs, unsub := o.Subscribe()
return func() error {
defer unsub()
for {
select {
case <-ctx.Done():
return ctx.Err()
case upd, ok := <-msgs:
if !ok {
return nil
}
if err := h(ctx, upd); err != nil {
return err
}
}
}
}
}
// Close disconnects all clients from the observer
func (o *Observer[T]) Close() error {
var res error
o.clients.Range(func(key, _ any) bool {
if err := o.deleteClient(key); err != nil {
res = err
}
return true
})
return res
}