Skip to content
This repository has been archived by the owner on Aug 30, 2023. It is now read-only.

Commit

Permalink
More abstraction, and create DeviceWebSocket 👀
Browse files Browse the repository at this point in the history
  • Loading branch information
hhvrc committed Jul 9, 2023
1 parent ec92167 commit 23938f6
Show file tree
Hide file tree
Showing 14 changed files with 224 additions and 152 deletions.
90 changes: 0 additions & 90 deletions Common/BusinessLogic/WebSocketHandler.cs

This file was deleted.

24 changes: 8 additions & 16 deletions Common/Websocket/FlatbufferWebSocketBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ public abstract class FlatbufferWebSocketBase<TRX, TTX>
private readonly WebSocket _webSocket;
private readonly ISerializer<TRX> _rxSerializer;
private readonly ISerializer<TTX> _txSerializer;
private readonly Channel<TTX> _txChannel;

protected Channel<TTX> TxChannel { get; init; }

private const int _CLIENT_STATE_INITIAL = 0;
private const int _CLIENT_STATE_CONNECTED = 1;
Expand All @@ -28,16 +29,7 @@ public FlatbufferWebSocketBase(WebSocket webSocket, ISerializer<TRX> rxSerialize
_webSocket = webSocket;
_rxSerializer = rxSerializer;
_txSerializer = txSerializer;
_txChannel = Channel.CreateBounded<TTX>(new BoundedChannelOptions(WebsocketConstants.ClientTxChannelCapacity) { FullMode = BoundedChannelFullMode.Wait });
}

protected async Task<bool> SendMessageAsync(TTX message, CancellationToken cancellationToken)
{
if (!IsConnnected) return false;

await _txChannel.Writer.WriteAsync(message, cancellationToken);

return true;
TxChannel = Channel.CreateBounded<TTX>(new BoundedChannelOptions(WebsocketConstants.ClientTxChannelCapacity) { FullMode = BoundedChannelFullMode.Wait });
}

public async Task RunWebSocketAsync(CancellationToken cancellationToken)
Expand Down Expand Up @@ -65,7 +57,7 @@ public async Task CloseAsync(WebSocketCloseStatus closeStatus = WebSocketCloseSt
{
try
{
_txChannel.Writer.TryComplete();
TxChannel.Writer.TryComplete();
await _webSocket.CloseAsync(closeStatus, reason, cs);
}
catch
Expand Down Expand Up @@ -104,7 +96,7 @@ private async Task RxTask(CancellationToken cancellationToken)
break;
}

var data = new ArraySegment<byte>(bytes, 0, msg.Count);
ArraySegment<byte> data = new ArraySegment<byte>(bytes, 0, msg.Count);

if (!await ValidateMessage(data, cancellationToken))
{
Expand All @@ -131,15 +123,15 @@ private async Task RxTask(CancellationToken cancellationToken)

private async Task TxTask(CancellationToken cancellationToken)
{
while (IsConnnected && await _txChannel.Reader.WaitToReadAsync(cancellationToken))
while (IsConnnected && await TxChannel.Reader.WaitToReadAsync(cancellationToken))
{
// We start with 512 bytes, but the buffer will be resized if needed
byte[] bytes = ArrayPool<byte>.Shared.Rent(512);

try
{
// Read all messages from the channel and send them
while (_txChannel.Reader.TryRead(out TTX? message) && message is not null)
while (TxChannel.Reader.TryRead(out TTX? message) && message is not null)
{
// Get a max estimate of the buffer size needed
int bufferSize = _txSerializer.GetMaxSize(message);
Expand All @@ -155,7 +147,7 @@ private async Task TxTask(CancellationToken cancellationToken)
int messageSize = _txSerializer.Write(new Span<byte>(bytes), message);

// Send the message
var segment = new ArraySegment<byte>(bytes, 0, messageSize);
ArraySegment<byte> segment = new ArraySegment<byte>(bytes, 0, messageSize);
await _webSocket.SendAsync(segment, WebSocketMessageType.Binary, true, cancellationToken);
}
}
Expand Down
8 changes: 8 additions & 0 deletions Common/Websocket/IWebSocketHandler.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
using System.Net.WebSockets;

namespace ZapMe.Websocket;

public interface IWebSocketHandler
{
Task RunAsync(Func<string?, Task<WebSocket>> webSocketAcceptFunc, IList<string> requestedSubProtocols, CancellationToken cancellationToken = default);
}
29 changes: 10 additions & 19 deletions Common/Websocket/User/DeviceWebSocket.cs
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
using fbs.client;
using fbs.common;
using fbs.server;
using System.Net.WebSockets;
using ZapMe.Constants;
using ZapMe.Helpers;
using PayloadType = fbs.client.ClientPayload.ItemKind;
using PayloadType = fbs.client.ClientDevicePayload.ItemKind;

namespace ZapMe.Websocket;

public sealed class DeviceWebSocket : FlatbufferWebSocketBase<ClientMessage, ServerMessage>, IDisposable
public sealed class DeviceWebSocket : FlatbufferWebSocketBase<ClientDeviceMessage, ServerMessage>, IDisposable
{
public Guid UserId { get; init; }
public Guid SessionId { get; init; }
public Guid DeviceId { get; init; }

private readonly SlidingWindow _msgsSecondWindow;
private readonly SlidingWindow _msgsMinuteWindow;
Expand All @@ -23,20 +24,20 @@ public sealed class DeviceWebSocket : FlatbufferWebSocketBase<ClientMessage, Ser
private long _lastHeartbeatTicks = DateTimeOffset.UtcNow.Ticks;
public long MsUntilTimeout => _heartbeatIntervalMs + _heartbeatAllowableSkewMs - ((DateTimeOffset.UtcNow.Ticks - Interlocked.Read(ref _lastHeartbeatTicks)) / TimeSpan.TicksPerMillisecond);

public DeviceWebSocket(Guid userId, Guid sessionId, WebSocket webSocket) : base(webSocket, ClientMessage.Serializer, ServerMessage.Serializer)
public DeviceWebSocket(Guid userId, Guid deviceId, WebSocket webSocket) : base(webSocket, ClientDeviceMessage.Serializer, ServerMessage.Serializer)
{
UserId = userId;
SessionId = sessionId;
DeviceId = deviceId;
_msgsSecondWindow = new SlidingWindow(1000, WebsocketConstants.ClientRateLimitMessagesPerSecond);
_msgsMinuteWindow = new SlidingWindow(60 * 1000, WebsocketConstants.ClientRateLimitMessagesPerMinute);
_bytesSecondWindow = new SlidingWindow(1000, WebsocketConstants.ClientRateLimitBytesPerSecond);
_bytesMinuteWindow = new SlidingWindow(60 * 1000, WebsocketConstants.ClientRateLimitBytesPerMinute);
_heartbeatTimer = new Timer(HeartbeatTimerCallback, this, _heartbeatAllowableSkewMs, _heartbeatAllowableSkewMs); // TODO: this is probably not the best way to do this
}

public Task<bool> SendPayloadAsync(ServerPayload payload, CancellationToken cancellationToken)
public ValueTask SendPayloadAsync(ServerPayload payload, CancellationToken cancellationToken)
{
return SendMessageAsync(new ServerMessage
return TxChannel.Writer.WriteAsync(new ServerMessage
{
Timestamp = DateTimeOffset.UtcNow.ToUnixTimeSeconds(),
Payload = payload
Expand All @@ -60,32 +61,22 @@ protected override async Task<bool> ValidateMessage(ArraySegment<byte> data, Can
return true;
}

protected override async Task HandleMessageAsync(ClientMessage message, CancellationToken cancellationToken)
protected override async Task HandleMessageAsync(ClientDeviceMessage message, CancellationToken cancellationToken)
{
if (!message.Payload.HasValue)
{
await CloseAsync(WebSocketCloseStatus.InvalidPayloadData, "Payload invalid!", cancellationToken);
return;
}

ClientPayload payload = message.Payload.Value;
ClientDevicePayload payload = message.Payload.Value;

// If this switch is not returned from, the connection will be closed
switch (payload.Kind)
{
case PayloadType.heartbeat:
await HandleHeartbeatAsync(payload.heartbeat, cancellationToken);
return;
case PayloadType.session_join:
throw new NotImplementedException();
case PayloadType.session_leave:
throw new NotImplementedException();
case PayloadType.session_rejoin:
throw new NotImplementedException();
case PayloadType.session_invite:
throw new NotImplementedException();
case PayloadType.session_ice_candidate_discovered:
throw new NotImplementedException();
case PayloadType.NONE:
default:
break;
Expand Down
15 changes: 8 additions & 7 deletions Common/Websocket/User/UserWebSocket.cs
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
using fbs.client;
using fbs.common;
using fbs.server;
using System.Net.WebSockets;
using ZapMe.Constants;
using ZapMe.Helpers;
using PayloadType = fbs.client.ClientPayload.ItemKind;
using PayloadType = fbs.client.ClientUserPayload.ItemKind;

namespace ZapMe.Websocket;

public sealed class UserWebSocket : FlatbufferWebSocketBase<ClientMessage, ServerMessage>, IDisposable
public sealed class UserWebSocket : FlatbufferWebSocketBase<ClientUserMessage, ServerMessage>, IDisposable
{
public Guid UserId { get; init; }
public Guid SessionId { get; init; }
Expand All @@ -23,7 +24,7 @@ public sealed class UserWebSocket : FlatbufferWebSocketBase<ClientMessage, Serve
private long _lastHeartbeatTicks = DateTimeOffset.UtcNow.Ticks;
public long MsUntilTimeout => _heartbeatIntervalMs + _heartbeatAllowableSkewMs - ((DateTimeOffset.UtcNow.Ticks - Interlocked.Read(ref _lastHeartbeatTicks)) / TimeSpan.TicksPerMillisecond);

public UserWebSocket(Guid userId, Guid sessionId, WebSocket webSocket) : base(webSocket, ClientMessage.Serializer, ServerMessage.Serializer)
public UserWebSocket(Guid userId, Guid sessionId, WebSocket webSocket) : base(webSocket, ClientUserMessage.Serializer, ServerMessage.Serializer)
{
UserId = userId;
SessionId = sessionId;
Expand All @@ -34,9 +35,9 @@ public UserWebSocket(Guid userId, Guid sessionId, WebSocket webSocket) : base(we
_heartbeatTimer = new Timer(HeartbeatTimerCallback, this, _heartbeatAllowableSkewMs, _heartbeatAllowableSkewMs); // TODO: this is probably not the best way to do this
}

public Task<bool> SendPayloadAsync(ServerPayload payload, CancellationToken cancellationToken)
public ValueTask SendPayloadAsync(ServerPayload payload, CancellationToken cancellationToken)
{
return SendMessageAsync(new ServerMessage
return TxChannel.Writer.WriteAsync(new ServerMessage
{
Timestamp = DateTimeOffset.UtcNow.ToUnixTimeSeconds(),
Payload = payload
Expand All @@ -60,15 +61,15 @@ protected override async Task<bool> ValidateMessage(ArraySegment<byte> data, Can
return true;
}

protected override async Task HandleMessageAsync(ClientMessage message, CancellationToken cancellationToken)
protected override async Task HandleMessageAsync(ClientUserMessage message, CancellationToken cancellationToken)
{
if (!message.Payload.HasValue)
{
await CloseAsync(WebSocketCloseStatus.InvalidPayloadData, "Payload invalid!", cancellationToken);
return;
}

ClientPayload payload = message.Payload.Value;
ClientUserPayload payload = message.Payload.Value;

// If this switch is not returned from, the connection will be closed
switch (payload.Kind)
Expand Down
Loading

0 comments on commit 23938f6

Please sign in to comment.