diff --git a/pkg/api/api.go b/pkg/api/api.go index 34353bb08..6700ed04c 100644 --- a/pkg/api/api.go +++ b/pkg/api/api.go @@ -85,6 +85,7 @@ const ( IceCandidate = WebrtcIce TerminateSession PT = 204 AppVideoChange PT = 150 + LibNewGameList PT = 205 ) func (p PT) String() string { @@ -125,6 +126,8 @@ func (p PT) String() string { return "TerminateSession" case AppVideoChange: return "AppVideoChange" + case LibNewGameList: + return "LibNewGameList" default: return "Unknown" } diff --git a/pkg/api/worker.go b/pkg/api/worker.go index 0436ee731..2bf963e3b 100644 --- a/pkg/api/worker.go +++ b/pkg/api/worker.go @@ -66,4 +66,9 @@ type ( S int `json:"s"` A float32 `json:"a"` } + + LibGameListInfo struct { + T int + List []GameInfo + } ) diff --git a/pkg/com/com.go b/pkg/com/com.go index 3bb930d2f..bbeaa0d3e 100644 --- a/pkg/com/com.go +++ b/pkg/com/com.go @@ -55,6 +55,10 @@ func (c *SocketClient[T, P, _, _]) ProcessPackets(fn func(in P) error) chan stru return c.sock.conn.Listen() } +func (c *SocketClient[T, P, X, P2]) SetErrorHandler(h func(error)) { c.sock.conn.SetErrorHandler(h) } + +func (c *SocketClient[T, P, X, P2]) SetMaxMessageSize(s int64) { c.sock.conn.SetMaxMessageSize(s) } + func (c *SocketClient[_, _, _, _]) handleMessage(message []byte, err error) { if err != nil { c.log.Error().Err(err).Send() diff --git a/pkg/com/net.go b/pkg/com/net.go index f670266ae..558c81486 100644 --- a/pkg/com/net.go +++ b/pkg/com/net.go @@ -72,7 +72,7 @@ type request struct { response []byte } -const DefaultCallTimeout = 7 * time.Second +const DefaultCallTimeout = 10 * time.Second var errCanceled = errors.New("canceled") var errTimeout = errors.New("timeout") @@ -97,7 +97,9 @@ func (s *Server) Connect(w http.ResponseWriter, r *http.Request) (*Connection, e return connect(s.Server.Connect(w, r, nil)) } -func (c Connection) IsServer() bool { return c.conn.IsServer() } +func (c *Connection) IsServer() bool { return c.conn.IsServer() } + +func (c *Connection) SetMaxReadSize(s int64) { c.conn.SetMaxMessageSize(s) } func connect(conn *websocket.Connection, err error) (*Connection, error) { if err != nil { diff --git a/pkg/config/config.yaml b/pkg/config/config.yaml index d27950f95..e02f8c203 100644 --- a/pkg/config/config.yaml +++ b/pkg/config/config.yaml @@ -59,6 +59,8 @@ coordinator: origin: userWs: workerWs: + # max websocket message size in bytes + maxWsSize: 32000000 # HTTP(S) server config server: address: :8000 diff --git a/pkg/config/coordinator.go b/pkg/config/coordinator.go index daa9b3933..6a41cce05 100644 --- a/pkg/config/coordinator.go +++ b/pkg/config/coordinator.go @@ -15,6 +15,7 @@ type Coordinator struct { Analytics Analytics Debug bool Library Library + MaxWsSize int64 Monitoring Monitoring Origin struct { UserWs string diff --git a/pkg/coordinator/hub.go b/pkg/coordinator/hub.go index 56d5cbc3d..c3c33b918 100644 --- a/pkg/coordinator/hub.go +++ b/pkg/coordinator/hub.go @@ -104,6 +104,8 @@ func (h *Hub) handleWorkerConnection() http.HandlerFunc { Str(logger.DirectionField, logger.MarkIn), ) + h.log.Debug().Msgf("WS max message size: %vb", h.conf.Coordinator.MaxWsSize) + return func(w http.ResponseWriter, r *http.Request) { h.log.Debug().Msgf("Handshake %v", r.Host) @@ -131,6 +133,7 @@ func (h *Hub) handleWorkerConnection() http.HandlerFunc { log.Error().Err(err).Msg("worker connection fail") return } + conn.SetMaxReadSize(h.conf.Coordinator.MaxWsSize) worker := NewWorker(conn, *handshake, log) defer h.workers.RemoveDisconnect(worker) diff --git a/pkg/coordinator/worker.go b/pkg/coordinator/worker.go index 32b8f1473..3ce1087ff 100644 --- a/pkg/coordinator/worker.go +++ b/pkg/coordinator/worker.go @@ -75,6 +75,15 @@ func (w *Worker) HandleRequests(users HasUserRegistry) chan struct{} { w.log.Error().Err(err).Send() return api.ErrMalformed } + case api.LibNewGameList: + inf := api.Unwrap[api.LibGameListInfo](payload) + if inf == nil { + return api.ErrMalformed + } + if err := w.HandleLibGameList(*inf); err != nil { + w.log.Error().Err(err).Send() + return api.ErrMalformed + } default: w.log.Warn().Msgf("Unknown packet: %+v", p) } diff --git a/pkg/coordinator/workerhandlers.go b/pkg/coordinator/workerhandlers.go index 6f50d126b..0a1571e6e 100644 --- a/pkg/coordinator/workerhandlers.go +++ b/pkg/coordinator/workerhandlers.go @@ -21,3 +21,8 @@ func (w *Worker) HandleIceCandidate(rq api.WebrtcIceCandidateRequest[com.Uid], u } return nil } + +func (w *Worker) HandleLibGameList(inf api.LibGameListInfo) error { + w.log.Info().Msgf("Oh, lib: %v", inf) + return nil +} diff --git a/pkg/network/retry.go b/pkg/network/retry.go new file mode 100644 index 000000000..0498b7db2 --- /dev/null +++ b/pkg/network/retry.go @@ -0,0 +1,26 @@ +package network + +import "time" + +const retry = 10 * time.Second + +type Retry struct { + t time.Duration + fail bool +} + +func NewRetry() Retry { + return Retry{t: retry} +} + +func (r *Retry) Fail() *Retry { r.fail = true; time.Sleep(r.t); return r } +func (r *Retry) Failed() bool { return r.fail } +func (r *Retry) Multiply(x int) { r.t *= time.Duration(x) } +func (r *Retry) SuccessCheck() { + if r.fail { + return + } + r.t = retry + r.fail = false +} +func (r *Retry) Time() time.Duration { return r.t } diff --git a/pkg/network/websocket/websocket.go b/pkg/network/websocket/websocket.go index 2d8ecfd34..84bd06edf 100644 --- a/pkg/network/websocket/websocket.go +++ b/pkg/network/websocket/websocket.go @@ -27,13 +27,15 @@ type Server struct { } type Connection struct { - alive bool - callback MessageHandler - conn deadlineConn - done chan struct{} - once sync.Once - pingPong bool - send chan []byte + alive bool + callback MessageHandler + conn deadlineConn + done chan struct{} + errorHandler ErrorHandler + once sync.Once + pingPong bool + send chan []byte + messSize int64 } type deadlineConn struct { @@ -43,6 +45,7 @@ type deadlineConn struct { } type MessageHandler func([]byte, error) +type ErrorHandler func(err error) type Upgrader struct { websocket.Upgrader @@ -125,7 +128,12 @@ func (c *Connection) reader() { c.close() }() - c.conn.SetReadLimit(maxMessageSize) + var s int64 = maxMessageSize + if c.messSize > 0 { + s = c.messSize + } + c.conn.SetReadLimit(s) + _ = c.conn.SetReadDeadline(time.Now().Add(pongTime)) if c.pingPong { c.conn.SetPongHandler(func(string) error { _ = c.conn.SetReadDeadline(time.Now().Add(pongTime)); return nil }) @@ -145,6 +153,8 @@ func (c *Connection) reader() { _, message, err := c.conn.ReadMessage() if err != nil { if websocket.IsUnexpectedCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway) { + c.errorHandler(err) + } else { c.callback(message, err) } break @@ -219,6 +229,10 @@ func (c *Connection) IsServer() bool { return c.pingPong } func (c *Connection) SetMessageHandler(fn MessageHandler) { c.callback = fn } +func (c *Connection) SetErrorHandler(fn ErrorHandler) { c.errorHandler = fn } + +func (c *Connection) SetMaxMessageSize(s int64) { c.messSize = s } + func (c *Connection) Listen() chan struct{} { if c.alive { return c.done diff --git a/pkg/worker/coordinator.go b/pkg/worker/coordinator.go index 4902d753e..734363874 100644 --- a/pkg/worker/coordinator.go +++ b/pkg/worker/coordinator.go @@ -14,6 +14,7 @@ type Connection interface { Disconnect() Id() com.Uid ProcessPackets(func(api.In[com.Uid]) error) chan struct{} + SetErrorHandler(func(error)) Send(api.PT, any) ([]byte, error) Notify(api.PT, any) @@ -148,3 +149,14 @@ func (c *coordinator) CloseRoom(id string) { c.Notify(api.CloseRoom, id) } func (c *coordinator) IceCandidate(candidate string, sessionId com.Uid) { c.Notify(api.WebrtcIce, api.WebrtcIceCandidateRequest[com.Uid]{Stateful: api.Stateful[com.Uid]{Id: sessionId}, Candidate: candidate}) } + +func (c *coordinator) SendLibrary(w *Worker) { + games := w.lib.GetAll() + + var gg = make([]api.GameInfo, len(games)) + for i, g := range games { + gg[i] = api.GameInfo(g) + } + + c.Notify(api.LibNewGameList, api.LibGameListInfo{T: 1, List: gg}) +} diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go index 148187287..f4333d29d 100644 --- a/pkg/worker/worker.go +++ b/pkg/worker/worker.go @@ -6,8 +6,10 @@ import ( "time" "github.com/giongto35/cloud-game/v3/pkg/config" + "github.com/giongto35/cloud-game/v3/pkg/games" "github.com/giongto35/cloud-game/v3/pkg/logger" "github.com/giongto35/cloud-game/v3/pkg/monitoring" + "github.com/giongto35/cloud-game/v3/pkg/network" "github.com/giongto35/cloud-game/v3/pkg/network/httpx" "github.com/giongto35/cloud-game/v3/pkg/worker/caged" "github.com/giongto35/cloud-game/v3/pkg/worker/cloud" @@ -18,6 +20,7 @@ type Worker struct { address string conf config.WorkerConfig cord *coordinator + lib games.GameLibrary log *logger.Logger mana *caged.Manager router *room.GameRouter @@ -28,14 +31,22 @@ type Worker struct { storage cloud.Storage } -const retry = 10 * time.Second - func New(conf config.WorkerConfig, log *logger.Logger) (*Worker, error) { manager := caged.NewManager(log) if err := manager.Load(caged.Libretro, conf); err != nil { return nil, fmt.Errorf("couldn't cage libretro: %v", err) } - worker := &Worker{conf: conf, log: log, mana: manager, router: room.NewGameRouter()} + + library := games.NewLib(conf.Library, conf.Emulator, log) + library.Scan() + + worker := &Worker{ + conf: conf, + lib: library, + log: log, + mana: manager, + router: room.NewGameRouter(), + } h, err := httpx.NewServer( conf.Worker.GetAddr(), @@ -79,6 +90,12 @@ func (w *Worker) Start(done chan struct{}) { } // !to restore alive worker info when coordinator connection was lost + retry := network.NewRetry() + + onRetryFail := func(err error) { + w.log.Warn().Err(err).Msgf("socket fail. Retrying in %v", retry.Time()) + retry.Fail().Multiply(2) + } go func() { remoteAddr := w.conf.Worker.Network.CoordinatorAddress @@ -96,14 +113,17 @@ func (w *Worker) Start(done chan struct{}) { default: cord, err := newCoordinatorConnection(remoteAddr, w.conf.Worker, w.address, w.log) if err != nil { - w.log.Warn().Err(err).Msgf("no connection: %v. Retrying in %v", remoteAddr, retry) - time.Sleep(retry) + w.log.Warn().Err(err).Msgf("no connection: %v. Retrying in %v", remoteAddr, retry.Time()) + retry.Fail() continue } + cord.SetErrorHandler(onRetryFail) w.cord = cord w.cord.log.Info().Msgf("Connected to the coordinator %v", remoteAddr) - <-w.cord.HandleRequests(w) - w.Reset() + wait := w.cord.HandleRequests(w) + w.cord.SendLibrary(w) + <-wait + retry.SuccessCheck() } } }()