-
Notifications
You must be signed in to change notification settings - Fork 8
/
pool-manager.js
197 lines (176 loc) · 6.38 KB
/
pool-manager.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
var _ = require('underscore');
// endpoint states
var CLOSED = 0; // closed circuit: endpoint is good to use
var HALF_OPEN_READY = 1; // endpoint is in recovery state: offer it for use once
var HALF_OPEN_PENDING = 2; // endpoint recovery is in process
var OPEN = 3; // open circuit: endpoint is no good
function PoolManager (options) {
options = options || {};
this.endpoints = [];
this._endpointOffset = 0;
this.isInPool = options.isInPool || _.constant(true);
this.onEndpointReturned = options.onEndpointReturned || _.noop;
this.onEndpointRegistered = options.onEndpointRegistered || _.noop;
this.onEndpointSelected = options.onEndpointSelected || _.noop;
}
PoolManager.prototype = {
hasEndpoints: function () {
return this.endpoints.length > 0;
},
getNextEndpoint: function () {
var i;
var l;
var offset;
var endpoint;
for (i = 0, l = this.endpoints.length; i < l; ++i) {
offset = (this._endpointOffset + i) % l;
endpoint = this.endpoints[offset];
if (this.isInPool(endpoint)) {
this.onEndpointSelected(endpoint);
this._endpointOffset = offset + 1;
return endpoint;
}
}
},
updateEndpoints: function (endpoints) {
var matchingEndpoint;
var i;
var newEndpoints = endpoints.map(function (info) {
return new Endpoint(info);
});
for (i = this.endpoints.length; i--;) {
matchingEndpoint = _.findWhere(newEndpoints, { url: this.endpoints[i].url });
if (matchingEndpoint) { // found a match, remove it from `newEndpoints`, since it's not new
newEndpoints = _.without(newEndpoints, matchingEndpoint);
} else { // didn't find a match in endpoints, so kill that endpoint
this.endpoints.splice(i, 1);
}
}
newEndpoints.forEach(function (endpoint) {
endpoint.callback = this.onEndpointReturned.bind(this, endpoint);
this.onEndpointRegistered(endpoint);
}, this);
// push all the actually-new endpoints in
this.endpoints.push.apply(this.endpoints, newEndpoints);
},
getStatus: function () {
var manager = this;
return {
total: this.endpoints.length,
unhealthy: this.endpoints.reduce(function (badCount, endpoint) {
return badCount + (manager.isInPool(endpoint) ? 0 : 1);
}, 0)
};
}
};
function Endpoint(info) {
this.name = info.name;
this.port = info.port;
this.url = info.name + ':' + info.port;
}
module.exports = {
defaultPoolManager: function () {
return new PoolManager();
},
ejectOnErrorPoolManager: function (options) {
if (!options) {
throw new Error('Must supply arguments to ejectOnErrorPoolManager');
}
var poolConfig;
if (options.failureWindow && options.maxFailures && options.resetTimeout) {
poolConfig = getRollingWindowConfiguration(options.failureWindow, options.maxFailures, options.resetTimeout);
} else if (options.failureRate && options.failureRateWindow && options.resetTimeout) {
poolConfig = getRateConfiguration(options.failureRate, options.failureRateWindow, options.resetTimeout);
} else {
throw new Error('Must supply either configuration to ejectOnErrorPoolManager');
}
return new PoolManager(poolConfig);
function disableEndpoint(endpoint) {
if (endpoint.state === OPEN) {
return;
}
endpoint.state = OPEN;
clearInterval(endpoint._reopenTimeout);
endpoint._reopenTimeout = setTimeout(function () {
endpoint.state = HALF_OPEN_READY;
}, options.resetTimeout);
}
function isInPool(endpoint) {
return endpoint.state === CLOSED || endpoint.state === HALF_OPEN_READY;
}
function onEndpointSelected(endpoint) {
if (endpoint.state === HALF_OPEN_READY) {
endpoint.state = HALF_OPEN_PENDING; // let one through, then turn it off again
}
}
function getRollingWindowConfiguration(failureWindow, maxFailures, resetTimeout) {
return {
isInPool: isInPool,
onEndpointSelected: onEndpointSelected,
onEndpointRegistered: function (endpoint) {
endpoint.state = CLOSED;
// A ring buffer, holding the timestamp of each error. As we loop around the ring, the timestamp in the slot we're
// about to fill will tell us the error rate. That is, `maxFailure` number of requests in how many milliseconds?
endpoint.buffer = new RingBuffer(maxFailures - 1);
},
onEndpointReturned: function (endpoint, err) {
if (err) {
if (endpoint.state === OPEN) {
return;
}
if (endpoint.buffer.size === 0) {
disableEndpoint(endpoint);
return;
}
var now = Date.now();
var oldestErrorTime = endpoint.buffer.read();
endpoint.buffer.write(now);
if (endpoint.state === HALF_OPEN_PENDING || (oldestErrorTime != null && now - oldestErrorTime <= failureWindow)) {
disableEndpoint(endpoint);
}
} else if (endpoint.state === HALF_OPEN_PENDING) {
endpoint.state = CLOSED;
}
}
};
}
function getRateConfiguration(failureRate, failureRateWindow, resetTimeout) {
var maxErrorCount = failureRate * failureRateWindow;
return {
isInPool: isInPool,
onEndpointSelected: onEndpointSelected,
onEndpointRegistered: function (endpoint) {
endpoint.state = CLOSED;
endpoint.buffer = new RingBuffer(failureRateWindow);
endpoint.errors = 0;
},
onEndpointReturned: function (endpoint, err) {
var state = endpoint.state;
var newStatus = err ? 1 : 0;
var oldestStatus = endpoint.buffer.read() ? 1 : 0;
endpoint.buffer.write(newStatus);
endpoint.errors += newStatus - oldestStatus;
if (err && (state === HALF_OPEN_PENDING || endpoint.errors >= maxErrorCount)) {
disableEndpoint(endpoint);
} else if (!err && state === HALF_OPEN_PENDING) {
endpoint.state = CLOSED;
}
}
};
}
}
};
function RingBuffer(size) {
this.buffer = new Array(size);
this.offset = 0;
this.size = size;
}
_.assign(RingBuffer.prototype, {
read: function () {
return this.buffer[this.offset];
},
write: function (val) {
this.buffer[this.offset] = val;
this.offset = (this.offset + 1) % this.size;
}
});