Skip to content

Commit

Permalink
Added a wait flag in Stop(wait bool) (#15)
Browse files Browse the repository at this point in the history
* Added a wait flag in Stop(wait bool)
Stop(true) blocks until all the output channels are read from
* Enhanced examples
* start just 1 worker by default to conform to the user input for max workers in case they provide 1
  • Loading branch information
dpaks committed Nov 9, 2020
1 parent 5899163 commit 7f501ce
Show file tree
Hide file tree
Showing 3 changed files with 111 additions and 43 deletions.
32 changes: 24 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func main() {
})

// wait till your job finishes
gw.Stop()
gw.Stop(false)
}
```

Expand Down Expand Up @@ -81,7 +81,7 @@ func main() {
}
log.Println("Submitted!")

gw.Stop()
gw.Stop(false)
}
```

Expand Down Expand Up @@ -114,7 +114,7 @@ func main() {
}
log.Println("Submitted!")

gw.Stop()
gw.Stop(false)
}
```

Expand Down Expand Up @@ -144,7 +144,7 @@ func main() {
})
}

gw.Stop()
gw.Stop(false)

tEnd := time.Now()
tDiff := tEnd.Sub(tStart)
Expand Down Expand Up @@ -195,7 +195,10 @@ func main() {
log.Println("Submitted!")

// Wait for jobs to finish
gw.Stop()
// Here, wait flag is set to true. Setting wait to true ensures that
// the output channels are read from completely.
// Stop(true) exits only when the error channel is completely read from.
gw.Stop(true)
}
```

Expand Down Expand Up @@ -226,11 +229,21 @@ func main() {
for {
select {
// Error channel provides errors from job, if any
case err := <-gw.ErrChan:
case err, ok := <-gw.ErrChan:
// The error channel is closed when the workers are done with their tasks.
// When the channel is closed, ok is set to false
if !ok {
return
}
fmt.Printf("Error: %s\n", err.Error())
// Result channel provides output from job, if any
// It will be of type interface{}
case res := <-gw.ResultChan:
case res, ok := <-gw.ResultChan:
// The result channel is closed when the workers are done with their tasks.
// When the channel is closed, ok is set to false
if !ok {
return
}
fmt.Printf("Type: %T, Value: %+v\n", res, res)
}
}
Expand Down Expand Up @@ -258,7 +271,10 @@ func main() {
log.Println("Submitted!")

// Wait for jobs to finish
gw.Stop()
// Here, wait flag is set to true. Setting wait to true ensures that
// the output channels are read from completely.
// Stop(true) exits only when both the result and the error channels are completely read from.
gw.Stop(true)
}
```

Expand Down
15 changes: 12 additions & 3 deletions goworkers.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,13 +155,23 @@ func (gw *GoWorkers) SubmitCheckResult(job func() (interface{}, error)) {
// Stop gracefully waits for jobs to finish running.
//
// This is a blocking call and returns when all the active and queued jobs are finished.
func (gw *GoWorkers) Stop() {
// If wait is true, Stop() waits until the result and error channels are emptied.
// Setting wait to true ensures that you can read all values from result and error channels before your
// parent program exits.
func (gw *GoWorkers) Stop(wait bool) {
if !atomic.CompareAndSwapInt32(&gw.stopping, 0, 1) {
return
}
if gw.JobNum() != 0 {
<-gw.done
}

if wait {
for len(gw.ResultChan)|len(gw.ErrChan) == 0 {
break
}
}

// close the input channel
close(gw.jobQ)
}
Expand All @@ -184,8 +194,7 @@ func (gw *GoWorkers) start() {
close(gw.ResultChan)
}()

// start 2 workers in advance
go gw.startWorker()
// start a worker in advance
go gw.startWorker()

go func() {
Expand Down
Loading

0 comments on commit 7f501ce

Please sign in to comment.