Skip to content

Commit

Permalink
Update Orleans Packages
Browse files Browse the repository at this point in the history
  • Loading branch information
galvesribeiro committed Oct 5, 2021
1 parent 43e2f31 commit 7f03597
Show file tree
Hide file tree
Showing 29 changed files with 302 additions and 349 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,5 @@
**/bin
.vs
.DS_Store
.user
.idea
13 changes: 0 additions & 13 deletions .idea/.idea.SignalR.Orleans/.idea/.gitignore

This file was deleted.

4 changes: 0 additions & 4 deletions .idea/.idea.SignalR.Orleans/.idea/encodings.xml

This file was deleted.

8 changes: 0 additions & 8 deletions .idea/.idea.SignalR.Orleans/.idea/indexLayout.xml

This file was deleted.

12 changes: 0 additions & 12 deletions .idea/.idea.SignalR.Orleans/.idea/riderModule.iml

This file was deleted.

6 changes: 0 additions & 6 deletions .idea/.idea.SignalR.Orleans/.idea/vcs.xml

This file was deleted.

10 changes: 5 additions & 5 deletions Directory.Build.props
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@
<!-- Shared Package Versions -->
<PropertyGroup>
<!-- vendors -->
<OrleansCoreVersion>3.4.1</OrleansCoreVersion>
<OrleansProvidersVersion>3.4.1</OrleansProvidersVersion>
<OrleansCodeGeneratorVersion>3.4.1</OrleansCodeGeneratorVersion>
<OrleansRuntimeAbstractionsVersion>3.4.1</OrleansRuntimeAbstractionsVersion>
<OrleansRuntimeVersion>3.4.1</OrleansRuntimeVersion>
<OrleansCoreVersion>3.5.0</OrleansCoreVersion>
<OrleansProvidersVersion>3.5.0</OrleansProvidersVersion>
<OrleansCodeGeneratorVersion>3.5.0</OrleansCodeGeneratorVersion>
<OrleansRuntimeAbstractionsVersion>3.5.0</OrleansRuntimeAbstractionsVersion>
<OrleansRuntimeVersion>3.5.0</OrleansRuntimeVersion>
</PropertyGroup>

<ItemGroup>
Expand Down
6 changes: 2 additions & 4 deletions SignalR.Orleans.sln.DotSettings.user
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
<wpf:ResourceDictionary xml:space="preserve" xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml" xmlns:s="clr-namespace:System;assembly=mscorlib" xmlns:ss="urn:shemas-jetbrains-com:settings-storage-xaml" xmlns:wpf="http://schemas.microsoft.com/winfx/2006/xaml/presentation">
<s:String x:Key="/Default/Environment/UnitTesting/UnitTestSessionStore/Sessions/=7f151a2f_002Da6cc_002D4fe6_002Db834_002D643c53f755d2/@EntryIndexedValue">&lt;SessionState ContinuousTestingMode="0" IsActive="True" Name="OrleansHubLifetimeManagerTests" xmlns="urn:schemas-jetbrains-com:jetbrains-ut-session"&gt;
&lt;TestAncestor&gt;
&lt;TestId&gt;xUnit::C047C695-9F3C-4DDE-9D0C-06608E191B67::net5.0::SignalR.Orleans.Tests.OrleansHubLifetimeManagerTests&lt;/TestId&gt;
&lt;/TestAncestor&gt;
<s:String x:Key="/Default/Environment/UnitTesting/UnitTestSessionStore/Sessions/=00abcb22_002D60d9_002D4faa_002D9256_002Df083aec196a4/@EntryIndexedValue">&lt;SessionState ContinuousTestingMode="0" IsActive="True" Name="All tests from Solution" xmlns="urn:schemas-jetbrains-com:jetbrains-ut-session"&gt;
&lt;Solution /&gt;
&lt;/SessionState&gt;</s:String></wpf:ResourceDictionary>
65 changes: 36 additions & 29 deletions src/SignalR.Orleans/Clients/ClientGrain.cs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
using System;
using System.Collections.Generic;
using System.Buffers;
using System.Diagnostics;
using System.Threading.Tasks;
using Microsoft.AspNetCore.SignalR.Protocol;
using Microsoft.Extensions.Logging;
using Microsoft.AspNetCore.SignalR.Protocol;
using Orleans;
using Orleans.Concurrency;
using Orleans.Providers;
using Orleans.Runtime;
using Orleans.Streams;
using Orleans.Concurrency;
using SignalR.Orleans.Core;

namespace SignalR.Orleans.Clients
Expand All @@ -20,23 +20,27 @@ internal class ClientState
public Guid ServerId { get; set; }
}

[StorageProvider(ProviderName = Constants.STORAGE_PROVIDER)]
[Reentrant]
internal class ClientGrain : Grain<ClientState>, IClientGrain
internal class ClientGrain : Grain, IClientGrain
{
private const string CLIENT_STORAGE = "ClientState";
private readonly ILogger<ClientGrain> _logger;
private IStreamProvider _streamProvider;
private IAsyncStream<ClientMessage> _serverStream;
private IAsyncStream<Guid> _serverDisconnectedStream;
private IAsyncStream<string> _clientDisconnectStream;
private ConnectionGrainKey _keyData;
private StreamSubscriptionHandle<Guid> _serverDisconnectedSubscription;
private readonly IPersistentState<ClientState> _clientState;
private IStreamProvider _streamProvider = default!;
private IAsyncStream<ClientMessage> _serverStream = default!;
private IAsyncStream<Guid> _serverDisconnectedStream = default!;
private IAsyncStream<string> _clientDisconnectStream = default!;
private ConnectionGrainKey _keyData = default!;
private StreamSubscriptionHandle<Guid>? _serverDisconnectedSubscription;
private const int _maxFailAttempts = 3;
private int _failAttempts;

public ClientGrain(ILogger<ClientGrain> logger)
public ClientGrain(
ILogger<ClientGrain> logger,
[PersistentState(CLIENT_STORAGE, Constants.STORAGE_PROVIDER)] IPersistentState<ClientState> clientState)
{
_logger = logger;
_clientState = clientState;
}

public override async Task OnActivateAsync()
Expand All @@ -45,27 +49,30 @@ public override async Task OnActivateAsync()
_streamProvider = GetStreamProvider(Constants.STREAM_PROVIDER);
_clientDisconnectStream = _streamProvider.GetStream<string>(Constants.CLIENT_DISCONNECT_STREAM_ID, _keyData.Id);

if (State.ServerId == Guid.Empty)
if (_clientState.State.ServerId == Guid.Empty)
return;

_serverStream = _streamProvider.GetStream<ClientMessage>(State.ServerId, Constants.SERVERS_STREAM);
_serverDisconnectedStream = _streamProvider.GetStream<Guid>(State.ServerId, Constants.SERVER_DISCONNECTED);
_serverStream = _streamProvider.GetStream<ClientMessage>(_clientState.State.ServerId, Constants.SERVERS_STREAM);
_serverDisconnectedStream = _streamProvider.GetStream<Guid>(_clientState.State.ServerId, Constants.SERVER_DISCONNECTED);
var subscriptions = await _serverDisconnectedStream.GetAllSubscriptionHandles();
var subscriptionTasks = new List<Task>();
foreach (var subscription in subscriptions)
var subscriptionTasks = ArrayPool<Task>.Shared.Rent(subscriptions.Count);
for (int i = 0; i < subscriptions.Count; i++)
{
subscriptionTasks.Add(subscription.ResumeAsync(async (serverId, _) => await OnDisconnect("server-disconnected")));
var subscription = subscriptions[i];
subscriptionTasks[i] = subscription.ResumeAsync((serverId, _) => OnDisconnect("server-disconnected"));
}

await Task.WhenAll(subscriptionTasks);
ArrayPool<Task>.Shared.Return(subscriptionTasks);
}

public async Task Send(Immutable<InvocationMessage> message)
{
if (State.ServerId != Guid.Empty)
if (_clientState.State.ServerId != Guid.Empty)
{
_logger.LogDebug("Sending message on {hubName}.{targetMethod} to connection {connectionId}", _keyData.HubName, message.Value.Target, _keyData.Id);
_failAttempts = 0;
await _serverStream.OnNextAsync(new ClientMessage { ConnectionId = _keyData.Id, Payload = message.Value, HubName = _keyData.HubName });
await _serverStream.OnNextAsync(new ClientMessage(_keyData.HubName, _keyData.Id, message.Value));
return;
}

Expand All @@ -82,23 +89,23 @@ public async Task Send(Immutable<InvocationMessage> message)

public async Task OnConnect(Guid serverId)
{
State.ServerId = serverId;
_serverStream = _streamProvider.GetStream<ClientMessage>(State.ServerId, Constants.SERVERS_STREAM);
_serverDisconnectedStream = _streamProvider.GetStream<Guid>(State.ServerId, Constants.SERVER_DISCONNECTED);
_serverDisconnectedSubscription = await _serverDisconnectedStream.SubscribeAsync(async _ => await OnDisconnect("server-disconnected"));
await WriteStateAsync();
_clientState.State.ServerId = serverId;
_serverStream = _streamProvider.GetStream<ClientMessage>(_clientState.State.ServerId, Constants.SERVERS_STREAM);
_serverDisconnectedStream = _streamProvider.GetStream<Guid>(_clientState.State.ServerId, Constants.SERVER_DISCONNECTED);
_serverDisconnectedSubscription = await _serverDisconnectedStream.SubscribeAsync(_ => OnDisconnect("server-disconnected"));
await _clientState.WriteStateAsync();
}

public async Task OnDisconnect(string reason = null)
public async Task OnDisconnect(string? reason = null)
{
_logger.LogDebug("Disconnecting connection on {hubName} for connection {connectionId} from server {serverId} via {reason}",
_keyData.HubName, _keyData.Id, State.ServerId, reason);
_keyData.HubName, _keyData.Id, _clientState.State.ServerId, reason);

if (_keyData.Id != null)
{
await _clientDisconnectStream.OnNextAsync(_keyData.Id);
}
await ClearStateAsync();
await _clientState.ClearStateAsync();

if (_serverDisconnectedSubscription != null)
await _serverDisconnectedSubscription.UnsubscribeAsync();
Expand Down
7 changes: 1 addition & 6 deletions src/SignalR.Orleans/Clients/ClientMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,5 @@

namespace SignalR.Orleans.Clients
{
public class ClientMessage
{
public string HubName { get; set; }
public string ConnectionId { get; set; }
public InvocationMessage Payload { get; set; }
}
public record ClientMessage(string HubName, string ConnectionId, InvocationMessage Payload);
}
2 changes: 1 addition & 1 deletion src/SignalR.Orleans/Clients/IClientGrain.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,6 @@ namespace SignalR.Orleans.Clients
public interface IClientGrain : IHubMessageInvoker, IGrainWithStringKey
{
Task OnConnect(Guid serverId);
Task OnDisconnect(string reason = null);
Task OnDisconnect(string? reason = null);
}
}
64 changes: 31 additions & 33 deletions src/SignalR.Orleans/Core/ConnectionGrain.cs
Original file line number Diff line number Diff line change
@@ -1,96 +1,94 @@
using System.Buffers;
using System.Collections.Generic;
using System.Linq;
using System.Linq;
using System.Buffers;
using System.Threading.Tasks;
using Microsoft.AspNetCore.SignalR.Protocol;
using System.Collections.Generic;
using Microsoft.Extensions.Logging;
using Microsoft.AspNetCore.SignalR.Protocol;
using Orleans;
using Orleans.Concurrency;
using Orleans.Streams;
using Orleans.Runtime;
using Orleans.Concurrency;

namespace SignalR.Orleans.Core
{
internal abstract class ConnectionGrain<TGrainState> : Grain<TGrainState>, IConnectionGrain
internal abstract class ConnectionGrain<TGrainState> : Grain, IConnectionGrain
where TGrainState : ConnectionState, new()
{
private readonly ILogger _logger;
private IStreamProvider _streamProvider;
private Dictionary<string, StreamSubscriptionHandle<string>> _connectionStreamHandles;
private readonly IPersistentState<TGrainState> _connectionState;
private readonly Dictionary<string, StreamSubscriptionHandle<string>> _connectionStreamHandles = new();
private IStreamProvider _streamProvider = default!;

protected ConnectionGrainKey KeyData;

internal ConnectionGrain(ILogger logger)
internal ConnectionGrain(
ILogger logger,
IPersistentState<TGrainState> connectionState)
{
_logger = logger;
_connectionState = connectionState;
}

public override async Task OnActivateAsync()
{
KeyData = new ConnectionGrainKey(this.GetPrimaryKeyString());
_connectionStreamHandles = new Dictionary<string, StreamSubscriptionHandle<string>>();
_streamProvider = GetStreamProvider(Constants.STREAM_PROVIDER);
var subscriptionTasks = new List<Task>();
foreach (var connection in State.Connections)
foreach (var connection in _connectionState.State.Connections)
{
var clientDisconnectStream = _streamProvider.GetStream<string>(Constants.CLIENT_DISCONNECT_STREAM_ID, connection);
var subscriptions = await clientDisconnectStream.GetAllSubscriptionHandles();
foreach (var subscription in subscriptions)
{
subscriptionTasks.Add(subscription.ResumeAsync(async (connectionId, _) => await Remove(connectionId)));
subscriptionTasks.Add(subscription.ResumeAsync((connectionId, _) => Remove(connectionId)));
}
}
await Task.WhenAll(subscriptionTasks);
}

public virtual async Task Add(string connectionId)
{
var shouldWriteState = State.Connections.Add(connectionId);
var shouldWriteState = _connectionState.State.Connections.Add(connectionId);
if (!_connectionStreamHandles.ContainsKey(connectionId))
{
var clientDisconnectStream = _streamProvider.GetStream<string>(Constants.CLIENT_DISCONNECT_STREAM_ID, connectionId);
var subscription = await clientDisconnectStream.SubscribeAsync(async (connId, _) => await Remove(connId));
var subscription = await clientDisconnectStream.SubscribeAsync((connId, _) => Remove(connId));
_connectionStreamHandles[connectionId] = subscription;
}

if (shouldWriteState)
await WriteStateAsync();
await _connectionState.WriteStateAsync();
}

public virtual async Task Remove(string connectionId)
{
var shouldWriteState = State.Connections.Remove(connectionId);
var shouldWriteState = _connectionState.State.Connections.Remove(connectionId);
if (_connectionStreamHandles.TryGetValue(connectionId, out var stream))
{
await stream.UnsubscribeAsync();
_connectionStreamHandles.Remove(connectionId);
}

if (State.Connections.Count == 0)
if (_connectionState.State.Connections.Count == 0)
{
await ClearStateAsync();
await _connectionState.ClearStateAsync();
DeactivateOnIdle();
}
else if (shouldWriteState)
{
await WriteStateAsync();
await _connectionState.WriteStateAsync();
}
}

public virtual Task Send(Immutable<InvocationMessage> message)
{
return SendAll(message, State.Connections);
}
public virtual Task Send(Immutable<InvocationMessage> message) => SendAll(message, _connectionState.State.Connections);

public Task SendExcept(string methodName, object[] args, IReadOnlyList<string> excludedConnectionIds)
public Task SendExcept(string methodName, object?[]? args, IReadOnlyList<string> excludedConnectionIds)
{
var message = new Immutable<InvocationMessage>(new InvocationMessage(methodName, args));
return SendAll(message, State.Connections.Where(x => !excludedConnectionIds.Contains(x)).ToList());
return SendAll(message, _connectionState.State.Connections.Where(x => !excludedConnectionIds.Contains(x)).ToList());
}

public Task<int> Count()
{
return Task.FromResult(State.Connections.Count);
}
public Task<int> Count() => Task.FromResult(_connectionState.State.Connections.Count);

protected Task SendAll(Immutable<InvocationMessage> message, IReadOnlyCollection<string> connections)
{
Expand All @@ -100,11 +98,11 @@ protected Task SendAll(Immutable<InvocationMessage> message, IReadOnlyCollection
var tasks = ArrayPool<Task>.Shared.Rent(connections.Count);
try
{
int index = 0;
foreach (var connection in connections)
for (int i = 0; i < connections.Count; i++)
{
var connection = connections.ElementAt(i);
var client = GrainFactory.GetClientGrain(KeyData.HubName, connection);
tasks[index++] = client.Send(message);
tasks[i] = client.Send(message);
}

return Task.WhenAll(tasks.Where(x => x != null).ToArray());
Expand All @@ -118,6 +116,6 @@ protected Task SendAll(Immutable<InvocationMessage> message, IReadOnlyCollection

internal abstract class ConnectionState
{
public HashSet<string> Connections { get; set; } = new HashSet<string>();
public HashSet<string> Connections { get; set; } = new();
}
}
Loading

0 comments on commit 7f03597

Please sign in to comment.