Skip to content

Commit

Permalink
include custom errors
Browse files Browse the repository at this point in the history
  • Loading branch information
rodrigo-brito committed Feb 25, 2018
1 parent 10bb6ba commit d25a81a
Show file tree
Hide file tree
Showing 6 changed files with 117 additions and 28 deletions.
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ script:
- dep ensure
- make lint
- vendorcheck ./...
- go test ./...

after_success:
- bash <(curl -s https://codecov.io/bash)
35 changes: 15 additions & 20 deletions broker/amqp.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"time"

"github.com/golang/protobuf/proto"
"github.com/guilhermehubner/worker/errors"
"github.com/guilhermehubner/worker/log"
"github.com/jpillora/backoff"
"github.com/streadway/amqp"
Expand Down Expand Up @@ -58,7 +59,7 @@ func (b *AMQPBroker) GetJobStatus(jobName string) (Status, error) {
func (b *AMQPBroker) GetMessage(jobName string) ([]byte, string) {
msg, ok, err := b.channel.Get(jobName, true)
if err != nil {
log.Get().Error(fmt.Sprintf("broker/amqp: fail to get message: %v", err))
log.Get().Error(errors.ErrChannelMessage.WithValue(err))
return nil, ""
}
if !ok {
Expand All @@ -78,14 +79,12 @@ func (b *AMQPBroker) Enqueue(jobName, messageID string, message proto.Message) e
nil, // arguments
)
if err != nil {
// TODO
return err
return errors.ErrJobEnqueue.WithValue(err)
}

body, err := proto.Marshal(message)
if err != nil {
// TODO
return err
return errors.ErrMessageSerialize.WithValue(err)
}

err = b.channel.Publish(
Expand All @@ -102,7 +101,7 @@ func (b *AMQPBroker) Enqueue(jobName, messageID string, message proto.Message) e
)

if err != nil {
// TODO
return errors.ErrMessagePublishing.WithValue(err)
}

return err
Expand All @@ -119,8 +118,7 @@ func (b *AMQPBroker) EnqueueIn(jobName, messageID string, message proto.Message,
nil, // arguments
)
if err != nil {
// TODO
return "", err
return "", errors.ErrJobEnqueue.WithValue(err)
}

queue, err := b.channel.QueueDeclare(
Expand All @@ -137,14 +135,12 @@ func (b *AMQPBroker) EnqueueIn(jobName, messageID string, message proto.Message,
},
)
if err != nil {
// TODO
return "", err
return "", errors.ErrJobEnqueue.WithValue(err)
}

body, err := proto.Marshal(message)
if err != nil {
// TODO
return "", err
return "", errors.ErrMessageSerialize.WithValue(err)
}

err = b.channel.Publish(
Expand All @@ -161,8 +157,7 @@ func (b *AMQPBroker) EnqueueIn(jobName, messageID string, message proto.Message,
)

if err != nil {
// TODO
return "", err
return "", errors.ErrMessagePublishing.WithValue(err)
}

return messageID, nil
Expand All @@ -172,21 +167,21 @@ func (b *AMQPBroker) connect() {
log.Get().Info("CONNECTING...")

for {
connection, err := amqp.Dial(b.url)
var err error
b.connection, err = amqp.Dial(b.url)
if err != nil {
log.Get().Error(fmt.Sprintf("broker/amqp: fail to connect: %v", err))
log.Get().Error(errors.ErrConnection.WithValue(err))
time.Sleep(b.backoff.Duration())
continue
}

channel, err := connection.Channel()
b.channel, err = b.connection.Channel()
if err != nil {
log.Get().Error(fmt.Sprintf("broker/amqp: fail to get connection channel: %v", err))
log.Get().Error(errors.ErrChannelUnavailable.WithValue(err))
time.Sleep(b.backoff.Duration())
continue
}

b.connection = connection
b.channel = channel
b.closed = b.connection.NotifyClose(make(chan *amqp.Error))
b.backoff.Reset()
b.connected = true
Expand Down
41 changes: 41 additions & 0 deletions errors/error.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package errors

import "fmt"

type ErrorCode int

const (
GeneralError ErrorCode = iota
FormatError
ParseError
ServiceError
)

var (
ErrChannelUnavailable = &Error{Code: ServiceError, Message: "channel unavailable"}
ErrChannelMessage = &Error{Code: ServiceError, Message: "fail to get message"}
ErrConnection = &Error{Code: ServiceError, Message: "fail to connect"}
ErrMessagePublishing = &Error{Code: ServiceError, Message: "error on message publishing"}
ErrJobRegister = &Error{Code: ServiceError, Message: "error on job register"}
ErrJobEnqueue = &Error{Code: ServiceError, Message: "error on enqueue job"}
ErrEmptyURL = &Error{Code: FormatError, Message: "needs a non-empty url"}
ErrMessageSerialize = &Error{Code: ParseError, Message: "error on message serialize"}
)

type Error struct {
Code ErrorCode
Message string
Err error
}

func (e *Error) Error() string {
if e.Err != nil {
return fmt.Sprintf("[Error %d] %s: %s", e.Code, e.Message, e.Err)
}
return fmt.Sprintf("[Error %d] %s", e.Code, e.Message)
}

func (e *Error) WithValue(Err error) *Error {
e.Err = Err
return e
}
40 changes: 40 additions & 0 deletions errors/error_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package errors

import (
"fmt"
"testing"

"github.com/pkg/errors"
)

func TestErrorMessage(t *testing.T) {
customError := errors.New("fail")

tt := []struct {
Err *Error
Expectation string
}{
{
Err: &Error{Code: FormatError, Message: "empty value"},
Expectation: fmt.Sprintf("[Error %d] empty value", FormatError),
},
{
Err: &Error{Code: ServiceError, Message: "not found"},
Expectation: fmt.Sprintf("[Error %d] not found", ServiceError),
},
{
Err: (&Error{Code: FormatError, Message: "nil pointer"}).WithValue(customError),
Expectation: fmt.Sprintf("[Error %d] nil pointer: fail", FormatError),
},
{
Err: (&Error{Code: ServiceError, Message: "mysql error"}).WithValue(customError),
Expectation: fmt.Sprintf("[Error %d] mysql error: fail", ServiceError),
},
}

for _, tc := range tt {
if tc.Err.Error() != tc.Expectation {
t.Errorf("expect '%s', but got '%s'", tc.Expectation, tc.Err)
}
}
}
16 changes: 13 additions & 3 deletions examples/consumer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"fmt"
"log"

"golang.org/x/net/context"

Expand All @@ -10,7 +11,7 @@ import (
)

func main() {
wp := worker.NewWorkerPool("amqp://guest:guest@localhost:5672/", 5,
wp, err := worker.NewWorkerPool("amqp://guest:guest@localhost:5672/", 5,
func(ctx context.Context, next func(context.Context) error) error {
fmt.Print("Enter on Middleware 1 > ")
return next(ctx)
Expand All @@ -19,8 +20,11 @@ func main() {
fmt.Print("Enter on Middleware 2 > ")
return next(ctx)
})
if err != nil {
log.Fatal(err)
}

wp.RegisterJob(worker.JobType{
err = wp.RegisterJob(worker.JobType{
Name: "queue1",
Handle: func(ctx context.Context, gen worker.GenFunc) error {
msg := payload.Payload{}
Expand All @@ -36,8 +40,11 @@ func main() {
},
Priority: 10,
})
if err != nil {
log.Fatal(err)
}

wp.RegisterJob(worker.JobType{
err = wp.RegisterJob(worker.JobType{
Name: "queue2",
Handle: func(ctx context.Context, gen worker.GenFunc) error {
msg := payload.Payload{}
Expand All @@ -53,6 +60,9 @@ func main() {
},
Priority: 15,
})
if err != nil {
log.Fatal(err)
}

wp.Start()
}
12 changes: 7 additions & 5 deletions workerpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"syscall"

"github.com/guilhermehubner/worker/broker"
"github.com/guilhermehubner/worker/errors"
"github.com/guilhermehubner/worker/log"
"golang.org/x/net/context"
)
Expand Down Expand Up @@ -89,13 +90,14 @@ func (wp *Pool) Start() {
RegisterJob adds a job with handler for 'name' queue and allows you to specify options such as a
job's priority and it's retry count.
*/
func (wp *Pool) RegisterJob(job JobType) {
func (wp *Pool) RegisterJob(job JobType) error {
err := wp.broker.RegisterJob(job.Name)
if err != nil {
// TODO
return errors.ErrJobRegister.WithValue(err)
}

wp.jobTypes = append(wp.jobTypes, job)
return nil
}

/*
Expand All @@ -105,9 +107,9 @@ URL is a string connection in the AMQP URI format.
Concurrency specifies how many workers to spin up - each worker can process jobs concurrently.
*/
func NewWorkerPool(url string, concurrency uint, middlewares ...Middleware) *Pool {
func NewWorkerPool(url string, concurrency uint, middlewares ...Middleware) (*Pool, error) {
if strings.TrimSpace(url) == "" {
panic("worker workerpool: needs a non-empty url")
return nil, errors.ErrEmptyURL
}

wp := &Pool{
Expand All @@ -116,5 +118,5 @@ func NewWorkerPool(url string, concurrency uint, middlewares ...Middleware) *Poo
workers: make([]*worker, concurrency),
}

return wp
return wp, nil
}

0 comments on commit d25a81a

Please sign in to comment.