Skip to content

Commit 8ed9baa

Browse files
committed
add amqp
1 parent f74b4a5 commit 8ed9baa

34 files changed

+2631
-184
lines changed

Makefile

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,16 @@ run-product:
1414
CGO_ENABLED=0 go run -tags migrate github.com/thangchung/go-coffeeshop/cmd/product
1515
.PHONY: run-product
1616

17+
run-counter:
18+
cd cmd/counter && go mod tidy && go mod download && \
19+
CGO_ENABLED=0 go run -tags migrate github.com/thangchung/go-coffeeshop/cmd/counter
20+
.PHONY: run-counter
21+
22+
run-barista:
23+
cd cmd/counter && go mod tidy && go mod download && \
24+
CGO_ENABLED=0 go run -tags migrate github.com/thangchung/go-coffeeshop/cmd/barista
25+
.PHONY: run-barista
26+
1727
run-proxy:
1828
cd cmd/proxy && go mod tidy && go mod download && \
1929
CGO_ENABLED=0 go run -tags migrate github.com/thangchung/go-coffeeshop/cmd/proxy

README.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,16 @@
22

33
The coffeeshop with golang stack
44

5+
## Services
6+
7+
No. | Service | URI
8+
--- | --- | ---
9+
1 | grpc-gateway | [http://localhost:5000](http://localhost:5000)
10+
2 | product service | [http://localhost:5001](http://localhost:5001)
11+
3 | counter service | [http://localhost:5002](http://localhost:5002)
12+
4 | barista service | [http://localhost:5003](http://localhost:5003)
13+
5 | kitchen service | [http://localhost:5004](http://localhost:5004)
14+
515
## Package
616

717
```go

buf.gen.yaml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
11
version: v1
22
plugins:
33
- remote: buf.build/protocolbuffers/plugins/go:v1.28.0-1
4-
out: proto
4+
out: proto/gen/
55
opt: paths=source_relative
66
- remote: buf.build/grpc/plugins/go:v1.2.0-1
7-
out: proto
7+
out: proto/gen/
88
opt: paths=source_relative,require_unimplemented_servers=false
99
- remote: buf.build/grpc-ecosystem/plugins/grpc-gateway:v2.7.2-1
10-
out: proto
10+
out: proto/gen/
1111
opt: paths=source_relative
1212
- remote: buf.build/grpc-ecosystem/plugins/openapiv2:v2.7.2-1
1313
out: third_party/OpenAPI

client.http

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,4 +6,8 @@ content-type: application/json
66

77
###
88
GET {{host}}/v1/api/items-by-types/1,2,3 HTTP/1.1
9+
content-type: application/json
10+
11+
###
12+
GET {{host}}/v1/fulfillment-orders HTTP/1.1
913
content-type: application/json

cmd/barista/event/consumer.go

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
package event
2+
3+
import (
4+
"encoding/json"
5+
"fmt"
6+
"log"
7+
8+
amqp "github.com/rabbitmq/amqp091-go"
9+
)
10+
11+
const ExchangeName = "orders_topic"
12+
13+
type Consumer struct {
14+
conn *amqp.Connection
15+
}
16+
17+
func NewConsumer(conn *amqp.Connection) (Consumer, error) {
18+
consumer := Consumer{
19+
conn: conn,
20+
}
21+
22+
err := consumer.setup()
23+
if err != nil {
24+
return Consumer{}, err
25+
}
26+
27+
return consumer, nil
28+
}
29+
30+
func (c *Consumer) setup() error {
31+
channel, err := c.conn.Channel()
32+
if err != nil {
33+
return err
34+
}
35+
36+
return channel.ExchangeDeclare(
37+
ExchangeName, // name
38+
"topic", // type
39+
true, // durable
40+
false, // auto-deleted
41+
false, // internal
42+
false, // no-wait
43+
nil, // arguments
44+
)
45+
}
46+
47+
type Payload struct {
48+
Name string `json:"name"`
49+
}
50+
51+
func (c *Consumer) Listen(topics []string) error {
52+
ch, err := c.conn.Channel()
53+
if err != nil {
54+
return err
55+
}
56+
defer ch.Close()
57+
58+
q, err := ch.QueueDeclare(
59+
"", // name
60+
false, // durable
61+
false, // delete when unused
62+
true, // exclusive
63+
false, // no-wait
64+
nil, // arguments
65+
)
66+
if err != nil {
67+
return err
68+
}
69+
70+
for _, s := range topics {
71+
err = ch.QueueBind(
72+
q.Name,
73+
s,
74+
ExchangeName,
75+
false,
76+
nil,
77+
)
78+
79+
if err != nil {
80+
log.Println(err)
81+
82+
return err
83+
}
84+
}
85+
86+
messages, err := ch.Consume(q.Name, "", true, false, false, false, nil)
87+
if err != nil {
88+
return err
89+
}
90+
91+
forever := make(chan bool)
92+
93+
go func() {
94+
for d := range messages {
95+
var payload Payload
96+
97+
_ = json.Unmarshal(d.Body, &payload)
98+
99+
go func() {
100+
switch payload.Name {
101+
case "drink_made":
102+
fmt.Println("Got it")
103+
default:
104+
fmt.Println("default")
105+
}
106+
}()
107+
}
108+
}()
109+
110+
log.Printf("[*] Waiting for message [Exchange, Queue][%s, %s].", ExchangeName, q.Name)
111+
<-forever
112+
113+
return nil
114+
}

cmd/barista/main.go

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
package main
2+
3+
import (
4+
"fmt"
5+
"log"
6+
"math"
7+
"os"
8+
"time"
9+
10+
amqp "github.com/rabbitmq/amqp091-go"
11+
"github.com/thangchung/go-coffeeshop/cmd/barista/event"
12+
)
13+
14+
const (
15+
RetryTimes = 5
16+
PowOf = 2
17+
)
18+
19+
func main() {
20+
rabbitConn, err := connect()
21+
if err != nil {
22+
log.Println(err)
23+
os.Exit(1)
24+
}
25+
26+
defer rabbitConn.Close()
27+
28+
log.Println("Listening for and consuming RabbitMQ messages...")
29+
30+
consumer, err := event.NewConsumer(rabbitConn)
31+
if err != nil {
32+
panic(err)
33+
}
34+
35+
err = consumer.Listen([]string{"log.INFO", "log.WARNING", "log.ERROR"})
36+
if err != nil {
37+
log.Println(err)
38+
}
39+
}
40+
41+
func connect() (*amqp.Connection, error) {
42+
var (
43+
counts int64
44+
backOff = 1 * time.Second
45+
connection *amqp.Connection
46+
rabbitURL = "amqp://guest:[email protected]:5672/"
47+
)
48+
49+
for {
50+
c, err := amqp.Dial(rabbitURL)
51+
if err != nil {
52+
fmt.Println("RabbitMQ not yet ready...")
53+
counts++
54+
} else {
55+
connection = c
56+
fmt.Println()
57+
58+
break
59+
}
60+
61+
if counts > RetryTimes {
62+
fmt.Println(err)
63+
64+
return nil, err
65+
}
66+
67+
fmt.Printf("Backing off for %d seconds...\n", int(math.Pow(float64(counts), PowOf)))
68+
backOff = time.Duration(math.Pow(float64(counts), PowOf)) * time.Second
69+
time.Sleep(backOff)
70+
71+
continue
72+
}
73+
74+
return connection, nil
75+
}

cmd/counter/config.yml

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
app:
2+
name: 'counter-service'
3+
version: '1.0.0'
4+
5+
http:
6+
host: '0.0.0.0'
7+
port: 5002
8+
9+
logger:
10+
log_level: 'debug'
11+
rollbar_env: 'counter-service'

cmd/counter/config/config.go

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
package config
2+
3+
import (
4+
"fmt"
5+
"log"
6+
"os"
7+
8+
"github.com/ilyakaznacheev/cleanenv"
9+
configs "github.com/thangchung/go-coffeeshop/pkg/config"
10+
)
11+
12+
type (
13+
Config struct {
14+
configs.App `yaml:"app"`
15+
configs.HTTP `yaml:"http"`
16+
configs.Log `yaml:"logger"`
17+
}
18+
)
19+
20+
func NewConfig() (*Config, error) {
21+
cfg := &Config{}
22+
23+
dir, err := os.Getwd()
24+
if err != nil {
25+
log.Fatal(err)
26+
}
27+
28+
// debug
29+
fmt.Println(dir)
30+
31+
err = cleanenv.ReadConfig(dir+"/config.yml", cfg)
32+
if err != nil {
33+
return nil, fmt.Errorf("config error: %w", err)
34+
}
35+
36+
err = cleanenv.ReadEnv(cfg)
37+
if err != nil {
38+
return nil, err
39+
}
40+
41+
return cfg, nil
42+
}

cmd/counter/main.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"os"
7+
"reflect"
8+
9+
"github.com/golang/glog"
10+
"github.com/thangchung/go-coffeeshop/cmd/counter/config"
11+
"github.com/thangchung/go-coffeeshop/internal/counter/app"
12+
mylogger "github.com/thangchung/go-coffeeshop/pkg/logger"
13+
)
14+
15+
func main() {
16+
cfg, err := config.NewConfig()
17+
if err != nil {
18+
glog.Fatal(err)
19+
}
20+
21+
fmt.Println(reflect.TypeOf(struct{}{}))
22+
23+
mylog := mylogger.New(cfg.Level)
24+
25+
a := app.New(mylog, cfg)
26+
if err = a.Run(context.Background()); err != nil {
27+
glog.Fatal(err)
28+
os.Exit(1)
29+
}
30+
}

cmd/product/config/config.go

Lines changed: 4 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -6,27 +6,14 @@ import (
66
"os"
77

88
"github.com/ilyakaznacheev/cleanenv"
9+
configs "github.com/thangchung/go-coffeeshop/pkg/config"
910
)
1011

1112
type (
1213
Config struct {
13-
App `yaml:"app"`
14-
HTTP `yaml:"http"`
15-
Log `yaml:"logger"`
16-
}
17-
18-
App struct {
19-
Name string `env-required:"true" yaml:"name" env:"APP_NAME"`
20-
Version string `env-required:"true" yaml:"version" env:"APP_VERSION"`
21-
}
22-
23-
HTTP struct {
24-
Host string `env-required:"true" yaml:"host" env:"HTTP_HOST"`
25-
Port int `env-required:"true" yaml:"port" env:"HTTP_PORT"`
26-
}
27-
28-
Log struct {
29-
Level string `env-required:"true" yaml:"log_level" env:"LOG_LEVEL"`
14+
configs.App `yaml:"app"`
15+
configs.HTTP `yaml:"http"`
16+
configs.Log `yaml:"logger"`
3017
}
3118
)
3219

0 commit comments

Comments
 (0)