forked from krakend/krakend-ratelimit
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathkrakendrate.go
190 lines (157 loc) · 4.02 KB
/
krakendrate.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
// krakendrate contains a collection of curated rate limit adaptors for the KrakenD framework
package krakendrate
import (
"context"
"errors"
"sync"
"time"
)
var (
// ErrLimited is the error returned when the rate limit has been exceded
ErrLimited = errors.New("ERROR: rate limit exceded")
// DataTTL is the default eviction time
DataTTL = 10 * time.Minute
now = time.Now
shards uint64 = 2048
)
// Limiter defines a simple interface for a rate limiter
type Limiter interface {
Allow() bool
}
// LimiterStore defines the interface for a limiter lookup function
type LimiterStore func(string) Limiter
// Hasher gets a hash for the received string
type Hasher func(string) uint64
// Backend is the interface of the persistence layer
type Backend interface {
Load(string, func() interface{}) interface{}
Store(string, interface{}) error
}
// ShardedMemoryBackend is a memory backend shardering the data in order to avoid mutex contention
type ShardedMemoryBackend struct {
shards []*MemoryBackend
total uint64
hasher Hasher
}
// DefaultShardedMemoryBackend is a 2018 sharded ShardedMemoryBackend
func DefaultShardedMemoryBackend(ctx context.Context) *ShardedMemoryBackend {
return NewShardedMemoryBackend(ctx, shards, DataTTL, PseudoFNV64a)
}
// NewShardedMemoryBackend returns a ShardedMemoryBackend with 'shards' shards
func NewShardedMemoryBackend(ctx context.Context, shards uint64, ttl time.Duration, h Hasher) *ShardedMemoryBackend {
b := &ShardedMemoryBackend{
shards: make([]*MemoryBackend, shards),
total: shards,
hasher: h,
}
var i uint64
for i = 0; i < shards; i++ {
b.shards[i] = NewMemoryBackend(ctx, ttl)
}
return b
}
func (b *ShardedMemoryBackend) shard(key string) uint64 {
return b.hasher(key) % b.total
}
// Load implements the Backend interface
func (b *ShardedMemoryBackend) Load(key string, f func() interface{}) interface{} {
return b.shards[b.shard(key)].Load(key, f)
}
// Store implements the Backend interface
func (b *ShardedMemoryBackend) Store(key string, v interface{}) error {
return b.shards[b.shard(key)].Store(key, v)
}
func (b *ShardedMemoryBackend) del(key ...string) {
buckets := map[uint64][]string{}
for _, k := range key {
h := b.shard(k)
ks, ok := buckets[h]
if !ok {
ks = []string{k}
} else {
ks = append(ks, k)
}
buckets[h] = ks
}
for s, ks := range buckets {
b.shards[s].del(ks...)
}
}
func NewMemoryBackend(ctx context.Context, ttl time.Duration) *MemoryBackend {
m := &MemoryBackend{
data: map[string]interface{}{},
lastAccess: map[string]time.Time{},
mu: new(sync.RWMutex),
}
go m.manageEvictions(ctx, ttl)
return m
}
// MemoryBackend implements the backend interface by wrapping a sync.Map
type MemoryBackend struct {
data map[string]interface{}
lastAccess map[string]time.Time
mu *sync.RWMutex
}
func (m *MemoryBackend) manageEvictions(ctx context.Context, ttl time.Duration) {
t := time.NewTicker(ttl)
for {
keysToDel := []string{}
select {
case <-ctx.Done():
t.Stop()
return
case now := <-t.C:
m.mu.RLock()
for k, v := range m.lastAccess {
if v.Add(ttl).Before(now) {
keysToDel = append(keysToDel, k)
}
}
m.mu.RUnlock()
}
m.del(keysToDel...)
}
}
// Load implements the Backend interface
func (m *MemoryBackend) Load(key string, f func() interface{}) interface{} {
m.mu.RLock()
v, ok := m.data[key]
m.mu.RUnlock()
n := now()
if ok {
go func(t time.Time) {
m.mu.Lock()
if t0, ok := m.lastAccess[key]; !ok || t.After(t0) {
m.lastAccess[key] = t
}
m.mu.Unlock()
}(n)
return v
}
m.mu.Lock()
defer m.mu.Unlock()
v, ok = m.data[key]
if ok {
return v
}
v = f()
m.lastAccess[key] = n
m.data[key] = v
return v
}
// Store implements the Backend interface
func (m *MemoryBackend) Store(key string, v interface{}) error {
m.mu.Lock()
m.lastAccess[key] = now()
m.data[key] = v
m.mu.Unlock()
return nil
}
func (m *MemoryBackend) del(key ...string) {
m.mu.Lock()
for _, k := range key {
delete(m.data, k)
delete(m.lastAccess, k)
}
m.mu.Unlock()
}