-
Notifications
You must be signed in to change notification settings - Fork 3
/
filter.go
75 lines (63 loc) · 1.74 KB
/
filter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
package bond
import (
"context"
"fmt"
"sync/atomic"
)
type FilterStorer interface {
Getter
Setter
DeleterWithRange
}
type Filter interface {
Add(ctx context.Context, key []byte)
MayContain(ctx context.Context, key []byte) bool
Load(ctx context.Context, store FilterStorer) error
Save(ctx context.Context, store FilterStorer) error
Clear(ctx context.Context, store FilterStorer) error
}
type FilterInitializable struct {
Filter
isInitialized uint64
}
func (f *FilterInitializable) MayContain(ctx context.Context, key []byte) bool {
if atomic.LoadUint64(&f.isInitialized) == 1 {
return f.Filter.MayContain(ctx, key)
} else {
return true
}
}
func (f *FilterInitializable) Initialize(ctx context.Context, filterStorer FilterStorer, scanners []TableScanner[any]) error {
err := FilterInitialize(ctx, f.Filter, filterStorer, scanners)
if err != nil {
return err
}
atomic.StoreUint64(&f.isInitialized, 1)
return nil
}
func (f *FilterInitializable) Save(ctx context.Context, store FilterStorer) error {
if atomic.LoadUint64(&f.isInitialized) == 1 {
return f.Filter.Save(ctx, store)
} else {
return fmt.Errorf("filter not initialized")
}
}
func FilterInitialize(ctx context.Context, filter Filter, filterStorer FilterStorer, scanners []TableScanner[any]) error {
err := filter.Load(ctx, filterStorer)
if err != nil {
err = filter.Clear(ctx, filterStorer)
if err != nil {
return fmt.Errorf("filter initialization failed: %w", err)
}
for _, scanner := range scanners {
err = scanner.ScanForEach(ctx, func(keyBytes KeyBytes, lazy Lazy[any]) (bool, error) {
filter.Add(ctx, keyBytes)
return true, nil
}, false)
if err != nil {
return fmt.Errorf("filter initialization failed: %w", err)
}
}
}
return nil
}