diff --git a/internal/cloudflare/credentials.go b/internal/cloudflare/credentials.go index fad556a..504054a 100644 --- a/internal/cloudflare/credentials.go +++ b/internal/cloudflare/credentials.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "net/http" + "os" "strings" "sync" "time" @@ -37,6 +38,10 @@ func NewCredentialsClient(appID, key string, lifetime time.Duration) *Credential } func (c *CredentialsClient) Run(ctx context.Context) { + if os.Getenv("ENV") != "production" && c.appID == "" { + return + } + logger := logging.GetLogger(ctx) for ctx.Err() == nil { diff --git a/internal/signaling/handler.go b/internal/signaling/handler.go index 8497f99..2ff5291 100644 --- a/internal/signaling/handler.go +++ b/internal/signaling/handler.go @@ -96,6 +96,12 @@ func Handler(ctx context.Context, store stores.Store, cloudflare *cloudflare.Cre } else { logger.Error("failed to send ping packet", zap.String("peer", peer.ID), zap.Error(err)) } + } else { + // If we can send a ping packet, and the peer has an ID, we update the peer as being active. + // If the peer doesn't have an ID yet, it's still in the process of connecting, so we don't update it. + if peer.ID != "" { + manager.MarkPeerAsActive(ctx, peer.ID) + } } case <-ctx.Done(): return diff --git a/internal/signaling/peer.go b/internal/signaling/peer.go index 81d3dd1..f87f878 100644 --- a/internal/signaling/peer.go +++ b/internal/signaling/peer.go @@ -217,6 +217,10 @@ func (p *Peer) HandleHelloPacket(ctx context.Context, packet HelloPacket) error p.ID = util.GeneratePeerID(ctx) p.Secret = util.GenerateSecret(ctx) logger.Info("peer connecting", zap.String("game", p.Game), zap.String("peer", p.ID)) + + if err := p.store.CreatePeer(ctx, p.ID, p.Secret, p.Game); err != nil { + return fmt.Errorf("unable to create peer: %w", err) + } } err := p.Send(ctx, WelcomePacket{ @@ -309,11 +313,9 @@ func (p *Peer) HandleClosePacket(ctx context.Context, packet ClosePacket) error } func (p *Peer) HandleListPacket(ctx context.Context, packet ListPacket) error { - logger := logging.GetLogger(ctx) if p.ID == "" { return fmt.Errorf("peer not connected") } - logger.Debug("listing lobbies", zap.String("game", p.Game), zap.String("peer", p.ID)) lobbies, err := p.store.ListLobbies(ctx, p.Game, packet.Filter) if err != nil { return err diff --git a/internal/signaling/stores/postgres.go b/internal/signaling/stores/postgres.go index 8d4a81a..607ff79 100644 --- a/internal/signaling/stores/postgres.go +++ b/internal/signaling/stores/postgres.go @@ -369,120 +369,182 @@ func (s *PostgresStore) ListLobbies(ctx context.Context, game, filter string) ([ return lobbies, nil } -func (s *PostgresStore) TimeoutPeer(ctx context.Context, peerID, secret, gameID string, lobbies []string) error { +func (s *PostgresStore) CreatePeer(ctx context.Context, peerID, secret, gameID string) error { if len(peerID) > 20 { logger := logging.GetLogger(ctx) logger.Warn("peer id too long", zap.String("peerID", peerID)) return ErrInvalidPeerID } - for _, lobby := range lobbies { - if len(lobby) > 20 { - logger := logging.GetLogger(ctx) - logger.Warn("lobby code too long", zap.String("lobbyCode", lobby)) - return ErrInvalidLobbyCode - } - } now := util.NowUTC(ctx) _, err := s.DB.Exec(ctx, ` - INSERT INTO timeouts (peer, secret, game, lobbies, created_at, updated_at) - VALUES ($1, $2, $3, $4, $5, $5) - ON CONFLICT (peer) DO UPDATE - SET - secret = $2, - game = $3, - lobbies = $4, - last_seen = $5, - updated_at = $5 - `, peerID, secret, gameID, lobbies, now) + INSERT INTO peers (peer, secret, game, last_seen, updated_at) + VALUES ($1, $2, $3, $4, $4) + `, peerID, secret, gameID, now) if err != nil { return err } return nil } -func (s *PostgresStore) ReconnectPeer(ctx context.Context, peerID, secret, gameID string) (bool, []string, error) { +func (s *PostgresStore) MarkPeerAsActive(ctx context.Context, peerID string) error { + now := util.NowUTC(ctx) + + _, err := s.DB.Exec(ctx, ` + UPDATE peers + SET + disconnected = FALSE, + last_seen = $1, + updated_at = $1 + WHERE peer = $2 + `, now, peerID) + return err +} + +func (s *PostgresStore) MarkPeerAsDisconnected(ctx context.Context, peerID string) error { + now := util.NowUTC(ctx) + + _, err := s.DB.Exec(ctx, ` + UPDATE peers + SET + disconnected = TRUE, + updated_at = $1 + WHERE peer = $2 + `, now, peerID) + return err +} + +func (s *PostgresStore) MarkPeerAsReconnected(ctx context.Context, peerID, secret, gameID string) (bool, []string, error) { + now := util.NowUTC(ctx) + + _, err := s.DB.Exec(ctx, ` + UPDATE peers + SET + disconnected = FALSE, + last_seen = $1, + updated_at = $1 + WHERE peer = $2 + AND secret = $3 + AND game = $4 + `, now, peerID, secret, gameID) + if err != nil { + return false, nil, err + } + var lobbies []string - err := s.DB.QueryRow(ctx, ` - DELETE FROM timeouts - WHERE peer = $1 - AND secret = $2 - AND game = $3 - RETURNING lobbies - `, peerID, secret, gameID).Scan(&lobbies) + rows, err := s.DB.Query(ctx, ` + SELECT + code + FROM lobbies + WHERE $1 = ANY(peers) + `, peerID) if err != nil { - if errors.Is(err, pgx.ErrNoRows) { - return false, nil, nil - } return false, nil, err } - if len(lobbies) == 0 { - lobbies = nil + + for rows.Next() { + var lobby string + + if err := rows.Scan(&lobby); err != nil { + return false, nil, err + } + + lobbies = append(lobbies, lobby) + } + + if err = rows.Err(); err != nil { + return false, nil, err } + return true, lobbies, nil } -func (s *PostgresStore) ClaimNextTimedOutPeer(ctx context.Context, threshold time.Duration, callback func(peerID, gameID string, lobbies []string) error) (more bool, err error) { +func (s *PostgresStore) ClaimNextTimedOutPeer(ctx context.Context, threshold time.Duration) (string, bool, map[string][]string, error) { now := util.NowUTC(ctx) tx, err := s.DB.Begin(ctx) if err != nil { - return false, err + return "", false, nil, err } defer tx.Rollback(context.Background()) //nolint:errcheck - // DELETE FROM timeouts will lock the row for this peer in this transaction. + // DELETE FROM peers will lock the row for this peer in this transaction. // This means we can safely remove the peer from lobbies without getting a // race condition with DoLeaderElection. // It is important that both ClaimNextTimedOutPeer and DoLeaderElection always - // lock timeouts first to avoid deadlocks. + // lock peers first to avoid deadlocks. var peerID string - var gameID string - var lobbies []string + var disconnected bool err = tx.QueryRow(ctx, ` WITH d AS ( - SELECT peer, game, lobbies - FROM timeouts + SELECT peer, disconnected + FROM peers WHERE last_seen < $1 LIMIT 1 ) - DELETE FROM timeouts + DELETE FROM peers USING d - WHERE timeouts.peer = d.peer - AND timeouts.game = d.game - RETURNING d.peer, d.game, d.lobbies - `, now.Add(-threshold)).Scan(&peerID, &gameID, &lobbies) + WHERE peers.peer = d.peer + RETURNING d.peer, d.disconnected + `, now.Add(-threshold)).Scan(&peerID, &disconnected) if err != nil { if errors.Is(err, pgx.ErrNoRows) { - if err := tx.Commit(ctx); err != nil && !errors.Is(err, pgx.ErrNoRows) { - return false, err - } - return false, nil + return "", false, nil, nil } - return false, err - } - - for _, lobby := range lobbies { - _, err := tx.Exec(ctx, ` - UPDATE lobbies - SET - peers = array_remove(peers, $1), - updated_at = $2 - WHERE code = $3 - AND game = $4 - `, peerID, now, lobby, gameID) + return "", false, nil, err + } + + gameLobbies := make(map[string][]string) + + rows, err := tx.Query(ctx, ` + UPDATE lobbies + SET + peers = array_remove(peers, $1), + updated_at = $2 + WHERE $1 = ANY(peers) + RETURNING game, code + `, peerID, now) + if err != nil { + return "", false, nil, err + } + + for rows.Next() { + var game string + var lobby string + + err = rows.Scan(&game, &lobby) if err != nil { - return false, err + return "", false, nil, err } + + gameLobbies[game] = append(gameLobbies[game], lobby) } - err = callback(peerID, gameID, lobbies) - if err != nil { - return false, err + if err = rows.Err(); err != nil { + return "", false, nil, err + } + + if err = tx.Commit(ctx); err != nil { + return "", false, nil, err } - return true, tx.Commit(ctx) + return peerID, disconnected, gameLobbies, nil +} + +// ResetAllPeerLastSeen will reset all last_seen. +// This is being called when the process restarts so it doesn't matter +// how long the process was down. +func (s *PostgresStore) ResetAllPeerLastSeen(ctx context.Context) error { + now := util.NowUTC(ctx) + + _, err := s.DB.Exec(ctx, ` + UPDATE peers + SET + last_seen = $1, + updated_at = $1 + `, now) + return err } func (s *PostgresStore) CleanEmptyLobbies(ctx context.Context, olderThan time.Time) error { @@ -506,7 +568,7 @@ func (s *PostgresStore) DoLeaderElection(ctx context.Context, gameID, lobbyCode // We need to lock the whole table as SELECT FOR UPDATE does not lock rows that do not exist yet // And we can't have timed out peers being added during the election. _, err = tx.Exec(ctx, ` - LOCK TABLE timeouts IN EXCLUSIVE MODE + LOCK TABLE peers IN EXCLUSIVE MODE `) if err != nil { return nil, err @@ -515,10 +577,9 @@ func (s *PostgresStore) DoLeaderElection(ctx context.Context, gameID, lobbyCode var timedOutPeers []string rows, err := tx.Query(ctx, ` SELECT peer - FROM timeouts - WHERE game = $1 - AND $2 = ANY(lobbies) - `, gameID, lobbyCode) + FROM peers + WHERE disconnected = TRUE + `) if err != nil { if !errors.Is(err, pgx.ErrNoRows) { return nil, err diff --git a/internal/signaling/stores/shared.go b/internal/signaling/stores/shared.go index 95c15e7..fc946f6 100644 --- a/internal/signaling/stores/shared.go +++ b/internal/signaling/stores/shared.go @@ -35,9 +35,12 @@ type Store interface { Subscribe(ctx context.Context, callback SubscriptionCallback, game, lobby, peerID string) Publish(ctx context.Context, topic string, data []byte) error - TimeoutPeer(ctx context.Context, peerID, secret, gameID string, lobbies []string) error - ReconnectPeer(ctx context.Context, peerID, secret, gameID string) (bool, []string, error) - ClaimNextTimedOutPeer(ctx context.Context, threshold time.Duration, callback func(peerID, gameID string, lobbies []string) error) (bool, error) + CreatePeer(ctx context.Context, peerID, secret, gameID string) error + MarkPeerAsActive(ctx context.Context, peerID string) error + MarkPeerAsDisconnected(ctx context.Context, peerID string) error + MarkPeerAsReconnected(ctx context.Context, peerID, secret, gameID string) (bool, []string, error) + ClaimNextTimedOutPeer(ctx context.Context, threshold time.Duration) (string, bool, map[string][]string, error) + ResetAllPeerLastSeen(ctx context.Context) error CleanEmptyLobbies(ctx context.Context, olderThan time.Time) error diff --git a/internal/signaling/timeout_manager.go b/internal/signaling/timeout_manager.go index e64ee42..cdc9bbe 100644 --- a/internal/signaling/timeout_manager.go +++ b/internal/signaling/timeout_manager.go @@ -16,41 +16,58 @@ type TimeoutManager struct { Store stores.Store } -func (i *TimeoutManager) Run(ctx context.Context) { - if i.DisconnectThreshold == 0 { - i.DisconnectThreshold = time.Minute +func (manager *TimeoutManager) Run(ctx context.Context) { + logger := logging.GetLogger(ctx) + + if err := manager.Store.ResetAllPeerLastSeen(ctx); err != nil { + logger.Error("failed to reset all peer last seen", zap.Error(err)) + } + + if manager.DisconnectThreshold == 0 { + // We update peer activity every 30 seconds. Make it possible to somehow miss + // two updates before we consider a peer timed out. + manager.DisconnectThreshold = time.Second * 90 } for ctx.Err() == nil { time.Sleep(time.Second) - i.RunOnce(ctx) + manager.RunOnce(ctx) } } -func (i *TimeoutManager) RunOnce(ctx context.Context) { +func (manager *TimeoutManager) RunOnce(ctx context.Context) { logger := logging.GetLogger(ctx) for ctx.Err() == nil { - hasNext, err := i.Store.ClaimNextTimedOutPeer(ctx, i.DisconnectThreshold, func(peerID, gameID string, lobbies []string) error { - logger.Info("peer timed out closing peer", zap.String("id", peerID)) - - for _, lobby := range lobbies { - if err := i.disconnectPeerInLobby(ctx, peerID, gameID, lobby, logger); err != nil { - return err - } - } - return nil - }) + peerID, disconnected, gameLobbies, err := manager.Store.ClaimNextTimedOutPeer(ctx, manager.DisconnectThreshold) if err != nil { - logger.Error("failed to claim next timed out peer", zap.Error(err)) + logger.Error("failed to claim next timedout peer", zap.Error(err)) } - if !hasNext { + if peerID == "" { break } + + for gameID, lobbies := range gameLobbies { + for _, lobbyCode := range lobbies { + logger.Info("peer timeout", zap.String("peer", peerID), zap.String("game", gameID), zap.String("lobby", lobbyCode)) + + if err := manager.disconnectPeerInLobby(ctx, peerID, gameID, lobbyCode); err != nil { + logger.Error("failed to disconnect peer", zap.Error(err), zap.String("peer", peerID), zap.String("game", gameID), zap.String("lobby", lobbyCode)) + } + + // If the peer wasn't disconnected normally, they might still be the leader of a lobby. + // Just to be sure, do a leader election. + if !disconnected { + if err := manager.doLeaderElectionAndPublish(ctx, gameID, lobbyCode); err != nil { + logger.Error("failed to do leader election", zap.Error(err), zap.String("peer", peerID), zap.String("game", gameID), zap.String("lobby", lobbyCode)) + } + } + } + } } } -func (i *TimeoutManager) disconnectPeerInLobby(ctx context.Context, peerID string, gameID string, lobby string, logger *zap.Logger) error { +func (manager *TimeoutManager) disconnectPeerInLobby(ctx context.Context, peerID string, gameID string, lobby string) error { ctx, cancel := context.WithTimeout(ctx, 1*time.Minute) defer cancel() @@ -60,18 +77,17 @@ func (i *TimeoutManager) disconnectPeerInLobby(ctx context.Context, peerID strin } data, err := json.Marshal(packet) if err != nil { - logger.Error("failed to marshal disconnect packet", zap.Error(err)) return err } - err = i.Store.Publish(ctx, gameID+lobby, data) + err = manager.Store.Publish(ctx, gameID+lobby, data) if err != nil { - logger.Error("failed to publish disconnect packet", zap.Error(err)) + return err } return nil } -func (i *TimeoutManager) Disconnected(ctx context.Context, p *Peer) { +func (manager *TimeoutManager) Disconnected(ctx context.Context, p *Peer) { logger := logging.GetLogger(ctx) if p.ID == "" { @@ -79,47 +95,57 @@ func (i *TimeoutManager) Disconnected(ctx context.Context, p *Peer) { } logger.Debug("peer marked as disconnected", zap.String("id", p.ID), zap.String("lobby", p.Lobby)) - lobbies := []string{} - if p.Lobby != "" { - lobbies = []string{p.Lobby} - } - err := i.Store.TimeoutPeer(ctx, p.ID, p.Secret, p.Game, lobbies) + err := manager.Store.MarkPeerAsDisconnected(ctx, p.ID) if err != nil { logger.Error("failed to record timeout peer", zap.Error(err)) } else { - for _, lobby := range lobbies { - result, err := i.Store.DoLeaderElection(ctx, p.Game, lobby) - if err != nil { - logger.Error("failed to do leader election", zap.Error(err)) - continue - } + err := manager.doLeaderElectionAndPublish(ctx, p.Game, p.Lobby) + if err != nil { + logger.Error("failed to do leader election", zap.Error(err), zap.String("game", p.Game), zap.String("lobby", p.Lobby)) + } + } +} - if result == nil { - continue - } +func (manager *TimeoutManager) Reconnected(ctx context.Context, peerID, secret, gameID string) (bool, []string, error) { + logger := logging.GetLogger(ctx) - packet := LeaderPacket{ - Type: "leader", - Leader: result.Leader, - Term: result.Term, - } - data, err := json.Marshal(packet) - if err != nil { - logger.Error("failed to marshal leader packet", zap.Error(err)) - continue - } + logger.Debug("peer marked as reconnected", zap.String("peer", peerID)) + return manager.Store.MarkPeerAsReconnected(ctx, peerID, secret, gameID) +} - err = i.Store.Publish(ctx, p.Game+lobby, data) - if err != nil { - logger.Error("failed to publish leader packet", zap.Error(err)) - } - } +func (manager *TimeoutManager) MarkPeerAsActive(ctx context.Context, peerID string) { + logger := logging.GetLogger(ctx) + + err := manager.Store.MarkPeerAsActive(ctx, peerID) + if err != nil { + logger.Error("failed to mark peer as active", zap.Error(err), zap.String("peer", peerID)) } } -func (i *TimeoutManager) Reconnected(ctx context.Context, id, secret, game string) (bool, []string, error) { - logger := logging.GetLogger(ctx) +func (manager *TimeoutManager) doLeaderElectionAndPublish(ctx context.Context, gameID, lobbyCode string) error { + result, err := manager.Store.DoLeaderElection(ctx, gameID, lobbyCode) + if err != nil { + return err + } + + if result == nil { + return nil + } + + packet := LeaderPacket{ + Type: "leader", + Leader: result.Leader, + Term: result.Term, + } + data, err := json.Marshal(packet) + if err != nil { + return err + } + + err = manager.Store.Publish(ctx, gameID+lobbyCode, data) + if err != nil { + return err + } - logger.Debug("peer marked as reconnected", zap.String("id", id)) - return i.Store.ReconnectPeer(ctx, id, secret, game) + return nil } diff --git a/migrations/1738466797_peers.down.sql b/migrations/1738466797_peers.down.sql new file mode 100644 index 0000000..5b92490 --- /dev/null +++ b/migrations/1738466797_peers.down.sql @@ -0,0 +1,17 @@ +BEGIN; + +DROP TABLE "peers"; + +CREATE TABLE "timeouts" ( + "peer" VARCHAR(20) NOT NULL PRIMARY KEY, + "secret" VARCHAR(24) NOT NULL, + "game" uuid NOT NULL, + "lobbies" VARCHAR(20)[] NOT NULL, + "last_seen" TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + "created_at" TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + "updated_at" TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP +); + +CREATE INDEX "timeouts_last_seen" ON "timeouts" ("last_seen"); + +COMMIT; diff --git a/migrations/1738466797_peers.up.sql b/migrations/1738466797_peers.up.sql new file mode 100644 index 0000000..bd03242 --- /dev/null +++ b/migrations/1738466797_peers.up.sql @@ -0,0 +1,17 @@ +BEGIN; + +DROP TABLE "timeouts"; + +CREATE TABLE "peers" ( + "peer" VARCHAR(20) NOT NULL PRIMARY KEY, + "secret" VARCHAR(24) DEFAULT NULL, + "game" uuid NOT NULL, + "disconnected" BOOLEAN NOT NULL DEFAULT FALSE, + "last_seen" TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + "created_at" TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + "updated_at" TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP +); + +CREATE INDEX "peers_last_seen" ON "peers" ("last_seen"); + +COMMIT; diff --git a/migrations/latest.lock b/migrations/latest.lock index 1838207..56be596 100644 --- a/migrations/latest.lock +++ b/migrations/latest.lock @@ -1 +1 @@ -1722609073_max_players +1738466797_peers