Skip to content

Commit

Permalink
Detect inactive peers and remove them from lobbies (#187)
Browse files Browse the repository at this point in the history
Inactive peers are peers that haven't had a connection to the signalling
server for a while. We see some of these peers that will never reconnect
still lingering in lobbies that never get cleaned. See:
#181

Before this the TimeoutManager only kept track of which peers were
disconnected and removed the ones that didn't reconnect within a certain
time. After this change it also tracks peers that somehow aren't getting
properly disconnected (for example due to a pod restart) and removes
them.

After this pull gets merged we need to run this query once to fill the
peers table with all peers in current lobbies:
```sql
INSERT INTO peers (peer, last_seen, updated_at)
SELECT UNNEST(peers) AS peer, NOW() AS last_seen, NOW() AS updated_at
FROM lobbies
ON CONFLICT (peer)
DO UPDATE SET
    last_seen = NOW(),
    updated_at = NOW()
```

After this pull we also need to keep track of the performance of
`MarkPeerAsReconnected` and `ClaimNextTimedOutPeer`. These functions do
a `FROM lobbies WHERE $1 = ANY(peers)` without index. They don't do this
very often and with a limited amount of lobbies this shouldn't be an
issue. Maintaining an index on `peers` probably costs more CPU at the
moment, but this will need to be monitored.
  • Loading branch information
erikdubbelboer authored Feb 5, 2025
1 parent e0cc548 commit 985ef5c
Show file tree
Hide file tree
Showing 9 changed files with 268 additions and 131 deletions.
5 changes: 5 additions & 0 deletions internal/cloudflare/credentials.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"errors"
"fmt"
"net/http"
"os"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -37,6 +38,10 @@ func NewCredentialsClient(appID, key string, lifetime time.Duration) *Credential
}

func (c *CredentialsClient) Run(ctx context.Context) {
if os.Getenv("ENV") != "production" && c.appID == "" {
return
}

logger := logging.GetLogger(ctx)

for ctx.Err() == nil {
Expand Down
6 changes: 6 additions & 0 deletions internal/signaling/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,12 @@ func Handler(ctx context.Context, store stores.Store, cloudflare *cloudflare.Cre
} else {
logger.Error("failed to send ping packet", zap.String("peer", peer.ID), zap.Error(err))
}
} else {
// If we can send a ping packet, and the peer has an ID, we update the peer as being active.
// If the peer doesn't have an ID yet, it's still in the process of connecting, so we don't update it.
if peer.ID != "" {
manager.MarkPeerAsActive(ctx, peer.ID)
}
}
case <-ctx.Done():
return
Expand Down
6 changes: 4 additions & 2 deletions internal/signaling/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,10 @@ func (p *Peer) HandleHelloPacket(ctx context.Context, packet HelloPacket) error
p.ID = util.GeneratePeerID(ctx)
p.Secret = util.GenerateSecret(ctx)
logger.Info("peer connecting", zap.String("game", p.Game), zap.String("peer", p.ID))

if err := p.store.CreatePeer(ctx, p.ID, p.Secret, p.Game); err != nil {
return fmt.Errorf("unable to create peer: %w", err)
}
}

err := p.Send(ctx, WelcomePacket{
Expand Down Expand Up @@ -309,11 +313,9 @@ func (p *Peer) HandleClosePacket(ctx context.Context, packet ClosePacket) error
}

func (p *Peer) HandleListPacket(ctx context.Context, packet ListPacket) error {
logger := logging.GetLogger(ctx)
if p.ID == "" {
return fmt.Errorf("peer not connected")
}
logger.Debug("listing lobbies", zap.String("game", p.Game), zap.String("peer", p.ID))
lobbies, err := p.store.ListLobbies(ctx, p.Game, packet.Filter)
if err != nil {
return err
Expand Down
201 changes: 131 additions & 70 deletions internal/signaling/stores/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,120 +369,182 @@ func (s *PostgresStore) ListLobbies(ctx context.Context, game, filter string) ([
return lobbies, nil
}

func (s *PostgresStore) TimeoutPeer(ctx context.Context, peerID, secret, gameID string, lobbies []string) error {
func (s *PostgresStore) CreatePeer(ctx context.Context, peerID, secret, gameID string) error {
if len(peerID) > 20 {
logger := logging.GetLogger(ctx)
logger.Warn("peer id too long", zap.String("peerID", peerID))
return ErrInvalidPeerID
}
for _, lobby := range lobbies {
if len(lobby) > 20 {
logger := logging.GetLogger(ctx)
logger.Warn("lobby code too long", zap.String("lobbyCode", lobby))
return ErrInvalidLobbyCode
}
}

now := util.NowUTC(ctx)
_, err := s.DB.Exec(ctx, `
INSERT INTO timeouts (peer, secret, game, lobbies, created_at, updated_at)
VALUES ($1, $2, $3, $4, $5, $5)
ON CONFLICT (peer) DO UPDATE
SET
secret = $2,
game = $3,
lobbies = $4,
last_seen = $5,
updated_at = $5
`, peerID, secret, gameID, lobbies, now)
INSERT INTO peers (peer, secret, game, last_seen, updated_at)
VALUES ($1, $2, $3, $4, $4)
`, peerID, secret, gameID, now)
if err != nil {
return err
}
return nil
}

func (s *PostgresStore) ReconnectPeer(ctx context.Context, peerID, secret, gameID string) (bool, []string, error) {
func (s *PostgresStore) MarkPeerAsActive(ctx context.Context, peerID string) error {
now := util.NowUTC(ctx)

_, err := s.DB.Exec(ctx, `
UPDATE peers
SET
disconnected = FALSE,
last_seen = $1,
updated_at = $1
WHERE peer = $2
`, now, peerID)
return err
}

func (s *PostgresStore) MarkPeerAsDisconnected(ctx context.Context, peerID string) error {
now := util.NowUTC(ctx)

_, err := s.DB.Exec(ctx, `
UPDATE peers
SET
disconnected = TRUE,
updated_at = $1
WHERE peer = $2
`, now, peerID)
return err
}

func (s *PostgresStore) MarkPeerAsReconnected(ctx context.Context, peerID, secret, gameID string) (bool, []string, error) {
now := util.NowUTC(ctx)

_, err := s.DB.Exec(ctx, `
UPDATE peers
SET
disconnected = FALSE,
last_seen = $1,
updated_at = $1
WHERE peer = $2
AND secret = $3
AND game = $4
`, now, peerID, secret, gameID)
if err != nil {
return false, nil, err
}

var lobbies []string
err := s.DB.QueryRow(ctx, `
DELETE FROM timeouts
WHERE peer = $1
AND secret = $2
AND game = $3
RETURNING lobbies
`, peerID, secret, gameID).Scan(&lobbies)
rows, err := s.DB.Query(ctx, `
SELECT
code
FROM lobbies
WHERE $1 = ANY(peers)
`, peerID)
if err != nil {
if errors.Is(err, pgx.ErrNoRows) {
return false, nil, nil
}
return false, nil, err
}
if len(lobbies) == 0 {
lobbies = nil

for rows.Next() {
var lobby string

if err := rows.Scan(&lobby); err != nil {
return false, nil, err
}

lobbies = append(lobbies, lobby)
}

if err = rows.Err(); err != nil {
return false, nil, err
}

return true, lobbies, nil
}

func (s *PostgresStore) ClaimNextTimedOutPeer(ctx context.Context, threshold time.Duration, callback func(peerID, gameID string, lobbies []string) error) (more bool, err error) {
func (s *PostgresStore) ClaimNextTimedOutPeer(ctx context.Context, threshold time.Duration) (string, bool, map[string][]string, error) {
now := util.NowUTC(ctx)

tx, err := s.DB.Begin(ctx)
if err != nil {
return false, err
return "", false, nil, err
}
defer tx.Rollback(context.Background()) //nolint:errcheck

// DELETE FROM timeouts will lock the row for this peer in this transaction.
// DELETE FROM peers will lock the row for this peer in this transaction.
// This means we can safely remove the peer from lobbies without getting a
// race condition with DoLeaderElection.
// It is important that both ClaimNextTimedOutPeer and DoLeaderElection always
// lock timeouts first to avoid deadlocks.
// lock peers first to avoid deadlocks.

var peerID string
var gameID string
var lobbies []string
var disconnected bool
err = tx.QueryRow(ctx, `
WITH d AS (
SELECT peer, game, lobbies
FROM timeouts
SELECT peer, disconnected
FROM peers
WHERE last_seen < $1
LIMIT 1
)
DELETE FROM timeouts
DELETE FROM peers
USING d
WHERE timeouts.peer = d.peer
AND timeouts.game = d.game
RETURNING d.peer, d.game, d.lobbies
`, now.Add(-threshold)).Scan(&peerID, &gameID, &lobbies)
WHERE peers.peer = d.peer
RETURNING d.peer, d.disconnected
`, now.Add(-threshold)).Scan(&peerID, &disconnected)
if err != nil {
if errors.Is(err, pgx.ErrNoRows) {
if err := tx.Commit(ctx); err != nil && !errors.Is(err, pgx.ErrNoRows) {
return false, err
}
return false, nil
return "", false, nil, nil
}
return false, err
}

for _, lobby := range lobbies {
_, err := tx.Exec(ctx, `
UPDATE lobbies
SET
peers = array_remove(peers, $1),
updated_at = $2
WHERE code = $3
AND game = $4
`, peerID, now, lobby, gameID)
return "", false, nil, err
}

gameLobbies := make(map[string][]string)

rows, err := tx.Query(ctx, `
UPDATE lobbies
SET
peers = array_remove(peers, $1),
updated_at = $2
WHERE $1 = ANY(peers)
RETURNING game, code
`, peerID, now)
if err != nil {
return "", false, nil, err
}

for rows.Next() {
var game string
var lobby string

err = rows.Scan(&game, &lobby)
if err != nil {
return false, err
return "", false, nil, err
}

gameLobbies[game] = append(gameLobbies[game], lobby)
}

err = callback(peerID, gameID, lobbies)
if err != nil {
return false, err
if err = rows.Err(); err != nil {
return "", false, nil, err
}

if err = tx.Commit(ctx); err != nil {
return "", false, nil, err
}

return true, tx.Commit(ctx)
return peerID, disconnected, gameLobbies, nil
}

// ResetAllPeerLastSeen will reset all last_seen.
// This is being called when the process restarts so it doesn't matter
// how long the process was down.
func (s *PostgresStore) ResetAllPeerLastSeen(ctx context.Context) error {
now := util.NowUTC(ctx)

_, err := s.DB.Exec(ctx, `
UPDATE peers
SET
last_seen = $1,
updated_at = $1
`, now)
return err
}

func (s *PostgresStore) CleanEmptyLobbies(ctx context.Context, olderThan time.Time) error {
Expand All @@ -506,7 +568,7 @@ func (s *PostgresStore) DoLeaderElection(ctx context.Context, gameID, lobbyCode
// We need to lock the whole table as SELECT FOR UPDATE does not lock rows that do not exist yet
// And we can't have timed out peers being added during the election.
_, err = tx.Exec(ctx, `
LOCK TABLE timeouts IN EXCLUSIVE MODE
LOCK TABLE peers IN EXCLUSIVE MODE
`)
if err != nil {
return nil, err
Expand All @@ -515,10 +577,9 @@ func (s *PostgresStore) DoLeaderElection(ctx context.Context, gameID, lobbyCode
var timedOutPeers []string
rows, err := tx.Query(ctx, `
SELECT peer
FROM timeouts
WHERE game = $1
AND $2 = ANY(lobbies)
`, gameID, lobbyCode)
FROM peers
WHERE disconnected = TRUE
`)
if err != nil {
if !errors.Is(err, pgx.ErrNoRows) {
return nil, err
Expand Down
9 changes: 6 additions & 3 deletions internal/signaling/stores/shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,12 @@ type Store interface {
Subscribe(ctx context.Context, callback SubscriptionCallback, game, lobby, peerID string)
Publish(ctx context.Context, topic string, data []byte) error

TimeoutPeer(ctx context.Context, peerID, secret, gameID string, lobbies []string) error
ReconnectPeer(ctx context.Context, peerID, secret, gameID string) (bool, []string, error)
ClaimNextTimedOutPeer(ctx context.Context, threshold time.Duration, callback func(peerID, gameID string, lobbies []string) error) (bool, error)
CreatePeer(ctx context.Context, peerID, secret, gameID string) error
MarkPeerAsActive(ctx context.Context, peerID string) error
MarkPeerAsDisconnected(ctx context.Context, peerID string) error
MarkPeerAsReconnected(ctx context.Context, peerID, secret, gameID string) (bool, []string, error)
ClaimNextTimedOutPeer(ctx context.Context, threshold time.Duration) (string, bool, map[string][]string, error)
ResetAllPeerLastSeen(ctx context.Context) error

CleanEmptyLobbies(ctx context.Context, olderThan time.Time) error

Expand Down
Loading

0 comments on commit 985ef5c

Please sign in to comment.