-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpool.go
491 lines (415 loc) · 11 KB
/
pool.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
package workers
import (
"context"
"errors"
"fmt"
"sync"
"time"
)
var (
// ErrPoolClosed is triggered when trying to start and add or remove
// workers from the pool after closing it.
ErrPoolClosed = errors.New("pool is closed")
// ErrPoolStarted is triggered when trying to start the pool when it's
// already running.
ErrPoolStarted = errors.New("pool already started")
// ErrNotStarted is returned when trying to add or remove workers from
// the pool after closing it.
ErrNotStarted = errors.New("pool has not started")
// ErrInvalidMax is triggered when configuring a pool with an invalid
// maximum number of workers.
ErrInvalidMax = errors.New("the maximum is less than the minimum workers")
// ErrInvalidMin is triggered when configuring a pool with an invalid
// minimum number of workers.
ErrInvalidMin = errors.New("negative number of minimum workers")
// ErrInvalidInitial is triggered when configuring a pool with an invalid
// initial number of workers.
ErrInvalidInitial = errors.New("the initial is less the minimum workers")
// ErrMinReached is triggered when trying to remove a worker when the
// pool is already running at minimum capacity.
ErrMinReached = errors.New("minimum number of workers reached")
// ErrMaxReached is triggered when trying to add a worker when the
// pool is already running at maximum capacity.
ErrMaxReached = errors.New("maximum number of workers reached")
)
// Job represents some work that needs to be done non-stop.
type Job interface {
// Do executes the job.
//
// The only parameter that will receive is the worker context,
// the job should try to honor the context cancellation signal
// as soon as possible.
//
// The context will be cancelled when removing workers from
// the pool or stopping the pool completely
Do(ctx context.Context) error
}
// JobFunc is a helper function that is a job.
type JobFunc func(ctx context.Context) error
// Do executes the job work.
func (f JobFunc) Do(ctx context.Context) error {
return f(ctx)
}
// JobBuilder is a job that needs to be built during
// the initialization for each worker.
type JobBuilder interface {
// New generates a new job for a new worker.
New() Job
}
// JobBuilderFunc is a helper to use function as
// job builders.
type JobBuilderFunc func() Job
// New builds the new job using the underlying function.
func (f JobBuilderFunc) New() Job {
return f()
}
// Middleware is a function that wraps the job and can
// be used to extend the functionality of the pool.
type Middleware interface {
Wrap(job Job) Job
}
// MiddlewareFunc is a function that implements the
// job middleware interface.
type MiddlewareFunc func(job Job) Job
// Wrap executes the middleware function wrapping the job.
func (f MiddlewareFunc) Wrap(job Job) Job {
return f(job)
}
// Wrap is a helper to apply a chain of middleware to a job.
func Wrap(job Job, middlewares ...Middleware) Job {
for _, mw := range middlewares {
job = mw.Wrap(job)
}
return job
}
// Config allows to configure the number of workers
// that will be running in the pool.
type Config struct {
// Min indicates the minimum number of workers that can run concurrently.
// By default the pool can have 0 workers, pausing it effectively.
Min int
// Max indicates the maximum number of workers that can run concurrently.
// the default "0" indicates an infinite number of workers.
Max int
// Initial indicates the initial number of workers that should be running.
// The default value will be the greater number between 1 or the given minimum.
Initial int
// StopOnErrors indicates whether the pool should stop when job a returns an error.
StopOnErrors bool
}
// New creates a new pool with the default configuration.
//
// It accepts an arbitrary number of job middlewares to run.
func New(middlewares ...Middleware) *Pool {
return newDefault(middlewares...)
}
// NewWithConfig creates a new pool with an specific configuration.
//
// It accepts an arbitrary number of job middlewares to run.
func NewWithConfig(cfg Config, middlewares ...Middleware) (*Pool, error) {
if cfg.Initial == 0 {
cfg.Initial = defaultInitial
if cfg.Min > cfg.Initial {
cfg.Initial = cfg.Min
}
}
if cfg.Min < 0 {
return nil, fmt.Errorf("%w: min %d", ErrInvalidMin, cfg.Min)
}
if cfg.Max != 0 && cfg.Max < cfg.Min {
return nil, fmt.Errorf("%w: max: %d, min %d", ErrInvalidMax, cfg.Max, cfg.Min)
}
if cfg.Initial < cfg.Min {
return nil, fmt.Errorf("%w: initial: %d, min %d", ErrInvalidInitial, cfg.Initial, cfg.Min)
}
return &Pool{
min: cfg.Min,
max: cfg.Max,
initial: cfg.Initial,
stopOnErrors: cfg.StopOnErrors,
mws: middlewares,
}, nil
}
// Must checks if the result of creating a pool
// has failed and if so, panics.
func Must(p *Pool, err error) *Pool {
if err != nil {
panic(err)
}
return p
}
const (
defaultMin = 0
defaultMax = 0
defaultInitial = 1
)
func newDefault(middlewares ...Middleware) *Pool {
return &Pool{
min: defaultMin,
max: defaultMax,
initial: defaultInitial,
mws: middlewares,
}
}
// Pool is a pool of workers that can be started
// to run a job non-stop concurrently.
type Pool struct {
// config
min int
initial int
max int
stopOnErrors bool
// job and its workers.
jobBuilder JobBuilder
mws []Middleware
workers []*worker
// Current pool state.
started bool
closed bool
// Pool context that will be the
// parent ctx for the workers.
ctx context.Context
cancel func()
// workers will let the pool know
// when they start and when they stop
// through this channel (+1, -1)
running chan int
// Once the pool is closed and all the
// workers stopped this channel
// will be closed to signal the pool
// has finished in a clean way.
done chan struct{}
// will contain the error that trigger
// the stop when "stop on errors" is true
workerErr error
mx sync.RWMutex
}
// StartWithBuilder launches the workers and keeps them running until the pool is closed.
func (p *Pool) StartWithBuilder(jobBuilder JobBuilder) error {
return p.start(jobBuilder)
}
// Start launches the workers and keeps them running until the pool is closed.
func (p *Pool) Start(job Job) error {
return p.start(JobBuilderFunc(func() Job {
return job
}))
}
// Run is a blocking call that will start the pool and keep it
// running until the context is terminated.
//
// The pool then will be stopped without a timeout
func (p *Pool) Run(ctx context.Context, job Job) error {
return p.RunWithBuilder(ctx, JobBuilderFunc(func() Job {
return job
}))
}
// RunWithBuilder is a blocking call that will start the
// pool with a job builder and keep it running until the
// context is terminated.
//
// The pool then will be stopped without a timeout
func (p *Pool) RunWithBuilder(ctx context.Context, jobBuilder JobBuilder) error {
if err := p.StartWithBuilder(jobBuilder); err != nil {
return err
}
select {
case <-ctx.Done():
case <-p.ctx.Done():
}
return p.Close(context.Background())
}
func (p *Pool) start(jobBuilder JobBuilder) error {
p.mx.Lock()
defer p.mx.Unlock()
if p.closed {
return ErrPoolClosed
}
if p.started {
return ErrPoolStarted
}
initial := p.initial
if initial == 0 {
initial = 1
}
p.jobBuilder = jobBuilder
p.started = true
p.running = make(chan int)
p.done = make(chan struct{})
p.workers = make([]*worker, initial)
p.ctx, p.cancel = context.WithCancel(context.Background())
go p.waitForWorkersToStop()
var wg sync.WaitGroup
wg.Add(initial)
for i := 0; i < initial; i++ {
go func(i int) {
defer wg.Done()
p.workers[i] = p.newWorker()
}(i)
}
wg.Wait()
return nil
}
func (p *Pool) waitForWorkersToStop() {
var running int
defer func() {
close(p.done)
close(p.running)
}()
for {
select {
case delta := <-p.running:
running += delta
case <-p.ctx.Done():
// we may receive the pool close cancellation after
// all workers have already stop, we need to
// fallthrough
}
if running > 0 {
continue
}
// we need to understand if we have no workers
// because the pool is closed, or because the
// all workers were removed (paused)
p.mx.RLock()
isClosed := p.closed
p.mx.RUnlock()
if isClosed {
return
}
}
}
// More starts a new worker in the pool.
func (p *Pool) More() error {
p.mx.Lock()
defer p.mx.Unlock()
if p.closed {
return ErrPoolClosed
}
if !p.started {
return ErrNotStarted
}
if p.max != 0 && len(p.workers) == p.max {
return ErrMaxReached
}
p.workers = append(p.workers, p.newWorker())
return nil
}
// Less removes the number of workers in the pool.
//
// This call does not wait for the worker to finish
// its current job, if the pool is closed though,
// the call to close it will wait for all removed
// workers to finish before returning.
func (p *Pool) Less() error {
p.mx.Lock()
defer p.mx.Unlock()
if p.closed {
return ErrPoolClosed
}
if !p.started {
return ErrNotStarted
}
current := len(p.workers)
if current == p.min {
return ErrMinReached
}
// pop the last worker
w := p.workers[current-1]
p.workers = p.workers[:current-1]
// stop the worker
w.cancel()
return nil
}
// Current returns the current number of workers.
//
// There may be more workers executing jobs while
// they are to complete it's last job after being
// removed, but they will eventually finish and
// stop processing new jobs.
func (p *Pool) Current() int {
p.mx.RLock()
defer p.mx.RUnlock()
return len(p.workers)
}
// Close stops all the workers and closes the pool.
//
// Only the first call to Close will shutdown the pool,
// the next calls will be ignored and return nil.
func (p *Pool) Close(ctx context.Context) error {
if err := p.close(); err != nil {
return err
}
if !p.hasStarted() {
return nil
}
p.cancel()
select {
case <-ctx.Done():
return ctx.Err()
case <-p.done:
return p.workerErr
}
}
// close attempts to close the pool.
func (p *Pool) hasStarted() bool {
p.mx.RLock()
defer p.mx.RUnlock()
return p.started
}
// close attempts to close the pool.
func (p *Pool) close() error {
p.mx.Lock()
defer p.mx.Unlock()
if p.closed {
return ErrPoolClosed
}
p.closed = true
return nil
}
// CloseWithTimeout closes the pool waiting
// for a certain amount of time.
func (p *Pool) CloseWithTimeout(timeout time.Duration) error {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
return p.Close(ctx)
}
func (p *Pool) newWorker() *worker {
ctx, cancel := context.WithCancel(p.ctx)
w := &worker{
cancel: cancel,
stopOnErrors: p.stopOnErrors,
}
p.running <- 1
go func() {
defer func() {
p.running <- -1
}()
err := w.work(ctx, Wrap(p.jobBuilder.New(), p.mws...))
if err != nil {
p.mx.Lock()
if p.workerErr == nil {
p.workerErr = err
}
p.mx.Unlock()
p.cancel()
}
}()
return w
}
type worker struct {
cancel func()
stopOnErrors bool
}
func (w *worker) work(ctx context.Context, job Job) error {
for {
select {
case <-ctx.Done():
return nil
default:
err := job.Do(ctx)
if err != nil && !errors.Is(err, context.Canceled) && w.stopOnErrors {
return err
}
}
}
}