Skip to content

Commit

Permalink
Added Wait() (#16)
Browse files Browse the repository at this point in the history
Added Wait()
This function waits till all the jobs complete but doesn't cleanup
the associated resources.
  • Loading branch information
dpaks committed Dec 25, 2020
1 parent 7f501ce commit d438b75
Show file tree
Hide file tree
Showing 2 changed files with 111 additions and 6 deletions.
43 changes: 37 additions & 6 deletions goworkers.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,12 +152,41 @@ func (gw *GoWorkers) SubmitCheckResult(job func() (interface{}, error)) {
}
}

// Stop gracefully waits for jobs to finish running.
// Wait waits for the jobs to finish running.
//
// This is a blocking call and returns when all the active and queued jobs are finished.
// If wait is true, Stop() waits until the result and error channels are emptied.
// Setting wait to true ensures that you can read all values from result and error channels before your
// parent program exits.
// If 'wait' argument is set true, Wait() waits until the result and the error channels are emptied.
// Setting 'wait' argument to true ensures that you can read all the values from the result and
// the error channels before this function unblocks.
// Jobs cannot be submitted until this function returns. If any, will be discarded.
func (gw *GoWorkers) Wait(wait bool) {
if !atomic.CompareAndSwapInt32(&gw.stopping, 0, 1) {
return
}

for {
if gw.JobNum() == 0 {
break
}
}

if wait {
for {
if len(gw.ResultChan)|len(gw.ErrChan) == 0 {
break
}
}
}

atomic.StoreInt32(&gw.stopping, 0)
}

// Stop gracefully waits for the jobs to finish running and releases the associated resources.
//
// This is a blocking call and returns when all the active and queued jobs are finished.
// If wait is true, Stop() waits until the result and the error channels are emptied.
// Setting wait to true ensures that you can read all the values from the result and the
// error channels before your parent program exits.
func (gw *GoWorkers) Stop(wait bool) {
if !atomic.CompareAndSwapInt32(&gw.stopping, 0, 1) {
return
Expand All @@ -167,8 +196,10 @@ func (gw *GoWorkers) Stop(wait bool) {
}

if wait {
for len(gw.ResultChan)|len(gw.ErrChan) == 0 {
break
for {
if len(gw.ResultChan)|len(gw.ErrChan) == 0 {
break
}
}
}

Expand Down
74 changes: 74 additions & 0 deletions goworkers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,63 @@ func TestStopAfterDelay(t *testing.T) {
gw.Stop(false)
}

func TestWait(t *testing.T) {
gw := New()

fn := func() {
time.Sleep(1 * time.Second)
}

for i := 0; i < 100; i++ {
gw.Submit(func() {
fn()
})
}

if gw.JobNum() == 0 {
t.Errorf("Number of jobs must be greater than 0")
}

gw.Wait(false)

if gw.JobNum() != 0 {
t.Errorf("Number of jobs should be 0. Got %d", gw.JobNum())
}

for i := 0; i < 100; i++ {
gw.Submit(func() {
fn()
})
}

if gw.JobNum() == 0 {
t.Errorf("Number of jobs must be greater than 0")
}

gw.Wait(true)

if gw.JobNum() != 0 {
t.Errorf("Number of jobs should be 0. Got %d", gw.JobNum())
}

gw.Stop(false)
}

func TestWaitAfterWait(t *testing.T) {
gw := New()
defer gw.Stop(false)

fn := func(i int) {
}

gw.Submit(func() {
fn(1)
})

go gw.Wait(false)
gw.Wait(false)
}

func TestSubmitCheckErrorAfterStop(t *testing.T) {
gw := New()

Expand Down Expand Up @@ -802,3 +859,20 @@ func ExampleGoWorkers_SubmitCheckResult() {

gw.Stop(true)
}

func ExampleGoWorkers_Wait() {
gw := New()
defer gw.Stop(false)

gw.Submit(func() {
fmt.Println("Hello, how are you?")
})

gw.Wait(false)

gw.Submit(func() {
fmt.Println("I'm good, thank you!")
})

gw.Wait(false)
}

0 comments on commit d438b75

Please sign in to comment.