Skip to content

Commit c219d43

Browse files
committed
Improve worker pool scaling logic and update README
- Update the worker pool scaling logic to consider the size of the workerStack and the length of the taskQueue. - Adjust the scaling logic to double the workerStack when the taskQueue length exceeds 75% of the workerStack size, but not exceeding maxWorkers. - Adjust the scaling logic to halve the workerStack when the taskQueue is empty, but not less than minWorkers. - Update the README to reflect these changes and improve the feature list format. - Add comments to the code for better readability. Signed-off-by: Daniel Hu <[email protected]>
1 parent 4974182 commit c219d43

File tree

3 files changed

+35
-15
lines changed

3 files changed

+35
-15
lines changed

README.md

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -6,25 +6,25 @@ GoPool is a high-performance, feature-rich, and easy-to-use worker pool library
66

77
## Features
88

9-
- **Task Queue**: GoPool uses a thread-safe task queue to store tasks waiting to be processed. Multiple workers can simultaneously fetch tasks from this queue.
9+
- [x] **Task Queue**: GoPool uses a thread-safe task queue to store tasks waiting to be processed. Multiple workers can simultaneously fetch tasks from this queue.
1010

11-
- **Dynamic Worker Adjustment**: GoPool can dynamically adjust the number of workers based on the number of tasks and system load.
11+
- [x] **Concurrency Control**: GoPool can control the number of concurrent tasks to prevent system overload.
1212

13-
- **Graceful Shutdown**: GoPool can shut down gracefully. It stops accepting new tasks and waits for all ongoing tasks to complete before shutting down when there are no more tasks or a shutdown signal is received.
13+
- [x] **Dynamic Worker Adjustment**: GoPool can dynamically adjust the number of workers based on the number of tasks and system load.
1414

15-
- **Task Error Handling**: GoPool can handle errors that occur during task execution.
15+
- [x] **Graceful Shutdown**: GoPool can shut down gracefully. It stops accepting new tasks and waits for all ongoing tasks to complete before shutting down when there are no more tasks or a shutdown signal is received.
1616

17-
- **Task Timeout Handling**: GoPool can handle task execution timeouts. If a task is not completed within the specified timeout period, the task is considered failed and a timeout error is returned.
17+
- [x] **Task Error Handling**: GoPool can handle errors that occur during task execution.
1818

19-
- **Task Priority**: GoPool supports task priority. Tasks with higher priority are processed first.
19+
- [x] **Task Timeout Handling**: GoPool can handle task execution timeouts. If a task is not completed within the specified timeout period, the task is considered failed and a timeout error is returned.
2020

21-
- **Task Result Retrieval**: GoPool provides a way to retrieve task results.
21+
- [x] **Task Result Retrieval**: GoPool provides a way to retrieve task results.
2222

23-
- **Task Retry**: GoPool provides a retry mechanism for failed tasks.
23+
- [x] **Task Retry**: GoPool provides a retry mechanism for failed tasks.
2424

25-
- **Concurrency Control**: GoPool can control the number of concurrent tasks to prevent system overload.
25+
- [x] **Lock Customization**: GoPool supports different types of locks. You can use the built-in `sync.Mutex` or a custom lock such as `spinlock.SpinLock`.
2626

27-
- **Lock Customization**: GoPool supports different types of locks. You can use the built-in `sync.Mutex` or a custom lock such as `spinlock.SpinLock`.
27+
- [ ] **Task Priority**: GoPool supports task priority. Tasks with higher priority are processed first.
2828

2929
## Installation
3030

gopool.go

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,24 +12,32 @@ type task func() (interface{}, error)
1212
// goPool represents a pool of workers.
1313
type goPool struct {
1414
workers []*worker
15+
workerStack []int
1516
maxWorkers int
17+
// Set by WithMinWorkers(), used to adjust the number of workers. Default equals to maxWorkers.
1618
minWorkers int
17-
workerStack []int
19+
// tasks are added to this channel first, then dispatched to workers. Default buffer size is 1 million.
1820
taskQueue chan task
21+
// Set by WithRetryCount(), used to retry a task when it fails. Default is 0.
1922
retryCount int
2023
lock sync.Locker
2124
cond *sync.Cond
25+
// Set by WithTimeout(), used to set a timeout for a task. Default is 0, which means no timeout.
2226
timeout time.Duration
27+
// Set by WithResultCallback(), used to handle the result of a task. Default is nil.
2328
resultCallback func(interface{})
29+
// Set by WithErrorCallback(), used to handle the error of a task. Default is nil.
2430
errorCallback func(error)
31+
// adjustInterval is the interval to adjust the number of workers. Default is 1 second.
2532
adjustInterval time.Duration
2633
}
2734

2835
// NewGoPool creates a new pool of workers.
2936
func NewGoPool(maxWorkers int, opts ...Option) *goPool {
3037
pool := &goPool{
3138
maxWorkers: maxWorkers,
32-
minWorkers: maxWorkers, // Set minWorkers to maxWorkers by default
39+
// Set minWorkers to maxWorkers by default
40+
minWorkers: maxWorkers,
3341
workers: make([]*worker, maxWorkers),
3442
workerStack: make([]int, maxWorkers),
3543
taskQueue: make(chan task, 1e6),
@@ -38,12 +46,14 @@ func NewGoPool(maxWorkers int, opts ...Option) *goPool {
3846
timeout: 0,
3947
adjustInterval: 1 * time.Second,
4048
}
49+
// Apply options
4150
for _, opt := range opts {
4251
opt(pool)
4352
}
4453
if pool.cond == nil {
4554
pool.cond = sync.NewCond(pool.lock)
4655
}
56+
// Create workers with the minimum number. Don't use pushWorker() here.
4757
for i := 0; i < pool.minWorkers; i++ {
4858
worker := newWorker()
4959
pool.workers[i] = worker
@@ -61,7 +71,7 @@ func (p *goPool) AddTask(t task) {
6171
}
6272

6373
// Release stops all workers and releases resources.
64-
func (p *goPool) Release() {
74+
func (p *goPool) Release() {
6575
close(p.taskQueue)
6676
p.cond.L.Lock()
6777
for len(p.workerStack) != p.minWorkers {
@@ -90,13 +100,14 @@ func (p *goPool) pushWorker(workerIndex int) {
90100
p.cond.Signal()
91101
}
92102

103+
// adjustWorkers adjusts the number of workers according to the number of tasks in the queue.
93104
func (p *goPool) adjustWorkers() {
94105
ticker := time.NewTicker(p.adjustInterval)
95106
defer ticker.Stop()
96107

97108
for range ticker.C {
98109
p.cond.L.Lock()
99-
if len(p.taskQueue) > (p.maxWorkers-p.minWorkers)/2+p.minWorkers && len(p.workerStack) < p.maxWorkers {
110+
if len(p.taskQueue) > len(p.workerStack)*3/4 && len(p.workerStack) < p.maxWorkers {
100111
// Double the number of workers until it reaches the maximum
101112
newWorkers := min(len(p.workerStack)*2, p.maxWorkers) - len(p.workerStack)
102113
for i := 0; i < newWorkers; i++ {
@@ -105,7 +116,7 @@ func (p *goPool) adjustWorkers() {
105116
p.workerStack = append(p.workerStack, len(p.workers)-1)
106117
worker.start(p, len(p.workers)-1)
107118
}
108-
} else if len(p.taskQueue) < p.minWorkers && len(p.workerStack) > p.minWorkers {
119+
} else if len(p.taskQueue) == 0 && len(p.workerStack) > p.minWorkers {
109120
// Halve the number of workers until it reaches the minimum
110121
removeWorkers := max((len(p.workerStack)-p.minWorkers)/2, p.minWorkers)
111122
p.workers = p.workers[:len(p.workers)-removeWorkers]
@@ -115,6 +126,7 @@ func (p *goPool) adjustWorkers() {
115126
}
116127
}
117128

129+
// dispatch dispatches tasks to workers.
118130
func (p *goPool) dispatch() {
119131
for t := range p.taskQueue {
120132
p.cond.L.Lock()

worker.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,9 @@ func newWorker() *worker {
1616
}
1717
}
1818

19+
// start starts the worker in a separate goroutine.
20+
// The worker will run tasks from its taskQueue until the taskQueue is closed.
21+
// For the length of the taskQueue is 1, the worker will be pushed back to the pool after executing 1 task.
1922
func (w *worker) start(pool *goPool, workerIndex int) {
2023
go func() {
2124
for t := range w.taskQueue {
@@ -28,6 +31,8 @@ func (w *worker) start(pool *goPool, workerIndex int) {
2831
}()
2932
}
3033

34+
// executeTask executes a task and returns the result and error.
35+
// If the task fails, it will be retried according to the retryCount of the pool.
3136
func (w *worker) executeTask(t task, pool *goPool) (result interface{}, err error) {
3237
for i := 0; i <= pool.retryCount; i++ {
3338
if pool.timeout > 0 {
@@ -42,6 +47,7 @@ func (w *worker) executeTask(t task, pool *goPool) (result interface{}, err erro
4247
return
4348
}
4449

50+
// executeTaskWithTimeout executes a task with a timeout and returns the result and error.
4551
func (w *worker) executeTaskWithTimeout(t task, pool *goPool) (result interface{}, err error) {
4652
// Create a context with timeout
4753
ctx, cancel := context.WithTimeout(context.Background(), pool.timeout)
@@ -72,6 +78,8 @@ func (w *worker) executeTaskWithoutTimeout(t task, pool *goPool) (result interfa
7278
return t()
7379
}
7480

81+
82+
// handleResult handles the result of a task.
7583
func (w *worker) handleResult(result interface{}, err error, pool *goPool) {
7684
if err != nil && pool.errorCallback != nil {
7785
pool.errorCallback(err)

0 commit comments

Comments
 (0)