Skip to content

Commit

Permalink
Some general cleanups, refactoring websocket connection hub
Browse files Browse the repository at this point in the history
* Removed some unused variables
* Working websocket connection hub in a separate package
* Broadcast hub info on client changes
  • Loading branch information
tcriess committed Jan 16, 2021
1 parent 2bbe5d4 commit b240b95
Show file tree
Hide file tree
Showing 4 changed files with 282 additions and 114 deletions.
151 changes: 37 additions & 114 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,59 +4,42 @@ package main

import (
"encoding/json"
"flag"
"fmt"
"log"
"net"
"net/http"
"strconv"
"strings"
"sync"

"flag"

"github.com/GRVYDEV/lightspeed-webrtc/ws"
"github.com/gorilla/websocket"

"github.com/pion/interceptor"
"github.com/pion/rtp"
"github.com/pion/webrtc/v3"
"github.com/pion/webrtc/v3/pkg/media/samplebuilder"
)

var (
videoBuilder *samplebuilder.SampleBuilder
addr = flag.String("addr", "localhost", "http service address")
ip = flag.String("ip", "none", "IP address for webrtc")
wsPort = flag.Int("ws-port", 8080, "Port for websocket")
rtpPort = flag.Int("rtp-port", 65535, "Port for RTP")
ports = flag.String("ports", "20000-20500", "Port range for webrtc")
upgrader = websocket.Upgrader{
addr = flag.String("addr", "localhost", "http service address")
ip = flag.String("ip", "none", "IP address for webrtc")
wsPort = flag.Int("ws-port", 8080, "Port for websocket")
rtpPort = flag.Int("rtp-port", 65535, "Port for RTP")
ports = flag.String("ports", "20000-20500", "Port range for webrtc")
upgrader = websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool { return true },
}

videoTrack *webrtc.TrackLocalStaticRTP

audioTrack *webrtc.TrackLocalStaticRTP

// lock for peerConnections and trackLocals
listLock sync.RWMutex
peerConnections []peerConnectionState
trackLocals map[string]*webrtc.TrackLocalStaticRTP
hub *ws.Hub
)

type websocketMessage struct {
Event string `json:"event"`
Data string `json:"data"`
}

type peerConnectionState struct {
peerConnection *webrtc.PeerConnection
websocket *threadSafeWriter
}

func main() {
flag.Parse()
log.SetFlags(0)
trackLocals = map[string]*webrtc.TrackLocalStaticRTP{}

// Open a UDP Listener for RTP Packets on port 65535
listener, err := net.ListenUDP("udp", &net.UDPAddr{IP: net.ParseIP(*addr), Port: *rtpPort})
Expand All @@ -83,6 +66,9 @@ func main() {
panic(err)
}

hub = ws.NewHub()
go hub.Run()

// start HTTP server
go func() {
http.HandleFunc("/websocket", websocketHandler)
Expand Down Expand Up @@ -162,34 +148,18 @@ func createWebrtcApi() *webrtc.API {
return webrtc.NewAPI(webrtc.WithMediaEngine(m), webrtc.WithInterceptorRegistry(i), webrtc.WithSettingEngine(s))
}

func cleanConnection(peerConnection *webrtc.PeerConnection) {
listLock.Lock()
defer listLock.Unlock()

for i := range peerConnections {
if peerConnection == peerConnections[i].peerConnection {
peerConnections[i] = peerConnections[len(peerConnections)-1]
peerConnections[len(peerConnections)-1] = peerConnectionState{}
peerConnections = peerConnections[:len(peerConnections)-1]
return
}
}
}

// Handle incoming websockets
func websocketHandler(w http.ResponseWriter, r *http.Request) {

// Upgrade HTTP request to Websocket
unsafeConn, err := upgrader.Upgrade(w, r, nil)
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Print("upgrade:", err)
return
}

c := &threadSafeWriter{unsafeConn, sync.Mutex{}}

// When this frame returns close the Websocket
defer c.Close() //nolint
defer conn.Close() //nolint

// Create API that takes IP and port range into account
api := createWebrtcApi()
Expand Down Expand Up @@ -231,17 +201,12 @@ func websocketHandler(w http.ResponseWriter, r *http.Request) {
}
}()

// Add our new PeerConnection to global list
listLock.Lock()
peerConnections = append(peerConnections, peerConnectionState{peerConnection, c})
noConnections := len(peerConnections)
for _, conn := range peerConnections {
if msg, err := json.Marshal(noConnections); err == nil {
conn.websocket.WriteJSON(&websocketMessage{Event: "connections", Data: string(msg)})
}
}
fmt.Printf("Connections: %d\n", len(peerConnections))
listLock.Unlock()
c := ws.NewClient(hub, conn, peerConnection)

go c.WriteLoop()

// Add to the hub
hub.Register <- c

// Trickle ICE. Emit server candidate to client
peerConnection.OnICECandidate(func(i *webrtc.ICECandidate) {
Expand All @@ -255,11 +220,13 @@ func websocketHandler(w http.ResponseWriter, r *http.Request) {
return
}

if writeErr := c.WriteJSON(&websocketMessage{
Event: "candidate",
if msg, err := json.Marshal(ws.WebsocketMessage{
Event: ws.MessageTypeCandidate,
Data: string(candidateString),
}); writeErr != nil {
log.Println(writeErr)
}); err == nil {
c.Send <- msg
} else {
log.Println(err)
}
})

Expand All @@ -270,9 +237,10 @@ func websocketHandler(w http.ResponseWriter, r *http.Request) {
if err := peerConnection.Close(); err != nil {
log.Print(err)
}
case webrtc.PeerConnectionStateClosed:
cleanConnection(peerConnection)
hub.Unregister <- c

case webrtc.PeerConnectionStateClosed:
hub.Unregister <- c
}
})

Expand All @@ -290,61 +258,16 @@ func websocketHandler(w http.ResponseWriter, r *http.Request) {
log.Print(err)
}

if err = c.WriteJSON(&websocketMessage{
Event: "offer",
if msg, err := json.Marshal(ws.WebsocketMessage{
Event: ws.MessageTypeOffer,
Data: string(offerString),
}); err != nil {
log.Print(err)
}); err == nil {
c.Send <- msg
} else {
log.Printf("could not marshal ws message: %s", err)
}

message := &websocketMessage{}
for {
_, raw, err := c.ReadMessage()
if err != nil {
log.Println(err)
return
} else if err := json.Unmarshal(raw, &message); err != nil {
log.Println(err)
return
}

switch message.Event {
case "candidate":

candidate := webrtc.ICECandidateInit{}
if err := json.Unmarshal([]byte(message.Data), &candidate); err != nil {
log.Println(err)
return
}

if err := peerConnection.AddICECandidate(candidate); err != nil {
log.Println(err)
return
}
case "answer":

answer := webrtc.SessionDescription{}
if err := json.Unmarshal([]byte(message.Data), &answer); err != nil {
log.Println(err)
return
}

if err := peerConnection.SetRemoteDescription(answer); err != nil {
log.Println(err)
return
}
}
}
}

type threadSafeWriter struct {
*websocket.Conn
sync.Mutex
}

func (t *threadSafeWriter) WriteJSON(v interface{}) error {
t.Lock()
defer t.Unlock()
go hub.SendInfo(hub.GetInfo()) // non-blocking broadcast, required as the read loop is not started yet.

return t.Conn.WriteJSON(v)
c.ReadLoop()
}
144 changes: 144 additions & 0 deletions ws/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
package ws

import (
"encoding/json"
"log"
"time"

"github.com/gorilla/websocket"
"github.com/pion/webrtc/v3"
)

// Client is a middleman between the websocket connection and the hub.
type Client struct {
hub *Hub

// The websocket connection.
conn *websocket.Conn

// Buffered channel of outbound messages.
Send chan []byte

// webRTC peer connection
PeerConnection *webrtc.PeerConnection
}

func NewClient(hub *Hub, conn *websocket.Conn, webrtcConn *webrtc.PeerConnection) *Client {
return &Client{
hub: hub,
conn: conn,
Send: make(chan []byte),
PeerConnection: webrtcConn,
}
}

// ReadLoop pumps messages from the websocket connection to the hub.
//
// The application runs ReadLoop in a per-connection goroutine. The application
// ensures that there is at most one reader on a connection by executing all
// reads from this goroutine.
func (c *Client) ReadLoop() {
defer func() {
c.hub.Unregister <- c
c.conn.Close()
}()
c.conn.SetReadLimit(maxMessageSize)
c.conn.SetReadDeadline(time.Now().Add(pongWait))
c.conn.SetPongHandler(func(string) error { c.conn.SetReadDeadline(time.Now().Add(pongWait)); return nil })
message := &WebsocketMessage{}
for {
// _, message, err := c.conn.ReadMessage()
_, raw, err := c.conn.ReadMessage()
if err != nil {
log.Printf("could not read message: %s", err)
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
log.Println("ws closed unexpected")
}
return
}

err = json.Unmarshal(raw, &message)
if err != nil {
log.Printf("could not unmarshal ws message: %s", err)
return
}

switch message.Event {
case MessageTypeCandidate:
candidate := webrtc.ICECandidateInit{}
if err := json.Unmarshal([]byte(message.Data), &candidate); err != nil {
log.Printf("could not unmarshal candidate msg: %s", err)
return
}

if err := c.PeerConnection.AddICECandidate(candidate); err != nil {
log.Printf("could not add ice candidate: %s", err)
return
}

case MessageTypeAnswer:
answer := webrtc.SessionDescription{}
if err := json.Unmarshal([]byte(message.Data), &answer); err != nil {
log.Printf("could not unmarshal answer msg: %s", err)
return
}

if err := c.PeerConnection.SetRemoteDescription(answer); err != nil {
log.Printf("could not set remote description: %s", err)
return
}
}

// we do not send anything to the other clients!
//message = bytes.TrimSpace(bytes.Replace(message, newline, space, -1))
//c.hub.Broadcast <- message
}
}

// WriteLoop pumps messages from the hub to the websocket connection.
//
// A goroutine running WriteLoop is started for each connection. The
// application ensures that there is at most one writer to a connection by
// executing all writes from this goroutine.
func (c *Client) WriteLoop() {
ticker := time.NewTicker(pingPeriod)
defer func() {
ticker.Stop()
c.conn.Close()
}()
for {
select {
case message, ok := <-c.Send:
_ = c.conn.SetWriteDeadline(time.Now().Add(writeWait))
if !ok {
// The hub closed the channel.
_ = c.conn.WriteMessage(websocket.CloseMessage, []byte{})
return
}

w, err := c.conn.NextWriter(websocket.TextMessage)
if err != nil {
return
}
_, _ = w.Write(message)

// Add queued messages to the current websocket message.
n := len(c.Send)
for i := 0; i < n; i++ {
_, _ = w.Write([]byte{'\n'})
message = <-c.Send
_, _ = w.Write(message)
}

if err := w.Close(); err != nil {
return
}

case <-ticker.C:
c.conn.SetWriteDeadline(time.Now().Add(writeWait))
if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil {
return
}
}
}
}
Loading

0 comments on commit b240b95

Please sign in to comment.