Skip to content

Commit

Permalink
v7.1.1
Browse files Browse the repository at this point in the history
  • Loading branch information
KSemenenko committed May 31, 2023
1 parent f36345a commit 9e63685
Show file tree
Hide file tree
Showing 16 changed files with 279 additions and 130 deletions.
4 changes: 2 additions & 2 deletions Directory.Build.props
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
<RepositoryUrl>https://github.com/managedcode/Orleans.SignalR</RepositoryUrl>
<PackageProjectUrl>https://github.com/managedcode/Orleans.SignalR</PackageProjectUrl>
<Product>Managed Code - Orleans SignalR</Product>
<Version>7.1.0</Version>
<PackageVersion>7.1.0</PackageVersion>
<Version>7.1.1</Version>
<PackageVersion>7.1.1</PackageVersion>

</PropertyGroup>
<PropertyGroup Condition="'$(GITHUB_ACTIONS)' == 'true'">
Expand Down
13 changes: 11 additions & 2 deletions ManagedCode.Orleans.SignalR.Core/Config/OrleansSignalROptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,16 @@ public class OrleansSignalROptions
public const string OrleansSignalRStorage = "OrleansSignalRStorage";

/// <summary>
/// Gets or sets the time window clients have to send a message before the server closes the connection. The default timeout is 30 seconds.
/// Gets or sets the time window clients have to send a message before the server closes the connection. The default
/// timeout is 30 seconds.
/// </summary>
public TimeSpan? ClientTimeoutInterval { get; set; } = TimeSpan.FromSeconds(30);
public TimeSpan ClientTimeoutInterval { get; set; } = TimeSpan.FromSeconds(30);

/// <summary>
/// If true each connection should be kept alive by sending a message to the orleans every
/// <see cref="HubOptions.KeepAliveInterval" />.
/// The default value is true.
/// Set to false only if you don't want to send messages to the specific connectionId.
/// </summary>
public bool KeepEachConnectionAlive { get; set; } = true;
}
64 changes: 64 additions & 0 deletions ManagedCode.Orleans.SignalR.Core/Helpers/TimeIntervalHelper.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
using System;
using ManagedCode.Orleans.SignalR.Core.Config;
using Microsoft.AspNetCore.SignalR;
using Microsoft.Extensions.Options;

namespace ManagedCode.Orleans.SignalR.Core.Helpers;

public static class TimeIntervalHelper
{
public static TimeSpan GetClientTimeoutInterval(IOptions<OrleansSignalROptions> orleansSignalOptions,
IOptions<HubOptions> hubOptions)
{
var timeSpan = orleansSignalOptions.Value.ClientTimeoutInterval;

if (hubOptions.Value.ClientTimeoutInterval.HasValue && timeSpan > hubOptions.Value.ClientTimeoutInterval)
timeSpan = hubOptions.Value.ClientTimeoutInterval.Value;

return timeSpan;
}

public static TimeSpan GetClientTimeoutInterval(IOptions<OrleansSignalROptions> orleansSignalOptions,
IOptions<HubOptions> globalHubOptions, IOptions<HubOptions> hubOptions)
{
var timeSpan = orleansSignalOptions.Value.ClientTimeoutInterval;

if (globalHubOptions.Value.ClientTimeoutInterval.HasValue &&
timeSpan > globalHubOptions.Value.ClientTimeoutInterval)
timeSpan = globalHubOptions.Value.ClientTimeoutInterval.Value;

if (hubOptions.Value.ClientTimeoutInterval.HasValue && timeSpan > hubOptions.Value.ClientTimeoutInterval)
timeSpan = hubOptions.Value.ClientTimeoutInterval.Value;

return timeSpan;
}


public static TimeSpan GetClientTimeoutInterval(IOptions<HubOptions> globalHubOptions,
IOptions<HubOptions> hubOptions)
{
var timeSpan = TimeSpan.FromSeconds(15);

if (globalHubOptions.Value.ClientTimeoutInterval.HasValue &&
timeSpan > globalHubOptions.Value.ClientTimeoutInterval)
timeSpan = globalHubOptions.Value.ClientTimeoutInterval.Value;

if (hubOptions.Value.ClientTimeoutInterval.HasValue && timeSpan > hubOptions.Value.ClientTimeoutInterval)
timeSpan = hubOptions.Value.ClientTimeoutInterval.Value;

return timeSpan;
}

public static TimeSpan GetKeepAliveInterval(IOptions<HubOptions> globalHubOptions, IOptions<HubOptions> hubOptions)
{
var timeSpan = TimeSpan.FromSeconds(15);

if (globalHubOptions.Value.ClientTimeoutInterval.HasValue)
timeSpan = globalHubOptions.Value.ClientTimeoutInterval.Value;

if (hubOptions.Value.ClientTimeoutInterval.HasValue)
timeSpan = hubOptions.Value.ClientTimeoutInterval.Value;

return timeSpan;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ public interface IObserverConnectionManager
Task RemoveConnection(string connectionId, ISignalRObserver observer);

[OneWay]
ValueTask Ping(ISignalRObserver observer);
Task Ping(ISignalRObserver observer);
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ public interface ISignalRInvocationGrain : IGrainWithStringKey, IObserverConnect
Task<ReturnType> TryGetReturnType();

[OneWay]
ValueTask AddInvocation(ISignalRObserver observer, InvocationInfo invocationInfo);
Task AddInvocation(ISignalRObserver observer, InvocationInfo invocationInfo);

[OneWay]
ValueTask<InvocationInfo?> RemoveInvocation();
Task<InvocationInfo?> RemoveInvocation();
}
19 changes: 17 additions & 2 deletions ManagedCode.Orleans.SignalR.Core/SignalR/Observers/Subscription.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ namespace ManagedCode.Orleans.SignalR.Core.SignalR.Observers;
public class Subscription : IDisposable
{
private readonly CancellationTokenSource _cts = new();
private readonly HashSet<IObserverConnectionManager> _grains = new();
private readonly SignalRObserver _observer;
private readonly IDisposable _timer;

Expand All @@ -20,14 +21,25 @@ public Subscription(SignalRObserver observer, TimeSpan pingTime)

public ISignalRObserver Reference { get; private set; }

public HashSet<IObserverConnectionManager> Grains { get; } = new();
public IReadOnlyCollection<IObserverConnectionManager> Grains => _grains;

public void Dispose()
{
_cts.Cancel();
_timer.Dispose();
_observer.Dispose();
Reference = null!;
_grains.Clear();
}

public void AddGrain(IObserverConnectionManager grain)
{
_grains.Add(grain);
}

public void RemoveGrain(IObserverConnectionManager grain)
{
_grains.Remove(grain);
}

private void Callback(object? state)
Expand All @@ -38,12 +50,15 @@ private void Callback(object? state)
if (token.IsCancellationRequested)
return;
foreach (var grain in Grains)
foreach (var grain in _grains)
{
if (token.IsCancellationRequested)
return;
await grain.Ping(Reference).ConfigureAwait(false);
if (token.IsCancellationRequested)
return;
}
}, _cts.Token).ConfigureAwait(false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using System.Threading;
using System.Threading.Tasks;
using ManagedCode.Orleans.SignalR.Core.Config;
using ManagedCode.Orleans.SignalR.Core.Helpers;
using ManagedCode.Orleans.SignalR.Core.Interfaces;
using ManagedCode.Orleans.SignalR.Core.Models;
using ManagedCode.Orleans.SignalR.Core.SignalR.Observers;
Expand All @@ -20,40 +21,41 @@ namespace ManagedCode.Orleans.SignalR.Core.SignalR;

public class OrleansHubLifetimeManager<THub> : HubLifetimeManager<THub> where THub : Hub
{
private readonly ILogger _logger;
private readonly IClusterClient _clusterClient;
private readonly HubConnectionStore _connections = new();
private readonly IOptions<HubOptions>? _globalHubOptions;
private readonly IOptions<HubOptions<THub>>? _hubOptions;
private readonly IOptions<OrleansSignalROptions> _options;
private readonly IOptions<HubOptions> _globalHubOptions;
private readonly IOptions<HubOptions<THub>> _hubOptions;
private readonly ILogger _logger;
private readonly IOptions<OrleansSignalROptions> _orleansSignalOptions;

public OrleansHubLifetimeManager(ILogger<OrleansHubLifetimeManager<THub>> logger,
IOptions<OrleansSignalROptions> options, IClusterClient clusterClient, IHubProtocolResolver hubProtocolResolver,
IOptions<HubOptions>? globalHubOptions, IOptions<HubOptions<THub>>? hubOptions)
public OrleansHubLifetimeManager(ILogger<OrleansHubLifetimeManager<THub>> logger, IClusterClient clusterClient,
IHubProtocolResolver hubProtocolResolver, IOptions<OrleansSignalROptions> orleansSignalOptions,
IOptions<HubOptions> globalHubOptions, IOptions<HubOptions<THub>> hubOptions)
{
_logger = logger;
_options = options;
_clusterClient = clusterClient;
_orleansSignalOptions = orleansSignalOptions;
_globalHubOptions = globalHubOptions;
_hubOptions = hubOptions;
_clusterClient = clusterClient;
}

public override async Task OnConnectedAsync(HubConnectionContext connection)
{
var connectionHolderGrain = NameHelperGenerator.GetConnectionHolderGrain<THub>(_clusterClient);

var subscription = CreateConnectionObserver(connection);

//Subscribe the instance to receive messages.
await connectionHolderGrain.AddConnection(connection.ConnectionId, subscription.Reference)
.ConfigureAwait(false);
subscription.Grains.Add(connectionHolderGrain);
if (_orleansSignalOptions.Value.KeepEachConnectionAlive)
{
var connectionHolderGrain = NameHelperGenerator.GetConnectionHolderGrain<THub>(_clusterClient);
await connectionHolderGrain.AddConnection(connection.ConnectionId, subscription.Reference)
.ConfigureAwait(false);
subscription.AddGrain(connectionHolderGrain);
}

if (!string.IsNullOrEmpty(connection.UserIdentifier))
{
var userGrain = NameHelperGenerator.GetSignalRUserGrain<THub>(_clusterClient, connection.UserIdentifier!);
await userGrain.AddConnection(connection.ConnectionId, subscription.Reference).ConfigureAwait(false);
subscription.Grains.Add(userGrain);
subscription.AddGrain(userGrain);
}

_connections.Add(connection);
Expand Down Expand Up @@ -162,7 +164,7 @@ public override async Task AddToGroupAsync(string connectionId, string groupName
var groupGrain = NameHelperGenerator.GetSignalRGroupGrain<THub>(_clusterClient, groupName);
await groupGrain.AddConnection(connectionId, subscription.Reference).ConfigureAwait(false);

subscription.Grains.Add(groupGrain);
subscription.AddGrain(groupGrain);
}

public override async Task RemoveFromGroupAsync(string connectionId, string groupName,
Expand All @@ -175,7 +177,7 @@ public override async Task RemoveFromGroupAsync(string connectionId, string grou
var groupGrain = NameHelperGenerator.GetSignalRGroupGrain<THub>(_clusterClient, groupName);
await groupGrain.RemoveConnection(connectionId, subscription.Reference).ConfigureAwait(false);

subscription.Grains.Remove(groupGrain);
subscription.RemoveGrain(groupGrain);
}

public override async Task<T> InvokeConnectionAsync<T>(string connectionId, string methodName, object?[] args,
Expand Down Expand Up @@ -206,7 +208,7 @@ public override async Task<T> InvokeConnectionAsync<T>(string connectionId, stri
});

var invocationGrain = NameHelperGenerator.GetInvocationGrain<THub>(_clusterClient, invocationId);
subscription.Grains.Add(invocationGrain);
subscription.AddGrain(invocationGrain);
await invocationGrain.AddInvocation(subscription.Reference,
new InvocationInfo(connectionId, invocationId, typeof(T)));

Expand Down Expand Up @@ -268,11 +270,20 @@ await NameHelperGenerator.GetInvocationGrain<THub>(_clusterClient, result.Invoca

public override bool TryGetReturnType(string invocationId, [NotNullWhen(true)] out Type? type)
{
var result = NameHelperGenerator.GetInvocationGrain<THub>(_clusterClient, invocationId).TryGetReturnType()
.Result;
var returnType = NameHelperGenerator.GetInvocationGrain<THub>(_clusterClient, invocationId).TryGetReturnType();

var timeSpan =
TimeIntervalHelper.GetClientTimeoutInterval(_orleansSignalOptions, _globalHubOptions, _hubOptions);
Task.WaitAny(returnType, Task.Delay(timeSpan * 0.8));

type = result.GetReturnType();
return result.Result;
if (returnType.IsCompleted)
{
type = returnType.Result.GetReturnType();
return returnType.Result.Result;
}

type = null;
return false;
}

private Subscription CreateConnectionObserver(HubConnectionContext connection)
Expand All @@ -282,15 +293,12 @@ private Subscription CreateConnectionObserver(HubConnectionContext connection)
return subscription;
}


private Subscription CreateSubscription(Func<HubMessage, Task>? onNextAction)
{
TimeSpan timeSpan = _globalHubOptions.Value.KeepAliveInterval.Value;
if (timeSpan > _hubOptions.Value.KeepAliveInterval)
{
timeSpan = _hubOptions.Value.KeepAliveInterval.Value;
}

var subscription = new Subscription(new SignalRObserver(onNextAction),timeSpan);
var timeSpan =
TimeIntervalHelper.GetClientTimeoutInterval(_orleansSignalOptions, _globalHubOptions, _hubOptions);
var subscription = new Subscription(new SignalRObserver(onNextAction), timeSpan * 0.8);
var reference = _clusterClient.CreateObjectReference<ISignalRObserver>(subscription.GetObserver());
subscription.SetReference(reference);
return subscription;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public static ISignalRServerBuilder AddOrleans(this ISignalRServerBuilder signal
signalrBuilder.Services.AddOptions<OrleansSignalROptions>().Configure(options);
signalrBuilder.Services.AddSingleton(typeof(HubLifetimeManager<>), typeof(OrleansHubLifetimeManager<>));
signalrBuilder.Services.AddSingleton(typeof(IOrleansHubContext<,>), typeof(OrleansHubContext<,>));

return signalrBuilder;
}
}
Loading

0 comments on commit 9e63685

Please sign in to comment.