-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathpool.go
178 lines (146 loc) · 3.63 KB
/
pool.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
package main
import (
"github.com/artyom/metrics"
"math"
"sync"
"time"
)
type idRange struct {
owner string
expires time.Time // when this range can no longer be distributed
start uint64 // inclusive
end uint64 // exclusive
}
func newIDRange(start uint64, end uint64, owner string) idRange {
return idRange{
expires: time.Now().Add(time.Second),
start: start,
end: end, owner: owner,
}
}
func (rng idRange) isValid() bool {
return rng.start > 0 && rng.end > rng.start
}
func (rng idRange) size() uint64 {
return rng.end - rng.start
}
type pool struct {
sync.Mutex
ranges []idRange // ranges we're still waiting to distribute
out chan uint64 // used internally to hand out ids
notify chan uint64 // used to notify that we need another range
distributing bool // are we currently handing out ids?
restart chan bool // receives when we should request a new range
rate metrics.EWMA // keep track of how much traffic we're getting
ticked bool // have we measured the rate yet?
}
func newPool(notify chan uint64) *pool {
p := &pool{
ranges: make([]idRange, 0, 2),
out: make(chan uint64),
restart: make(chan bool),
notify: notify,
rate: metrics.NewEWMA1(),
}
// tick our metrics
go func() {
for _ = range time.Tick(metrics.TickDuration) {
p.rate.Tick()
p.ticked = true
}
}()
// get this party started!
go func() {
p.restart <- true
}()
return p
}
func (pool *pool) nextRangeSize() uint64 {
r := pool.rate.Rate()
switch {
case !pool.ticked:
// if we haven't ticked yet, default to 10
return 10
case r < 1:
return 1
case r < 4:
return uint64(r)
default:
return uint64(r * 0.75)
}
if pool.rate.Rate() == 0 {
return 10
} else {
return uint64(math.Max(1, pool.rate.Rate()))
}
}
func (pool *pool) addRange(rng idRange) {
log.Info("got a new range %v", rng.size())
pool.Lock()
defer pool.Unlock()
if !pool.distributing {
pool.distributing = true
go func() {
pool.distributeRange(rng)
}()
} else {
pool.ranges = append(pool.ranges, rng)
}
}
func (pool *pool) distributeRange(rng idRange) {
threshold := rng.start + uint64(float64(rng.end-rng.start)*0.9)
requested := false
RangeLoop:
for i := rng.start; i < rng.end; i++ {
if time.Now().After(rng.expires) {
log.Info("range expired top!")
break
}
// I'm not totally sure what to do about this area. In theory, the
// range could expire right here, after the above check, but before
// we create the timer. That would mean it waits for the next get,
// which could be an undetermined point in the future, and screws with
// our gaurantees.
select {
case <-time.NewTimer(rng.expires.Sub(time.Now())).C:
log.Info("range expired!")
break RangeLoop
case pool.out <- i:
}
if i == threshold && rng.size() > 2 {
pool.Lock()
if len(pool.ranges) == 0 {
// we're about to run out! request some more
log.Info("requesting another range")
requested = true
pool.notify <- pool.nextRangeSize()
}
pool.Unlock()
}
// update our rate
pool.rate.Update(1)
}
// make sure there's nothing left in the queue
var next idRange
pool.Lock()
if len(pool.ranges) > 0 {
next, pool.ranges = pool.ranges[0], pool.ranges[1:] // shift
}
pool.distributing = next.isValid()
pool.Unlock()
if pool.distributing {
pool.distributeRange(next)
} else if requested == false {
pool.restart <- true
}
}
// where consumers wait on the next id
func (pool *pool) getID() uint64 {
select {
case id := <-pool.out:
return id
case <-pool.restart:
pool.notify <- pool.nextRangeSize()
return pool.getID()
}
}