Skip to content

Commit

Permalink
Backoff on get message and reconnect improvement
Browse files Browse the repository at this point in the history
  • Loading branch information
guilhermehubner committed Mar 2, 2018
1 parent 7aedab8 commit 9400ebb
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 20 deletions.
21 changes: 13 additions & 8 deletions broker/amqp.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,20 +96,19 @@ func (b *AMQPBroker) GetQueueStatus(name string) (Status, error) {
}, nil
}

func (b *AMQPBroker) GetMessage(jobName string) *Message {
func (b *AMQPBroker) GetMessage(jobName string) (*Message, error) {
msg, ok, err := b.channel.Get(jobName, true)
if err != nil {
log.Get().Error(fmt.Sprintf("broker/amqp: fail to get message: %v", err))
return nil
return nil, err
}
if !ok {
return nil
return nil, nil
}

return &Message{
message: msg,
broker: b,
}
}, nil
}

func (b *AMQPBroker) Enqueue(name, messageID string, message proto.Message) error {
Expand Down Expand Up @@ -234,19 +233,25 @@ func (b *AMQPBroker) connect() {
b.closed = b.connection.NotifyClose(make(chan *amqp.Error))
b.backoff.Reset()
b.connected = true
log.Get().Info("\x1b[1;32mCONNECTED\x1b[0m")
break
}

log.Get().Info("\x1b[1;32mCONNECTED\x1b[0m")
}

func (b *AMQPBroker) reconnect() {
for e := range b.closed {
e, ok := <-b.closed
if e != nil || !ok {
log.Get().Info("\x1b[1;31mCONNECTION CLOSED\x1b[0m")
log.Get().Error(e)
if e != nil {
log.Get().Error(e)
}

b.connected = false
b.connect()
}

go b.reconnect()
}

func NewBroker(url string) *AMQPBroker {
Expand Down
29 changes: 22 additions & 7 deletions worker.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package worker

import (
"fmt"
"time"

"golang.org/x/net/context"

"github.com/golang/protobuf/proto"
"github.com/guilhermehubner/worker/broker"
"github.com/guilhermehubner/worker/log"
"github.com/jpillora/backoff"
)

Expand All @@ -15,14 +17,15 @@ const (
maxBackoffTime = 10 * time.Second
)

type getJobHandle func() (*broker.Message, *JobType)
type getJobHandle func() (*broker.Message, *JobType, error)

type worker struct {
middlewares []Middleware
getJob getJobHandle
cancel chan struct{}
ended chan struct{}
backoff *backoff.Backoff
middlewares []Middleware
getJob getJobHandle
cancel chan struct{}
ended chan struct{}
backoff *backoff.Backoff
getMessageBackoff *backoff.Backoff
}

func newWorker(middlewares []Middleware, getJob getJobHandle) *worker {
Expand All @@ -35,6 +38,10 @@ func newWorker(middlewares []Middleware, getJob getJobHandle) *worker {
Min: minBackoffTime,
Max: maxBackoffTime,
},
getMessageBackoff: &backoff.Backoff{
Min: minBackoffTime,
Max: maxBackoffTime,
},
}
}

Expand All @@ -55,7 +62,15 @@ func (w *worker) start() chan struct{} {
}

func (w *worker) executeJob() {
message, job := w.getJob()
message, job, err := w.getJob()
if err != nil {
log.Get().Error(fmt.Sprintf("worker: fail to get job: %v", err))
time.Sleep(w.getMessageBackoff.Duration())
return
}

w.getMessageBackoff.Reset()

if message == nil {
return
}
Expand Down
13 changes: 8 additions & 5 deletions workerpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,21 +57,24 @@ func (wp *Pool) Start() {

sort.Sort(wp.jobTypes)

getJob := func() (*broker.Message, *JobType) {
getJob := func() (*broker.Message, *JobType, error) {
for _, jobType := range wp.jobTypes {
if wp.stop {
return nil, nil
return nil, nil, nil
}

msg := wp.broker.GetMessage(jobType.Name)
msg, err := wp.broker.GetMessage(jobType.Name)
if err != nil {
return nil, nil, err
}
if msg == nil {
continue
}

return msg, &jobType
return msg, &jobType, nil
}

return nil, nil
return nil, nil, nil
}

workersEnded := make([]chan struct{}, 0, len(wp.workers))
Expand Down

0 comments on commit 9400ebb

Please sign in to comment.