diff --git a/.travis.yml b/.travis.yml index 70f86d9..b97745d 100644 --- a/.travis.yml +++ b/.travis.yml @@ -18,6 +18,7 @@ script: - dep ensure - make lint - vendorcheck ./... + - make test after_success: - bash <(curl -s https://codecov.io/bash) diff --git a/Makefile b/Makefile index 59601bf..f5ec783 100644 --- a/Makefile +++ b/Makefile @@ -17,4 +17,6 @@ lint: gosimple ${PACKAGES} | awk '{print ""; print} END {if (NR > 0) {exit 1} else print "... ok"}' && \ echo -n ">> unused" && \ unused ${PACKAGES} | awk '{print ""; print} END {if (NR > 0) {exit 1} else print "... ok"}' +test: + go test ${PACKAGES} diff --git a/broker/amqp.go b/broker/amqp.go index 856ee8d..6097c1d 100644 --- a/broker/amqp.go +++ b/broker/amqp.go @@ -5,6 +5,7 @@ import ( "time" "github.com/golang/protobuf/proto" + "github.com/guilhermehubner/worker/errors" "github.com/guilhermehubner/worker/log" "github.com/jpillora/backoff" "github.com/streadway/amqp" @@ -58,7 +59,7 @@ func (b *AMQPBroker) GetJobStatus(jobName string) (Status, error) { func (b *AMQPBroker) GetMessage(jobName string) ([]byte, string) { msg, ok, err := b.channel.Get(jobName, true) if err != nil { - log.Get().Error(fmt.Sprintf("broker/amqp: fail to get message: %v", err)) + log.Get().Error(errors.ErrChannelMessage.WithValue(err)) return nil, "" } if !ok { @@ -78,14 +79,12 @@ func (b *AMQPBroker) Enqueue(jobName, messageID string, message proto.Message) e nil, // arguments ) if err != nil { - // TODO - return err + return errors.ErrJobEnqueue.WithValue(err) } body, err := proto.Marshal(message) if err != nil { - // TODO - return err + return errors.ErrMessageSerialize.WithValue(err) } err = b.channel.Publish( @@ -102,7 +101,7 @@ func (b *AMQPBroker) Enqueue(jobName, messageID string, message proto.Message) e ) if err != nil { - // TODO + return errors.ErrMessagePublishing.WithValue(err) } return err @@ -119,8 +118,7 @@ func (b *AMQPBroker) EnqueueIn(jobName, messageID string, message proto.Message, nil, // arguments ) if err != nil { - // TODO - return "", err + return "", errors.ErrJobEnqueue.WithValue(err) } queue, err := b.channel.QueueDeclare( @@ -137,14 +135,12 @@ func (b *AMQPBroker) EnqueueIn(jobName, messageID string, message proto.Message, }, ) if err != nil { - // TODO - return "", err + return "", errors.ErrJobEnqueue.WithValue(err) } body, err := proto.Marshal(message) if err != nil { - // TODO - return "", err + return "", errors.ErrMessageSerialize.WithValue(err) } err = b.channel.Publish( @@ -161,8 +157,7 @@ func (b *AMQPBroker) EnqueueIn(jobName, messageID string, message proto.Message, ) if err != nil { - // TODO - return "", err + return "", errors.ErrMessagePublishing.WithValue(err) } return messageID, nil @@ -172,21 +167,21 @@ func (b *AMQPBroker) connect() { log.Get().Info("CONNECTING...") for { - connection, err := amqp.Dial(b.url) + var err error + b.connection, err = amqp.Dial(b.url) if err != nil { - log.Get().Error(fmt.Sprintf("broker/amqp: fail to connect: %v", err)) + log.Get().Error(errors.ErrConnection.WithValue(err)) time.Sleep(b.backoff.Duration()) continue } - channel, err := connection.Channel() + b.channel, err = b.connection.Channel() if err != nil { - log.Get().Error(fmt.Sprintf("broker/amqp: fail to get connection channel: %v", err)) + log.Get().Error(errors.ErrChannelUnavailable.WithValue(err)) + time.Sleep(b.backoff.Duration()) continue } - b.connection = connection - b.channel = channel b.closed = b.connection.NotifyClose(make(chan *amqp.Error)) b.backoff.Reset() b.connected = true diff --git a/errors/error.go b/errors/error.go new file mode 100644 index 0000000..2f8a600 --- /dev/null +++ b/errors/error.go @@ -0,0 +1,41 @@ +package errors + +import "fmt" + +type ErrorCode int + +const ( + GeneralError ErrorCode = iota + FormatError + ParseError + ServiceError +) + +var ( + ErrChannelUnavailable = &Error{Code: ServiceError, Message: "channel unavailable"} + ErrChannelMessage = &Error{Code: ServiceError, Message: "fail to get message"} + ErrConnection = &Error{Code: ServiceError, Message: "fail to connect"} + ErrMessagePublishing = &Error{Code: ServiceError, Message: "error on message publishing"} + ErrJobRegister = &Error{Code: ServiceError, Message: "error on job register"} + ErrJobEnqueue = &Error{Code: ServiceError, Message: "error on enqueue job"} + ErrEmptyURL = &Error{Code: FormatError, Message: "needs a non-empty url"} + ErrMessageSerialize = &Error{Code: ParseError, Message: "error on message serialize"} +) + +type Error struct { + Code ErrorCode + Message string + Err error +} + +func (e *Error) Error() string { + if e.Err != nil { + return fmt.Sprintf("[Error %d] %s: %s", e.Code, e.Message, e.Err) + } + return fmt.Sprintf("[Error %d] %s", e.Code, e.Message) +} + +func (e *Error) WithValue(Err error) *Error { + e.Err = Err + return e +} diff --git a/errors/error_test.go b/errors/error_test.go new file mode 100644 index 0000000..1838b2f --- /dev/null +++ b/errors/error_test.go @@ -0,0 +1,40 @@ +package errors + +import ( + "fmt" + "testing" + + "github.com/pkg/errors" +) + +func TestErrorMessage(t *testing.T) { + customError := errors.New("fail") + + tt := []struct { + Err *Error + Expectation string + }{ + { + Err: &Error{Code: FormatError, Message: "empty value"}, + Expectation: fmt.Sprintf("[Error %d] empty value", FormatError), + }, + { + Err: &Error{Code: ServiceError, Message: "not found"}, + Expectation: fmt.Sprintf("[Error %d] not found", ServiceError), + }, + { + Err: (&Error{Code: FormatError, Message: "nil pointer"}).WithValue(customError), + Expectation: fmt.Sprintf("[Error %d] nil pointer: fail", FormatError), + }, + { + Err: (&Error{Code: ServiceError, Message: "mysql error"}).WithValue(customError), + Expectation: fmt.Sprintf("[Error %d] mysql error: fail", ServiceError), + }, + } + + for _, tc := range tt { + if tc.Err.Error() != tc.Expectation { + t.Errorf("expect '%s', but got '%s'", tc.Expectation, tc.Err) + } + } +} diff --git a/examples/consumer/main.go b/examples/consumer/main.go index 2f382b6..6b8d5ed 100644 --- a/examples/consumer/main.go +++ b/examples/consumer/main.go @@ -2,6 +2,7 @@ package main import ( "fmt" + "log" "golang.org/x/net/context" @@ -10,7 +11,7 @@ import ( ) func main() { - wp := worker.NewWorkerPool("amqp://guest:guest@localhost:5672/", 5, + wp, err := worker.NewWorkerPool("amqp://guest:guest@localhost:5672/", 5, func(ctx context.Context, next func(context.Context) error) error { fmt.Print("Enter on Middleware 1 > ") return next(ctx) @@ -19,8 +20,11 @@ func main() { fmt.Print("Enter on Middleware 2 > ") return next(ctx) }) + if err != nil { + log.Fatal(err) + } - wp.RegisterJob(worker.JobType{ + err = wp.RegisterJob(worker.JobType{ Name: "queue1", Handle: func(ctx context.Context, gen worker.GenFunc) error { msg := payload.Payload{} @@ -36,8 +40,11 @@ func main() { }, Priority: 10, }) + if err != nil { + log.Fatal(err) + } - wp.RegisterJob(worker.JobType{ + err = wp.RegisterJob(worker.JobType{ Name: "queue2", Handle: func(ctx context.Context, gen worker.GenFunc) error { msg := payload.Payload{} @@ -53,6 +60,9 @@ func main() { }, Priority: 15, }) + if err != nil { + log.Fatal(err) + } wp.Start() } diff --git a/workerpool.go b/workerpool.go index 9a98969..82f78a6 100644 --- a/workerpool.go +++ b/workerpool.go @@ -9,6 +9,7 @@ import ( "syscall" "github.com/guilhermehubner/worker/broker" + "github.com/guilhermehubner/worker/errors" "github.com/guilhermehubner/worker/log" "golang.org/x/net/context" ) @@ -89,13 +90,14 @@ func (wp *Pool) Start() { RegisterJob adds a job with handler for 'name' queue and allows you to specify options such as a job's priority and it's retry count. */ -func (wp *Pool) RegisterJob(job JobType) { +func (wp *Pool) RegisterJob(job JobType) error { err := wp.broker.RegisterJob(job.Name) if err != nil { - // TODO + return errors.ErrJobRegister.WithValue(err) } wp.jobTypes = append(wp.jobTypes, job) + return nil } /* @@ -105,9 +107,9 @@ URL is a string connection in the AMQP URI format. Concurrency specifies how many workers to spin up - each worker can process jobs concurrently. */ -func NewWorkerPool(url string, concurrency uint, middlewares ...Middleware) *Pool { +func NewWorkerPool(url string, concurrency uint, middlewares ...Middleware) (*Pool, error) { if strings.TrimSpace(url) == "" { - panic("worker workerpool: needs a non-empty url") + return nil, errors.ErrEmptyURL } wp := &Pool{ @@ -116,5 +118,5 @@ func NewWorkerPool(url string, concurrency uint, middlewares ...Middleware) *Poo workers: make([]*worker, concurrency), } - return wp + return wp, nil }