Skip to content
This repository was archived by the owner on Feb 10, 2025. It is now read-only.

Commit 271fbd8

Browse files
author
Filip
committed
Added supervisor and real health check
1 parent 564d00e commit 271fbd8

File tree

11 files changed

+252
-77
lines changed

11 files changed

+252
-77
lines changed

consumer/consumer.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,5 +5,5 @@ import "github.com/AirHelp/rabbit-amazon-forwarder/forwarder"
55
// Client intarface for consuming messages
66
type Client interface {
77
Name() string
8-
Consume(forwarder.Client) error
8+
Start(forwarder.Client, chan bool, chan bool) error
99
}

healthcheck/health.go

Lines changed: 0 additions & 13 deletions
This file was deleted.

healthcheck/health_test.go

Lines changed: 0 additions & 29 deletions
This file was deleted.

mapping/mapping.go

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -44,26 +44,24 @@ func New(helpers ...Helper) Client {
4444
return Client{helper}
4545
}
4646

47-
// LoadAndStart loads and starts mappings
48-
func (c Client) LoadAndStart() error {
47+
// Load loads mappings
48+
func (c Client) Load() (map[consumer.Client]forwarder.Client, error) {
49+
consumerForwaderMap := make(map[consumer.Client]forwarder.Client)
4950
data, err := c.loadFile()
5051
if err != nil {
51-
return err
52+
return consumerForwaderMap, err
5253
}
5354
var pairsList pairs
5455
if err = json.Unmarshal(data, &pairsList); err != nil {
55-
return err
56+
return consumerForwaderMap, err
5657
}
57-
log.Print("Starting consumer->forwader pairs")
58+
log.Print("Loading consumer->forwader pairs")
5859
for _, pair := range pairsList {
5960
consumer := c.helper.createConsumer(pair.Source)
6061
forwarder := c.helper.createForwarder(pair.Destination)
61-
log.Printf("Starting consumer:%s with forwader:%s", consumer.Name(), forwarder.Name())
62-
if err := consumer.Consume(forwarder); err != nil {
63-
return err
64-
}
62+
consumerForwaderMap[consumer] = forwarder
6563
}
66-
return nil
64+
return consumerForwaderMap, nil
6765
}
6866

6967
func (c Client) loadFile() ([]byte, error) {
@@ -73,7 +71,7 @@ func (c Client) loadFile() ([]byte, error) {
7371
}
7472

7573
func (h helperImpl) createConsumer(item common.Item) consumer.Client {
76-
log.Print("Creating consumer: ", item.Type)
74+
log.Printf("Creating consumer: [%s, %s]", item.Type, item.Name)
7775
switch item.Type {
7876
case rabbitmq.Type:
7977
return rabbitmq.CreateConsumer(item)
@@ -82,7 +80,7 @@ func (h helperImpl) createConsumer(item common.Item) consumer.Client {
8280
}
8381

8482
func (h helperImpl) createForwarder(item common.Item) forwarder.Client {
85-
log.Print("Creating forwarder: ", item.Type)
83+
log.Printf("Creating forwarder: [%s, %s]", item.Type, item.Name)
8684
switch item.Type {
8785
case sns.Type:
8886
return sns.CreateForwarder(item)

mapping/mapping_test.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,17 @@ const (
1818
snsType = "sns"
1919
)
2020

21-
func TestLoadAndStart(t *testing.T) {
21+
func TestLoad(t *testing.T) {
2222
os.Setenv(common.MappingFile, "../tests/rabbit_to_sns.json")
2323
client := New(MockMappingHelper{})
24-
if err := client.LoadAndStart(); err != nil {
24+
var consumerForwarderMap map[consumer.Client]forwarder.Client
25+
var err error
26+
if consumerForwarderMap, err = client.Load(); err != nil {
2527
t.Errorf("could not load mapping and start mocked rabbit->sns pair: %s", err.Error())
2628
}
29+
if len(consumerForwarderMap) != 1 {
30+
t.Errorf("wrong consumerForwarderMap size, expected 1, got %d", len(consumerForwarderMap))
31+
}
2732
}
2833

2934
func TestLoadFile(t *testing.T) {
@@ -118,7 +123,7 @@ func (c MockRabbitConsumer) Name() string {
118123
return rabbitType
119124
}
120125

121-
func (c MockRabbitConsumer) Consume(forwarder.Client) error {
126+
func (c MockRabbitConsumer) Start(client forwarder.Client, check chan bool, stop chan bool) error {
122127
return nil
123128
}
124129

rabbitmq/consumer.go

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -35,19 +35,17 @@ func (c Consumer) Name() string {
3535

3636
// TODO gracefull shotdown
3737
// Consume consumes messages from Rabbit queue
38-
func (c Consumer) Consume(forwarder forwarder.Client) error {
38+
func (c Consumer) Start(forwarder forwarder.Client, check chan bool, stop chan bool) error {
3939
log.Print("Starting consumer with params: ", c)
4040
conn, err := amqp.Dial(c.ConnectionURL)
4141
if err != nil {
4242
failOnError(err, "Failed to connect to RabbitMQ")
4343
}
44-
// defer conn.Close()
4544

4645
ch, err := conn.Channel()
4746
if err != nil {
4847
failOnError(err, "Failed to open a channel")
4948
}
50-
// defer ch.Close()
5149

5250
err = ch.ExchangeDeclare(
5351
c.ExchangeName, // name
@@ -96,20 +94,30 @@ func (c Consumer) Consume(forwarder forwarder.Client) error {
9694
return failOnError(err, "Failed to register a consumer")
9795
}
9896

99-
go c.push(msgs, forwarder)
97+
go c.push(forwarder, msgs, check, stop, conn, ch)
10098

10199
return nil
102100
}
103101

104-
func (c Consumer) push(msgs <-chan amqp.Delivery, forwarder forwarder.Client) {
102+
func (c Consumer) push(forwarder forwarder.Client, msgs <-chan amqp.Delivery, check chan bool, stop chan bool, conn *amqp.Connection, ch *amqp.Channel) {
105103
log.Printf("[%s] Started forwarding messages to %s", c.Name(), forwarder.Name())
106-
for d := range msgs {
107-
log.Printf("[%s] Message to forward: %v", c.Name(), d.MessageId)
108-
err := forwarder.Push(string(d.Body))
109-
if err != nil {
110-
log.Printf("[%s] Could not forward message. Error: %s", forwarder.Name(), err.Error())
111-
} else {
112-
d.Ack(true)
104+
for {
105+
select {
106+
case d := <-msgs:
107+
log.Printf("[%s] Message to forward: %v", c.Name(), d.MessageId)
108+
err := forwarder.Push(string(d.Body))
109+
if err != nil {
110+
log.Printf("[%s] Could not forward message. Error: %s", forwarder.Name(), err.Error())
111+
} else {
112+
d.Ack(true)
113+
}
114+
case <-check:
115+
log.Printf("[%s] Checking", forwarder.Name())
116+
case <-stop:
117+
log.Printf("[%s] Closing", forwarder.Name())
118+
ch.Close()
119+
conn.Close()
120+
return
113121
}
114122
}
115123
}

server.go

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,16 +4,21 @@ import (
44
"log"
55
"net/http"
66

7-
"github.com/AirHelp/rabbit-amazon-forwarder/healthcheck"
87
"github.com/AirHelp/rabbit-amazon-forwarder/mapping"
8+
"github.com/AirHelp/rabbit-amazon-forwarder/supervisor"
99
)
1010

1111
func main() {
12-
http.HandleFunc("/health", health.Check)
13-
err := mapping.New().LoadAndStart()
12+
consumerForwarderMap, err := mapping.New().Load()
1413
if err != nil {
15-
log.Fatalf("Could not load and start consumer->forwader pairs. Error: " + err.Error())
14+
log.Fatalf("Could not load consumer->forwader pairs. Error: " + err.Error())
1615
}
16+
supervisor := supervisor.New(consumerForwarderMap)
17+
if err := supervisor.Start(); err != nil {
18+
log.Fatal("Could not start supervisor. Error: ", err.Error())
19+
}
20+
http.HandleFunc("/restart", supervisor.Restart)
21+
http.HandleFunc("/health", supervisor.Check)
1722
log.Print("Starting http server")
1823
log.Fatal(http.ListenAndServe(":8080", nil))
1924
}

sns/forwader.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@ func (f Forwarder) Name() string {
3535

3636
// Push pushes message to forwarding infrastructure
3737
func (f Forwarder) Push(message string) error {
38-
log.Print("Topic: ", f.topic)
3938
params := &sns.PublishInput{
4039
Message: aws.String(message),
4140
TargetArn: aws.String(f.topic),

sqs/forwader.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,6 @@ func (f Forwarder) Name() string {
3535

3636
// Push pushes message to forwarding infrastructure
3737
func (f Forwarder) Push(message string) error {
38-
log.Print("Queue: ", f.queue)
39-
4038
params := &sqs.SendMessageInput{
4139
MessageBody: aws.String(message), // Required
4240
QueueUrl: aws.String(f.queue), // Required

supervisor/supervisor.go

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
package supervisor
2+
3+
import (
4+
"fmt"
5+
"log"
6+
"net/http"
7+
"time"
8+
9+
"github.com/AirHelp/rabbit-amazon-forwarder/consumer"
10+
"github.com/AirHelp/rabbit-amazon-forwarder/forwarder"
11+
)
12+
13+
type consumerChannel struct {
14+
name string
15+
check chan bool
16+
stop chan bool
17+
}
18+
19+
// Client supervisor client
20+
type Client struct {
21+
mappings map[consumer.Client]forwarder.Client
22+
consumers map[string]*consumerChannel
23+
}
24+
25+
// New client for supervisor
26+
func New(consumerForwarderMap map[consumer.Client]forwarder.Client) Client {
27+
return Client{mappings: consumerForwarderMap}
28+
}
29+
30+
// Start starts supervisor
31+
func (c *Client) Start() error {
32+
c.consumers = make(map[string]*consumerChannel)
33+
for consumer, forwarder := range c.mappings {
34+
channel := makeConsumerChannel(forwarder.Name())
35+
c.consumers[forwarder.Name()] = channel
36+
if err := consumer.Start(forwarder, channel.check, channel.stop); err != nil {
37+
return err
38+
}
39+
log.Printf("Started consumer:%s with forwader:%s", consumer.Name(), forwarder.Name())
40+
}
41+
return nil
42+
}
43+
44+
// Check checks running consumers
45+
func (c *Client) Check(w http.ResponseWriter, r *http.Request) {
46+
stopped := 0
47+
for _, consumer := range c.consumers {
48+
if len(consumer.check) > 0 {
49+
stopped = stopped + 1
50+
continue
51+
}
52+
consumer.check <- true
53+
time.Sleep(500 * time.Millisecond)
54+
if len(consumer.check) > 0 {
55+
stopped = stopped + 1
56+
}
57+
}
58+
if stopped > 0 {
59+
w.WriteHeader(500)
60+
message := fmt.Sprintf("Number of failed consumers: %d", stopped)
61+
w.Write([]byte(message))
62+
return
63+
}
64+
w.WriteHeader(200)
65+
w.Write([]byte("success"))
66+
}
67+
68+
// Restart restarts every consumer
69+
func (c *Client) Restart(w http.ResponseWriter, r *http.Request) {
70+
c.stop()
71+
if err := c.Start(); err != nil {
72+
w.WriteHeader(500)
73+
w.Write([]byte(err.Error()))
74+
return
75+
}
76+
w.WriteHeader(200)
77+
w.Write([]byte("success"))
78+
}
79+
80+
func (c *Client) stop() {
81+
for _, consumer := range c.consumers {
82+
consumer.stop <- true
83+
}
84+
85+
}
86+
87+
func makeConsumerChannel(name string) *consumerChannel {
88+
check := make(chan bool)
89+
stop := make(chan bool)
90+
return &consumerChannel{name: name, check: check, stop: stop}
91+
}

0 commit comments

Comments
 (0)