-
Notifications
You must be signed in to change notification settings - Fork 0
/
store.go
93 lines (75 loc) · 1.63 KB
/
store.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
package snatch
import (
"time"
)
// Store represents a Bucket store.
type Store interface {
// Add adds Buckets into the Store.
Add(...*Bucket) error
// Scan scans the store for complete Buckets.
Scan() (<-chan *Bucket, error)
// Flush flushes all Buckets from the Store.
Flush() (<-chan *Bucket, error)
}
type memStore struct {
res time.Duration
store map[int64]map[string]*Bucket
}
// NewStore creates a new in-memory store.
func NewStore(res time.Duration) Store {
return &memStore{
res: res,
store: map[int64]map[string]*Bucket{},
}
}
// Add adds Buckets into the Store.
func (s *memStore) Add(bkts ...*Bucket) error {
for _, bkt := range bkts {
ts, key := bkt.ID.Keys()
box, ok := s.store[ts]
if !ok {
s.store[ts] = map[string]*Bucket{
key: bkt,
}
continue
}
if b, ok := box[key]; ok {
b.Merge(bkt)
continue
}
box[key] = bkt
}
return nil
}
// Scan scans the store for complete Buckets.
func (s *memStore) Scan() (<-chan *Bucket, error) {
buckets := make(chan *Bucket, 1000)
go func(out chan *Bucket) {
ready := time.Now().Truncate(s.res).Add(-1 * (s.res + time.Second)).Unix()
for ts, box := range s.store {
if ts >= ready {
continue
}
for _, v := range box {
out <- v
}
delete(s.store, ts)
}
close(out)
}(buckets)
return buckets, nil
}
// Flush flushes all Buckets from the Store.
func (s *memStore) Flush() (<-chan *Bucket, error) {
buckets := make(chan *Bucket, 1000)
go func(out chan *Bucket) {
for ts, box := range s.store {
for _, v := range box {
out <- v
}
delete(s.store, ts)
}
close(out)
}(buckets)
return buckets, nil
}