Skip to content
This repository was archived by the owner on Feb 10, 2025. It is now read-only.

Commit d0adb06

Browse files
author
Filip
committed
Automatic consumer recovery + sqs/sns tests
1 parent 1402f32 commit d0adb06

File tree

9 files changed

+753
-24
lines changed

9 files changed

+753
-24
lines changed

forwarder/forwarder.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,10 @@
11
package forwarder
22

3+
const (
4+
// EmptyMessageError empty error message
5+
EmptyMessageError = "message is empty"
6+
)
7+
38
// Client interface to forwarding messages
49
type Client interface {
510
Name() string

rabbitmq/consumer.go

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -103,31 +103,37 @@ func (c Consumer) Start(forwarder forwarder.Client, check chan bool, stop chan b
103103
return failOnError(err, "Failed to register a consumer")
104104
}
105105
params := workerParams{forwarder, msgs, check, stop, conn, ch}
106-
go c.push(params)
106+
go c.startForwarding(&params)
107107

108108
return nil
109109
}
110110

111-
func (c Consumer) push(params workerParams) {
111+
func (c Consumer) startForwarding(params *workerParams) {
112112
forwarderName := params.forwarder.Name()
113+
defer params.ch.Close()
114+
defer params.conn.Close()
113115
log.Printf("[%s] Started forwarding messages to %s", c.Name(), forwarderName)
114116
for {
115117
select {
116-
case d := <-params.msgs:
118+
case d, ok := <-params.msgs:
119+
if !ok { // channel already closed
120+
go c.Start(params.forwarder, params.check, params.stop)
121+
return
122+
}
117123
log.Printf("[%s] Message to forward: %v", c.Name(), d.MessageId)
118124
err := params.forwarder.Push(string(d.Body))
119125
if err != nil {
120126
log.Printf("[%s] Could not forward message. Error: %s", forwarderName, err.Error())
121127
} else {
122-
d.Ack(true)
128+
if err := d.Ack(true); err != nil {
129+
log.Println("Could not ack message with id:", d.MessageId)
130+
}
123131
}
124132
case <-params.check:
125133
log.Printf("[%s] Checking", forwarderName)
126134
case <-params.stop:
127135
log.Printf("[%s] Closing", forwarderName)
128-
params.ch.Close()
129-
params.conn.Close()
130-
return
136+
break
131137
}
132138
}
133139
}

sns/forwader.go

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,15 @@
11
package sns
22

33
import (
4+
"errors"
45
"log"
56

67
"github.com/AirHelp/rabbit-amazon-forwarder/config"
78
"github.com/AirHelp/rabbit-amazon-forwarder/forwarder"
89
"github.com/aws/aws-sdk-go/aws"
910
"github.com/aws/aws-sdk-go/aws/session"
1011
"github.com/aws/aws-sdk-go/service/sns"
12+
"github.com/aws/aws-sdk-go/service/sns/snsiface"
1113
)
1214

1315
const (
@@ -16,13 +18,18 @@ const (
1618

1719
type Forwarder struct {
1820
name string
19-
snsClient *sns.SNS
21+
snsClient snsiface.SNSAPI
2022
topic string
2123
}
2224

2325
// CreateForwarder creates instance of forwarder
24-
func CreateForwarder(entry config.AmazonEntry) forwarder.Client {
25-
client := awsClient()
26+
func CreateForwarder(entry config.AmazonEntry, snsClient ...snsiface.SNSAPI) forwarder.Client {
27+
var client snsiface.SNSAPI
28+
if len(snsClient) > 0 {
29+
client = snsClient[0]
30+
} else {
31+
client = sns.New(session.Must(session.NewSession()))
32+
}
2633
forwarder := Forwarder{entry.Name, client, entry.Target}
2734
log.Print("Created forwarder: ", forwarder.Name())
2835
return forwarder
@@ -35,6 +42,9 @@ func (f Forwarder) Name() string {
3542

3643
// Push pushes message to forwarding infrastructure
3744
func (f Forwarder) Push(message string) error {
45+
if message == "" {
46+
return errors.New(forwarder.EmptyMessageError)
47+
}
3848
params := &sns.PublishInput{
3949
Message: aws.String(message),
4050
TargetArn: aws.String(f.topic),
@@ -45,11 +55,6 @@ func (f Forwarder) Push(message string) error {
4555
log.Printf("[%s] Could not forward message. Error: %s", f.Name(), err.Error())
4656
return err
4757
}
48-
log.Printf("[%s] Forward succeeded. Response: %s", f.Name(), resp)
58+
log.Printf("[%s] Forward succeeded. Response: %v", f.Name(), resp)
4959
return nil
5060
}
51-
52-
func awsClient() *sns.SNS {
53-
sess := session.New()
54-
return sns.New(sess)
55-
}

sns/forwarder_test.go

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,18 @@
11
package sns
22

33
import (
4+
"errors"
45
"testing"
56

67
"github.com/AirHelp/rabbit-amazon-forwarder/config"
8+
"github.com/AirHelp/rabbit-amazon-forwarder/forwarder"
9+
"github.com/aws/aws-sdk-go/aws"
10+
"github.com/aws/aws-sdk-go/service/sns"
11+
"github.com/aws/aws-sdk-go/service/sns/snsiface"
712
)
813

14+
var badRequest = "Bad request"
15+
916
func TestCreateForwarder(t *testing.T) {
1017
entry := config.AmazonEntry{Type: "SNS",
1118
Name: "sns-test",
@@ -16,3 +23,75 @@ func TestCreateForwarder(t *testing.T) {
1623
t.Errorf("wrong forwarder name, expected:%s, found: %s", entry.Name, forwarder.Name())
1724
}
1825
}
26+
27+
func TestPush(t *testing.T) {
28+
topicName := "topic1"
29+
entry := config.AmazonEntry{Type: "SNS",
30+
Name: "sns-test",
31+
Target: topicName,
32+
}
33+
scenarios := []struct {
34+
name string
35+
mock snsiface.SNSAPI
36+
message string
37+
topic string
38+
err error
39+
}{
40+
{
41+
name: "empty message",
42+
mock: mockAmazonSNS{resp: sns.PublishOutput{MessageId: aws.String("messageId")}, topic: topicName, message: ""},
43+
message: "",
44+
topic: topicName,
45+
err: errors.New(forwarder.EmptyMessageError),
46+
},
47+
{
48+
name: "bad request",
49+
mock: mockAmazonSNS{resp: sns.PublishOutput{MessageId: aws.String("messageId")}, topic: topicName, message: badRequest},
50+
message: badRequest,
51+
topic: topicName,
52+
err: errors.New(badRequest),
53+
},
54+
{
55+
name: "success",
56+
mock: mockAmazonSNS{resp: sns.PublishOutput{MessageId: aws.String("messageId")}, topic: topicName, message: "abc"},
57+
message: "abc",
58+
topic: topicName,
59+
err: nil,
60+
},
61+
}
62+
for _, scenario := range scenarios {
63+
t.Log("Scenario name: ", scenario.name)
64+
forwarder := CreateForwarder(entry, scenario.mock)
65+
err := forwarder.Push(scenario.message)
66+
if scenario.err == nil && err != nil {
67+
t.Errorf("Error should not occur")
68+
return
69+
}
70+
if scenario.err == err {
71+
return
72+
}
73+
if err.Error() != scenario.err.Error() {
74+
t.Errorf("Wrong error, expecting:%v, got:%v", scenario.err, err)
75+
}
76+
}
77+
}
78+
79+
type mockAmazonSNS struct {
80+
snsiface.SNSAPI
81+
resp sns.PublishOutput
82+
topic string
83+
message string
84+
}
85+
86+
func (m mockAmazonSNS) Publish(input *sns.PublishInput) (*sns.PublishOutput, error) {
87+
if *input.TargetArn != m.topic {
88+
return nil, errors.New("Wrong topic name")
89+
}
90+
if *input.Message != m.message {
91+
return nil, errors.New("Wrong message body")
92+
}
93+
if *input.Message == badRequest {
94+
return nil, errors.New(badRequest)
95+
}
96+
return &m.resp, nil
97+
}

sqs/forwader.go

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,15 @@
11
package sqs
22

33
import (
4+
"errors"
45
"log"
56

67
"github.com/AirHelp/rabbit-amazon-forwarder/config"
78
"github.com/AirHelp/rabbit-amazon-forwarder/forwarder"
89
"github.com/aws/aws-sdk-go/aws"
910
"github.com/aws/aws-sdk-go/aws/session"
1011
"github.com/aws/aws-sdk-go/service/sqs"
12+
"github.com/aws/aws-sdk-go/service/sqs/sqsiface"
1113
)
1214

1315
const (
@@ -16,13 +18,18 @@ const (
1618

1719
type Forwarder struct {
1820
name string
19-
sqsClient *sqs.SQS
21+
sqsClient sqsiface.SQSAPI
2022
queue string
2123
}
2224

2325
// CreateForwarder creates instance of forwarder
24-
func CreateForwarder(entry config.AmazonEntry) forwarder.Client {
25-
client := awsClient()
26+
func CreateForwarder(entry config.AmazonEntry, sqsClient ...sqsiface.SQSAPI) forwarder.Client {
27+
var client sqsiface.SQSAPI
28+
if len(sqsClient) > 0 {
29+
client = sqsClient[0]
30+
} else {
31+
client = sqs.New(session.Must(session.NewSession()))
32+
}
2633
forwarder := Forwarder{entry.Name, client, entry.Target}
2734
log.Print("Created forwarder: ", forwarder.Name())
2835
return forwarder
@@ -35,6 +42,9 @@ func (f Forwarder) Name() string {
3542

3643
// Push pushes message to forwarding infrastructure
3744
func (f Forwarder) Push(message string) error {
45+
if message == "" {
46+
return errors.New(forwarder.EmptyMessageError)
47+
}
3848
params := &sqs.SendMessageInput{
3949
MessageBody: aws.String(message), // Required
4050
QueueUrl: aws.String(f.queue), // Required
@@ -49,8 +59,3 @@ func (f Forwarder) Push(message string) error {
4959
log.Printf("[%s] Forward succeeded. Response: %s", f.Name(), resp)
5060
return nil
5161
}
52-
53-
func awsClient() *sqs.SQS {
54-
sess := session.New()
55-
return sqs.New(sess)
56-
}

sqs/forwarder_test.go

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,18 @@
11
package sqs
22

33
import (
4+
"errors"
45
"testing"
56

67
"github.com/AirHelp/rabbit-amazon-forwarder/config"
8+
"github.com/AirHelp/rabbit-amazon-forwarder/forwarder"
9+
"github.com/aws/aws-sdk-go/aws"
10+
"github.com/aws/aws-sdk-go/service/sqs"
11+
"github.com/aws/aws-sdk-go/service/sqs/sqsiface"
712
)
813

14+
var badRequest = "Bad request"
15+
916
func TestCreateForwarder(t *testing.T) {
1017
entry := config.AmazonEntry{Type: "SQS",
1118
Name: "sqs-test",
@@ -16,3 +23,75 @@ func TestCreateForwarder(t *testing.T) {
1623
t.Errorf("wrong forwarder name, expected:%s, found: %s", entry.Name, forwarder.Name())
1724
}
1825
}
26+
27+
func TestPush(t *testing.T) {
28+
queueName := "queue1"
29+
entry := config.AmazonEntry{Type: "SQS",
30+
Name: "sqs-test",
31+
Target: queueName,
32+
}
33+
scenarios := []struct {
34+
name string
35+
mock sqsiface.SQSAPI
36+
message string
37+
queue string
38+
err error
39+
}{
40+
{
41+
name: "empty message",
42+
mock: mockAmazonSQS{resp: sqs.SendMessageOutput{MessageId: aws.String("messageId")}, queue: queueName, message: ""},
43+
message: "",
44+
queue: queueName,
45+
err: errors.New(forwarder.EmptyMessageError),
46+
},
47+
{
48+
name: "bad request",
49+
mock: mockAmazonSQS{resp: sqs.SendMessageOutput{MessageId: aws.String("messageId")}, queue: queueName, message: badRequest},
50+
message: badRequest,
51+
queue: queueName,
52+
err: errors.New(badRequest),
53+
},
54+
{
55+
name: "success",
56+
mock: mockAmazonSQS{resp: sqs.SendMessageOutput{MessageId: aws.String("messageId")}, queue: queueName, message: "abc"},
57+
message: "abc",
58+
queue: queueName,
59+
err: nil,
60+
},
61+
}
62+
for _, scenario := range scenarios {
63+
t.Log("Scenario name: ", scenario.name)
64+
forwarder := CreateForwarder(entry, scenario.mock)
65+
err := forwarder.Push(scenario.message)
66+
if scenario.err == nil && err != nil {
67+
t.Errorf("Error should not occur")
68+
return
69+
}
70+
if scenario.err == err {
71+
return
72+
}
73+
if err.Error() != scenario.err.Error() {
74+
t.Errorf("Wrong error, expecting:%v, got:%v", scenario.err, err)
75+
}
76+
}
77+
}
78+
79+
type mockAmazonSQS struct {
80+
sqsiface.SQSAPI
81+
resp sqs.SendMessageOutput
82+
queue string
83+
message string
84+
}
85+
86+
func (m mockAmazonSQS) SendMessage(input *sqs.SendMessageInput) (*sqs.SendMessageOutput, error) {
87+
if *input.QueueUrl != m.queue {
88+
return nil, errors.New("Wrong queue name")
89+
}
90+
if *input.MessageBody != m.message {
91+
return nil, errors.New("Wrong message body")
92+
}
93+
if *input.MessageBody == badRequest {
94+
return nil, errors.New(badRequest)
95+
}
96+
return &m.resp, nil
97+
}

0 commit comments

Comments
 (0)