Skip to content

Commit

Permalink
Merge pull request #17 from rog-golang-buddies/queue_setup
Browse files Browse the repository at this point in the history
Queue setup
  • Loading branch information
ldmi3i committed Aug 5, 2022
2 parents b18c7c6 + be9fddf commit 79e12b5
Show file tree
Hide file tree
Showing 25 changed files with 844 additions and 14 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/golangci-lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@ jobs:
steps:
- uses: actions/setup-go@v3
with:
go-version: 1.17
go-version: 1.18
- name: Checkout
uses: actions/checkout@v3
- name: Run linters
uses: golangci/golangci-lint-action@v3
with:
# Optional: version of golangci-lint to use in form of v1.2 or v1.2.3 or `latest` to use the latest version
version: v1.29
version: v1.47.2

# Optional: working directory, useful for monorepos
# working-directory: somedir
Expand Down
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,10 @@ In other words this service processes content of Open API file, transforms it to
4. Validate content
5. Parse content into an ASD model
6. Put ASD model with metadata to the storage and update service queue

### Starting service
The easiest way to start an application is to do it with docker.
If you have docker you just need to run a command from the project root
`docker-compose -f ./docker/docker-compose-dev.yml up -d --build`.
And `docker-compose -f ./docker/docker-compose-dev.yml down` to stop.
You can observe queues, and send and retrieve messages from queues using the web interface available by address http://localhost:15672 .
8 changes: 5 additions & 3 deletions cmd/main.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package main

import "fmt"
import (
"github.com/rog-golang-buddies/api-hub_data-scraping-service/internal"
"os"
)

func main() {
// Feel free to delete this file.
fmt.Println("Hello Gophers")
os.Exit(internal.Start())
}
18 changes: 18 additions & 0 deletions docker/docker-compose-dev.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
version: '3.9'

services:
rabbit:
image: rabbitmq:3-management #you may open management UI via http://localhost:15672/#/ login&password == guest
container_name: rabbit
ports:
- "5672:5672"
- "15672:15672"

data-scraping-service:
container_name: dss
build:
context: ../.
dockerfile: Dockerfile
restart: unless-stopped
depends_on:
- rabbit
13 changes: 13 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,3 +1,16 @@
module github.com/rog-golang-buddies/api-hub_data-scraping-service

go 1.18

require (
github.com/golang/mock v1.6.0
github.com/rabbitmq/amqp091-go v1.4.0
github.com/stretchr/testify v1.7.0
github.com/wagslane/go-rabbitmq v0.10.0
)

require (
github.com/davecgh/go-spew v1.1.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c // indirect
)
30 changes: 21 additions & 9 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,19 +1,29 @@
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/golang/mock v1.6.0 h1:ErTB+efbowRARo13NNdxyJji2egdxLGQhRaY+DUumQc=
github.com/golang/mock v1.6.0/go.mod h1:p6yTPP+5HYm5mzsMV8JkE6ZKdX+/wYM6Hr+LicevLPs=
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rabbitmq/amqp091-go v1.4.0 h1:T2G+J9W9OY4p64Di23J6yH7tOkMocgnESvYeBjuG9cY=
github.com/rabbitmq/amqp091-go v1.4.0/go.mod h1:JsV0ofX5f1nwOGafb8L5rBItt9GyhfQfcJj+oyz0dGg=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/wagslane/go-rabbitmq v0.10.0 h1:y9Bw8Q/9gOvsHfjMOGQjCW3033aYTKabxDm8eyjUGjs=
github.com/wagslane/go-rabbitmq v0.10.0/go.mod h1:u6xM1V7OO4D0szUy/F6Bya/9r0lLae/2FXBijkAQmn0=
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
go.uber.org/goleak v1.1.12 h1:gZAh5/EyT/HQwlpkCy6wTpqfH9H8Lz8zbm3dZh+OyzA=
go.uber.org/goleak v1.1.12/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
Expand All @@ -28,13 +38,15 @@ golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9sn
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.1.1/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
46 changes: 46 additions & 0 deletions internal/app.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package internal

import (
"context"
"github.com/rog-golang-buddies/api-hub_data-scraping-service/internal/config"
"github.com/rog-golang-buddies/api-hub_data-scraping-service/internal/queue"
"github.com/rog-golang-buddies/api-hub_data-scraping-service/internal/queue/handler"
"github.com/rog-golang-buddies/api-hub_data-scraping-service/internal/queue/publisher"
"log"
)

func Start() int {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

conf := config.ReadConfig() //read configuration from file & env
//initialize publisher connection to the queue
//this library assumes using one publisher and one consumer per application
//https://github.com/wagslane/go-rabbitmq/issues/79
pub, err := publisher.NewPublisher(conf.QueueConfig) //TODO pass logger here and add it to publisher options
if err != nil {
log.Println("error while starting publisher: ", err)
return 1
}
defer publisher.ClosePublisher(pub)
//initialize consumer connection to the queue
consumer, err := queue.NewConsumer(conf.QueueConfig) //TODO pass logger here and add it to consumer options
if err != nil {
log.Println("error while connecting to the queue: ", err)
return 1
}
defer queue.CloseConsumer(consumer)

handl := handler.NewApiSpecDocHandler(pub, conf.QueueConfig)
listener := queue.NewListener()
err = listener.Start(consumer, &conf.QueueConfig, handl)
if err != nil {
log.Println("error while listening queue ", err)
return 1
}

<-ctx.Done()

log.Println("application stopped gracefully (not)")
return 0
}
20 changes: 20 additions & 0 deletions internal/config/application.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package config

type ApplicationConfig struct {
QueueConfig QueueConfig
}

func ReadConfig() ApplicationConfig {
//Stub this method before the configuration task is not resolved
//https://github.com/rog-golang-buddies/api-hub_data-scraping-service/issues/10
//TODO implement with the method to read configuration from file and env
return ApplicationConfig{
QueueConfig: QueueConfig{
UrlRequestQueue: "data-scraping-asd",
ScrapingResultQueue: "storage-update-asd",
NotificationQueue: "gateway-scrape_notifications",
Url: "amqp://guest:guest@rabbit:5672/",
Concurrency: 10,
},
}
}
10 changes: 10 additions & 0 deletions internal/config/queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package config

//QueueConfig queue configuration
type QueueConfig struct {
UrlRequestQueue string //UrlRequestQueue name to listen to the new events
ScrapingResultQueue string //Queue name to send processed ApiSpecDoc
NotificationQueue string //Queue name to notify a user about error or success (if required)
Url string //RabbitMQ url
Concurrency int //Number of parallel handlers
}
9 changes: 9 additions & 0 deletions internal/dto/scrapingResult.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package dto

import "github.com/rog-golang-buddies/api-hub_data-scraping-service/internal/dto/apiSpecDoc"

type ScrapingResult struct {
IsNotifyUser bool

ApiSpecDoc apiSpecDoc.ApiSpecDoc
}
12 changes: 12 additions & 0 deletions internal/dto/urlRequest.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package dto

//UrlRequest represents listening request model
type UrlRequest struct {
//File url to scrape data
FileUrl string

//A flag is a notification required related to an error notification in case of an error
//Notification is required when this is the request from the user and doesn't require it
//if it is the request from the storage and update service.
IsNotifyUser bool
}
32 changes: 32 additions & 0 deletions internal/dto/userNotification.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package dto

import "fmt"

//UserNotification represents basic DTO notification to the user if requested
//Initially, it supposed to be simple - if err != nil => error happens, else all is ok
type UserNotification struct {
Error *ProcessingError
}

func NewUserNotification(procErr *ProcessingError) UserNotification {
return UserNotification{Error: procErr}
}

//ProcessingError represents basic DTO to provide information about the error
//when the processing request contains a notification request
type ProcessingError struct {
Cause error

Message string
}

func (pe *ProcessingError) Error() string {
return fmt.Sprintf("%s: %v", pe.Message, pe.Cause)
}

func NewProcessingError(message string, err error) ProcessingError {
return ProcessingError{
Cause: err,
Message: message,
}
}
40 changes: 40 additions & 0 deletions internal/queue/consumer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package queue

import (
"github.com/rog-golang-buddies/api-hub_data-scraping-service/internal/config"
"github.com/wagslane/go-rabbitmq"
"io"
"log"
)

//Consumer is just an interface for the library consumer which doesn't have one.
//go:generate mockgen -source=consumer.go -destination=./mocks/consumer.go
type Consumer interface {
io.Closer
StartConsuming(
handler rabbitmq.Handler,
queue string,
routingKeys []string,
optionFuncs ...func(*rabbitmq.ConsumeOptions),
) error
}

func NewConsumer(conf config.QueueConfig) (Consumer, error) {
consumer, err := rabbitmq.NewConsumer(
conf.Url,
rabbitmq.Config{},
rabbitmq.WithConsumerOptionsLogging,
)
if err != nil {
return nil, err
}
return &consumer, nil
}

func CloseConsumer(consumer Consumer) {
log.Println("closing consumer")
err := consumer.Close()
if err != nil {
log.Println("error while closing consumer: ", err)
}
}
15 changes: 15 additions & 0 deletions internal/queue/consumer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package queue_test

import (
"github.com/golang/mock/gomock"
"github.com/rog-golang-buddies/api-hub_data-scraping-service/internal/queue"
mock_queue "github.com/rog-golang-buddies/api-hub_data-scraping-service/internal/queue/mocks"
"testing"
)

func TestClosePublisher(t *testing.T) {
ctrl := gomock.NewController(t)
consumer := mock_queue.NewMockConsumer(ctrl)
consumer.EXPECT().Close().Return(nil)
queue.CloseConsumer(consumer)
}
68 changes: 68 additions & 0 deletions internal/queue/handler/apispec.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package handler

import (
"encoding/json"
"github.com/rog-golang-buddies/api-hub_data-scraping-service/internal/config"
"github.com/rog-golang-buddies/api-hub_data-scraping-service/internal/dto"
"github.com/rog-golang-buddies/api-hub_data-scraping-service/internal/dto/apiSpecDoc"
"github.com/rog-golang-buddies/api-hub_data-scraping-service/internal/queue/publisher"
"github.com/wagslane/go-rabbitmq"
"log"
)

type ApiSpecDocHandler struct {
publisher publisher.Publisher
config config.QueueConfig
}

func (asdh *ApiSpecDocHandler) Handle(delivery rabbitmq.Delivery) rabbitmq.Action {
log.Printf("consumed: %v", string(delivery.Body))
//call process here
var req dto.UrlRequest
err := json.Unmarshal(delivery.Body, &req)
if err != nil {
log.Printf("error unmarshalling message: '%v', err: %s\n", string(delivery.Body), err)
return rabbitmq.NackDiscard
}
//here processing of the request happens...
asd := apiSpecDoc.ApiSpecDoc{} //TODO replace this stub with process call

//publish to the required queue success or error
result := dto.ScrapingResult{IsNotifyUser: req.IsNotifyUser, ApiSpecDoc: asd}
err = asdh.publish(&delivery, result, asdh.config.ScrapingResultQueue)
if err != nil {
log.Println("error while publishing: ", err)
//Here is some error while publishing happened - probably something wrong with the queue
return rabbitmq.NackDiscard
}
if req.IsNotifyUser {
err = asdh.publish(&delivery, dto.NewUserNotification(nil), asdh.config.NotificationQueue)
if err != nil {
log.Println("error while notifying user")
//don't discard this message because it was published to the storage service successfully
}
}
log.Println("Url scraped successfully")
return rabbitmq.Ack
}

func (asdh *ApiSpecDocHandler) publish(delivery *rabbitmq.Delivery, message any, queue string) error {
content, err := json.Marshal(message)
if err != nil {
log.Println("error while marshalling: ", err)
return err
}
return asdh.publisher.Publish(content,
[]string{queue},
rabbitmq.WithPublishOptionsCorrelationID(delivery.CorrelationId),
rabbitmq.WithPublishOptionsContentType("application/json"),
rabbitmq.WithPublishOptionsPersistentDelivery,
)
}

func NewApiSpecDocHandler(publisher publisher.Publisher, config config.QueueConfig) Handler {
return &ApiSpecDocHandler{
publisher: publisher,
config: config,
}
}
Loading

0 comments on commit 79e12b5

Please sign in to comment.