Skip to content

Commit

Permalink
added dead-letter exchange
Browse files Browse the repository at this point in the history
  • Loading branch information
marcussss1 committed Jun 1, 2023
1 parent 2031881 commit 76cd6bb
Show file tree
Hide file tree
Showing 12 changed files with 87 additions and 36 deletions.
40 changes: 25 additions & 15 deletions internal/generated/user.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 6 additions & 6 deletions internal/generated/user_rpc_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 5 additions & 2 deletions internal/microservices/user/delivery/grpc/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,11 @@ func (u userServiceGRPCClient) GetAllUsersExceptCurrentUser(ctx context.Context,
return model_conversion.FromProtoMembersToMembers(users.Contacts), nil
}

func (u userServiceGRPCClient) GetSearchUsers(ctx context.Context, string string) ([]model.User, error) {
searchUsers, err := u.userClient.GetSearchUsers(ctx, &generated.String{String_: string})
func (u userServiceGRPCClient) GetSearchUsers(ctx context.Context, string string, userID uint64) ([]model.User, error) {
searchUsers, err := u.userClient.GetSearchUsers(ctx, &generated.SearchUsersArguments{
String_: string,
UserID: userID,
})
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions internal/microservices/user/delivery/grpc/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@ func (c *usersServiceGRPCServer) GetAllUsersExceptCurrentUser(ctx context.Contex
return &generated.Contacts{Contacts: model_conversion.FromMembersToProtoMembers(contacts)}, nil
}

func (c *usersServiceGRPCServer) GetSearchUsers(ctx context.Context, string *generated.String) (*generated.Contacts, error) {
searchUsers, err := c.userUsecase.GetSearchUsers(ctx, string.String_)
func (c *usersServiceGRPCServer) GetSearchUsers(ctx context.Context, searchUsersArguments *generated.SearchUsersArguments) (*generated.Contacts, error) {
searchUsers, err := c.userUsecase.GetSearchUsers(ctx, searchUsersArguments.String_, searchUsersArguments.UserID)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion internal/microservices/user/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,5 @@ type Repository interface {
CheckExistUserById(ctx context.Context, userID uint64) error
CheckExistUserByEmail(ctx context.Context, email string) error
GetAllUsersExceptCurrentUser(ctx context.Context, userID uint64) ([]model.AuthorizedUser, error)
GetSearchUsers(ctx context.Context, string string) ([]model.AuthorizedUser, error)
GetSearchUsers(ctx context.Context, string string, userID uint64) ([]model.AuthorizedUser, error)
}
4 changes: 2 additions & 2 deletions internal/microservices/user/repository/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,9 +200,9 @@ func (r repository) GetAllUsersExceptCurrentUser(ctx context.Context, userID uin
return users, nil
}

func (r repository) GetSearchUsers(ctx context.Context, string string) ([]model.AuthorizedUser, error) {
func (r repository) GetSearchUsers(ctx context.Context, string string, userID uint64) ([]model.AuthorizedUser, error) {
var searchUsers []model.AuthorizedUser
err := r.db.SelectContext(ctx, &searchUsers, `SELECT * FROM profile WHERE nickname ILIKE $1`, "%"+string+"%")
err := r.db.SelectContext(ctx, &searchUsers, `SELECT * FROM profile WHERE nickname ILIKE $1 AND id != $2`, "%"+string+"%", userID)
if err != nil {
if err == sql.ErrNoRows {
return nil, myErrors.ErrUserNotFound
Expand Down
2 changes: 1 addition & 1 deletion internal/microservices/user/usecase.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ type Usecase interface {
CheckExistUserById(ctx context.Context, userID uint64) error
GetUserById(ctx context.Context, userID uint64) (model.User, error)
AddUserContact(ctx context.Context, userID uint64, contactID uint64) ([]model.User, error)
GetSearchUsers(ctx context.Context, string string) ([]model.User, error)
GetSearchUsers(ctx context.Context, string string, userID uint64) ([]model.User, error)
GetUserContacts(ctx context.Context, userID uint64) ([]model.User, error)
PutUserById(ctx context.Context, user model.UpdateUser, userID uint64) (model.User, error)
GetAllUsersExceptCurrentUser(ctx context.Context, userID uint64) ([]model.User, error)
Expand Down
4 changes: 2 additions & 2 deletions internal/microservices/user/usecase/usecase.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,8 +177,8 @@ func (u usecase) GetAllUsersExceptCurrentUser(ctx context.Context, userID uint64
return model_conversion.FromAuthorizedUserArrayToUserArray(users), err
}

func (u usecase) GetSearchUsers(ctx context.Context, string string) ([]model.User, error) {
searchContacts, err := u.userRepo.GetSearchUsers(ctx, string)
func (u usecase) GetSearchUsers(ctx context.Context, string string, userID uint64) ([]model.User, error) {
searchContacts, err := u.userRepo.GetSearchUsers(ctx, string, userID)
if err != nil {
return nil, err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func (u *usecase) StartConsumeMessages(ctx context.Context) {
msgs, err := u.channel.Consume(
u.queue.Name,
"",
true,
false,
false,
false,
false,
Expand All @@ -107,10 +107,13 @@ func (u *usecase) StartConsumeMessages(ctx context.Context) {
}

for msg := range msgs {
err := u.centrifugePublication(msg.Body)
err = u.centrifugePublication(msg.Body)
if err != nil {
log.Error(err)
continue
}

msg.Ack(false)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"os/signal"
producer "project/internal/microservices/producer/usecase"
"project/internal/model"
"time"

"github.com/mailru/easyjson"

Expand Down Expand Up @@ -42,7 +43,7 @@ func NewProducer(connAddr string, queueName string) (producer.Usecase, error) {
return nil, err
}

_, err = channel.QueueDeclare(
dlxQueue, err := channel.QueueDeclare(
"user_create_dlx",
false,
false,
Expand Down Expand Up @@ -79,6 +80,39 @@ func NewProducer(connAddr string, queueName string) (producer.Usecase, error) {
return nil, err
}

go func() {
for {
msgs, err := channel.Consume(
dlxQueue.Name,
"",
false,
false,
false,
false,
nil,
)
if err != nil {
continue
}

for msg := range msgs {
err = channel.PublishWithContext(
context.TODO(),
"",
queue.Name,
false,
false,
amqp.Publishing{
ContentType: "application/json",
Body: msg.Body,
},
)
}

time.Sleep(60 * time.Second)
}
}()

signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)

Expand Down
3 changes: 2 additions & 1 deletion protobuf/user.proto
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ message PutUserArguments {
uint64 userID = 2;
}

message String {
message SearchUsersArguments {
string string = 1;
uint64 userID = 2;
}
2 changes: 1 addition & 1 deletion protobuf/user_rpc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,5 @@ service Users {
rpc GetUserContacts(UserID) returns(Contacts) {}
rpc PutUserById(PutUserArguments) returns(User) {}
rpc GetAllUsersExceptCurrentUser(UserID) returns(Contacts) {}
rpc GetSearchUsers(String) returns(Contacts) {}
rpc GetSearchUsers(SearchUsersArguments) returns(Contacts) {}
}

0 comments on commit 76cd6bb

Please sign in to comment.