diff --git a/internal/generated/user.pb.go b/internal/generated/user.pb.go index 6d26c078..24994fe6 100644 --- a/internal/generated/user.pb.go +++ b/internal/generated/user.pb.go @@ -493,16 +493,17 @@ func (x *PutUserArguments) GetUserID() uint64 { return 0 } -type String struct { +type SearchUsersArguments struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields String_ string `protobuf:"bytes,1,opt,name=string,proto3" json:"string,omitempty"` + UserID uint64 `protobuf:"varint,2,opt,name=userID,proto3" json:"userID,omitempty"` } -func (x *String) Reset() { - *x = String{} +func (x *SearchUsersArguments) Reset() { + *x = SearchUsersArguments{} if protoimpl.UnsafeEnabled { mi := &file_protobuf_user_proto_msgTypes[7] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) @@ -510,13 +511,13 @@ func (x *String) Reset() { } } -func (x *String) String() string { +func (x *SearchUsersArguments) String() string { return protoimpl.X.MessageStringOf(x) } -func (*String) ProtoMessage() {} +func (*SearchUsersArguments) ProtoMessage() {} -func (x *String) ProtoReflect() protoreflect.Message { +func (x *SearchUsersArguments) ProtoReflect() protoreflect.Message { mi := &file_protobuf_user_proto_msgTypes[7] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) @@ -528,18 +529,25 @@ func (x *String) ProtoReflect() protoreflect.Message { return mi.MessageOf(x) } -// Deprecated: Use String.ProtoReflect.Descriptor instead. -func (*String) Descriptor() ([]byte, []int) { +// Deprecated: Use SearchUsersArguments.ProtoReflect.Descriptor instead. +func (*SearchUsersArguments) Descriptor() ([]byte, []int) { return file_protobuf_user_proto_rawDescGZIP(), []int{7} } -func (x *String) GetString_() string { +func (x *SearchUsersArguments) GetString_() string { if x != nil { return x.String_ } return "" } +func (x *SearchUsersArguments) GetUserID() uint64 { + if x != nil { + return x.UserID + } + return 0 +} + var File_protobuf_user_proto protoreflect.FileDescriptor var file_protobuf_user_proto_rawDesc = []byte{ @@ -595,10 +603,12 @@ var file_protobuf_user_proto_rawDesc = []byte{ 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x55, 0x73, 0x65, 0x72, 0x52, 0x04, 0x75, 0x73, 0x65, 0x72, 0x12, 0x16, 0x0a, 0x06, 0x75, 0x73, 0x65, 0x72, 0x49, 0x44, 0x18, 0x02, 0x20, 0x01, 0x28, 0x04, 0x52, 0x06, 0x75, 0x73, 0x65, 0x72, 0x49, - 0x44, 0x22, 0x20, 0x0a, 0x06, 0x53, 0x74, 0x72, 0x69, 0x6e, 0x67, 0x12, 0x16, 0x0a, 0x06, 0x73, - 0x74, 0x72, 0x69, 0x6e, 0x67, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x73, 0x74, 0x72, - 0x69, 0x6e, 0x67, 0x42, 0x0d, 0x5a, 0x0b, 0x2e, 0x3b, 0x67, 0x65, 0x6e, 0x65, 0x72, 0x61, 0x74, - 0x65, 0x64, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x44, 0x22, 0x46, 0x0a, 0x14, 0x53, 0x65, 0x61, 0x72, 0x63, 0x68, 0x55, 0x73, 0x65, 0x72, 0x73, + 0x41, 0x72, 0x67, 0x75, 0x6d, 0x65, 0x6e, 0x74, 0x73, 0x12, 0x16, 0x0a, 0x06, 0x73, 0x74, 0x72, + 0x69, 0x6e, 0x67, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x73, 0x74, 0x72, 0x69, 0x6e, + 0x67, 0x12, 0x16, 0x0a, 0x06, 0x75, 0x73, 0x65, 0x72, 0x49, 0x44, 0x18, 0x02, 0x20, 0x01, 0x28, + 0x04, 0x52, 0x06, 0x75, 0x73, 0x65, 0x72, 0x49, 0x44, 0x42, 0x0d, 0x5a, 0x0b, 0x2e, 0x3b, 0x67, + 0x65, 0x6e, 0x65, 0x72, 0x61, 0x74, 0x65, 0x64, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -622,7 +632,7 @@ var file_protobuf_user_proto_goTypes = []interface{}{ (*Contacts)(nil), // 4: protobuf.Contacts (*AddUserContactArguments)(nil), // 5: protobuf.AddUserContactArguments (*PutUserArguments)(nil), // 6: protobuf.PutUserArguments - (*String)(nil), // 7: protobuf.String + (*SearchUsersArguments)(nil), // 7: protobuf.SearchUsersArguments } var file_protobuf_user_proto_depIdxs = []int32{ 0, // 0: protobuf.Contacts.contacts:type_name -> protobuf.User @@ -725,7 +735,7 @@ func file_protobuf_user_proto_init() { } } file_protobuf_user_proto_msgTypes[7].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*String); i { + switch v := v.(*SearchUsersArguments); i { case 0: return &v.state case 1: diff --git a/internal/generated/user_rpc_grpc.pb.go b/internal/generated/user_rpc_grpc.pb.go index 137cd20b..3b702862 100644 --- a/internal/generated/user_rpc_grpc.pb.go +++ b/internal/generated/user_rpc_grpc.pb.go @@ -30,7 +30,7 @@ type UsersClient interface { GetUserContacts(ctx context.Context, in *UserID, opts ...grpc.CallOption) (*Contacts, error) PutUserById(ctx context.Context, in *PutUserArguments, opts ...grpc.CallOption) (*User, error) GetAllUsersExceptCurrentUser(ctx context.Context, in *UserID, opts ...grpc.CallOption) (*Contacts, error) - GetSearchUsers(ctx context.Context, in *String, opts ...grpc.CallOption) (*Contacts, error) + GetSearchUsers(ctx context.Context, in *SearchUsersArguments, opts ...grpc.CallOption) (*Contacts, error) } type usersClient struct { @@ -104,7 +104,7 @@ func (c *usersClient) GetAllUsersExceptCurrentUser(ctx context.Context, in *User return out, nil } -func (c *usersClient) GetSearchUsers(ctx context.Context, in *String, opts ...grpc.CallOption) (*Contacts, error) { +func (c *usersClient) GetSearchUsers(ctx context.Context, in *SearchUsersArguments, opts ...grpc.CallOption) (*Contacts, error) { out := new(Contacts) err := c.cc.Invoke(ctx, "/protobuf.Users/GetSearchUsers", in, out, opts...) if err != nil { @@ -124,7 +124,7 @@ type UsersServer interface { GetUserContacts(context.Context, *UserID) (*Contacts, error) PutUserById(context.Context, *PutUserArguments) (*User, error) GetAllUsersExceptCurrentUser(context.Context, *UserID) (*Contacts, error) - GetSearchUsers(context.Context, *String) (*Contacts, error) + GetSearchUsers(context.Context, *SearchUsersArguments) (*Contacts, error) } // UnimplementedUsersServer should be embedded to have forward compatible implementations. @@ -152,7 +152,7 @@ func (UnimplementedUsersServer) PutUserById(context.Context, *PutUserArguments) func (UnimplementedUsersServer) GetAllUsersExceptCurrentUser(context.Context, *UserID) (*Contacts, error) { return nil, status.Errorf(codes.Unimplemented, "method GetAllUsersExceptCurrentUser not implemented") } -func (UnimplementedUsersServer) GetSearchUsers(context.Context, *String) (*Contacts, error) { +func (UnimplementedUsersServer) GetSearchUsers(context.Context, *SearchUsersArguments) (*Contacts, error) { return nil, status.Errorf(codes.Unimplemented, "method GetSearchUsers not implemented") } @@ -294,7 +294,7 @@ func _Users_GetAllUsersExceptCurrentUser_Handler(srv interface{}, ctx context.Co } func _Users_GetSearchUsers_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(String) + in := new(SearchUsersArguments) if err := dec(in); err != nil { return nil, err } @@ -306,7 +306,7 @@ func _Users_GetSearchUsers_Handler(srv interface{}, ctx context.Context, dec fun FullMethod: "/protobuf.Users/GetSearchUsers", } handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(UsersServer).GetSearchUsers(ctx, req.(*String)) + return srv.(UsersServer).GetSearchUsers(ctx, req.(*SearchUsersArguments)) } return interceptor(ctx, in, info, handler) } diff --git a/internal/microservices/user/delivery/grpc/client/client.go b/internal/microservices/user/delivery/grpc/client/client.go index 63b102c9..130213e2 100644 --- a/internal/microservices/user/delivery/grpc/client/client.go +++ b/internal/microservices/user/delivery/grpc/client/client.go @@ -83,8 +83,11 @@ func (u userServiceGRPCClient) GetAllUsersExceptCurrentUser(ctx context.Context, return model_conversion.FromProtoMembersToMembers(users.Contacts), nil } -func (u userServiceGRPCClient) GetSearchUsers(ctx context.Context, string string) ([]model.User, error) { - searchUsers, err := u.userClient.GetSearchUsers(ctx, &generated.String{String_: string}) +func (u userServiceGRPCClient) GetSearchUsers(ctx context.Context, string string, userID uint64) ([]model.User, error) { + searchUsers, err := u.userClient.GetSearchUsers(ctx, &generated.SearchUsersArguments{ + String_: string, + UserID: userID, + }) if err != nil { return nil, err } diff --git a/internal/microservices/user/delivery/grpc/server/server.go b/internal/microservices/user/delivery/grpc/server/server.go index 10a90a37..ea08adb6 100644 --- a/internal/microservices/user/delivery/grpc/server/server.go +++ b/internal/microservices/user/delivery/grpc/server/server.go @@ -88,8 +88,8 @@ func (c *usersServiceGRPCServer) GetAllUsersExceptCurrentUser(ctx context.Contex return &generated.Contacts{Contacts: model_conversion.FromMembersToProtoMembers(contacts)}, nil } -func (c *usersServiceGRPCServer) GetSearchUsers(ctx context.Context, string *generated.String) (*generated.Contacts, error) { - searchUsers, err := c.userUsecase.GetSearchUsers(ctx, string.String_) +func (c *usersServiceGRPCServer) GetSearchUsers(ctx context.Context, searchUsersArguments *generated.SearchUsersArguments) (*generated.Contacts, error) { + searchUsers, err := c.userUsecase.GetSearchUsers(ctx, searchUsersArguments.String_, searchUsersArguments.UserID) if err != nil { return nil, err } diff --git a/internal/microservices/user/repository.go b/internal/microservices/user/repository.go index 29704c2f..793db165 100644 --- a/internal/microservices/user/repository.go +++ b/internal/microservices/user/repository.go @@ -18,5 +18,5 @@ type Repository interface { CheckExistUserById(ctx context.Context, userID uint64) error CheckExistUserByEmail(ctx context.Context, email string) error GetAllUsersExceptCurrentUser(ctx context.Context, userID uint64) ([]model.AuthorizedUser, error) - GetSearchUsers(ctx context.Context, string string) ([]model.AuthorizedUser, error) + GetSearchUsers(ctx context.Context, string string, userID uint64) ([]model.AuthorizedUser, error) } diff --git a/internal/microservices/user/repository/repository.go b/internal/microservices/user/repository/repository.go index bbac7e6a..d139c952 100644 --- a/internal/microservices/user/repository/repository.go +++ b/internal/microservices/user/repository/repository.go @@ -200,9 +200,9 @@ func (r repository) GetAllUsersExceptCurrentUser(ctx context.Context, userID uin return users, nil } -func (r repository) GetSearchUsers(ctx context.Context, string string) ([]model.AuthorizedUser, error) { +func (r repository) GetSearchUsers(ctx context.Context, string string, userID uint64) ([]model.AuthorizedUser, error) { var searchUsers []model.AuthorizedUser - err := r.db.SelectContext(ctx, &searchUsers, `SELECT * FROM profile WHERE nickname ILIKE $1`, "%"+string+"%") + err := r.db.SelectContext(ctx, &searchUsers, `SELECT * FROM profile WHERE nickname ILIKE $1 AND id != $2`, "%"+string+"%", userID) if err != nil { if err == sql.ErrNoRows { return nil, myErrors.ErrUserNotFound diff --git a/internal/microservices/user/usecase.go b/internal/microservices/user/usecase.go index aa501cc6..3b2f3bc3 100644 --- a/internal/microservices/user/usecase.go +++ b/internal/microservices/user/usecase.go @@ -10,7 +10,7 @@ type Usecase interface { CheckExistUserById(ctx context.Context, userID uint64) error GetUserById(ctx context.Context, userID uint64) (model.User, error) AddUserContact(ctx context.Context, userID uint64, contactID uint64) ([]model.User, error) - GetSearchUsers(ctx context.Context, string string) ([]model.User, error) + GetSearchUsers(ctx context.Context, string string, userID uint64) ([]model.User, error) GetUserContacts(ctx context.Context, userID uint64) ([]model.User, error) PutUserById(ctx context.Context, user model.UpdateUser, userID uint64) (model.User, error) GetAllUsersExceptCurrentUser(ctx context.Context, userID uint64) ([]model.User, error) diff --git a/internal/microservices/user/usecase/usecase.go b/internal/microservices/user/usecase/usecase.go index ec0964ff..02a3acf5 100644 --- a/internal/microservices/user/usecase/usecase.go +++ b/internal/microservices/user/usecase/usecase.go @@ -177,8 +177,8 @@ func (u usecase) GetAllUsersExceptCurrentUser(ctx context.Context, userID uint64 return model_conversion.FromAuthorizedUserArrayToUserArray(users), err } -func (u usecase) GetSearchUsers(ctx context.Context, string string) ([]model.User, error) { - searchContacts, err := u.userRepo.GetSearchUsers(ctx, string) +func (u usecase) GetSearchUsers(ctx context.Context, string string, userID uint64) ([]model.User, error) { + searchContacts, err := u.userRepo.GetSearchUsers(ctx, string, userID) if err != nil { return nil, err } diff --git a/internal/monolithic_services/qaas/send_messages/rabbitMQ/consumer/usecase/usecase.go b/internal/monolithic_services/qaas/send_messages/rabbitMQ/consumer/usecase/usecase.go index 4f7de723..64a76f27 100644 --- a/internal/monolithic_services/qaas/send_messages/rabbitMQ/consumer/usecase/usecase.go +++ b/internal/monolithic_services/qaas/send_messages/rabbitMQ/consumer/usecase/usecase.go @@ -95,7 +95,7 @@ func (u *usecase) StartConsumeMessages(ctx context.Context) { msgs, err := u.channel.Consume( u.queue.Name, "", - true, + false, false, false, false, @@ -107,10 +107,13 @@ func (u *usecase) StartConsumeMessages(ctx context.Context) { } for msg := range msgs { - err := u.centrifugePublication(msg.Body) + err = u.centrifugePublication(msg.Body) if err != nil { log.Error(err) + continue } + + msg.Ack(false) } } } diff --git a/internal/monolithic_services/qaas/send_messages/rabbitMQ/producer/usecase/usecase.go b/internal/monolithic_services/qaas/send_messages/rabbitMQ/producer/usecase/usecase.go index c4fc11f1..ed8a87fc 100644 --- a/internal/monolithic_services/qaas/send_messages/rabbitMQ/producer/usecase/usecase.go +++ b/internal/monolithic_services/qaas/send_messages/rabbitMQ/producer/usecase/usecase.go @@ -7,6 +7,7 @@ import ( "os/signal" producer "project/internal/microservices/producer/usecase" "project/internal/model" + "time" "github.com/mailru/easyjson" @@ -42,7 +43,7 @@ func NewProducer(connAddr string, queueName string) (producer.Usecase, error) { return nil, err } - _, err = channel.QueueDeclare( + dlxQueue, err := channel.QueueDeclare( "user_create_dlx", false, false, @@ -79,6 +80,39 @@ func NewProducer(connAddr string, queueName string) (producer.Usecase, error) { return nil, err } + go func() { + for { + msgs, err := channel.Consume( + dlxQueue.Name, + "", + false, + false, + false, + false, + nil, + ) + if err != nil { + continue + } + + for msg := range msgs { + err = channel.PublishWithContext( + context.TODO(), + "", + queue.Name, + false, + false, + amqp.Publishing{ + ContentType: "application/json", + Body: msg.Body, + }, + ) + } + + time.Sleep(60 * time.Second) + } + }() + signals := make(chan os.Signal, 1) signal.Notify(signals, os.Interrupt) diff --git a/protobuf/user.proto b/protobuf/user.proto index bc7ab6dc..1976dc0f 100644 --- a/protobuf/user.proto +++ b/protobuf/user.proto @@ -50,6 +50,7 @@ message PutUserArguments { uint64 userID = 2; } -message String { +message SearchUsersArguments { string string = 1; + uint64 userID = 2; } diff --git a/protobuf/user_rpc.proto b/protobuf/user_rpc.proto index 0132cd87..ed035754 100644 --- a/protobuf/user_rpc.proto +++ b/protobuf/user_rpc.proto @@ -15,5 +15,5 @@ service Users { rpc GetUserContacts(UserID) returns(Contacts) {} rpc PutUserById(PutUserArguments) returns(User) {} rpc GetAllUsersExceptCurrentUser(UserID) returns(Contacts) {} - rpc GetSearchUsers(String) returns(Contacts) {} + rpc GetSearchUsers(SearchUsersArguments) returns(Contacts) {} }