Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

include custom errors #6

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ script:
- dep ensure
- make lint
- vendorcheck ./...
- make test

after_success:
- bash <(curl -s https://codecov.io/bash)
2 changes: 2 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,6 @@ lint:
gosimple ${PACKAGES} | awk '{print ""; print} END {if (NR > 0) {exit 1} else print "... ok"}' && \
echo -n ">> unused" && \
unused ${PACKAGES} | awk '{print ""; print} END {if (NR > 0) {exit 1} else print "... ok"}'
test:
go test ${PACKAGES}

35 changes: 15 additions & 20 deletions broker/amqp.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"time"

"github.com/golang/protobuf/proto"
"github.com/guilhermehubner/worker/errors"
"github.com/guilhermehubner/worker/log"
"github.com/jpillora/backoff"
"github.com/streadway/amqp"
Expand Down Expand Up @@ -58,7 +59,7 @@ func (b *AMQPBroker) GetJobStatus(jobName string) (Status, error) {
func (b *AMQPBroker) GetMessage(jobName string) ([]byte, string) {
msg, ok, err := b.channel.Get(jobName, true)
if err != nil {
log.Get().Error(fmt.Sprintf("broker/amqp: fail to get message: %v", err))
log.Get().Error(errors.ErrChannelMessage.WithValue(err))
return nil, ""
}
if !ok {
Expand All @@ -78,14 +79,12 @@ func (b *AMQPBroker) Enqueue(jobName, messageID string, message proto.Message) e
nil, // arguments
)
if err != nil {
// TODO
return err
return errors.ErrJobEnqueue.WithValue(err)
}

body, err := proto.Marshal(message)
if err != nil {
// TODO
return err
return errors.ErrMessageSerialize.WithValue(err)
}

err = b.channel.Publish(
Expand All @@ -102,7 +101,7 @@ func (b *AMQPBroker) Enqueue(jobName, messageID string, message proto.Message) e
)

if err != nil {
// TODO
return errors.ErrMessagePublishing.WithValue(err)
}

return err
Expand All @@ -119,8 +118,7 @@ func (b *AMQPBroker) EnqueueIn(jobName, messageID string, message proto.Message,
nil, // arguments
)
if err != nil {
// TODO
return "", err
return "", errors.ErrJobEnqueue.WithValue(err)
}

queue, err := b.channel.QueueDeclare(
Expand All @@ -137,14 +135,12 @@ func (b *AMQPBroker) EnqueueIn(jobName, messageID string, message proto.Message,
},
)
if err != nil {
// TODO
return "", err
return "", errors.ErrJobEnqueue.WithValue(err)
}

body, err := proto.Marshal(message)
if err != nil {
// TODO
return "", err
return "", errors.ErrMessageSerialize.WithValue(err)
}

err = b.channel.Publish(
Expand All @@ -161,8 +157,7 @@ func (b *AMQPBroker) EnqueueIn(jobName, messageID string, message proto.Message,
)

if err != nil {
// TODO
return "", err
return "", errors.ErrMessagePublishing.WithValue(err)
}

return messageID, nil
Expand All @@ -172,21 +167,21 @@ func (b *AMQPBroker) connect() {
log.Get().Info("CONNECTING...")

for {
connection, err := amqp.Dial(b.url)
var err error
b.connection, err = amqp.Dial(b.url)
if err != nil {
log.Get().Error(fmt.Sprintf("broker/amqp: fail to connect: %v", err))
log.Get().Error(errors.ErrConnection.WithValue(err))
time.Sleep(b.backoff.Duration())
continue
}

channel, err := connection.Channel()
b.channel, err = b.connection.Channel()
if err != nil {
log.Get().Error(fmt.Sprintf("broker/amqp: fail to get connection channel: %v", err))
log.Get().Error(errors.ErrChannelUnavailable.WithValue(err))
time.Sleep(b.backoff.Duration())
continue
}

b.connection = connection
b.channel = channel
b.closed = b.connection.NotifyClose(make(chan *amqp.Error))
b.backoff.Reset()
b.connected = true
Expand Down
41 changes: 41 additions & 0 deletions errors/error.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package errors

import "fmt"

type ErrorCode int

const (
GeneralError ErrorCode = iota
FormatError
ParseError
ServiceError
)

var (
ErrChannelUnavailable = &Error{Code: ServiceError, Message: "channel unavailable"}
ErrChannelMessage = &Error{Code: ServiceError, Message: "fail to get message"}
ErrConnection = &Error{Code: ServiceError, Message: "fail to connect"}
ErrMessagePublishing = &Error{Code: ServiceError, Message: "error on message publishing"}
ErrJobRegister = &Error{Code: ServiceError, Message: "error on job register"}
ErrJobEnqueue = &Error{Code: ServiceError, Message: "error on enqueue job"}
ErrEmptyURL = &Error{Code: FormatError, Message: "needs a non-empty url"}
ErrMessageSerialize = &Error{Code: ParseError, Message: "error on message serialize"}
)

type Error struct {
Code ErrorCode
Message string
Err error
}

func (e *Error) Error() string {
if e.Err != nil {
return fmt.Sprintf("[Error %d] %s: %s", e.Code, e.Message, e.Err)
}
return fmt.Sprintf("[Error %d] %s", e.Code, e.Message)
}

func (e *Error) WithValue(Err error) *Error {
e.Err = Err
return e
}
40 changes: 40 additions & 0 deletions errors/error_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package errors

import (
"fmt"
"testing"

"github.com/pkg/errors"
)

func TestErrorMessage(t *testing.T) {
customError := errors.New("fail")

tt := []struct {
Err *Error
Expectation string
}{
{
Err: &Error{Code: FormatError, Message: "empty value"},
Expectation: fmt.Sprintf("[Error %d] empty value", FormatError),
},
{
Err: &Error{Code: ServiceError, Message: "not found"},
Expectation: fmt.Sprintf("[Error %d] not found", ServiceError),
},
{
Err: (&Error{Code: FormatError, Message: "nil pointer"}).WithValue(customError),
Expectation: fmt.Sprintf("[Error %d] nil pointer: fail", FormatError),
},
{
Err: (&Error{Code: ServiceError, Message: "mysql error"}).WithValue(customError),
Expectation: fmt.Sprintf("[Error %d] mysql error: fail", ServiceError),
},
}

for _, tc := range tt {
if tc.Err.Error() != tc.Expectation {
t.Errorf("expect '%s', but got '%s'", tc.Expectation, tc.Err)
}
}
}
16 changes: 13 additions & 3 deletions examples/consumer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"fmt"
"log"

"golang.org/x/net/context"

Expand All @@ -10,7 +11,7 @@ import (
)

func main() {
wp := worker.NewWorkerPool("amqp://guest:guest@localhost:5672/", 5,
wp, err := worker.NewWorkerPool("amqp://guest:guest@localhost:5672/", 5,
func(ctx context.Context, next func(context.Context) error) error {
fmt.Print("Enter on Middleware 1 > ")
return next(ctx)
Expand All @@ -19,8 +20,11 @@ func main() {
fmt.Print("Enter on Middleware 2 > ")
return next(ctx)
})
if err != nil {
log.Fatal(err)
}

wp.RegisterJob(worker.JobType{
err = wp.RegisterJob(worker.JobType{
Name: "queue1",
Handle: func(ctx context.Context, gen worker.GenFunc) error {
msg := payload.Payload{}
Expand All @@ -36,8 +40,11 @@ func main() {
},
Priority: 10,
})
if err != nil {
log.Fatal(err)
}

wp.RegisterJob(worker.JobType{
err = wp.RegisterJob(worker.JobType{
Name: "queue2",
Handle: func(ctx context.Context, gen worker.GenFunc) error {
msg := payload.Payload{}
Expand All @@ -53,6 +60,9 @@ func main() {
},
Priority: 15,
})
if err != nil {
log.Fatal(err)
}

wp.Start()
}
12 changes: 7 additions & 5 deletions workerpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"syscall"

"github.com/guilhermehubner/worker/broker"
"github.com/guilhermehubner/worker/errors"
"github.com/guilhermehubner/worker/log"
"golang.org/x/net/context"
)
Expand Down Expand Up @@ -89,13 +90,14 @@ func (wp *Pool) Start() {
RegisterJob adds a job with handler for 'name' queue and allows you to specify options such as a
job's priority and it's retry count.
*/
func (wp *Pool) RegisterJob(job JobType) {
func (wp *Pool) RegisterJob(job JobType) error {
err := wp.broker.RegisterJob(job.Name)
if err != nil {
// TODO
return errors.ErrJobRegister.WithValue(err)
}

wp.jobTypes = append(wp.jobTypes, job)
return nil
}

/*
Expand All @@ -105,9 +107,9 @@ URL is a string connection in the AMQP URI format.

Concurrency specifies how many workers to spin up - each worker can process jobs concurrently.
*/
func NewWorkerPool(url string, concurrency uint, middlewares ...Middleware) *Pool {
func NewWorkerPool(url string, concurrency uint, middlewares ...Middleware) (*Pool, error) {
if strings.TrimSpace(url) == "" {
panic("worker workerpool: needs a non-empty url")
return nil, errors.ErrEmptyURL
}

wp := &Pool{
Expand All @@ -116,5 +118,5 @@ func NewWorkerPool(url string, concurrency uint, middlewares ...Middleware) *Poo
workers: make([]*worker, concurrency),
}

return wp
return wp, nil
}