11package gopool
22
33import (
4- "sync"
5- "time"
4+ "sync"
5+ "time"
66)
77
88// task represents a function that will be executed by a worker.
@@ -11,151 +11,151 @@ type task func() (interface{}, error)
1111
1212// goPool represents a pool of workers.
1313type goPool struct {
14- workers []* worker
15- workerStack []int
16- maxWorkers int
17- // Set by WithMinWorkers(), used to adjust the number of workers. Default equals to maxWorkers.
18- minWorkers int
19- // tasks are added to this channel first, then dispatched to workers. Default buffer size is 1 million.
20- taskQueue chan task
21- // Set by WithRetryCount(), used to retry a task when it fails. Default is 0.
22- retryCount int
23- lock sync.Locker
24- cond * sync.Cond
25- // Set by WithTimeout(), used to set a timeout for a task. Default is 0, which means no timeout.
26- timeout time.Duration
27- // Set by WithResultCallback(), used to handle the result of a task. Default is nil.
28- resultCallback func (interface {})
29- // Set by WithErrorCallback(), used to handle the error of a task. Default is nil.
30- errorCallback func (error )
31- // adjustInterval is the interval to adjust the number of workers. Default is 1 second.
32- adjustInterval time.Duration
14+ workers []* worker
15+ workerStack []int
16+ maxWorkers int
17+ // Set by WithMinWorkers(), used to adjust the number of workers. Default equals to maxWorkers.
18+ minWorkers int
19+ // tasks are added to this channel first, then dispatched to workers. Default buffer size is 1 million.
20+ taskQueue chan task
21+ // Set by WithRetryCount(), used to retry a task when it fails. Default is 0.
22+ retryCount int
23+ lock sync.Locker
24+ cond * sync.Cond
25+ // Set by WithTimeout(), used to set a timeout for a task. Default is 0, which means no timeout.
26+ timeout time.Duration
27+ // Set by WithResultCallback(), used to handle the result of a task. Default is nil.
28+ resultCallback func (interface {})
29+ // Set by WithErrorCallback(), used to handle the error of a task. Default is nil.
30+ errorCallback func (error )
31+ // adjustInterval is the interval to adjust the number of workers. Default is 1 second.
32+ adjustInterval time.Duration
3333}
3434
3535// NewGoPool creates a new pool of workers.
3636func NewGoPool (maxWorkers int , opts ... Option ) * goPool {
37- pool := & goPool {
38- maxWorkers : maxWorkers ,
39- // Set minWorkers to maxWorkers by default
40- minWorkers : maxWorkers ,
41- workers : make ([]* worker , maxWorkers ),
42- workerStack : make ([]int , maxWorkers ),
43- taskQueue : make (chan task , 1e6 ),
44- retryCount : 0 ,
45- lock : new (sync.Mutex ),
46- timeout : 0 ,
47- adjustInterval : 1 * time .Second ,
48- }
49- // Apply options
50- for _ , opt := range opts {
51- opt (pool )
52- }
53- if pool .cond == nil {
54- pool .cond = sync .NewCond (pool .lock )
55- }
56- // Create workers with the minimum number. Don't use pushWorker() here.
57- for i := 0 ; i < pool .minWorkers ; i ++ {
58- worker := newWorker ()
59- pool .workers [i ] = worker
60- pool .workerStack [i ] = i
61- worker .start (pool , i )
62- }
63- go pool .adjustWorkers ()
64- go pool .dispatch ()
65- return pool
37+ pool := & goPool {
38+ maxWorkers : maxWorkers ,
39+ // Set minWorkers to maxWorkers by default
40+ minWorkers : maxWorkers ,
41+ workers : make ([]* worker , maxWorkers ),
42+ workerStack : make ([]int , maxWorkers ),
43+ taskQueue : make (chan task , 1e6 ),
44+ retryCount : 0 ,
45+ lock : new (sync.Mutex ),
46+ timeout : 0 ,
47+ adjustInterval : 1 * time .Second ,
48+ }
49+ // Apply options
50+ for _ , opt := range opts {
51+ opt (pool )
52+ }
53+ if pool .cond == nil {
54+ pool .cond = sync .NewCond (pool .lock )
55+ }
56+ // Create workers with the minimum number. Don't use pushWorker() here.
57+ for i := 0 ; i < pool .minWorkers ; i ++ {
58+ worker := newWorker ()
59+ pool .workers [i ] = worker
60+ pool .workerStack [i ] = i
61+ worker .start (pool , i )
62+ }
63+ go pool .adjustWorkers ()
64+ go pool .dispatch ()
65+ return pool
6666}
6767
6868// AddTask adds a task to the pool.
6969func (p * goPool ) AddTask (t task ) {
70- p .taskQueue <- t
70+ p .taskQueue <- t
7171}
7272
7373// Wait waits for all tasks to be dispatched.
7474func (p * goPool ) Wait () {
75- for len (p .taskQueue ) > 0 {
76- time .Sleep (100 * time .Millisecond )
77- }
75+ for len (p .taskQueue ) > 0 {
76+ time .Sleep (100 * time .Millisecond )
77+ }
7878}
7979
8080// Release stops all workers and releases resources.
81- func (p * goPool ) Release () {
82- close (p .taskQueue )
83- p .cond .L .Lock ()
84- for len (p .workerStack ) != p .minWorkers {
85- p .cond .Wait ()
86- }
87- p .cond .L .Unlock ()
88- for _ , worker := range p .workers {
89- close (worker .taskQueue )
90- }
91- p .workers = nil
92- p .workerStack = nil
81+ func (p * goPool ) Release () {
82+ close (p .taskQueue )
83+ p .cond .L .Lock ()
84+ for len (p .workerStack ) != p .minWorkers {
85+ p .cond .Wait ()
86+ }
87+ p .cond .L .Unlock ()
88+ for _ , worker := range p .workers {
89+ close (worker .taskQueue )
90+ }
91+ p .workers = nil
92+ p .workerStack = nil
9393}
9494
9595func (p * goPool ) popWorker () int {
96- p .lock .Lock ()
97- workerIndex := p .workerStack [len (p .workerStack )- 1 ]
98- p .workerStack = p .workerStack [:len (p .workerStack )- 1 ]
99- p .lock .Unlock ()
100- return workerIndex
96+ p .lock .Lock ()
97+ workerIndex := p .workerStack [len (p .workerStack )- 1 ]
98+ p .workerStack = p .workerStack [:len (p .workerStack )- 1 ]
99+ p .lock .Unlock ()
100+ return workerIndex
101101}
102102
103103func (p * goPool ) pushWorker (workerIndex int ) {
104- p .lock .Lock ()
105- p .workerStack = append (p .workerStack , workerIndex )
106- p .lock .Unlock ()
107- p .cond .Signal ()
104+ p .lock .Lock ()
105+ p .workerStack = append (p .workerStack , workerIndex )
106+ p .lock .Unlock ()
107+ p .cond .Signal ()
108108}
109109
110110// adjustWorkers adjusts the number of workers according to the number of tasks in the queue.
111111func (p * goPool ) adjustWorkers () {
112- ticker := time .NewTicker (p .adjustInterval )
113- defer ticker .Stop ()
112+ ticker := time .NewTicker (p .adjustInterval )
113+ defer ticker .Stop ()
114114
115- for range ticker .C {
116- p .cond .L .Lock ()
117- if len (p .taskQueue ) > len (p .workerStack )* 3 / 4 && len (p .workerStack ) < p .maxWorkers {
118- // Double the number of workers until it reaches the maximum
119- newWorkers := min (len (p .workerStack )* 2 , p .maxWorkers ) - len (p .workerStack )
120- for i := 0 ; i < newWorkers ; i ++ {
121- worker := newWorker ()
122- p .workers = append (p .workers , worker )
123- p .workerStack = append (p .workerStack , len (p .workers )- 1 )
124- worker .start (p , len (p .workers )- 1 )
125- }
126- } else if len (p .taskQueue ) == 0 && len (p .workerStack ) > p .minWorkers {
127- // Halve the number of workers until it reaches the minimum
128- removeWorkers := max ((len (p .workerStack )- p .minWorkers )/ 2 , p .minWorkers )
129- p .workers = p .workers [:len (p .workers )- removeWorkers ]
130- p .workerStack = p .workerStack [:len (p .workerStack )- removeWorkers ]
131- }
132- p .cond .L .Unlock ()
133- }
115+ for range ticker .C {
116+ p .cond .L .Lock ()
117+ if len (p .taskQueue ) > len (p .workerStack )* 3 / 4 && len (p .workerStack ) < p .maxWorkers {
118+ // Double the number of workers until it reaches the maximum
119+ newWorkers := min (len (p .workerStack )* 2 , p .maxWorkers ) - len (p .workerStack )
120+ for i := 0 ; i < newWorkers ; i ++ {
121+ worker := newWorker ()
122+ p .workers = append (p .workers , worker )
123+ p .workerStack = append (p .workerStack , len (p .workers )- 1 )
124+ worker .start (p , len (p .workers )- 1 )
125+ }
126+ } else if len (p .taskQueue ) == 0 && len (p .workerStack ) > p .minWorkers {
127+ // Halve the number of workers until it reaches the minimum
128+ removeWorkers := max ((len (p .workerStack )- p .minWorkers )/ 2 , p .minWorkers )
129+ p .workers = p .workers [:len (p .workers )- removeWorkers ]
130+ p .workerStack = p .workerStack [:len (p .workerStack )- removeWorkers ]
131+ }
132+ p .cond .L .Unlock ()
133+ }
134134}
135135
136136// dispatch dispatches tasks to workers.
137137func (p * goPool ) dispatch () {
138- for t := range p .taskQueue {
139- p .cond .L .Lock ()
140- for len (p .workerStack ) == 0 {
141- p .cond .Wait ()
142- }
143- p .cond .L .Unlock ()
144- workerIndex := p .popWorker ()
145- p .workers [workerIndex ].taskQueue <- t
146- }
138+ for t := range p .taskQueue {
139+ p .cond .L .Lock ()
140+ for len (p .workerStack ) == 0 {
141+ p .cond .Wait ()
142+ }
143+ p .cond .L .Unlock ()
144+ workerIndex := p .popWorker ()
145+ p .workers [workerIndex ].taskQueue <- t
146+ }
147147}
148148
149149func min (a , b int ) int {
150- if a < b {
151- return a
152- }
153- return b
150+ if a < b {
151+ return a
152+ }
153+ return b
154154}
155155
156156func max (a , b int ) int {
157- if a > b {
158- return a
159- }
160- return b
157+ if a > b {
158+ return a
159+ }
160+ return b
161161}
0 commit comments