Skip to content

Commit

Permalink
include custom errors
Browse files Browse the repository at this point in the history
  • Loading branch information
rodrigo-brito committed Feb 19, 2018
1 parent 9cfac7e commit 8e852ff
Show file tree
Hide file tree
Showing 6 changed files with 112 additions and 26 deletions.
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ script:
- dep ensure
- make lint
- vendorcheck ./...
- go test $(go list ./... | grep -v vendor/)

after_success:
- bash <(curl -s https://codecov.io/bash)
30 changes: 12 additions & 18 deletions broker/amqp.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"time"

"github.com/golang/protobuf/proto"
"github.com/guilhermehubner/worker/errors"
"github.com/jpillora/backoff"
"github.com/streadway/amqp"
)
Expand Down Expand Up @@ -80,8 +81,7 @@ func (b *AMQPBroker) GetMessage(jobName string) ([]byte, string) {
func (b *AMQPBroker) Enqueue(jobName, messageID string, message proto.Message) error {
channel, err := b.getChannel()
if err != nil {
// TODO
return err
return errors.ErrChannelUnavailable.WithValue(err)
}

queue, err := channel.QueueDeclare(
Expand All @@ -93,14 +93,12 @@ func (b *AMQPBroker) Enqueue(jobName, messageID string, message proto.Message) e
nil, // arguments
)
if err != nil {
// TODO
return err
return errors.ErrJobEnqueue.WithValue(err)
}

body, err := proto.Marshal(message)
if err != nil {
// TODO
return err
return errors.ErrMessageSerialize.WithValue(err)
}

err = channel.Publish(
Expand All @@ -117,7 +115,7 @@ func (b *AMQPBroker) Enqueue(jobName, messageID string, message proto.Message) e
)

if err != nil {
// TODO
return errors.ErrMessagePublishing.WithValue(err)
}

return err
Expand All @@ -127,8 +125,7 @@ func (b *AMQPBroker) EnqueueIn(jobName, messageID string, message proto.Message,
secondsFromNow int64) (string, error) {
channel, err := b.getChannel()
if err != nil {
// TODO
return "", err
return "", errors.ErrChannelUnavailable.WithValue(err)
}

_, err = channel.QueueDeclare(
Expand All @@ -140,8 +137,7 @@ func (b *AMQPBroker) EnqueueIn(jobName, messageID string, message proto.Message,
nil, // arguments
)
if err != nil {
// TODO
return "", err
return "", errors.ErrJobEnqueue.WithValue(err)
}

queue, err := channel.QueueDeclare(
Expand All @@ -158,14 +154,12 @@ func (b *AMQPBroker) EnqueueIn(jobName, messageID string, message proto.Message,
},
)
if err != nil {
// TODO
return "", err
return "", errors.ErrJobEnqueue.WithValue(err)
}

body, err := proto.Marshal(message)
if err != nil {
// TODO
return "", err
return "", errors.ErrMessageSerialize.WithValue(err)
}

err = channel.Publish(
Expand All @@ -182,8 +176,7 @@ func (b *AMQPBroker) EnqueueIn(jobName, messageID string, message proto.Message,
)

if err != nil {
// TODO
return "", err
return "", errors.ErrMessagePublishing.WithValue(err)
}

return messageID, nil
Expand All @@ -196,10 +189,11 @@ func (b *AMQPBroker) getChannel() (*amqp.Channel, error) {
for i := 0; i < 20; i++ {
channel, err = b.connection.Channel()
if err != nil {
time.Sleep(5 * time.Millisecond)
time.Sleep(b.backoff.Duration())
continue
}

b.backoff.Reset()
return channel, nil
}

Expand Down
39 changes: 39 additions & 0 deletions errors/error.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package errors

import "fmt"

type ErrorCode int

const (
GeneralError ErrorCode = iota
FormatError
ParseError
ServiceError
)

var (
ErrChannelUnavailable = &Error{Code: ServiceError, Message: "channel not unavailable"}
ErrMessagePublishing = &Error{Code: ServiceError, Message: "error on message publishing"}
ErrJobRegister = &Error{Code: ServiceError, Message: "error on job register"}
ErrJobEnqueue = &Error{Code: ServiceError, Message: "error on enqueue job"}
ErrEmptyURL = &Error{Code: FormatError, Message: "needs a non-empty url"}
ErrMessageSerialize = &Error{Code: ParseError, Message: "error on message serialize"}
)

type Error struct {
Code ErrorCode
Message string
Err error
}

func (e *Error) Error() string {
if e.Err != nil {
return fmt.Sprintf("[Error %d] %s: %s", e.Code, e.Message, e.Err)
}
return fmt.Sprintf("[Error %d] %s", e.Code, e.Message)
}

func (e *Error) WithValue(Err error) *Error {
e.Err = Err
return e
}
40 changes: 40 additions & 0 deletions errors/error_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package errors

import (
"fmt"
"testing"

"errors"
)

func TestErrorMessage(t *testing.T) {
customError := errors.New("fail")

tt := []struct {
Err *Error
Expectation string
}{
{
Err: &Error{Code: FormatError, Message: "empty value"},
Expectation: fmt.Sprintf("[Error %d] empty value", FormatError),
},
{
Err: &Error{Code: ServiceError, Message: "not found"},
Expectation: fmt.Sprintf("[Error %d] not found", ServiceError),
},
{
Err: (&Error{Code: FormatError, Message: "nil pointer"}).WithValue(customError),
Expectation: fmt.Sprintf("[Error %d] nil pointer: fail", FormatError),
},
{
Err: (&Error{Code: ServiceError, Message: "mysql error"}).WithValue(customError),
Expectation: fmt.Sprintf("[Error %d] mysql error: fail", ServiceError),
},
}

for _, tc := range tt {
if tc.Err.Error() != tc.Expectation {
t.Errorf("expect '%s', but got '%s'", tc.Expectation, tc.Err)
}
}
}
16 changes: 13 additions & 3 deletions examples/consumer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"fmt"
"log"

"golang.org/x/net/context"

Expand All @@ -10,7 +11,7 @@ import (
)

func main() {
wp := worker.NewWorkerPool("amqp://guest:guest@localhost:5672/", 5,
wp, err := worker.NewWorkerPool("amqp://guest:guest@localhost:5672/", 5,
func(ctx context.Context, next func(context.Context) error) error {
fmt.Print("Enter on Middleware 1 > ")
return next(ctx)
Expand All @@ -19,8 +20,11 @@ func main() {
fmt.Print("Enter on Middleware 2 > ")
return next(ctx)
})
if err != nil {
log.Fatal(err)
}

wp.RegisterJob(worker.JobType{
err = wp.RegisterJob(worker.JobType{
Name: "queue1",
Handle: func(ctx context.Context, gen worker.GenFunc) error {
msg := payload.Payload{}
Expand All @@ -36,8 +40,11 @@ func main() {
},
Priority: 10,
})
if err != nil {
log.Fatal(err)
}

wp.RegisterJob(worker.JobType{
err = wp.RegisterJob(worker.JobType{
Name: "queue2",
Handle: func(ctx context.Context, gen worker.GenFunc) error {
msg := payload.Payload{}
Expand All @@ -53,6 +60,9 @@ func main() {
},
Priority: 15,
})
if err != nil {
log.Fatal(err)
}

wp.Start()
}
12 changes: 7 additions & 5 deletions workerpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"syscall"

"github.com/guilhermehubner/worker/broker"
"github.com/guilhermehubner/worker/errors"
"golang.org/x/net/context"
)

Expand Down Expand Up @@ -88,13 +89,14 @@ func (wp *Pool) Start() {
RegisterJob adds a job with handler for 'name' queue and allows you to specify options such as a
job's priority and it's retry count.
*/
func (wp *Pool) RegisterJob(job JobType) {
func (wp *Pool) RegisterJob(job JobType) error {
err := wp.broker.RegisterJob(job.Name)
if err != nil {
// TODO
return errors.ErrJobRegister.WithValue(err)
}

wp.jobTypes = append(wp.jobTypes, job)
return nil
}

/*
Expand All @@ -104,9 +106,9 @@ URL is a string connection in the AMQP URI format.
Concurrency specifies how many workers to spin up - each worker can process jobs concurrently.
*/
func NewWorkerPool(url string, concurrency uint, middlewares ...Middleware) *Pool {
func NewWorkerPool(url string, concurrency uint, middlewares ...Middleware) (*Pool, error) {
if strings.TrimSpace(url) == "" {
panic("worker workerpool: needs a non-empty url")
return nil, errors.ErrEmptyURL
}

wp := &Pool{
Expand All @@ -115,5 +117,5 @@ func NewWorkerPool(url string, concurrency uint, middlewares ...Middleware) *Poo
workers: make([]*worker, concurrency),
}

return wp
return wp, nil
}

0 comments on commit 8e852ff

Please sign in to comment.