-
Notifications
You must be signed in to change notification settings - Fork 2
/
workerpool.go
125 lines (99 loc) · 2.5 KB
/
workerpool.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
package worker
import (
"fmt"
"os"
"os/signal"
"sort"
"strings"
"syscall"
"github.com/guilhermehubner/worker/broker"
"github.com/guilhermehubner/worker/log"
"golang.org/x/net/context"
)
var gracefulStop = make(chan os.Signal)
type Pool struct {
broker *broker.AMQPBroker
workers []*worker
jobTypes jobTypes
middlewares []Middleware
stop bool
}
type Middleware func(context.Context, NextMiddleware) error
type NextMiddleware func(context.Context) error
func (wp *Pool) GetPoolStatus() ([]broker.Status, error) {
stats := make([]broker.Status, 0, len(wp.jobTypes))
for _, jobType := range wp.jobTypes {
s, err := wp.broker.GetQueueStatus(jobType.Name)
if err != nil {
return nil, err
}
stats = append(stats, s)
}
return stats, nil
}
// Start starts the workers and associated processes.
func (wp *Pool) Start() {
signal.Notify(gracefulStop, syscall.SIGTERM, syscall.SIGINT)
go func() {
<-gracefulStop
wp.stop = true
for _, w := range wp.workers {
if w != nil {
w.cancel <- struct{}{}
}
}
}()
sort.Sort(wp.jobTypes)
getJob := func() (*broker.Message, *JobType, error) {
for _, jobType := range wp.jobTypes {
if wp.stop {
return nil, nil, nil
}
msg, err := wp.broker.GetMessage(jobType.Name)
if err != nil {
return nil, nil, err
}
if msg == nil {
continue
}
return msg, &jobType, nil
}
return nil, nil, nil
}
workersEnded := make([]chan struct{}, 0, len(wp.workers))
for i := range wp.workers {
wp.workers[i] = newWorker(wp.middlewares, getJob)
workersEnded = append(workersEnded, wp.workers[i].start())
}
for i, w := range wp.workers {
<-w.ended
log.Get().Info(fmt.Sprintf("Finish worker %d", i+1))
}
}
/*
RegisterJob adds a job with handler for 'name' queue and allows you to specify options such as a
job's priority and it's retry count.
*/
func (wp *Pool) RegisterJob(job JobType) {
err := wp.broker.RegisterJob(job.Name)
if err != nil {
// TODO
}
wp.jobTypes = append(wp.jobTypes, job)
}
/*
NewWorkerPool creates a new worker pool.
URL is a string connection in the AMQP URI format.
Concurrency specifies how many workers to spin up - each worker can process jobs concurrently.
*/
func NewWorkerPool(url string, concurrency uint, middlewares ...Middleware) *Pool {
if strings.TrimSpace(url) == "" {
panic("worker workerpool: needs a non-empty url")
}
wp := &Pool{
broker: broker.NewBroker(url),
middlewares: middlewares,
workers: make([]*worker, concurrency),
}
return wp
}