forked from yale8848/gorpool
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathgorpool.go
135 lines (118 loc) · 2.42 KB
/
gorpool.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
// Create by Yale 2018/1/8 14:47
package gorpool
import (
"sync"
)
//Job function
type Job func()
type worker struct {
workerPool chan *worker
jobQueue chan Job
stop chan struct{}
}
type Pool struct {
dispatcher *dispatcher
wg sync.WaitGroup
enableWaitForAll bool
workerNum int
jobNum int
workerCount int
}
type dispatcher struct {
workerPool chan *worker
jobQueue chan Job
stop chan struct{}
}
func newWorker(workerPool chan *worker) *worker {
return &worker{
workerPool: workerPool,
jobQueue: make(chan Job),
stop: make(chan struct{}),
}
}
//one worker start to work
func (w *worker) start() {
for {
w.workerPool <- w
select {
case job := <-w.jobQueue:
job()
case <-w.stop:
w.stop <- struct{}{}
return
}
}
}
//Dispatch job to free worker
func (dis *dispatcher) dispatch() {
for {
select {
case job := <-dis.jobQueue:
worker := <-dis.workerPool
worker.jobQueue <- job
case <-dis.stop:
for i := 0; i < cap(dis.workerPool); i++ {
worker := <-dis.workerPool
worker.stop <- struct{}{}
<-worker.stop
}
dis.stop <- struct{}{}
return
}
}
}
func newDispatcher(workerPool chan *worker, jobQueue chan Job) *dispatcher {
return &dispatcher{workerPool: workerPool, jobQueue: jobQueue, stop: make(chan struct{})}
}
//workerNum is worker number of worker pool,on worker have one goroutine
//
//jobNum is job number of job pool
func NewPool(workerNum, jobNum int) *Pool {
workers := make(chan *worker, workerNum)
jobs := make(chan Job, jobNum)
pool := &Pool{
dispatcher: newDispatcher(workers, jobs),
enableWaitForAll: false,
workerNum:workerNum,
jobNum:jobNum,
}
return pool
}
//Add one job to job pool
func (p *Pool) AddJob(job Job) {
if p.enableWaitForAll {
p.wg.Add(1)
}
p.dispatcher.jobQueue <- func() {
job()
if p.enableWaitForAll {
p.wg.Done()
}
}
if len(p.dispatcher.jobQueue) >0{
if p.workerCount < p.workerNum {
worker := newWorker(p.dispatcher.workerPool)
go worker.start()
p.workerCount++
}
}
}
func (p *Pool) WaitForAll() {
if p.enableWaitForAll {
p.wg.Wait()
}
}
func (p *Pool) StopAll() {
p.dispatcher.stop <- struct{}{}
<-p.dispatcher.stop
}
//Enable whether enable WaitForAll
func (p *Pool) EnableWaitForAll(enable bool) *Pool {
p.enableWaitForAll = enable
return p
}
//Start worker pool and dispatch
func (p *Pool) Start() *Pool {
go p.dispatcher.dispatch()
return p
}