Skip to content

Commit 3280388

Browse files
committed
add test for job builders
1 parent 61eb3a0 commit 3280388

File tree

3 files changed

+195
-49
lines changed

3 files changed

+195
-49
lines changed

README.md

Lines changed: 91 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -14,33 +14,34 @@ Create a pool and start running a job.
1414
package main
1515

1616
import (
17-
"log"
18-
"context"
19-
"time"
17+
"context"
18+
"log"
19+
"os"
20+
"os/signal"
2021

21-
"github.com/hmoragrega/workers"
22+
"github.com/hmoragrega/workers"
2223
)
2324

2425
func main() {
25-
var pool Pool
26+
ctx, cancel := context.WithCancel(context.Background())
27+
stop := make(chan os.Signal, 1)
28+
signal.Notify(stop, os.Interrupt)
2629

27-
job := workers.JobFunc(func(ctx context.Context) error {
28-
// my job code
29-
return nil
30-
})
30+
go func() {
31+
<-stop
32+
cancel()
33+
}()
3134

32-
if err := pool.Start(job); err != nil {
33-
log.Fatal("cannot start pool", err)
34-
}
35+
job := workers.JobFunc(func(ctx context.Context) error {
36+
// my job code
37+
return nil
38+
})
3539

36-
defer func() {
37-
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
38-
defer cancel()
40+
var pool workers.Pool
3941

40-
if err := pool.Close(ctx); err != nil {
41-
log.Fatal("cannot close pool", err)
42-
}
43-
}()
42+
if err := pool.Run(ctx, job); err != nil {
43+
log.Fatal("job pool failed", err)
44+
}
4445
}
4546
```
4647

@@ -55,34 +56,54 @@ behaviour
5556

5657
```go
5758
type Config struct {
58-
// Min indicates the minimum number of workers that can run concurrently.
59-
// By default the pool can have 0 workers, pausing it effectively.
60-
Min int
59+
// Min indicates the minimum number of workers that can run concurrently.
60+
// By default the pool can have 0 workers, pausing it effectively.
61+
Min int
6162

62-
// Max indicates the maximum number of workers that can run concurrently.
63-
// the default "0" indicates an infinite number of workers.
64-
Max int
63+
// Max indicates the maximum number of workers that can run concurrently.
64+
// the default "0" indicates an infinite number of workers.
65+
Max int
6566

66-
// Initial indicates the initial number of workers that should be running.
67-
// The default value will be the greater number between 1 or the given minimum.
68-
Initial int
67+
// Initial indicates the initial number of workers that should be running.
68+
// The default value will be the greater number between 1 or the given minimum.
69+
Initial int
6970
}
7071
```
7172

7273
To have a pool with a tweaked config you can call `NewWithConfig`
7374
```go
7475
pool := workers.Must(NewWithConfig(workers.Config{
75-
Min: 3,
76-
Max: 10,
77-
Initial: 5,
76+
Min: 3,
77+
Max: 10,
78+
Initial: 5,
7879
}))
7980
```
8081

82+
#### Running non-stop
83+
If you want to keep the pool running in a blocking call you
84+
can call `Run` or `RunWithBuilder`.
85+
86+
In this mode the pool will run until the given context is
87+
terminated.
88+
89+
The pool then will be closed without a timeout.
90+
91+
```go
92+
var pool workers.Pool
93+
94+
if err := pool.Run(job); err != nil {
95+
log.Println("pool the pool", err)
96+
}
97+
```
98+
99+
Alternatively you can start and stop the pool to have
100+
a fine-grained control of the pool live-cycle.
101+
81102
#### Starting the pool
82-
To start the pool give it a job to run:
103+
To start the pool give it a job (or a [job builder](#builder-jobs)) to run:
83104
```go
84105
if err := pool.Start(job); err != nil {
85-
log.Println("cannot start the pool", err)
106+
log.Println("cannot start the pool", err)
86107
}
87108
```
88109
The operation will fail if:
@@ -96,7 +117,7 @@ given context is cancelled.
96117

97118
```go
98119
if err := pool.Close(ctx); err != nil {
99-
log.Println("cannot start the pool", err)
120+
log.Println("cannot start the pool", err)
100121
}
101122
```
102123

@@ -114,7 +135,7 @@ Alternative `CloseWithTimeout` can be used passing a
114135
To add a new worker to the pool you can call
115136
```go
116137
if err := pool.More(); err != nil {
117-
log.Println("cannot add more workers", err)
138+
log.Println("cannot add more workers", err)
118139
}
119140
```
120141
The operation will fail if:
@@ -126,7 +147,7 @@ The operation will fail if:
126147
To remove a worker you can use `Less`.
127148
```go
128149
if err := pool.Less(); err != nil {
129-
log.Println("cannot remove more workers", err)
150+
log.Println("cannot remove more workers", err)
130151
}
131152
```
132153
Less will remove a worker from the pool, immediately reducing
@@ -169,11 +190,13 @@ To extend the functionality of jobs you can use builders, middlewares
169190
or wrappers.
170191

171192
#### Builder Jobs
172-
Some time you want to share data across job execution within the same worker.
173-
For example, you could share a buffer of pre-allocated memory.
193+
Sometimes you want to share data across job execution within the same worker.
194+
195+
For this you can use a job builder to indicate that every worker that
196+
joins the pool will have its own job.
174197

175-
In this case you can use a job builder to indicate that every worker that
176-
joins the pool will have its own job.
198+
**NOTE:** in this case, the jobs are not going to be running concurrently,
199+
making much easier to avoid data races.
177200

178201
```go
179202
// JobBuilder is a job that needs to be built during
@@ -183,7 +206,31 @@ type JobBuilder interface {
183206
New() Job
184207
}
185208
```
186-
In this case you will have to call `StartWithBuilder` method to start the pool.
209+
You will have to call `StartWithBuilder` method to start the pool with a job builder.
210+
```go
211+
var (
212+
numberOfWorkers = 3
213+
workSlots = make([]int, numberOfWorkers)
214+
workerID int32
215+
)
216+
217+
builder := JobBuilderFunc(func() Job {
218+
workerID := atomic.AddInt32(&workerID, 1)
219+
220+
var count int
221+
return JobFunc(func(ctx context.Context) error {
222+
// count is not shared across worker goroutines
223+
// no need to protect it against data races
224+
count++
225+
// same for the slice, since each worker
226+
// updated only its own index.
227+
workSlots[workerID-1] = count
228+
return nil
229+
})
230+
})
231+
232+
p.StartWithBuilder(builder)
233+
```
187234

188235
#### Job Middleware
189236
A middleware allows to extend the job capabilities
@@ -239,7 +286,7 @@ pool.Start(workers.JobFunc(func(ctx context.Context) error {
239286
If you need to add a middleware before starting the job instead of on pool creating
240287
there's the little handy function `Wrap` that will easy applying them for you.
241288

242-
```
289+
```go
243290
// Wrap is a helper to apply a chain of middleware to a job.
244291
func Wrap(job Job, middlewares ...Middleware) Job
245292
```
@@ -249,7 +296,7 @@ var pool Pool
249296

250297
job := workers.JobFunc(func(ctx context.Context) (err error) {
251298
// work
252-
return err
299+
return err
253300
}
254301

255302
pool.Start(workers.Wrap(job, jobLogger("my-job")))
@@ -266,7 +313,7 @@ having to return error from you job function.
266313

267314
```go
268315
job := func(ctx context.Context) {
269-
// work
316+
// work
270317
}
271318

272319
var pool workers.Pool

pool.go

Lines changed: 30 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -70,11 +70,11 @@ type JobBuilder interface {
7070
New() Job
7171
}
7272

73-
// JobBuilderFunc is a type of job that shares
74-
// requires an initialization on each worker.
73+
// JobBuilderFunc is a helper to use function as
74+
// job builders.
7575
type JobBuilderFunc func() Job
7676

77-
// New builds the new job.
77+
// New builds the new job using the underlying function.
7878
func (f JobBuilderFunc) New() Job {
7979
return f()
8080
}
@@ -213,8 +213,8 @@ type Pool struct {
213213
mx sync.RWMutex
214214
}
215215

216-
// StartBuilder launches the workers and keeps them running until the pool is closed.
217-
func (p *Pool) StartBuilder(jobBuilder JobBuilder) error {
216+
// StartWithBuilder launches the workers and keeps them running until the pool is closed.
217+
func (p *Pool) StartWithBuilder(jobBuilder JobBuilder) error {
218218
return p.start(jobBuilder)
219219
}
220220

@@ -225,6 +225,31 @@ func (p *Pool) Start(job Job) error {
225225
}))
226226
}
227227

228+
// Run is a blocking call that will start the pool and keep it
229+
// running until the context is terminated.
230+
//
231+
// The pool then will be stopped without a timeout
232+
func (p *Pool) Run(ctx context.Context, job Job) error {
233+
return p.RunWithBuilder(ctx, JobBuilderFunc(func() Job {
234+
return job
235+
}))
236+
}
237+
238+
// RunWithBuilder is a blocking call that will start the
239+
// pool with a job builder and keep it running until the
240+
// context is terminated.
241+
//
242+
// The pool then will be stopped without a timeout
243+
func (p *Pool) RunWithBuilder(ctx context.Context, jobBuilder JobBuilder) error {
244+
if err := p.StartWithBuilder(jobBuilder); err != nil {
245+
return err
246+
}
247+
248+
<-ctx.Done()
249+
250+
return p.Close(context.Background())
251+
}
252+
228253
func (p *Pool) start(jobBuilder JobBuilder) error {
229254
p.mx.Lock()
230255
defer p.mx.Unlock()

pool_test.go

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,80 @@ func TestPool_StartErrors(t *testing.T) {
149149
})
150150
}
151151

152+
func TestPool_StartWithBuilder(t *testing.T) {
153+
numWorkers := 3
154+
155+
p := Must(NewWithConfig(Config{
156+
Initial: numWorkers,
157+
}))
158+
159+
var builds int32
160+
var buildsWG sync.WaitGroup
161+
buildsWG.Add(numWorkers)
162+
163+
counters := make([]int, numWorkers)
164+
var countersWG sync.WaitGroup
165+
countersWG.Add(numWorkers)
166+
167+
builder := JobBuilderFunc(func() Job {
168+
workerID := atomic.AddInt32(&builds, 1)
169+
170+
var count int
171+
buildsWG.Done()
172+
return JobFunc(func(ctx context.Context) error {
173+
// count is not shared across worker goroutines
174+
// no need to protect it against data races
175+
count++
176+
// same for the slice, since each worker
177+
// updated only its own index.
178+
counters[workerID-1] = count
179+
180+
if count == 10 {
181+
countersWG.Done()
182+
<-ctx.Done()
183+
return ctx.Err()
184+
}
185+
return nil
186+
})
187+
})
188+
189+
if err := p.StartWithBuilder(builder); err != nil {
190+
t.Fatalf("unexpected error starting poole. got %+v", err)
191+
}
192+
193+
buildsWG.Wait()
194+
if int(builds) != numWorkers {
195+
t.Fatalf("unexpected nnumber of job builds, got %d, want: %d", builds, numWorkers)
196+
}
197+
198+
countersWG.Wait()
199+
if err := p.CloseWithTimeout(time.Second); err != nil {
200+
t.Fatal("cannot stop pool", err)
201+
}
202+
203+
for i, c := range counters {
204+
if c != 10 {
205+
t.Fatalf("unexpected counter for worker %d: got %d, want 10", i+1, c)
206+
}
207+
}
208+
}
209+
210+
func TestPool_Run(t *testing.T) {
211+
var pool Pool
212+
213+
ctx, cancel := context.WithCancel(context.Background())
214+
215+
err := pool.Run(ctx, JobFunc(func(ctx context.Context) error {
216+
cancel()
217+
<-ctx.Done()
218+
return nil
219+
}))
220+
221+
if err != nil {
222+
t.Fatal("unexpected error running the pool", err)
223+
}
224+
}
225+
152226
func TestPool_More(t *testing.T) {
153227
testCases := []struct {
154228
name string

0 commit comments

Comments
 (0)