From f875d3fc423b169253e562f36ed36690f5160ef3 Mon Sep 17 00:00:00 2001 From: Sergey Stepanov Date: Fri, 6 Oct 2023 13:30:10 +0300 Subject: [PATCH] Guard room creation with a mutex Possible fix for the concurrent room creation while other is not destroyed properly. --- pkg/worker/coordinatorhandlers.go | 33 ++++++++++++++++++------------- pkg/worker/room/room.go | 16 ++++++++++----- 2 files changed, 30 insertions(+), 19 deletions(-) diff --git a/pkg/worker/coordinatorhandlers.go b/pkg/worker/coordinatorhandlers.go index 4d3548dba..aa32e8a1b 100644 --- a/pkg/worker/coordinatorhandlers.go +++ b/pkg/worker/coordinatorhandlers.go @@ -50,9 +50,7 @@ func (c *coordinator) HandleWebrtcInit(rq api.WebrtcInitRequest[com.Uid], w *Wor user := room.NewGameSession(rq.Id, peer) // use user uid from the coordinator c.log.Info().Msgf("Peer connection: %s", user.Id()) - c.log.Debug().Msgf("Users before add: %v", w.router.Users()) w.router.AddUser(user) - c.log.Debug().Msgf("Users after add: %v", w.router.Users()) return api.Out{Payload: sdp} } @@ -83,11 +81,23 @@ func (c *coordinator) HandleGameStart(rq api.StartGameRequest[com.Uid], w *Worke r := w.router.FindRoom(rq.Rid) - if r == nil { + if r == nil { // new room uid := rq.Room.Rid if uid == "" { uid = games.GenerateRoomID(rq.Game.Name) } + game := games.GameMetadata(rq.Game) + + r = room.NewRoom[*room.GameSession](uid, nil, w.router.Users(), nil) + r.HandleClose = func() { c.CloseRoom(uid) } + + if other := w.router.Room(); other != nil { + c.log.Error().Msgf("concurrent room creation: %v", uid) + return api.EmptyPacket + } + + w.router.SetRoom(r) + c.log.Info().Str("room", r.Id()).Str("game", game.Name).Msg("New room") // start the emulator app := room.WithEmulator(w.mana.Get(caged.Libretro)) @@ -97,13 +107,14 @@ func (c *coordinator) HandleGameStart(rq api.StartGameRequest[com.Uid], w *Worke app.EnableCloudStorage(uid, w.storage) app.EnableRecording(rq.Record, rq.RecordUser, rq.Game.Name) - w.log.Info().Msgf("Starting game: %v", rq.Game.Name) - game := games.GameMetadata(rq.Game) + w.log.Info().Msgf("Starting the game: %v", rq.Game.Name) if err := app.Load(game, w.conf.Worker.Library.BasePath); err != nil { c.log.Error().Err(err).Msgf("couldn't load the game %v", game) app.Close() + w.router.SetRoom(nil) return api.EmptyPacket } + r.SetApp(app) m := media.NewWebRtcMediaPipe(w.conf.Encoder.Audio, w.conf.Encoder.Video, w.log) m.AudioSrcHz = app.AudioSampleRate() @@ -112,19 +123,15 @@ func (c *coordinator) HandleGameStart(rq api.StartGameRequest[com.Uid], w *Worke if err := m.Init(); err != nil { c.log.Error().Err(err).Msgf("couldn't init the media") app.Close() + w.router.SetRoom(nil) return api.EmptyPacket } if app.Flipped() { m.SetVideoFlip(true) } + r.SetMedia(m) - // make the room - r = room.NewRoom[*room.GameSession](uid, app, w.router.Users(), m) - r.HandleClose = func() { c.CloseRoom(uid) } - - w.router.SetRoom(r) - c.log.Info().Str("room", r.Id()).Str("game", game.Name).Msg("New room") - + r.BindAppMedia() r.StartApp() } @@ -146,10 +153,8 @@ func (c *coordinator) HandleTerminateSession(rq api.TerminateSessionRequest[com. // HandleQuitGame handles cases when a user manually exits the game. func (c *coordinator) HandleQuitGame(rq api.GameQuitRequest[com.Uid], w *Worker) { - w.log.Debug().Msgf("Users before remove: %v", w.router.Users()) if user := w.router.FindUser(rq.Id); user != nil { w.router.Remove(user) - w.log.Debug().Msgf("Users after remove: %v", w.router.Users()) } } diff --git a/pkg/worker/room/room.go b/pkg/worker/room/room.go index a7cfe123b..7c2aef789 100644 --- a/pkg/worker/room/room.go +++ b/pkg/worker/room/room.go @@ -53,8 +53,10 @@ type Room[T Session] struct { func NewRoom[T Session](id string, app app.App, um SessionManager[T], media MediaPipe) *Room[T] { room := &Room[T]{id: id, app: app, users: um, media: media} - room.InitVideo() - room.InitAudio() + if app != nil && media != nil { + room.InitVideo() + room.InitAudio() + } return room } @@ -70,9 +72,12 @@ func (r *Room[T]) InitVideo() { }) } -func (r *Room[T]) App() app.App { return r.app } -func (r *Room[T]) Id() string { return r.id } -func (r *Room[T]) StartApp() { r.app.Start() } +func (r *Room[T]) App() app.App { return r.app } +func (r *Room[T]) BindAppMedia() { r.InitAudio(); r.InitVideo() } +func (r *Room[T]) Id() string { return r.id } +func (r *Room[T]) SetApp(app app.App) { r.app = app } +func (r *Room[T]) SetMedia(m MediaPipe) { r.media = m } +func (r *Room[T]) StartApp() { r.app.Start() } func (r *Room[T]) Close() { if r.closed { @@ -124,6 +129,7 @@ func (r *Router[T]) Remove(user T) { r.Close() } } +func (r *Router[T]) Room() *Room[T] { r.mu.Lock(); defer r.mu.Unlock(); return r.room } func (r *Router[T]) FindUser(uid Uid) T { return r.users.Find(uid.Id()) } func (r *Router[T]) SetRoom(room *Room[T]) { r.mu.Lock(); r.room = room; r.mu.Unlock() } func (r *Router[T]) Users() SessionManager[T] { return r.users }