Skip to content

Commit

Permalink
DRY out WebSocket handler
Browse files Browse the repository at this point in the history
  • Loading branch information
morrisonbrett committed Jan 8, 2025
1 parent a9b69d2 commit bf410e6
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 62 deletions.
67 changes: 5 additions & 62 deletions HockeyPickup.Api/Helpers/SessionSubscriptionHandler.cs
Original file line number Diff line number Diff line change
@@ -1,68 +1,11 @@
using System.Collections.Concurrent;
using System.Net.WebSockets;
using System.Text.Json;
using System.Text;

namespace HockeyPickup.Api.Helpers;

public class SessionSubscriptionHandler : ISubscriptionHandler
public class SessionSubscriptionHandler : BaseSubscriptionHandler
{
private readonly HashSet<string> _subscribedSockets = new();
private readonly ConcurrentDictionary<string, WebSocketConnection> _connections;
private readonly ConcurrentDictionary<string, string> _subscriptionIds = new();

public SessionSubscriptionHandler(ConcurrentDictionary<string, WebSocketConnection> connections)
{
_connections = connections;
}

public string OperationType => "SessionUpdated";

public Task HandleSubscription(string socketId, string id)
{
_subscriptionIds[socketId] = id;
_subscribedSockets.Add(socketId);
Console.WriteLine($"Added subscription with ID: {id}");
return Task.CompletedTask;
}

public async Task HandleUpdate(object data)
{
foreach (var socketId in _subscribedSockets)
{
if (!_subscriptionIds.TryGetValue(socketId, out var subscriptionId) ||
!_connections.TryGetValue(socketId, out var connection) ||
connection.Socket.State != WebSocketState.Open)
continue;

try
{
var message = JsonSerializer.Serialize(new
{
type = "next",
id = subscriptionId, // Use stored subscription ID
payload = new { data = new { SessionUpdated = data } }
});
public SessionSubscriptionHandler(IWebSocketService webSocketService) : base(webSocketService) { }

var bytes = Encoding.UTF8.GetBytes(message);
await connection.Socket.SendAsync(
new ArraySegment<byte>(bytes),
WebSocketMessageType.Text,
true,
CancellationToken.None
);
}
catch (Exception ex)
{
Console.WriteLine($"Error sending to socket {socketId}: {ex.Message}");
}
}
}
public override string OperationType => "SessionUpdated";

public Task Cleanup(string socketId)
{
_subscribedSockets.Remove(socketId);
_subscriptionIds.TryRemove(socketId, out _);
return Task.CompletedTask;
}
protected override object WrapData(object data) =>
new { data = new { SessionUpdated = data } };
}
82 changes: 82 additions & 0 deletions HockeyPickup.Api/Helpers/WebSocketMiddleware.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,46 @@ public interface ISubscriptionHandler
Task Cleanup(string socketId);
}

public abstract class BaseSubscriptionHandler : ISubscriptionHandler
{
private readonly HashSet<string> _subscribedSockets = new();
private readonly ConcurrentDictionary<string, string> _subscriptionIds = new();
private readonly IWebSocketService _webSocketService;

protected BaseSubscriptionHandler(IWebSocketService webSocketService)
{
_webSocketService = webSocketService;
}

public abstract string OperationType { get; }
protected abstract object WrapData(object data);

public Task HandleSubscription(string socketId, string id)
{
_subscriptionIds[socketId] = id;
_subscribedSockets.Add(socketId);
return Task.CompletedTask;
}

public async Task HandleUpdate(object data)
{
foreach (var socketId in _subscribedSockets)
{
if (_subscriptionIds.TryGetValue(socketId, out var subscriptionId))
{
await _webSocketService.SendMessageToSocket(socketId, WrapData(data), subscriptionId);
}
}
}

public Task Cleanup(string socketId)
{
_subscribedSockets.Remove(socketId);
_subscriptionIds.TryRemove(socketId, out _);
return Task.CompletedTask;
}
}

public class WebSocketConnection
{
public WebSocket Socket { get; set; }
Expand Down Expand Up @@ -214,3 +254,45 @@ private async Task CleanupSocketSubscriptions(string socketId)
}
}
}

public interface IWebSocketService
{
Task SendMessageToSocket(string socketId, object payload, string subscriptionId);
bool IsSocketConnected(string socketId);
}

[ExcludeFromCodeCoverage]
public class WebSocketService : IWebSocketService
{
private readonly ConcurrentDictionary<string, WebSocketConnection> _connections;

public WebSocketService(ConcurrentDictionary<string, WebSocketConnection> connections)
{
_connections = connections;
}

public async Task SendMessageToSocket(string socketId, object payload, string subscriptionId)
{
if (_connections.TryGetValue(socketId, out var connection) &&
connection.Socket.State == WebSocketState.Open)
{
var message = JsonSerializer.Serialize(new
{
type = "next",
id = subscriptionId,
payload
});
var bytes = Encoding.UTF8.GetBytes(message);
await connection.Socket.SendAsync(
new ArraySegment<byte>(bytes),
WebSocketMessageType.Text,
true,
CancellationToken.None
);
}
}

public bool IsSocketConnected(string socketId) =>
_connections.TryGetValue(socketId, out var connection) &&
connection.Socket.State == WebSocketState.Open;
}
1 change: 1 addition & 0 deletions HockeyPickup.Api/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ public static void Main(string[] args)

builder.Services.AddSingleton<ConcurrentDictionary<string, WebSocketConnection>>();
builder.Services.AddSingleton<ISubscriptionHandler, SessionSubscriptionHandler>();
builder.Services.AddSingleton<IWebSocketService, WebSocketService>();

builder.Services.AddSingleton<IAuthorizationHandler, GraphQLAuthHandler>();
builder.Services.AddGraphQLServer()
Expand Down

0 comments on commit bf410e6

Please sign in to comment.