Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat-cleanup #3

Open
wants to merge 36 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
c43e2ea
cleanup and testing init
0xDebabrata Nov 16, 2023
21bcdec
test
antinish Nov 16, 2023
938168d
debug
0xDebabrata Nov 16, 2023
bf6f132
debug
0xDebabrata Nov 16, 2023
85abe8c
test
0xDebabrata Nov 16, 2023
95eb2b8
test
0xDebabrata Nov 16, 2023
45ec879
test
0xDebabrata Nov 16, 2023
c4c9f74
test
0xDebabrata Nov 16, 2023
59f013a
test
0xDebabrata Nov 16, 2023
637facf
test
0xDebabrata Nov 16, 2023
1cf4019
test
0xDebabrata Nov 16, 2023
fa418c0
test
0xDebabrata Nov 16, 2023
fb4d95a
client test
0xDebabrata Nov 16, 2023
082c809
update import
0xDebabrata Nov 16, 2023
f2c2fa6
debug
0xDebabrata Nov 16, 2023
007bc92
update imports
0xDebabrata Nov 16, 2023
9b311bc
test
0xDebabrata Nov 16, 2023
5f9cc9c
update url
0xDebabrata Nov 16, 2023
a29463b
debug
0xDebabrata Nov 16, 2023
e781c53
test
0xDebabrata Nov 16, 2023
30dcbf9
threads
0xDebabrata Nov 16, 2023
e395d6f
remove os interrupts
0xDebabrata Nov 16, 2023
f41e143
debug
0xDebabrata Nov 16, 2023
c00d274
logs
0xDebabrata Nov 16, 2023
874a484
check disconnection
0xDebabrata Nov 16, 2023
bc0a692
add log
0xDebabrata Nov 16, 2023
ef9d075
debug
0xDebabrata Nov 16, 2023
1c5ac41
change sleep times
0xDebabrata Nov 16, 2023
f52a2f3
remove watchparty from map
0xDebabrata Nov 16, 2023
f96e40e
add party removal test
0xDebabrata Nov 16, 2023
d7ae2d3
test
0xDebabrata Nov 16, 2023
c987744
debug
0xDebabrata Nov 16, 2023
fa13ca9
rename
0xDebabrata Nov 16, 2023
2adbec2
log
0xDebabrata Nov 16, 2023
125aea5
debug
0xDebabrata Nov 16, 2023
303e729
rearrange logs
0xDebabrata Nov 16, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions server/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ func initClient(id string, conn *websocket.Conn, party *Party) *Client {

func (c *Client) readPump() {
defer func() {
log.Println("Read pump closed.")
c.party.leave <- c
c.conn.Close()
}()
Expand Down Expand Up @@ -87,6 +88,7 @@ func (c *Client) readPump() {
func (c *Client) writePump() {
ticker := time.NewTicker(pingPeriod)
defer func() {
log.Println("Write pump closed.")
ticker.Stop()
c.conn.Close()
}()
Expand Down
75 changes: 75 additions & 0 deletions server/client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package main

import (
"log"
"testing"
"time"
"net/http"
"github.com/gorilla/websocket"
)

func TestClient(t *testing.T) {
go main()
time.Sleep(1 * time.Second)
initWatchparty()
time.Sleep(35*time.Second)
}

func initWatchparty() {
url := "http://localhost:6969/create?ownerId=user-123&partyId=party-123&src=https://example.com"

// create watchparty
resp, err := http.Get(url)
log.Println("HTTP Response", resp.Body)
if err != nil {
log.Println("error making request", err)
}

// join as client
go wsClient()
}

func wsClient() {
wsUrl := "ws://localhost:6969/ws?userId=user-234&partyId=party-123"

log.Println("Connecting to", wsUrl)
c, _, err := websocket.DefaultDialer.Dial(wsUrl, nil)
if err != nil {
log.Fatal("dial:", err)
}
defer c.Close()

go func() {
for {
_, message, err := c.ReadMessage()
if err != nil {
log.Println("read:", err)
return
}
log.Printf("recv: %s", message)
}
}()

// Close WS connection after 4s
time.Sleep(2 * time.Second)
log.Println("Client closing connection.")
err = c.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
if err != nil {
log.Println("write close:", err)
return
}

// ticker := time.NewTicker(time.Second)
// defer ticker.Stop()

// for {
// select {
// case t := <-ticker.C:
// err := c.WriteMessage(websocket.TextMessage, []byte(t.String()))
// if err != nil {
// log.Println("write:", err)
// return
// }
// }
// }
}
169 changes: 91 additions & 78 deletions server/party.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,98 +8,111 @@ import (
)

type PartyInfo struct {
Id string `json:"id"`
OwnerId string `json:"ownerId"`
Src string `json:"src"`
Playhead float32 `json:"playhead"`
IsPlaying bool `json:"isPlaying"`
Id string `json:"id"`
OwnerId string `json:"ownerId"`
Src string `json:"src"`
Playhead float32 `json:"playhead"`
IsPlaying bool `json:"isPlaying"`
}

type Party struct {
party PartyInfo
//clients map[*Client]bool
clients sync.Map
broadcast chan []byte
join chan *Client
leave chan *Client
party PartyInfo
//clients map[*Client]bool
clients sync.Map
broadcast chan []byte
join chan *Client
leave chan *Client
}

func createParty(id string, ownerId string, src string) *Party {
return &Party{
party: PartyInfo{
Id: id,
OwnerId: ownerId,
Src: src,
Playhead: 0,
IsPlaying: false,
},
//clients: make(map[*Client]bool),
clients: sync.Map{},
broadcast: make(chan []byte),
join: make(chan *Client),
leave: make(chan *Client),
}
return &Party{
party: PartyInfo{
Id: id,
OwnerId: ownerId,
Src: src,
Playhead: 0,
IsPlaying: false,
},
//clients: make(map[*Client]bool),
clients: sync.Map{},
broadcast: make(chan []byte),
join: make(chan *Client),
leave: make(chan *Client),
}
}

func (p *Party) run() {
log.Println("watchparty " + p.party.Id + " live.")
go p.broadcastStatus()
log.Println("watchparty " + p.party.Id + " live.")

for {
select {
case client := <-p.join:
log.Printf("%s joined.\n", client.nickname)
//p.clients[client] = true
p.clients.Store(client, true)
handleJoin(p.party, client.send) // Send party details to newly joined client
broadcastJoinOrLeave("new", &p.clients, client) // Send join notification to peers
done := make(chan bool)
go p.broadcastStatus(done)

case client := <-p.leave:
//_, ok := p.clients[client]
_, ok := p.clients.Load(client)
if ok {
//delete(p.clients, client)
p.clients.Delete(client)
close(client.send)
broadcastJoinOrLeave("leave", &p.clients, client) // Send join notification to peers
}
// watchparty will be terminated after this ticker ticks
ticker := time.NewTicker(24 * time.Hour)
defer func() {
log.Printf("Party %s terminated", p.party.Id)
watchparties.Delete(p.party.Id) // Remove watchparty from map
done <- true // close broadcastStatus goroutine
ticker.Stop() // close watchparty
}()

case message := <-p.broadcast:
msg := wsMessage{}
json.Unmarshal([]byte(message), &msg)
p.clients.Range(func(key, value any) bool {
// Do not broadcast message back to the sender
if msg.ClientId != key.(*Client).id {
select {
case key.(*Client).send <- message:
default:
close(key.(*Client).send)
//delete(p.clients, client)
p.clients.Delete(key)
}
}
return true
})
}
}
for {
select {
case client := <-p.join:
log.Printf("%s joined.\n", client.nickname)
//p.clients[client] = true
p.clients.Store(client, true)
handleJoin(p.party, client.send) // Send party details to newly joined client
broadcastJoinOrLeave("new", &p.clients, client) // Send join notification to peers

case client := <-p.leave:
//_, ok := p.clients[client]
_, ok := p.clients.Load(client)
if ok {
//delete(p.clients, client)
p.clients.Delete(client)
close(client.send)
broadcastJoinOrLeave("leave", &p.clients, client) // Send join notification to peers
}

case message := <-p.broadcast:
msg := wsMessage{}
json.Unmarshal([]byte(message), &msg)
p.clients.Range(func(key, value any) bool {
// Do not broadcast message back to the sender
if msg.ClientId != key.(*Client).id {
select {
case key.(*Client).send <- message:
default:
close(key.(*Client).send)
//delete(p.clients, client)
p.clients.Delete(key)
}
}
return true
})

case <-ticker.C:
// close the party after 24 hours
return
}

}
}

func (p *Party) broadcastStatus() {
ticker := time.NewTicker(2000 * time.Millisecond)
done := make(chan bool)
func (p *Party) broadcastStatus(done chan bool) {
ticker := time.NewTicker(2000 * time.Millisecond)

defer func() {
ticker.Stop()
done <- true
log.Printf("%s party ticker stopped\n", p.party.Id)
}()
defer func() {
ticker.Stop()
}()

for {
select {
case <- done:
return
case <-ticker.C:
broadcastWatchpartyStatus(p)
}
}
for {
select {
case <-done:
return
case <-ticker.C:
broadcastWatchpartyStatus(p)
}
}
}
28 changes: 28 additions & 0 deletions server/party_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package main

import (
"testing"
"time"
"net/http"
"log"
)

func TestRun(t *testing.T) {
party := createParty("party-123", "user-123", "https://youtube.com")
go party.run()
time.Sleep(8 * time.Second)
}

func TestWatchpartyRemoval(t *testing.T) {
go main()
url := "http://localhost:6969/create?ownerId=user-123&partyId=party-123&src=https://example.com"

// create watchparty
resp, err := http.Get(url)
log.Println("HTTP Response", resp.Body)
if err != nil {
log.Println("error making request", err)
}

time.Sleep(15 * time.Second)
}