Skip to content
This repository has been archived by the owner on Aug 26, 2024. It is now read-only.

Commit

Permalink
Merge pull request #8 from CastyLab/develop
Browse files Browse the repository at this point in the history
Adding redis PubSub actions for user and theater events
  • Loading branch information
mrjosh authored Oct 24, 2020
2 parents eacc0ed + 8e18d34 commit 3ce7c71
Show file tree
Hide file tree
Showing 26 changed files with 443 additions and 212 deletions.
5 changes: 3 additions & 2 deletions config.example.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ secrets:
user: "service"
pass: "super-secure-password"
redis:
host: "localhost"
port: 6379
masterName: "mymaster"
sentinels:
- "localhost:6379"
pass: "super-secure-password"
oauth:
google: "./config/oauth/google_client_secret.json"
Expand Down
7 changes: 4 additions & 3 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,10 @@ type ConfMap struct {
Spotify string `yaml:"spotify"`
} `yaml:"oauth"`
Redis struct {
Host string `yaml:"host"`
Port int `yaml:"port"`
Pass string `yaml:"pass"`
MasterName string `yaml:"masterName"`
Sentinels []string `yaml:"sentinels"`
Port int `yaml:"port"`
Pass string `yaml:"pass"`
} `yaml:"redis"`
JWT struct {
ExpireTime int `yaml:"expire_time"`
Expand Down
6 changes: 6 additions & 0 deletions db/models/theater.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,10 @@ type Follow struct {

CreatedAt time.Time `bson:"created_at, omitempty" json:"created_at, omitempty"`
UpdatedAt time.Time `bson:"updated_at, omitempty" json:"updated_at, omitempty"`
}

type TheaterMember struct {
ID *primitive.ObjectID `bson:"_id, omitempty" json:"id, omitempty"`
TheaterId *primitive.ObjectID `bson:"theater_id, omitempty" json:"theater_id, omitempty"`
UserId *primitive.ObjectID `bson:"user_id, omitempty" json:"user_id, omitempty"`
}
10 changes: 5 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,17 @@ module github.com/CastyLab/grpc.server
go 1.14

require (
github.com/CastyLab/grpc.proto v0.0.0-20200829020729-8a1f43db6b04
github.com/CastyLab/grpc.proto v0.0.0-20201024203352-28756769b0b7
github.com/asticode/go-astisub v0.2.0
github.com/dgrijalva/jwt-go v3.2.0+incompatible
github.com/getsentry/sentry-go v0.5.1
github.com/golang/protobuf v1.4.1
github.com/pingcap/errors v0.11.4
github.com/go-redis/redis/v8 v8.3.2
github.com/golang/protobuf v1.4.2
github.com/pkg/errors v0.8.1
go.mongodb.org/mongo-driver v1.3.1
golang.org/x/crypto v0.0.0-20200302210943-78000ba7a073
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be
google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013
google.golang.org/grpc v1.28.0
gopkg.in/yaml.v2 v2.2.4
gopkg.in/yaml.v2 v2.3.0
)
59 changes: 55 additions & 4 deletions go.sum

Large diffs are not rendered by default.

47 changes: 47 additions & 0 deletions helpers/member.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package helpers

import (
"context"
"fmt"
"github.com/CastyLab/grpc.proto/proto"
"github.com/CastyLab/grpc.server/db"
"github.com/CastyLab/grpc.server/db/models"
"go.mongodb.org/mongo-driver/bson"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

func NewMemberProto(member *models.TheaterMember) (*proto.User, error) {
var (
dbmember = new(models.User)
decoder = db.Connection.Collection("users").
FindOne(context.Background(), bson.M{"_id": member.UserId})
)
if err := decoder.Decode(dbmember); err != nil {
return nil, fmt.Errorf("could not decode theater member: %v", err)
}
return NewProtoUser(dbmember), nil
}

func GetTheaterMembers(ctx context.Context, theater *models.Theater) ([]*proto.User, error) {

members := make([]*proto.User, 0)
cursor, err := db.Connection.Collection("theater_members").Find(ctx, bson.M{"theater_id": theater.ID})
if err != nil {
return nil, status.Error(codes.Internal, "Could not get theater members")
}

for cursor.Next(ctx) {
member := new(models.TheaterMember)
if err := cursor.Decode(member); err != nil {
continue
}
protoMember, err := NewMemberProto(member)
if err != nil {
continue
}
members = append(members, protoMember)
}

return members, nil
}
7 changes: 2 additions & 5 deletions helpers/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
func NewProtoMessage(ctx context.Context, message *models.Message) (*proto.Message, error) {

var (
err error
dbSender = new(models.User)
collection = db.Connection.Collection("users")
)
Expand All @@ -20,11 +21,7 @@ func NewProtoMessage(ctx context.Context, message *models.Message) (*proto.Messa
return nil, err
}

sender, err := NewProtoUser(dbSender)
if err != nil {
return nil, err
}

sender := NewProtoUser(dbSender)
createdAt, _ := ptypes.TimestampProto(message.CreatedAt)
updatedAt, _ := ptypes.TimestampProto(message.UpdatedAt)

Expand Down
9 changes: 2 additions & 7 deletions helpers/notifications.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,12 @@ func NewNotificationProto(notif *models.Notification) (*proto.Notification, erro
return nil, err
}

protoUser, err := NewProtoUser(fromUser)
if err != nil {
return nil, err
}

protoMSG := &proto.Notification{
protoMSG := &proto.Notification{
Id: notif.ID.Hex(),
Type: notif.Type,
Read: notif.Read,
ReadAt: readAt,
FromUser: protoUser,
FromUser: NewProtoUser(fromUser),
CreatedAt: createdAt,
UpdatedAt: updatedAt,
}
Expand Down
8 changes: 1 addition & 7 deletions helpers/theater.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
func NewTheaterProto(ctx context.Context, theater *models.Theater) (*proto.Theater, error) {

var (
err error
database = db.Connection
thUser = new(models.User)
mediaSourceProtoMessage = new(proto.MediaSource)
Expand All @@ -33,15 +32,10 @@ func NewTheaterProto(ctx context.Context, theater *models.Theater) (*proto.Theat
return nil, err
}

thProtoMessageUser, err := NewProtoUser(thUser)
if err != nil {
return nil, err
}

return &proto.Theater{
Id: theater.ID.Hex(),
Description: theater.Description,
User: thProtoMessageUser,
User: NewProtoUser(thUser),
MediaSource: mediaSourceProtoMessage,
Privacy: theater.Privacy,
VideoPlayerAccess: theater.VideoPlayerAccess,
Expand Down
80 changes: 75 additions & 5 deletions helpers/user.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,90 @@
package helpers

import (
"context"
"fmt"
"github.com/CastyLab/grpc.proto/proto"
"github.com/CastyLab/grpc.server/db"
"github.com/CastyLab/grpc.server/db/models"
"github.com/CastyLab/grpc.server/redis"
"github.com/golang/protobuf/ptypes"
"go.mongodb.org/mongo-driver/bson"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

func NewProtoUser(user *models.User) (*proto.User, error) {
func GetFriendsFromDatabase(ctx context.Context, user *models.User) ([]*proto.User, error) {
var (
friends = make([]*proto.User, 0)
userCollection = db.Connection.Collection("users")
friendsCollection = db.Connection.Collection("friends")
)

filter := bson.M{
"accepted": true,
"$or": []interface{}{
bson.M{"friend_id": user.ID},
bson.M{"user_id": user.ID},
},
}

cursor, err := friendsCollection.Find(ctx, filter)
if err != nil {
return nil, status.Error(codes.NotFound, "Could not find friends!")
}

for cursor.Next(ctx) {

var friend = new(models.Friend)
if err := cursor.Decode(friend); err != nil {
continue
}

var filter = bson.M{"_id": friend.FriendId}
if user.ID.Hex() == friend.FriendId.Hex() {
filter = bson.M{"_id": friend.UserId}
}

friendUserObject := new(models.User)
if err := userCollection.FindOne(ctx, filter).Decode(friendUserObject); err != nil {
continue
}

friends = append(friends, NewProtoUser(friendUserObject))
}

return friends, nil
}

// update friends with new event of user
func SendEventToFriends(ctx context.Context, event []byte, user *models.User) error {
friends, err := GetFriendsFromDatabase(ctx, user)
if err != nil {
return status.Error(codes.Internal, "Could not get friends!")
}
SendEventToUsers(ctx, event, friends)
return nil
}

func SendEventToUser(ctx context.Context, event []byte, user *proto.User) {
redis.Client.Publish(ctx, fmt.Sprintf("user:events:%s", user.Id), event)
}

func SendEventToUsers(ctx context.Context, event []byte, users []*proto.User) {
for _, user := range users {
redis.Client.Publish(ctx, fmt.Sprintf("user:events:%s", user.Id), event)
}
}

func SendEventToTheaterMembers(ctx context.Context, event []byte, theater *models.Theater) {
redis.Client.Publish(ctx, fmt.Sprintf("theater:events:%s", theater.ID.Hex()), event)
}

func NewProtoUser(user *models.User) *proto.User {
lastLogin, _ := ptypes.TimestampProto(user.LastLogin)
joinedAt, _ := ptypes.TimestampProto(user.JoinedAt)
updatedAt, _ := ptypes.TimestampProto(user.UpdatedAt)

protoUser := &proto.User{
return &proto.User{
Id: user.ID.Hex(),
Fullname: user.Fullname,
Username: user.Username,
Expand All @@ -29,6 +101,4 @@ func NewProtoUser(user *models.User) (*proto.User, error) {
JoinedAt: joinedAt,
UpdatedAt: updatedAt,
}

return protoUser, nil
}
3 changes: 2 additions & 1 deletion oauth/google/google.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ func Configure() error {
}

func Authenticate(code string) (*oauth2.Token, error) {
mCtx, _ := context.WithTimeout(context.Background(), 10 * time.Second)
mCtx, cancel := context.WithTimeout(context.Background(), 10 * time.Second)
defer cancel()
return oauthClient.Exchange(mCtx, code, oauth2.AccessTypeOffline)
}
31 changes: 31 additions & 0 deletions redis/redis.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package redis

import (
"context"
"github.com/CastyLab/grpc.server/config"
"github.com/go-redis/redis/v8"
"log"
)

var (
Client *redis.Client
)

func Configure() error {
Client = redis.NewFailoverClient(&redis.FailoverOptions{
SentinelAddrs: config.Map.Secrets.Redis.Sentinels,
Password: config.Map.Secrets.Redis.Pass,
MasterName: config.Map.Secrets.Redis.MasterName,
DB: 0,
})
cmd := Client.Ping(context.Background())
if res := cmd.Val(); res != "PONG" {
log.Println("SentinelAddrs: ", config.Map.Secrets.Redis.Sentinels)
log.Fatalf("Could not ping the redis server: %v", cmd.Err())
}
return nil
}

func Close() error {
return Client.Close()
}
19 changes: 16 additions & 3 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/CastyLab/grpc.server/db"
"github.com/CastyLab/grpc.server/jwt"
"github.com/CastyLab/grpc.server/oauth"
"github.com/CastyLab/grpc.server/redis"
"github.com/CastyLab/grpc.server/services/auth"
"github.com/CastyLab/grpc.server/services/message"
"github.com/CastyLab/grpc.server/services/theater"
Expand Down Expand Up @@ -42,6 +43,10 @@ func init() {
log.Fatal(fmt.Errorf("could not load config: %v", err))
}

if err := redis.Configure(); err != nil {
log.Fatal(fmt.Errorf("could not configure redis : %v", err))
}

if err := sentry.Init(sentry.ClientOptions{ Dsn: config.Map.Secrets.SentryDsn }); err != nil {
log.Fatal(fmt.Errorf("could not initilize sentry: %v", err))
}
Expand All @@ -68,9 +73,17 @@ func init() {

func main() {

// Since sentry emits events in the background we need to make sure
// they are sent before we shut down
defer sentry.Flush(time.Second * 5)
defer func() {

// Since sentry emits events in the background we need to make sure
// they are sent before we shut down
sentry.Flush(time.Second * 5)

if err := redis.Close(); err != nil {
log.Println(fmt.Errorf("could not close redis connection: %v", err))
}

}()

listener, err := net.Listen("tcp", fmt.Sprintf("%s:%d", *host, *port))
if err != nil {
Expand Down
5 changes: 1 addition & 4 deletions services/auth/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,14 @@ import (
"github.com/CastyLab/grpc.server/jwt"
"github.com/getsentry/sentry-go"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"golang.org/x/crypto/bcrypt"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"net/http"
"regexp"
)

type Service struct {
db *mongo.Database
}
type Service struct {}

func (s *Service) isEmail(user string) bool {

Expand Down
Loading

0 comments on commit 3ce7c71

Please sign in to comment.