Skip to content

Commit

Permalink
v7.0.0
Browse files Browse the repository at this point in the history
  • Loading branch information
KSemenenko committed Apr 26, 2023
1 parent 0865af5 commit 153230f
Show file tree
Hide file tree
Showing 14 changed files with 264 additions and 116 deletions.
4 changes: 2 additions & 2 deletions Directory.Build.props
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
<RepositoryUrl>https://github.com/managedcode/Orleans.SignalR</RepositoryUrl>
<PackageProjectUrl>https://github.com/managedcode/Orleans.SignalR</PackageProjectUrl>
<Product>Managed Code - Orleans SignalR</Product>
<Version>0.0.4</Version>
<PackageVersion>0.0.4</PackageVersion>
<Version>7.0.0</Version>
<PackageVersion>7.0.0</PackageVersion>

</PropertyGroup>
<PropertyGroup Condition="'$(GITHUB_ACTIONS)' == 'true'">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,8 @@
</PropertyGroup>

<ItemGroup>

<PackageReference Include="ManagedCode.Communication" Version="2.0.19"/>
<PackageReference Include="ManagedCode.Communication.Orleans" Version="2.0.19"/>

<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="7.0.0"/>
<PackageReference Include="Microsoft.Orleans.Client" Version="7.1.2"/>

</ItemGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@ public class OrleansSignalROptions
public string StreamProvider { get; set; } = DefaultSignalRStreamProvider;

public const string DefaultSignalRStreamProvider = "OrleansSignalRStreamProvider";
public const string OrleansSignalRStorage = "OrleansSignalRStorage";
}
6 changes: 6 additions & 0 deletions ManagedCode.Orleans.SignalR.Core/Models/InvocationInfo.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,12 @@ namespace ManagedCode.Orleans.SignalR.Core.Models;
[GenerateSerializer]
public class InvocationInfo
{
public InvocationInfo()
{
ConnectionId = null;
InvocationId = null;
}

public InvocationInfo(string connectionId, string invocationId, Type type)
{
ConnectionId = connectionId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,18 @@
using ManagedCode.Orleans.SignalR.Core.SignalR;
using Microsoft.AspNetCore.SignalR;
using Microsoft.Extensions.DependencyInjection;
using Orleans.Hosting;

namespace ManagedCode.Orleans.SignalR.Server.Extensions;

/// <summary>
/// Extension methods for configuring Redis-based scale-out for a SignalR Server in an
/// Extension methods for configuring Orleans-based scale-out for a SignalR Server in an
/// <see cref="ISignalRServerBuilder" />.
/// </summary>
public static class OrleansDependencyInjectionExtensions
{
public static ISignalRServerBuilder AddOrleans(this ISignalRServerBuilder signalrBuilder)
{
signalrBuilder.Services.AddSingleton(typeof(HubLifetimeManager<>), typeof(OrleansHubLifetimeManager<>));
return signalrBuilder;
return AddOrleans(signalrBuilder, o => { });
}

public static ISignalRServerBuilder AddOrleans(this ISignalRServerBuilder signalrBuilder, Action<OrleansSignalROptions> options)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,7 @@
</PropertyGroup>

<ItemGroup>

<PackageReference Include="ManagedCode.Communication" Version="2.0.19"/>
<PackageReference Include="ManagedCode.Communication.Orleans" Version="2.0.19"/>

<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="7.0.0"/>

<PackageReference Include="Microsoft.Orleans.Reminders" Version="7.1.2"/>

<PackageReference Include="Microsoft.Orleans.Serialization" Version="7.1.2"/>
<PackageReference Include="Microsoft.Orleans.Server" Version="7.1.2"/>
<PackageReference Include="Microsoft.Orleans.Streaming" Version="7.1.2"/>
Expand Down
67 changes: 37 additions & 30 deletions ManagedCode.Orleans.SignalR.Server/SignalRConnectionHolderGrain.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using ManagedCode.Orleans.SignalR.Core.Config;
using ManagedCode.Orleans.SignalR.Core.Interfaces;
Expand All @@ -9,47 +10,56 @@
using Microsoft.Extensions.Options;
using Orleans;
using Orleans.Concurrency;
using Orleans.Runtime;

namespace ManagedCode.Orleans.SignalR.Server;

[Reentrant]
public class SignalRConnectionHolderGrain<THub> : Grain, ISignalRConnectionHolderGrain<THub>
{
private readonly IGrainFactory _grainFactory;
private readonly IOptions<OrleansSignalROptions> _options;
private readonly ILogger<SignalRConnectionHolderGrain<THub>> _logger;
private readonly IPersistentState<ConnectionState> _stateStorage;
private readonly IOptions<OrleansSignalROptions> _options;

private readonly ConnectionState _state = new();

public SignalRConnectionHolderGrain(ILogger<SignalRConnectionHolderGrain<THub>> logger, IGrainFactory grainFactory,
public SignalRConnectionHolderGrain(ILogger<SignalRConnectionHolderGrain<THub>> logger,
[PersistentState(nameof(SignalRConnectionHolderGrain<THub>), OrleansSignalROptions.OrleansSignalRStorage)] IPersistentState<ConnectionState> stateStorage,
IOptions<OrleansSignalROptions> options)
{
_logger = logger;
_grainFactory = grainFactory;
_stateStorage = stateStorage;
_options = options;
}

public override async Task OnDeactivateAsync(DeactivationReason reason, CancellationToken cancellationToken)
{
if(_stateStorage.State.ConnectionIds.Count == 0)
await _stateStorage.ClearStateAsync();
else
await _stateStorage.WriteStateAsync();
}

public Task AddConnection(string connectionId)
{
_state.ConnectionIds.Add(connectionId);
_stateStorage.State.ConnectionIds.Add(connectionId);
return Task.CompletedTask;
}

public Task RemoveConnection(string connectionId)
{
_state.ConnectionIds.Remove(connectionId);
_stateStorage.State.ConnectionIds.Remove(connectionId);
return Task.CompletedTask;
}

public Task SendToAll(InvocationMessage message)
{
var tasks = new List<Task>();

foreach (var connectionId in _state.ConnectionIds)
tasks.Add(NameHelperGenerator
.GetStream<THub, InvocationMessage>(this.GetStreamProvider(_options.Value.StreamProvider),
connectionId)
.OnNextAsync(message));
foreach (var connectionId in _stateStorage.State.ConnectionIds)
{
var stream = NameHelperGenerator.GetStream<THub, InvocationMessage>(this.GetStreamProvider(_options.Value.StreamProvider), connectionId);
tasks.Add(stream.OnNextAsync(message));
}


_ = Task.Run(() => Task.WhenAll(tasks));

Expand All @@ -61,15 +71,13 @@ public Task SendToAllExcept(InvocationMessage message, string[] excludedConnecti
var hashSet = new HashSet<string>(excludedConnectionIds);
var tasks = new List<Task>();

foreach (var connectionId in _state.ConnectionIds)
foreach (var connectionId in _stateStorage.State.ConnectionIds)
{
if (hashSet.Contains(connectionId))
continue;

tasks.Add(NameHelperGenerator
.GetStream<THub, InvocationMessage>(this.GetStreamProvider(_options.Value.StreamProvider),
connectionId)
.OnNextAsync(message));

var stream = NameHelperGenerator.GetStream<THub, InvocationMessage>(this.GetStreamProvider(_options.Value.StreamProvider), connectionId);
tasks.Add(stream.OnNextAsync(message));
}

_ = Task.Run(() => Task.WhenAll(tasks));
Expand All @@ -79,13 +87,12 @@ public Task SendToAllExcept(InvocationMessage message, string[] excludedConnecti

public Task<bool> SendToConnection(InvocationMessage message, string connectionId)
{
if (!_state.ConnectionIds.Contains(connectionId))
if (!_stateStorage.State.ConnectionIds.Contains(connectionId))
return Task.FromResult(false);

_ = Task.Run(() => NameHelperGenerator
.GetStream<THub, InvocationMessage>(this.GetStreamProvider(_options.Value.StreamProvider),
connectionId)
.OnNextAsync(message));

var stream = NameHelperGenerator
.GetStream<THub, InvocationMessage>(this.GetStreamProvider(_options.Value.StreamProvider), connectionId);
_ = Task.Run(() => stream.OnNextAsync(message));

return Task.FromResult(true);
}
Expand All @@ -95,10 +102,10 @@ public Task SendToConnections(InvocationMessage message, string[] connectionIds)
var tasks = new List<Task>();

foreach (var connectionId in connectionIds)
tasks.Add(NameHelperGenerator
.GetStream<THub, InvocationMessage>(this.GetStreamProvider(_options.Value.StreamProvider),
connectionId)
.OnNextAsync(message));
{
var stream = NameHelperGenerator.GetStream<THub, InvocationMessage>(this.GetStreamProvider(_options.Value.StreamProvider), connectionId);
tasks.Add(stream.OnNextAsync(message));
}

_ = Task.Run(() => Task.WhenAll(tasks));

Expand Down
50 changes: 32 additions & 18 deletions ManagedCode.Orleans.SignalR.Server/SignalRGroupGrain.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using ManagedCode.Orleans.SignalR.Core.Config;
using ManagedCode.Orleans.SignalR.Core.Interfaces;
Expand All @@ -9,35 +10,47 @@
using Microsoft.Extensions.Options;
using Orleans;
using Orleans.Concurrency;
using Orleans.Runtime;

namespace ManagedCode.Orleans.SignalR.Server;

[Reentrant]
public class SignalRGroupGrain<THub> : Grain, ISignalRGroupGrain<THub>
{
private readonly IGrainFactory _grainFactory;
private readonly IOptions<OrleansSignalROptions> _options;

private readonly ILogger<SignalRGroupGrain<THub>> _logger;
private readonly ConnectionState _state = new();


public SignalRGroupGrain(ILogger<SignalRGroupGrain<THub>> logger, IGrainFactory grainFactory, IOptions<OrleansSignalROptions> options)
private readonly IPersistentState<ConnectionState> _stateStorage;
private readonly IOptions<OrleansSignalROptions> _options;

public SignalRGroupGrain(ILogger<SignalRGroupGrain<THub>> logger,
[PersistentState(nameof(SignalRGroupGrain<THub>), OrleansSignalROptions.OrleansSignalRStorage)] IPersistentState<ConnectionState> stateStorage,
IOptions<OrleansSignalROptions> options)
{
_logger = logger;
_grainFactory = grainFactory;
_stateStorage = stateStorage;
_options = options;
}

public override async Task OnDeactivateAsync(DeactivationReason reason, CancellationToken cancellationToken)
{
if(_stateStorage.State.ConnectionIds.Count == 0)
await _stateStorage.ClearStateAsync();
else
await _stateStorage.WriteStateAsync();
}

public Task SendToGroup(InvocationMessage message)
{
var tasks = new List<Task>();

foreach (var connectionId in _state.ConnectionIds)
tasks.Add(NameHelperGenerator
foreach (var connectionId in _stateStorage.State.ConnectionIds)
{
var stream = NameHelperGenerator
.GetStream<THub, InvocationMessage>(this.GetStreamProvider(_options.Value.StreamProvider),
connectionId)
.OnNextAsync(message));
connectionId);

tasks.Add(stream.OnNextAsync(message));
}


_ = Task.Run(() => Task.WhenAll(tasks));

Expand All @@ -49,15 +62,16 @@ public Task SendToGroupExcept(InvocationMessage message, string[] excludedConnec
var hashSet = new HashSet<string>(excludedConnectionIds);
var tasks = new List<Task>();

foreach (var connectionId in _state.ConnectionIds)
foreach (var connectionId in _stateStorage.State.ConnectionIds)
{
if (hashSet.Contains(connectionId))
continue;

tasks.Add(NameHelperGenerator
var stream = NameHelperGenerator
.GetStream<THub, InvocationMessage>(this.GetStreamProvider(_options.Value.StreamProvider),
connectionId)
.OnNextAsync(message));
connectionId);

tasks.Add(stream.OnNextAsync(message));
}

_ = Task.Run(() => Task.WhenAll(tasks));
Expand All @@ -67,13 +81,13 @@ public Task SendToGroupExcept(InvocationMessage message, string[] excludedConnec

public Task AddConnection(string connectionId)
{
_state.ConnectionIds.Add(connectionId);
_stateStorage.State.ConnectionIds.Add(connectionId);
return Task.CompletedTask;
}

public Task RemoveConnection(string connectionId)
{
_state.ConnectionIds.Remove(connectionId);
_stateStorage.State.ConnectionIds.Remove(connectionId);
return Task.CompletedTask;
}
}
32 changes: 20 additions & 12 deletions ManagedCode.Orleans.SignalR.Server/SignalRGroupHolderGrain.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using ManagedCode.Orleans.SignalR.Core.Config;
using ManagedCode.Orleans.SignalR.Core.Interfaces;
Expand All @@ -7,33 +8,40 @@
using Microsoft.Extensions.Options;
using Orleans;
using Orleans.Concurrency;
using Orleans.Runtime;

namespace ManagedCode.Orleans.SignalR.Server;

[Reentrant]
public class SignalRGroupHolderGrain<THub> : Grain, ISignalRGroupHolderGrain<THub>
{
private readonly IGrainFactory _grainFactory;
private readonly IOptions<OrleansSignalROptions> _options;
private readonly ConnectionGroupState _groups = new();

private readonly ILogger<SignalRGroupHolderGrain<THub>> _logger;


public SignalRGroupHolderGrain(ILogger<SignalRGroupHolderGrain<THub>> logger, IGrainFactory grainFactory, IOptions<OrleansSignalROptions> options)
private readonly IPersistentState<ConnectionGroupState> _stateStorage;
private readonly IOptions<OrleansSignalROptions> _options;

public SignalRGroupHolderGrain(ILogger<SignalRGroupHolderGrain<THub>> logger,
[PersistentState(nameof(SignalRGroupHolderGrain<THub>), OrleansSignalROptions.OrleansSignalRStorage)] IPersistentState<ConnectionGroupState> stateStorage,
IOptions<OrleansSignalROptions> options)
{
_logger = logger;
_grainFactory = grainFactory;
_stateStorage = stateStorage;
_options = options;
}

public override async Task OnDeactivateAsync(DeactivationReason reason, CancellationToken cancellationToken)
{
if(_stateStorage.State.Groups.Count == 0)
await _stateStorage.ClearStateAsync();
else
await _stateStorage.WriteStateAsync();
}

public Task AddConnectionToGroup(string connectionId, string groupName)
{
if (_groups.Groups.TryGetValue(groupName, out var state))
if (_stateStorage.State.Groups.TryGetValue(groupName, out var state))
state.ConnectionIds.Add(connectionId);
else
_groups.Groups.Add(groupName, new ConnectionState
_stateStorage.State.Groups.Add(groupName, new ConnectionState
{
ConnectionIds = new HashSet<string> { connectionId }
});
Expand All @@ -43,15 +51,15 @@ public Task AddConnectionToGroup(string connectionId, string groupName)

public Task RemoveConnectionFromGroup(string connectionId, string groupName)
{
if (_groups.Groups.TryGetValue(groupName, out var state))
if (_stateStorage.State.Groups.TryGetValue(groupName, out var state))
state.ConnectionIds.Remove(connectionId);
return Task.CompletedTask;
}


public Task RemoveConnection(string connectionId)
{
foreach (var connections in _groups.Groups.Values)
foreach (var connections in _stateStorage.State.Groups.Values)
connections.ConnectionIds.Remove(connectionId);
return Task.CompletedTask;
}
Expand Down
Loading

0 comments on commit 153230f

Please sign in to comment.