Skip to content

Commit

Permalink
Guard room creation with a mutex
Browse files Browse the repository at this point in the history
Possible fix for the concurrent room creation while other is not destroyed properly.
  • Loading branch information
sergystepanov committed Oct 6, 2023
1 parent cddf081 commit f875d3f
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 19 deletions.
33 changes: 19 additions & 14 deletions pkg/worker/coordinatorhandlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,7 @@ func (c *coordinator) HandleWebrtcInit(rq api.WebrtcInitRequest[com.Uid], w *Wor

user := room.NewGameSession(rq.Id, peer) // use user uid from the coordinator
c.log.Info().Msgf("Peer connection: %s", user.Id())
c.log.Debug().Msgf("Users before add: %v", w.router.Users())
w.router.AddUser(user)
c.log.Debug().Msgf("Users after add: %v", w.router.Users())

return api.Out{Payload: sdp}
}
Expand Down Expand Up @@ -83,11 +81,23 @@ func (c *coordinator) HandleGameStart(rq api.StartGameRequest[com.Uid], w *Worke

r := w.router.FindRoom(rq.Rid)

if r == nil {
if r == nil { // new room
uid := rq.Room.Rid
if uid == "" {
uid = games.GenerateRoomID(rq.Game.Name)
}
game := games.GameMetadata(rq.Game)

r = room.NewRoom[*room.GameSession](uid, nil, w.router.Users(), nil)
r.HandleClose = func() { c.CloseRoom(uid) }

if other := w.router.Room(); other != nil {
c.log.Error().Msgf("concurrent room creation: %v", uid)
return api.EmptyPacket
}

w.router.SetRoom(r)
c.log.Info().Str("room", r.Id()).Str("game", game.Name).Msg("New room")

// start the emulator
app := room.WithEmulator(w.mana.Get(caged.Libretro))
Expand All @@ -97,13 +107,14 @@ func (c *coordinator) HandleGameStart(rq api.StartGameRequest[com.Uid], w *Worke
app.EnableCloudStorage(uid, w.storage)
app.EnableRecording(rq.Record, rq.RecordUser, rq.Game.Name)

w.log.Info().Msgf("Starting game: %v", rq.Game.Name)
game := games.GameMetadata(rq.Game)
w.log.Info().Msgf("Starting the game: %v", rq.Game.Name)
if err := app.Load(game, w.conf.Worker.Library.BasePath); err != nil {
c.log.Error().Err(err).Msgf("couldn't load the game %v", game)
app.Close()
w.router.SetRoom(nil)
return api.EmptyPacket
}
r.SetApp(app)

m := media.NewWebRtcMediaPipe(w.conf.Encoder.Audio, w.conf.Encoder.Video, w.log)
m.AudioSrcHz = app.AudioSampleRate()
Expand All @@ -112,19 +123,15 @@ func (c *coordinator) HandleGameStart(rq api.StartGameRequest[com.Uid], w *Worke
if err := m.Init(); err != nil {
c.log.Error().Err(err).Msgf("couldn't init the media")
app.Close()
w.router.SetRoom(nil)
return api.EmptyPacket
}
if app.Flipped() {
m.SetVideoFlip(true)
}
r.SetMedia(m)

// make the room
r = room.NewRoom[*room.GameSession](uid, app, w.router.Users(), m)
r.HandleClose = func() { c.CloseRoom(uid) }

w.router.SetRoom(r)
c.log.Info().Str("room", r.Id()).Str("game", game.Name).Msg("New room")

r.BindAppMedia()
r.StartApp()
}

Expand All @@ -146,10 +153,8 @@ func (c *coordinator) HandleTerminateSession(rq api.TerminateSessionRequest[com.

// HandleQuitGame handles cases when a user manually exits the game.
func (c *coordinator) HandleQuitGame(rq api.GameQuitRequest[com.Uid], w *Worker) {
w.log.Debug().Msgf("Users before remove: %v", w.router.Users())
if user := w.router.FindUser(rq.Id); user != nil {
w.router.Remove(user)
w.log.Debug().Msgf("Users after remove: %v", w.router.Users())
}
}

Expand Down
16 changes: 11 additions & 5 deletions pkg/worker/room/room.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,10 @@ type Room[T Session] struct {

func NewRoom[T Session](id string, app app.App, um SessionManager[T], media MediaPipe) *Room[T] {
room := &Room[T]{id: id, app: app, users: um, media: media}
room.InitVideo()
room.InitAudio()
if app != nil && media != nil {
room.InitVideo()
room.InitAudio()
}
return room
}

Expand All @@ -70,9 +72,12 @@ func (r *Room[T]) InitVideo() {
})
}

func (r *Room[T]) App() app.App { return r.app }
func (r *Room[T]) Id() string { return r.id }
func (r *Room[T]) StartApp() { r.app.Start() }
func (r *Room[T]) App() app.App { return r.app }
func (r *Room[T]) BindAppMedia() { r.InitAudio(); r.InitVideo() }
func (r *Room[T]) Id() string { return r.id }
func (r *Room[T]) SetApp(app app.App) { r.app = app }
func (r *Room[T]) SetMedia(m MediaPipe) { r.media = m }
func (r *Room[T]) StartApp() { r.app.Start() }

func (r *Room[T]) Close() {
if r.closed {
Expand Down Expand Up @@ -124,6 +129,7 @@ func (r *Router[T]) Remove(user T) {
r.Close()
}
}
func (r *Router[T]) Room() *Room[T] { r.mu.Lock(); defer r.mu.Unlock(); return r.room }
func (r *Router[T]) FindUser(uid Uid) T { return r.users.Find(uid.Id()) }
func (r *Router[T]) SetRoom(room *Room[T]) { r.mu.Lock(); r.room = room; r.mu.Unlock() }
func (r *Router[T]) Users() SessionManager[T] { return r.users }
Expand Down

0 comments on commit f875d3f

Please sign in to comment.