Skip to content

Commit

Permalink
Add the initial caged apps abstraction
Browse files Browse the repository at this point in the history
In the current version of the application, we have strictly hardcoded the captured runtime application (FFI Libretro frontend) as well as the streaming transport (WebRTC). This commit makes it possible to choose these components at runtime.

In this commit, we no longer manage initially connected users separately from the rooms, and instead, we treat all users as abstract app sessions, rather than hardcoded WebRTC connections. These sessions may contain all the transport specifics, such as WebRTC and so on.

Rooms, instead of having the hardcoded emulator app and WebRTC media encoders, now have these components decoupled. In theory, it is possible to add new transports (e.g., WebTransport) and streaming apps (e.g., wrapped into an ffmpeg desktop app).
  • Loading branch information
sergystepanov committed Sep 16, 2023
1 parent 878d7fe commit 1969302
Show file tree
Hide file tree
Showing 70 changed files with 2,537 additions and 2,412 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ build.worker:
build: build.coordinator build.worker

verify-cores:
go test -run TestAllEmulatorRooms ./pkg/worker -v -renderFrames $(GL_CTX) -outputPath "../../_rendered"
go test -run TestAll ./pkg/worker/room -v -renderFrames $(GL_CTX) -outputPath "../../../_rendered"

dev.build: compile build

Expand Down
17 changes: 10 additions & 7 deletions pkg/com/com.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,21 @@ package com

import "github.com/giongto35/cloud-game/v3/pkg/logger"

type NetClient interface {
type NetClient[K comparable] interface {
Disconnect()
Id() Uid
Id() K
}

type NetMap[T NetClient] struct{ Map[Uid, T] }
type NetMap[K comparable, T NetClient[K]] struct{ Map[K, T] }

func NewNetMap[T NetClient]() NetMap[T] { return NetMap[T]{Map: Map[Uid, T]{m: make(map[Uid]T, 10)}} }
func NewNetMap[K comparable, T NetClient[K]]() NetMap[K, T] {
return NetMap[K, T]{Map: Map[K, T]{m: make(map[K]T, 10)}}
}

func (m *NetMap[T]) Add(client T) { m.Put(client.Id(), client) }
func (m *NetMap[T]) Remove(client T) { m.Map.Remove(client.Id()) }
func (m *NetMap[T]) RemoveDisconnect(client T) { client.Disconnect(); m.Remove(client) }
func (m *NetMap[K, T]) Add(client T) bool { return m.Put(client.Id(), client) }
func (m *NetMap[K, T]) Remove(client T) { m.Map.Remove(client.Id()) }
func (m *NetMap[K, T]) Reset() { m.Map = Map[K, T]{m: make(map[K]T, 10)} }
func (m *NetMap[K, T]) RemoveDisconnect(client T) { client.Disconnect(); m.Remove(client) }

type SocketClient[T ~uint8, P Packet[T], X any, P2 Packet2[X]] struct {
id Uid
Expand Down
10 changes: 8 additions & 2 deletions pkg/com/map.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,14 @@ func (m *Map[K, V]) Pop(key K) V {
m.mu.Unlock()
return v
}
func (m *Map[K, V]) Put(key K, v V) { m.mu.Lock(); m.m[key] = v; m.mu.Unlock() }
func (m *Map[K, _]) Remove(key K) { m.mu.Lock(); delete(m.m, key); m.mu.Unlock() }
func (m *Map[K, V]) Put(key K, v V) bool {
m.mu.Lock()
_, ok := m.m[key]
m.m[key] = v
m.mu.Unlock()
return ok
}
func (m *Map[K, _]) Remove(key K) { m.mu.Lock(); delete(m.m, key); m.mu.Unlock() }

// Find returns the first value found and a boolean flag if its found or not.
func (m *Map[K, V]) Find(key K) (v V, ok bool) {
Expand Down
1 change: 1 addition & 0 deletions pkg/com/net.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ func UidFromString(id string) (Uid, error) {
}

func (u Uid) Short() string { return u.String()[:3] + "." + u.String()[len(u.String())-3:] }
func (u Uid) Id() string { return u.String() }

type HasCallId interface {
SetGetId(fmt.Stringer)
Expand Down
24 changes: 5 additions & 19 deletions pkg/config/emulator.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package config

import (
"math"
"path"
"path/filepath"
"strings"
Expand All @@ -10,30 +9,17 @@ import (
type Emulator struct {
Scale int
Threads int
AspectRatio AspectRatio
AspectRatio struct {
Keep bool
Width int
Height int
}
Storage string
LocalPath string
Libretro LibretroConfig
AutosaveSec int
}

type AspectRatio struct {
Keep bool
Width int
Height int
}

func (a AspectRatio) ResizeToAspect(ratio float64, sw int, sh int) (dw int, dh int) {
// ratio is always > 0
dw = int(math.Round(float64(sh)*ratio/2) * 2)
dh = sh
if dw > sw {
dw = sw
dh = int(math.Round(float64(sw)/ratio/2) * 2)
}
return
}

type LibretroConfig struct {
Cores struct {
Paths struct {
Expand Down
10 changes: 4 additions & 6 deletions pkg/coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,10 @@ import (

type Coordinator struct {
hub *Hub
services [2]runnable
}

type runnable interface {
Run()
Stop() error
services [2]interface {
Run()
Stop() error
}
}

func New(conf config.CoordinatorConfig, log *logger.Logger) (*Coordinator, error) {
Expand Down
8 changes: 4 additions & 4 deletions pkg/coordinator/hub.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,15 @@ type Hub struct {
conf config.CoordinatorConfig
launcher games.Launcher
log *logger.Logger
users com.NetMap[*User]
workers com.NetMap[*Worker]
users com.NetMap[com.Uid, *User]
workers com.NetMap[com.Uid, *Worker]
}

func NewHub(conf config.CoordinatorConfig, lib games.GameLibrary, log *logger.Logger) *Hub {
return &Hub{
conf: conf,
users: com.NewNetMap[*User](),
workers: com.NewNetMap[*Worker](),
users: com.NewNetMap[com.Uid, *User](),
workers: com.NewNetMap[com.Uid, *Worker](),
launcher: games.NewGameLauncher(lib),
log: log,
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/coordinator/user.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ type HasServerInfo interface {
}

func NewUser(sock *com.Connection, log *logger.Logger) *User {
conn := com.NewConnection[api.PT, api.In[com.Uid], api.Out](sock, com.NewUid(), log)
conn := com.NewConnection[api.PT, api.In[com.Uid], api.Out, *api.Out](sock, com.NewUid(), log)
return &User{
Connection: conn,
log: log.Extend(log.With().
Expand Down
2 changes: 1 addition & 1 deletion pkg/coordinator/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type HasUserRegistry interface {
}

func NewWorker(sock *com.Connection, handshake api.ConnectionRequest[com.Uid], log *logger.Logger) *Worker {
conn := com.NewConnection[api.PT, api.In[com.Uid], api.Out](sock, handshake.Id, log)
conn := com.NewConnection[api.PT, api.In[com.Uid], api.Out, *api.Out](sock, handshake.Id, log)
return &Worker{
Connection: conn,
Addr: handshake.Addr,
Expand Down
63 changes: 38 additions & 25 deletions pkg/network/webrtc/webrtc.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package webrtc
import (
"fmt"
"strings"
"sync"
"time"

"github.com/giongto35/cloud-game/v3/pkg/logger"
Expand All @@ -16,27 +17,19 @@ type Peer struct {
log *logger.Logger
OnMessage func(data []byte)

aTrack *webrtc.TrackLocalStaticSample
vTrack *webrtc.TrackLocalStaticSample
dTrack *webrtc.DataChannel
a *webrtc.TrackLocalStaticSample
v *webrtc.TrackLocalStaticSample
d *webrtc.DataChannel
}

// A Sample contains encoded media and timing information
type Sample struct {
Data []byte
Timestamp time.Time
Duration time.Duration
PacketTimestamp uint32
PrevDroppedPackets uint16
Metadata interface{}
}
var samplePool sync.Pool

type Decoder func(data string, obj any) error

func New(log *logger.Logger, api *ApiFactory) *Peer { return &Peer{api: api, log: log} }

func (p *Peer) NewCall(vCodec, aCodec string, onICECandidate func(ice any)) (sdp any, err error) {
if p.IsConnected() {
if p.conn != nil && p.conn.ConnectionState() == webrtc.PeerConnectionStateConnected {
return
}
p.log.Info().Msg("WebRTC start")
Expand All @@ -52,7 +45,7 @@ func (p *Peer) NewCall(vCodec, aCodec string, onICECandidate func(ice any)) (sdp
if _, err = p.conn.AddTrack(video); err != nil {
return "", err
}
p.vTrack = video
p.v = video
p.log.Debug().Msgf("Added [%s] track", video.Codec().MimeType)

// plug in the [audio] track (out)
Expand All @@ -64,7 +57,7 @@ func (p *Peer) NewCall(vCodec, aCodec string, onICECandidate func(ice any)) (sdp
return "", err
}
p.log.Debug().Msgf("Added [%s] track", audio.Codec().MimeType)
p.aTrack = audio
p.a = audio

// plug in the [input] data channel (in)
if err = p.addInputChannel("game-input"); err != nil {
Expand All @@ -90,6 +83,35 @@ func (p *Peer) NewCall(vCodec, aCodec string, onICECandidate func(ice any)) (sdp
return offer, nil
}

func (p *Peer) SendAudio(dat []byte, dur int32) {
if err := p.send(dat, int64(dur), p.a.WriteSample); err != nil {
p.log.Error().Err(err).Send()
}
}

func (p *Peer) SendVideo(data []byte, dur int32) {
if err := p.send(data, int64(dur), p.v.WriteSample); err != nil {
p.log.Error().Err(err).Send()
}
}

func (p *Peer) SendData(data []byte) { _ = p.d.Send(data) }

func (p *Peer) send(data []byte, duration int64, fn func(media.Sample) error) error {
sample, _ := samplePool.Get().(*media.Sample)
if sample == nil {
sample = new(media.Sample)
}
sample.Data = data
sample.Duration = time.Duration(duration)
err := fn(*sample)
if err != nil {
return err
}
samplePool.Put(sample)
return nil
}

func (p *Peer) SetRemoteSDP(sdp string, decoder Decoder) error {
var answer webrtc.SessionDescription
if err := decoder(sdp, &answer); err != nil {
Expand All @@ -103,9 +125,6 @@ func (p *Peer) SetRemoteSDP(sdp string, decoder Decoder) error {
return nil
}

func (p *Peer) WriteVideo(s Sample) error { return p.vTrack.WriteSample(media.Sample(s)) }
func (p *Peer) WriteAudio(s Sample) error { return p.aTrack.WriteSample(media.Sample(s)) }

func newTrack(id string, label string, codec string) (*webrtc.TrackLocalStaticSample, error) {
codec = strings.ToLower(codec)
var mime string
Expand Down Expand Up @@ -189,12 +208,6 @@ func (p *Peer) Disconnect() {
p.log.Debug().Msg("WebRTC stop")
}

func (p *Peer) IsConnected() bool {
return p.conn != nil && p.conn.ConnectionState() == webrtc.PeerConnectionStateConnected
}

func (p *Peer) SendMessage(data []byte) { _ = p.dTrack.Send(data) }

// addInputChannel creates a new WebRTC data channel for user input.
// Default params -- ordered: true, negotiated: false.
func (p *Peer) addInputChannel(label string) error {
Expand All @@ -214,7 +227,7 @@ func (p *Peer) addInputChannel(label string) error {
p.OnMessage(m.Data)
}
})
p.dTrack = ch
p.d = ch
ch.OnClose(func() { p.log.Debug().Msg("Data channel [input] has been closed") })
return nil
}
Expand Down
25 changes: 25 additions & 0 deletions pkg/worker/caged/app/app.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package app

import "image"

type App interface {
AudioSampleRate() int
Init() error
ViewportSize() (int, int)
Start()
Close()

SetAudioCb(func(Audio))
SetVideoCb(func(Video))
SendControl(port int, data []byte)
}

type Audio struct {
Data []int16
Duration int32 // up to 6y nanosecond-wise
}

type Video struct {
Frame image.RGBA
Duration int32
}
61 changes: 61 additions & 0 deletions pkg/worker/caged/caged.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package caged

import (
"errors"
"reflect"

"github.com/giongto35/cloud-game/v3/pkg/config"
"github.com/giongto35/cloud-game/v3/pkg/logger"
"github.com/giongto35/cloud-game/v3/pkg/worker/caged/app"
"github.com/giongto35/cloud-game/v3/pkg/worker/caged/libretro"
)

type Manager struct {
list map[ModName]app.App
log *logger.Logger
}

type ModName string

const Libretro ModName = "libretro"

func NewManager(log *logger.Logger) *Manager {
return &Manager{log: log, list: make(map[ModName]app.App)}
}

func (m *Manager) Get(name ModName) app.App { return m.list[name] }

func (m *Manager) Load(name ModName, conf any) error {
if name == Libretro {
caged, err := m.loadLibretro(conf)
if err != nil {
return err
}
m.list[name] = caged
}
return nil
}

func (m *Manager) loadLibretro(conf any) (*libretro.Caged, error) {
s := reflect.ValueOf(conf)

e := s.FieldByName("Emulator")
if !e.IsValid() {
return nil, errors.New("no emulator conf")
}
r := s.FieldByName("Recording")
if !r.IsValid() {
return nil, errors.New("no recording conf")
}

c := libretro.CagedConf{
Emulator: e.Interface().(config.Emulator),
Recording: r.Interface().(config.Recording),
}

caged := libretro.Cage(c, m.log)
if err := caged.Init(); err != nil {
return nil, err
}
return &caged, nil
}
Loading

0 comments on commit 1969302

Please sign in to comment.