Skip to content

Commit cddf081

Browse files
committed
Use locks in router with rooms
1 parent c1c4731 commit cddf081

File tree

8 files changed

+60
-27
lines changed

8 files changed

+60
-27
lines changed

pkg/com/com.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ func NewNetMap[K comparable, T NetClient[K]]() NetMap[K, T] {
1515

1616
func (m *NetMap[K, T]) Add(client T) bool { return m.Put(client.Id(), client) }
1717
func (m *NetMap[K, T]) Remove(client T) { m.Map.Remove(client.Id()) }
18+
func (m *NetMap[K, T]) RemoveL(client T) int { return m.Map.RemoveL(client.Id()) }
1819
func (m *NetMap[K, T]) Reset() { m.Map = Map[K, T]{m: make(map[K]T, 10)} }
1920
func (m *NetMap[K, T]) RemoveDisconnect(client T) { client.Disconnect(); m.Remove(client) }
2021

pkg/com/map.go

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
package com
22

3-
import "sync"
3+
import (
4+
"fmt"
5+
"sync"
6+
)
47

58
// Map defines a concurrent-safe map structure.
69
// Keep in mind that the underlying map structure will grow indefinitely.
@@ -9,7 +12,7 @@ type Map[K comparable, V any] struct {
912
mu sync.Mutex
1013
}
1114

12-
func (m *Map[K, _]) Has(key K) bool { _, ok := m.Find(key); return ok }
15+
func (m *Map[K, _]) Has(key K) bool { _, ok := m.Contains(key); return ok }
1316
func (m *Map[_, _]) Len() int { m.mu.Lock(); defer m.mu.Unlock(); return len(m.m) }
1417
func (m *Map[K, V]) Pop(key K) V {
1518
m.mu.Lock()
@@ -26,9 +29,22 @@ func (m *Map[K, V]) Put(key K, v V) bool {
2629
return ok
2730
}
2831
func (m *Map[K, _]) Remove(key K) { m.mu.Lock(); delete(m.m, key); m.mu.Unlock() }
32+
func (m *Map[K, _]) RemoveL(key K) int {
33+
m.mu.Lock()
34+
delete(m.m, key)
35+
k := len(m.m)
36+
m.mu.Unlock()
37+
return k
38+
}
39+
func (m *Map[K, V]) String() string {
40+
m.mu.Lock()
41+
s := fmt.Sprintf("%v", m.m)
42+
m.mu.Unlock()
43+
return s
44+
}
2945

30-
// Find returns the first value found and a boolean flag if its found or not.
31-
func (m *Map[K, V]) Find(key K) (v V, ok bool) {
46+
// Contains returns the first value found and a boolean flag if its found or not.
47+
func (m *Map[K, V]) Contains(key K) (v V, ok bool) {
3248
m.mu.Lock()
3349
defer m.mu.Unlock()
3450
if vv, ok := m.m[key]; ok {
@@ -37,6 +53,11 @@ func (m *Map[K, V]) Find(key K) (v V, ok bool) {
3753
return v, false
3854
}
3955

56+
func (m *Map[K, V]) Find(key K) V {
57+
v, _ := m.Contains(key)
58+
return v
59+
}
60+
4061
// FindBy searches the first key-value with the provided predicate function.
4162
func (m *Map[K, V]) FindBy(fn func(v V) bool) (v V, ok bool) {
4263
m.mu.Lock()

pkg/com/map_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,11 @@ func TestMap_Base(t *testing.T) {
1717
if !m.Has(k) {
1818
t.Errorf("should have the key %v, %v", k, m.m)
1919
}
20-
v, ok := m.Find(k)
20+
v, ok := m.Contains(k)
2121
if v != 0 && !ok {
2222
t.Errorf("should have the key %v and ok, %v %v", k, ok, m.m)
2323
}
24-
_, ok = m.Find(k + 1)
24+
_, ok = m.Contains(k + 1)
2525
if ok {
2626
t.Errorf("should not find anything, %v %v", ok, m.m)
2727
}

pkg/coordinator/worker.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ type RegionalClient interface {
2929
}
3030

3131
type HasUserRegistry interface {
32-
Find(com.Uid) (*User, bool)
32+
Find(com.Uid) *User
3333
}
3434

3535
func NewWorker(sock *com.Connection, handshake api.ConnectionRequest[com.Uid], log *logger.Logger) *Worker {

pkg/coordinator/workerhandlers.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ func (w *Worker) HandleCloseRoom(rq api.CloseRoomRequest) {
1414
}
1515

1616
func (w *Worker) HandleIceCandidate(rq api.WebrtcIceCandidateRequest[com.Uid], users HasUserRegistry) error {
17-
if usr, ok := users.Find(rq.Id); ok {
17+
if usr := users.Find(rq.Id); usr != nil {
1818
usr.SendWebrtcIceCandidate(rq.Candidate)
1919
} else {
2020
w.log.Warn().Str("id", rq.Id.String()).Msg("unknown session")

pkg/worker/coordinatorhandlers.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,10 @@ func (c *coordinator) HandleWebrtcInit(rq api.WebrtcInitRequest[com.Uid], w *Wor
4949
}
5050

5151
user := room.NewGameSession(rq.Id, peer) // use user uid from the coordinator
52-
w.router.AddUser(user)
5352
c.log.Info().Msgf("Peer connection: %s", user.Id())
53+
c.log.Debug().Msgf("Users before add: %v", w.router.Users())
54+
w.router.AddUser(user)
55+
c.log.Debug().Msgf("Users after add: %v", w.router.Users())
5456

5557
return api.Out{Payload: sdp}
5658
}
@@ -118,11 +120,7 @@ func (c *coordinator) HandleGameStart(rq api.StartGameRequest[com.Uid], w *Worke
118120

119121
// make the room
120122
r = room.NewRoom[*room.GameSession](uid, app, w.router.Users(), m)
121-
r.HandleClose = func() {
122-
w.router.Close()
123-
c.CloseRoom(uid)
124-
w.log.Debug().Msgf("Closed room %v", uid)
125-
}
123+
r.HandleClose = func() { c.CloseRoom(uid) }
126124

127125
w.router.SetRoom(r)
128126
c.log.Info().Str("room", r.Id()).Str("game", game.Name).Msg("New room")
@@ -148,8 +146,10 @@ func (c *coordinator) HandleTerminateSession(rq api.TerminateSessionRequest[com.
148146

149147
// HandleQuitGame handles cases when a user manually exits the game.
150148
func (c *coordinator) HandleQuitGame(rq api.GameQuitRequest[com.Uid], w *Worker) {
149+
w.log.Debug().Msgf("Users before remove: %v", w.router.Users())
151150
if user := w.router.FindUser(rq.Id); user != nil {
152151
w.router.Remove(user)
152+
w.log.Debug().Msgf("Users after remove: %v", w.router.Users())
153153
}
154154
}
155155

pkg/worker/room/room.go

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
package room
22

3-
import "github.com/giongto35/cloud-game/v3/pkg/worker/caged/app"
3+
import (
4+
"sync"
5+
6+
"github.com/giongto35/cloud-game/v3/pkg/worker/caged/app"
7+
)
48

59
type MediaPipe interface {
610
// Destroy frees all allocated resources.
@@ -19,10 +23,9 @@ type MediaPipe interface {
1923

2024
type SessionManager[T Session] interface {
2125
Add(T) bool
22-
Find(string) (T, bool)
26+
Find(string) T
2327
ForEach(func(T))
24-
Len() int
25-
Remove(T)
28+
RemoveL(T) int
2629
// Reset used for proper cleanup of the resources if needed.
2730
Reset()
2831
}
@@ -93,32 +96,36 @@ func (r *Room[T]) Close() {
9396
type Router[T Session] struct {
9497
room *Room[T]
9598
users SessionManager[T]
99+
mu sync.Mutex
96100
}
97101

98102
func (r *Router[T]) AddUser(user T) { r.users.Add(user) }
103+
99104
func (r *Router[T]) Close() {
105+
r.mu.Lock()
100106
if r.room != nil {
101107
r.room.Close()
102108
r.room = nil
103109
}
110+
r.mu.Unlock()
104111
}
112+
105113
func (r *Router[T]) FindRoom(id string) *Room[T] {
114+
r.mu.Lock()
115+
defer r.mu.Unlock()
106116
if r.room != nil && r.room.Id() == id {
107117
return r.room
108118
}
109119
return nil
110120
}
111-
func (r *Router[T]) FindUser(uid Uid) T { sess, _ := r.users.Find(uid.Id()); return sess }
121+
112122
func (r *Router[T]) Remove(user T) {
113-
r.users.Remove(user)
114-
if r.users.Len() == 0 {
115-
if r.room != nil {
116-
r.room.Close()
117-
}
118-
r.users.Reset()
123+
if left := r.users.RemoveL(user); left == 0 {
124+
r.Close()
119125
}
120126
}
121-
func (r *Router[T]) SetRoom(room *Room[T]) { r.room = room }
127+
func (r *Router[T]) FindUser(uid Uid) T { return r.users.Find(uid.Id()) }
128+
func (r *Router[T]) SetRoom(room *Room[T]) { r.mu.Lock(); r.room = room; r.mu.Unlock() }
122129
func (r *Router[T]) Users() SessionManager[T] { return r.users }
123130

124131
type AppSession struct {

pkg/worker/worker.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,11 @@ type Worker struct {
2828
storage cloud.Storage
2929
}
3030

31-
func (w *Worker) Reset() { w.router.Close() }
31+
func (w *Worker) Reset() {
32+
w.log.Debug().Msgf("Users before close: %v", w.router.Users())
33+
w.router.Close()
34+
w.log.Debug().Msgf("Users after close: %v", w.router.Users())
35+
}
3236

3337
const retry = 10 * time.Second
3438

0 commit comments

Comments
 (0)