diff --git a/server/client.go b/server/client.go index bbe5c5e..9ce68de 100755 --- a/server/client.go +++ b/server/client.go @@ -41,6 +41,7 @@ func initClient(id string, conn *websocket.Conn, party *Party) *Client { func (c *Client) readPump() { defer func() { + log.Println("Read pump closed.") c.party.leave <- c c.conn.Close() }() @@ -87,6 +88,7 @@ func (c *Client) readPump() { func (c *Client) writePump() { ticker := time.NewTicker(pingPeriod) defer func() { + log.Println("Write pump closed.") ticker.Stop() c.conn.Close() }() diff --git a/server/client_test.go b/server/client_test.go new file mode 100644 index 0000000..6539208 --- /dev/null +++ b/server/client_test.go @@ -0,0 +1,75 @@ +package main + +import ( + "log" + "testing" + "time" + "net/http" + "github.com/gorilla/websocket" +) + +func TestClient(t *testing.T) { + go main() + time.Sleep(1 * time.Second) + initWatchparty() + time.Sleep(35*time.Second) +} + +func initWatchparty() { + url := "http://localhost:6969/create?ownerId=user-123&partyId=party-123&src=https://example.com" + + // create watchparty + resp, err := http.Get(url) + log.Println("HTTP Response", resp.Body) + if err != nil { + log.Println("error making request", err) + } + + // join as client + go wsClient() +} + +func wsClient() { + wsUrl := "ws://localhost:6969/ws?userId=user-234&partyId=party-123" + + log.Println("Connecting to", wsUrl) + c, _, err := websocket.DefaultDialer.Dial(wsUrl, nil) + if err != nil { + log.Fatal("dial:", err) + } + defer c.Close() + + go func() { + for { + _, message, err := c.ReadMessage() + if err != nil { + log.Println("read:", err) + return + } + log.Printf("recv: %s", message) + } + }() + + // Close WS connection after 4s + time.Sleep(2 * time.Second) + log.Println("Client closing connection.") + err = c.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")) + if err != nil { + log.Println("write close:", err) + return + } + + // ticker := time.NewTicker(time.Second) + // defer ticker.Stop() + + // for { + // select { + // case t := <-ticker.C: + // err := c.WriteMessage(websocket.TextMessage, []byte(t.String())) + // if err != nil { + // log.Println("write:", err) + // return + // } + // } + // } +} \ No newline at end of file diff --git a/server/party.go b/server/party.go index ba7a4b2..696bbe8 100755 --- a/server/party.go +++ b/server/party.go @@ -8,98 +8,111 @@ import ( ) type PartyInfo struct { - Id string `json:"id"` - OwnerId string `json:"ownerId"` - Src string `json:"src"` - Playhead float32 `json:"playhead"` - IsPlaying bool `json:"isPlaying"` + Id string `json:"id"` + OwnerId string `json:"ownerId"` + Src string `json:"src"` + Playhead float32 `json:"playhead"` + IsPlaying bool `json:"isPlaying"` } type Party struct { - party PartyInfo - //clients map[*Client]bool - clients sync.Map - broadcast chan []byte - join chan *Client - leave chan *Client + party PartyInfo + //clients map[*Client]bool + clients sync.Map + broadcast chan []byte + join chan *Client + leave chan *Client } func createParty(id string, ownerId string, src string) *Party { - return &Party{ - party: PartyInfo{ - Id: id, - OwnerId: ownerId, - Src: src, - Playhead: 0, - IsPlaying: false, - }, - //clients: make(map[*Client]bool), - clients: sync.Map{}, - broadcast: make(chan []byte), - join: make(chan *Client), - leave: make(chan *Client), - } + return &Party{ + party: PartyInfo{ + Id: id, + OwnerId: ownerId, + Src: src, + Playhead: 0, + IsPlaying: false, + }, + //clients: make(map[*Client]bool), + clients: sync.Map{}, + broadcast: make(chan []byte), + join: make(chan *Client), + leave: make(chan *Client), + } } func (p *Party) run() { - log.Println("watchparty " + p.party.Id + " live.") - go p.broadcastStatus() + log.Println("watchparty " + p.party.Id + " live.") - for { - select { - case client := <-p.join: - log.Printf("%s joined.\n", client.nickname) - //p.clients[client] = true - p.clients.Store(client, true) - handleJoin(p.party, client.send) // Send party details to newly joined client - broadcastJoinOrLeave("new", &p.clients, client) // Send join notification to peers + done := make(chan bool) + go p.broadcastStatus(done) - case client := <-p.leave: - //_, ok := p.clients[client] - _, ok := p.clients.Load(client) - if ok { - //delete(p.clients, client) - p.clients.Delete(client) - close(client.send) - broadcastJoinOrLeave("leave", &p.clients, client) // Send join notification to peers - } + // watchparty will be terminated after this ticker ticks + ticker := time.NewTicker(24 * time.Hour) + defer func() { + log.Printf("Party %s terminated", p.party.Id) + watchparties.Delete(p.party.Id) // Remove watchparty from map + done <- true // close broadcastStatus goroutine + ticker.Stop() // close watchparty + }() - case message := <-p.broadcast: - msg := wsMessage{} - json.Unmarshal([]byte(message), &msg) - p.clients.Range(func(key, value any) bool { - // Do not broadcast message back to the sender - if msg.ClientId != key.(*Client).id { - select { - case key.(*Client).send <- message: - default: - close(key.(*Client).send) - //delete(p.clients, client) - p.clients.Delete(key) - } - } - return true - }) - } - } + for { + select { + case client := <-p.join: + log.Printf("%s joined.\n", client.nickname) + //p.clients[client] = true + p.clients.Store(client, true) + handleJoin(p.party, client.send) // Send party details to newly joined client + broadcastJoinOrLeave("new", &p.clients, client) // Send join notification to peers + + case client := <-p.leave: + //_, ok := p.clients[client] + _, ok := p.clients.Load(client) + if ok { + //delete(p.clients, client) + p.clients.Delete(client) + close(client.send) + broadcastJoinOrLeave("leave", &p.clients, client) // Send join notification to peers + } + + case message := <-p.broadcast: + msg := wsMessage{} + json.Unmarshal([]byte(message), &msg) + p.clients.Range(func(key, value any) bool { + // Do not broadcast message back to the sender + if msg.ClientId != key.(*Client).id { + select { + case key.(*Client).send <- message: + default: + close(key.(*Client).send) + //delete(p.clients, client) + p.clients.Delete(key) + } + } + return true + }) + + case <-ticker.C: + // close the party after 24 hours + return + } + + } } -func (p *Party) broadcastStatus() { - ticker := time.NewTicker(2000 * time.Millisecond) - done := make(chan bool) +func (p *Party) broadcastStatus(done chan bool) { + ticker := time.NewTicker(2000 * time.Millisecond) - defer func() { - ticker.Stop() - done <- true - log.Printf("%s party ticker stopped\n", p.party.Id) - }() + defer func() { + ticker.Stop() + }() - for { - select { - case <- done: - return - case <-ticker.C: - broadcastWatchpartyStatus(p) - } - } + for { + select { + case <-done: + return + case <-ticker.C: + broadcastWatchpartyStatus(p) + } + } } diff --git a/server/party_test.go b/server/party_test.go new file mode 100644 index 0000000..3935dc9 --- /dev/null +++ b/server/party_test.go @@ -0,0 +1,28 @@ +package main + +import ( + "testing" + "time" + "net/http" + "log" +) + +func TestRun(t *testing.T) { + party := createParty("party-123", "user-123", "https://youtube.com") + go party.run() + time.Sleep(8 * time.Second) +} + +func TestWatchpartyRemoval(t *testing.T) { + go main() + url := "http://localhost:6969/create?ownerId=user-123&partyId=party-123&src=https://example.com" + + // create watchparty + resp, err := http.Get(url) + log.Println("HTTP Response", resp.Body) + if err != nil { + log.Println("error making request", err) + } + + time.Sleep(15 * time.Second) +} \ No newline at end of file