Skip to content

Commit

Permalink
Send worker lib
Browse files Browse the repository at this point in the history
  • Loading branch information
sergystepanov committed Nov 17, 2024
1 parent 45cc9e8 commit 7b57f73
Show file tree
Hide file tree
Showing 13 changed files with 123 additions and 17 deletions.
3 changes: 3 additions & 0 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ const (
IceCandidate = WebrtcIce
TerminateSession PT = 204
AppVideoChange PT = 150
LibNewGameList PT = 205
)

func (p PT) String() string {
Expand Down Expand Up @@ -125,6 +126,8 @@ func (p PT) String() string {
return "TerminateSession"
case AppVideoChange:
return "AppVideoChange"
case LibNewGameList:
return "LibNewGameList"
default:
return "Unknown"
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/api/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,4 +66,9 @@ type (
S int `json:"s"`
A float32 `json:"a"`
}

LibGameListInfo struct {
T int
List []GameInfo
}
)
4 changes: 4 additions & 0 deletions pkg/com/com.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ func (c *SocketClient[T, P, _, _]) ProcessPackets(fn func(in P) error) chan stru
return c.sock.conn.Listen()
}

func (c *SocketClient[T, P, X, P2]) SetErrorHandler(h func(error)) { c.sock.conn.SetErrorHandler(h) }

func (c *SocketClient[T, P, X, P2]) SetMaxMessageSize(s int64) { c.sock.conn.SetMaxMessageSize(s) }

func (c *SocketClient[_, _, _, _]) handleMessage(message []byte, err error) {
if err != nil {
c.log.Error().Err(err).Send()
Expand Down
6 changes: 4 additions & 2 deletions pkg/com/net.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ type request struct {
response []byte
}

const DefaultCallTimeout = 7 * time.Second
const DefaultCallTimeout = 10 * time.Second

var errCanceled = errors.New("canceled")
var errTimeout = errors.New("timeout")
Expand All @@ -97,7 +97,9 @@ func (s *Server) Connect(w http.ResponseWriter, r *http.Request) (*Connection, e
return connect(s.Server.Connect(w, r, nil))
}

func (c Connection) IsServer() bool { return c.conn.IsServer() }
func (c *Connection) IsServer() bool { return c.conn.IsServer() }

func (c *Connection) SetMaxReadSize(s int64) { c.conn.SetMaxMessageSize(s) }

func connect(conn *websocket.Connection, err error) (*Connection, error) {
if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions pkg/config/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ coordinator:
origin:
userWs:
workerWs:
# max websocket message size in bytes
maxWsSize: 32000000
# HTTP(S) server config
server:
address: :8000
Expand Down
1 change: 1 addition & 0 deletions pkg/config/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ type Coordinator struct {
Analytics Analytics
Debug bool
Library Library
MaxWsSize int64
Monitoring Monitoring
Origin struct {
UserWs string
Expand Down
3 changes: 3 additions & 0 deletions pkg/coordinator/hub.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ func (h *Hub) handleWorkerConnection() http.HandlerFunc {
Str(logger.DirectionField, logger.MarkIn),
)

h.log.Debug().Msgf("WS max message size: %vb", h.conf.Coordinator.MaxWsSize)

return func(w http.ResponseWriter, r *http.Request) {
h.log.Debug().Msgf("Handshake %v", r.Host)

Expand Down Expand Up @@ -131,6 +133,7 @@ func (h *Hub) handleWorkerConnection() http.HandlerFunc {
log.Error().Err(err).Msg("worker connection fail")
return
}
conn.SetMaxReadSize(h.conf.Coordinator.MaxWsSize)

worker := NewWorker(conn, *handshake, log)
defer h.workers.RemoveDisconnect(worker)
Expand Down
9 changes: 9 additions & 0 deletions pkg/coordinator/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,15 @@ func (w *Worker) HandleRequests(users HasUserRegistry) chan struct{} {
w.log.Error().Err(err).Send()
return api.ErrMalformed
}
case api.LibNewGameList:
inf := api.Unwrap[api.LibGameListInfo](payload)
if inf == nil {
return api.ErrMalformed
}
if err := w.HandleLibGameList(*inf); err != nil {
w.log.Error().Err(err).Send()
return api.ErrMalformed
}
default:
w.log.Warn().Msgf("Unknown packet: %+v", p)
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/coordinator/workerhandlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,8 @@ func (w *Worker) HandleIceCandidate(rq api.WebrtcIceCandidateRequest[com.Uid], u
}
return nil
}

func (w *Worker) HandleLibGameList(inf api.LibGameListInfo) error {
w.log.Info().Msgf("Oh, lib: %v", inf)
return nil
}
26 changes: 26 additions & 0 deletions pkg/network/retry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package network

import "time"

const retry = 10 * time.Second

type Retry struct {
t time.Duration
fail bool
}

func NewRetry() Retry {
return Retry{t: retry}
}

func (r *Retry) Fail() *Retry { r.fail = true; time.Sleep(r.t); return r }
func (r *Retry) Failed() bool { return r.fail }
func (r *Retry) Multiply(x int) { r.t *= time.Duration(x) }
func (r *Retry) SuccessCheck() {
if r.fail {
return
}
r.t = retry
r.fail = false
}
func (r *Retry) Time() time.Duration { return r.t }
30 changes: 22 additions & 8 deletions pkg/network/websocket/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,15 @@ type Server struct {
}

type Connection struct {
alive bool
callback MessageHandler
conn deadlineConn
done chan struct{}
once sync.Once
pingPong bool
send chan []byte
alive bool
callback MessageHandler
conn deadlineConn
done chan struct{}
errorHandler ErrorHandler
once sync.Once
pingPong bool
send chan []byte
messSize int64
}

type deadlineConn struct {
Expand All @@ -43,6 +45,7 @@ type deadlineConn struct {
}

type MessageHandler func([]byte, error)
type ErrorHandler func(err error)

type Upgrader struct {
websocket.Upgrader
Expand Down Expand Up @@ -125,7 +128,12 @@ func (c *Connection) reader() {
c.close()
}()

c.conn.SetReadLimit(maxMessageSize)
var s int64 = maxMessageSize
if c.messSize > 0 {
s = c.messSize
}
c.conn.SetReadLimit(s)

_ = c.conn.SetReadDeadline(time.Now().Add(pongTime))
if c.pingPong {
c.conn.SetPongHandler(func(string) error { _ = c.conn.SetReadDeadline(time.Now().Add(pongTime)); return nil })
Expand All @@ -145,6 +153,8 @@ func (c *Connection) reader() {
_, message, err := c.conn.ReadMessage()
if err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway) {
c.errorHandler(err)
} else {
c.callback(message, err)
}
break
Expand Down Expand Up @@ -219,6 +229,10 @@ func (c *Connection) IsServer() bool { return c.pingPong }

func (c *Connection) SetMessageHandler(fn MessageHandler) { c.callback = fn }

func (c *Connection) SetErrorHandler(fn ErrorHandler) { c.errorHandler = fn }

func (c *Connection) SetMaxMessageSize(s int64) { c.messSize = s }

func (c *Connection) Listen() chan struct{} {
if c.alive {
return c.done
Expand Down
12 changes: 12 additions & 0 deletions pkg/worker/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ type Connection interface {
Disconnect()
Id() com.Uid
ProcessPackets(func(api.In[com.Uid]) error) chan struct{}
SetErrorHandler(func(error))

Send(api.PT, any) ([]byte, error)
Notify(api.PT, any)
Expand Down Expand Up @@ -148,3 +149,14 @@ func (c *coordinator) CloseRoom(id string) { c.Notify(api.CloseRoom, id) }
func (c *coordinator) IceCandidate(candidate string, sessionId com.Uid) {
c.Notify(api.WebrtcIce, api.WebrtcIceCandidateRequest[com.Uid]{Stateful: api.Stateful[com.Uid]{Id: sessionId}, Candidate: candidate})
}

func (c *coordinator) SendLibrary(w *Worker) {
games := w.lib.GetAll()

var gg = make([]api.GameInfo, len(games))
for i, g := range games {
gg[i] = api.GameInfo(g)
}

c.Notify(api.LibNewGameList, api.LibGameListInfo{T: 1, List: gg})
}
34 changes: 27 additions & 7 deletions pkg/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ import (
"time"

"github.com/giongto35/cloud-game/v3/pkg/config"
"github.com/giongto35/cloud-game/v3/pkg/games"
"github.com/giongto35/cloud-game/v3/pkg/logger"
"github.com/giongto35/cloud-game/v3/pkg/monitoring"
"github.com/giongto35/cloud-game/v3/pkg/network"
"github.com/giongto35/cloud-game/v3/pkg/network/httpx"
"github.com/giongto35/cloud-game/v3/pkg/worker/caged"
"github.com/giongto35/cloud-game/v3/pkg/worker/cloud"
Expand All @@ -18,6 +20,7 @@ type Worker struct {
address string
conf config.WorkerConfig
cord *coordinator
lib games.GameLibrary
log *logger.Logger
mana *caged.Manager
router *room.GameRouter
Expand All @@ -28,14 +31,22 @@ type Worker struct {
storage cloud.Storage
}

const retry = 10 * time.Second

func New(conf config.WorkerConfig, log *logger.Logger) (*Worker, error) {
manager := caged.NewManager(log)
if err := manager.Load(caged.Libretro, conf); err != nil {
return nil, fmt.Errorf("couldn't cage libretro: %v", err)
}
worker := &Worker{conf: conf, log: log, mana: manager, router: room.NewGameRouter()}

library := games.NewLib(conf.Library, conf.Emulator, log)
library.Scan()

worker := &Worker{
conf: conf,
lib: library,
log: log,
mana: manager,
router: room.NewGameRouter(),
}

h, err := httpx.NewServer(
conf.Worker.GetAddr(),
Expand Down Expand Up @@ -79,6 +90,12 @@ func (w *Worker) Start(done chan struct{}) {
}

// !to restore alive worker info when coordinator connection was lost
retry := network.NewRetry()

onRetryFail := func(err error) {
w.log.Warn().Err(err).Msgf("socket fail. Retrying in %v", retry.Time())
retry.Fail().Multiply(2)
}

go func() {
remoteAddr := w.conf.Worker.Network.CoordinatorAddress
Expand All @@ -96,14 +113,17 @@ func (w *Worker) Start(done chan struct{}) {
default:
cord, err := newCoordinatorConnection(remoteAddr, w.conf.Worker, w.address, w.log)
if err != nil {
w.log.Warn().Err(err).Msgf("no connection: %v. Retrying in %v", remoteAddr, retry)
time.Sleep(retry)
w.log.Warn().Err(err).Msgf("no connection: %v. Retrying in %v", remoteAddr, retry.Time())
retry.Fail()
continue
}
cord.SetErrorHandler(onRetryFail)
w.cord = cord
w.cord.log.Info().Msgf("Connected to the coordinator %v", remoteAddr)
<-w.cord.HandleRequests(w)
w.Reset()
wait := w.cord.HandleRequests(w)
w.cord.SendLibrary(w)
<-wait
retry.SuccessCheck()
}
}
}()
Expand Down

0 comments on commit 7b57f73

Please sign in to comment.