Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ZiggyCreatures.FusionCache.slnx
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
<Folder Name="/src/">
<Project Path="src/ZiggyCreatures.FusionCache.AspNetCore.OutputCaching/ZiggyCreatures.FusionCache.AspNetCore.OutputCaching.csproj" />
<Project Path="src/ZiggyCreatures.FusionCache.Backplane.Memory/ZiggyCreatures.FusionCache.Backplane.Memory.csproj" />
<Project Path="src/ZiggyCreatures.FusionCache.Backplane.NATS/ZiggyCreatures.FusionCache.Backplane.NATS.csproj" />
<Project Path="src/ZiggyCreatures.FusionCache.Backplane.StackExchangeRedis/ZiggyCreatures.FusionCache.Backplane.StackExchangeRedis.csproj" />
<Project Path="src/ZiggyCreatures.FusionCache.Chaos/ZiggyCreatures.FusionCache.Chaos.csproj" />
<Project Path="src/ZiggyCreatures.FusionCache.Locking.AsyncKeyed/ZiggyCreatures.FusionCache.Locking.AsyncKeyed.csproj" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,6 @@ public async ValueTask PublishAsync(BackplaneMessage message, FusionCacheEntryOp
{
EnsureConnection();

if (message is null)
throw new ArgumentNullException(nameof(message));

if (message.IsValid() == false)
throw new InvalidOperationException("The message is invalid");

Expand All @@ -60,8 +57,6 @@ public async ValueTask PublishAsync(BackplaneMessage message, FusionCacheEntryOp
if (_logger?.IsEnabled(LogLevel.Trace) ?? false)
_logger.Log(LogLevel.Trace, "FUSION [N={CacheName} I={CacheInstanceId}] (K={CacheKey}): [BP] about to send a backplane notification to {BackplanesCount} backplanes (including self)", _subscriptionOptions?.CacheName, _subscriptionOptions?.CacheInstanceId, message.CacheKey, _subscribers.Count);

var payload = BackplaneMessage.ToByteArray(message);

foreach (var backplane in _subscribers)
{
token.ThrowIfCancellationRequested();
Expand All @@ -74,7 +69,7 @@ public async ValueTask PublishAsync(BackplaneMessage message, FusionCacheEntryOp
if (_logger?.IsEnabled(LogLevel.Trace) ?? false)
_logger.Log(LogLevel.Trace, "FUSION [N={CacheName} I={CacheInstanceId}] (K={CacheKey}): [BP] before sending a backplane notification to channel {BackplaneChannel}", _subscriptionOptions?.CacheName, _subscriptionOptions?.CacheInstanceId, message.CacheKey, backplane._channelName);

await backplane.OnMessageAsync(payload).ConfigureAwait(false);
await backplane.OnMessageAsync(message).ConfigureAwait(false);

if (_logger?.IsEnabled(LogLevel.Trace) ?? false)
_logger.Log(LogLevel.Trace, "FUSION [N={CacheName} I={CacheInstanceId}] (K={CacheKey}): [BP] after sending a backplane notification to channel {BackplaneChannel}", _subscriptionOptions?.CacheName, _subscriptionOptions?.CacheInstanceId, message.CacheKey, backplane._channelName);
Expand All @@ -87,10 +82,8 @@ public async ValueTask PublishAsync(BackplaneMessage message, FusionCacheEntryOp
}
}

internal async ValueTask OnMessageAsync(byte[] payload)
internal async ValueTask OnMessageAsync(BackplaneMessage message)
{
var message = BackplaneMessage.FromByteArray(payload);

var handler = _incomingMessageHandlerAsync;

if (handler is null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,13 +86,10 @@ public void Unsubscribe()
}

/// <inheritdoc/>
public void Publish(BackplaneMessage message, FusionCacheEntryOptions options, CancellationToken token = default)
public void Publish(in BackplaneMessage message, FusionCacheEntryOptions options, CancellationToken token = default)
{
EnsureConnection();

if (message is null)
throw new ArgumentNullException(nameof(message));

if (message.IsValid() == false)
throw new InvalidOperationException("The message is invalid");

Expand All @@ -102,8 +99,6 @@ public void Publish(BackplaneMessage message, FusionCacheEntryOptions options, C
if (_logger?.IsEnabled(LogLevel.Trace) ?? false)
_logger.Log(LogLevel.Trace, "FUSION [N={CacheName} I={CacheInstanceId}] (K={CacheKey}): [BP] about to send a backplane notification to {BackplanesCount} backplanes (including self)", _subscriptionOptions?.CacheName, _subscriptionOptions?.CacheInstanceId, message.CacheKey, _subscribers.Count);

var payload = BackplaneMessage.ToByteArray(message);

foreach (var backplane in _subscribers)
{
token.ThrowIfCancellationRequested();
Expand All @@ -116,7 +111,7 @@ public void Publish(BackplaneMessage message, FusionCacheEntryOptions options, C
if (_logger?.IsEnabled(LogLevel.Trace) ?? false)
_logger.Log(LogLevel.Trace, "FUSION [N={CacheName} I={CacheInstanceId}] (K={CacheKey}): [BP] before sending a backplane notification to channel {BackplaneChannel}", _subscriptionOptions?.CacheName, _subscriptionOptions?.CacheInstanceId, message.CacheKey, backplane._channelName);

backplane.OnMessage(payload);
backplane.OnMessage(message);

if (_logger?.IsEnabled(LogLevel.Trace) ?? false)
_logger.Log(LogLevel.Trace, "FUSION [N={CacheName} I={CacheInstanceId}] (K={CacheKey}): [BP] after sending a backplane notification to channel {BackplaneChannel}", _subscriptionOptions?.CacheName, _subscriptionOptions?.CacheInstanceId, message.CacheKey, backplane._channelName);
Expand All @@ -129,10 +124,8 @@ public void Publish(BackplaneMessage message, FusionCacheEntryOptions options, C
}
}

internal void OnMessage(byte[] payload)
internal void OnMessage(in BackplaneMessage message)
{
var message = BackplaneMessage.FromByteArray(payload);

var handler = _incomingMessageHandler;

if (handler is null)
Expand Down
142 changes: 142 additions & 0 deletions src/ZiggyCreatures.FusionCache.Backplane.NATS/NatsBackplane.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;

using NATS.Client.Core;

namespace ZiggyCreatures.Caching.Fusion.Backplane.NATS;

/// <summary>
/// A NATS based implementation of a FusionCache backplane.
/// </summary>
public partial class NatsBackplane
: IFusionCacheBackplane
{
private BackplaneSubscriptionOptions? _subscriptionOptions;
private readonly ILogger? _logger;
private INatsConnection _connection;
private string _channelName = "";
private Func<BackplaneMessage, ValueTask>? _incomingMessageHandlerAsync;
private INatsSub<NatsMemoryOwner<byte>>? _subscription;

/// <summary>
/// Initializes a new instance of the NatsBackplane class.
/// </summary>
/// <param name="natsConnection">The NATS connection instance to use.</param>
/// <param name="logger">The <see cref="ILogger{TCategoryName}"/> instance to use. If null, logging will be completely disabled.</param>
public NatsBackplane(INatsConnection? natsConnection, ILogger<NatsBackplane>? logger = null)
{
_connection = natsConnection ?? throw new ArgumentNullException(nameof(natsConnection));

// LOGGING
if (logger is NullLogger<NatsBackplane>)
{
// IGNORE NULL LOGGER (FOR BETTER PERF)
_logger = null;
}
else
{
_logger = logger;
}
}

/// <inheritdoc/>
public async ValueTask SubscribeAsync(BackplaneSubscriptionOptions subscriptionOptions)
{
if (subscriptionOptions is null)
throw new ArgumentNullException(nameof(subscriptionOptions));

if (subscriptionOptions.ChannelName is null)
throw new NullReferenceException("The BackplaneSubscriptionOptions.ChannelName cannot be null");

if (subscriptionOptions.IncomingMessageHandler is null)
throw new NullReferenceException("The BackplaneSubscriptionOptions.IncomingMessageHandler cannot be null");

if (subscriptionOptions.ConnectHandler is null)
throw new NullReferenceException("The BackplaneSubscriptionOptions.ConnectHandler cannot be null");

if (subscriptionOptions.IncomingMessageHandlerAsync is null)
throw new NullReferenceException("The BackplaneSubscriptionOptions.IncomingMessageHandlerAsync cannot be null");

if (subscriptionOptions.ConnectHandlerAsync is null)
throw new NullReferenceException("The BackplaneSubscriptionOptions.ConnectHandlerAsync cannot be null");

_subscriptionOptions = subscriptionOptions;

_channelName = _subscriptionOptions.ChannelName;
if (string.IsNullOrEmpty(_channelName))
throw new NullReferenceException("The backplane channel name must have a value");

_incomingMessageHandlerAsync = _subscriptionOptions.IncomingMessageHandlerAsync;
_subscription = await _connection.SubscribeCoreAsync<NatsMemoryOwner<byte>>(_channelName);
_ = Task.Run(async () =>
{
while (await _subscription.Msgs.WaitToReadAsync().ConfigureAwait(false))
{
while (_subscription.Msgs.TryRead(out var msg))
{
using (msg.Data)
{
if (BackplaneMessage.TryParse(msg.Data.Span, out BackplaneMessage message))
{
await OnMessageAsync(message).ConfigureAwait(false);
}
}
}
}
});
}


/// <inheritdoc/>
public void Subscribe(BackplaneSubscriptionOptions options)
{
// TODO: IS THERE A BETTER WAY INSTEAD OF SYNC OVER ASYNC ?
SubscribeAsync(options).GetAwaiter().GetResult();
}

/// <inheritdoc/>
public async ValueTask UnsubscribeAsync()
{
if (_subscription is not null)
{
await _subscription.UnsubscribeAsync().ConfigureAwait(false);
await _subscription.Msgs.Completion;
}
}

/// <inheritdoc/>
public void Unsubscribe()
{
// TODO: IS THERE A BETTER WAY INSTEAD OF SYNC OVER ASYNC ?
UnsubscribeAsync().GetAwaiter().GetResult();
}

/// <inheritdoc/>
public async ValueTask PublishAsync(BackplaneMessage message, FusionCacheEntryOptions options, CancellationToken token = default)
{
// TH TYPE NatsBufferWriter SEEMS TO BE DISPOSABLE: SHOULD IT BE DISPOSED?
var writer = new NatsBufferWriter<byte>();
message.WriteTo(writer);
await _connection.PublishAsync(_channelName, writer).ConfigureAwait(false);
}

/// <inheritdoc/>
public void Publish(in BackplaneMessage message, FusionCacheEntryOptions options, CancellationToken token = default)
{
// TODO: IS THERE A BETTER WAY INSTEAD OF SYNC OVER ASYNC ?
PublishAsync(message, options, token).GetAwaiter().GetResult();
}

internal async ValueTask OnMessageAsync(BackplaneMessage message)
{
var tmp = _incomingMessageHandlerAsync;
if (tmp is null)
{
if (_logger?.IsEnabled(LogLevel.Trace) ?? false)
_logger.Log(LogLevel.Trace, "FUSION [N={CacheName} I={CacheInstanceId}]: [BP] incoming message handler was null", _subscriptionOptions?.CacheName, _subscriptionOptions?.CacheInstanceId);
return;
}

await tmp(message).ConfigureAwait(false);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>netstandard2.0</TargetFramework>
<Version>2.1.0</Version>
<PackageId>ZiggyCreatures.FusionCache.Backplane.NATS</PackageId>
<Description>FusionCache backplane for NATS based on the NATS.Net library</Description>
<PackageTags>backplane;nats;synadia;caching;cache;hybrid;hybrid-cache;hybridcache;multi-level;multilevel;fusion;fusioncache;fusion-cache;performance;async;ziggy</PackageTags>
<RootNamespace>ZiggyCreatures.Caching.Fusion.Backplane.NATS</RootNamespace>
<GenerateDocumentationFile>true</GenerateDocumentationFile>
<PackageValidationBaselineVersion>1.0.0</PackageValidationBaselineVersion>
<TreatWarningsAsErrors>false</TreatWarningsAsErrors>
</PropertyGroup>

<ItemGroup>
<None Include="artwork\logo-128x128.png" Pack="true" PackagePath="\" />
<None Include="docs\README.md" Pack="true" PackagePath="\" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="NATS.Client.Core" Version="2.6.1" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\ZiggyCreatures.FusionCache\ZiggyCreatures.FusionCache.csproj" />
</ItemGroup>
</Project>
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
13 changes: 13 additions & 0 deletions src/ZiggyCreatures.FusionCache.Backplane.NATS/docs/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# FusionCache

![FusionCache logo](https://raw.githubusercontent.com/ZiggyCreatures/FusionCache/main/docs/logo-256x256.png)

### FusionCache is an easy to use, fast and robust hybrid cache with advanced resiliency features.

It was born after years of dealing with all sorts of different types of caches: memory caching, distributed caching, http caching, CDNs, browser cache, offline cache, you name it. So I've tried to put together these experiences and came up with FusionCache.

Find out [more](https://github.com/ZiggyCreatures/FusionCache).

## 📦 This package

This package is a backplane implementation on [NATS](https://nats.io/) based on the awesome [StackExchange.Redis](https://github.com/StackExchange/StackExchange.Redis) library.
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
using Microsoft.Extensions.Options;
using StackExchange.Redis;

using ZiggyCreatures.Caching.Fusion.Internals;

namespace ZiggyCreatures.Caching.Fusion.Backplane.StackExchangeRedis;

/// <summary>
Expand Down Expand Up @@ -93,26 +95,33 @@ private void Disconnect()
_connection = null;
}

private static BackplaneMessage? GetMessageFromRedisValue(RedisValue value, ILogger? logger, BackplaneSubscriptionOptions? subscriptionOptions)
private static bool TryGetMessageFromRedisValue(RedisValue value, ILogger? logger, BackplaneSubscriptionOptions? subscriptionOptions, out BackplaneMessage backplaneMessage)
{
try
{
return BackplaneMessage.FromByteArray(value);
byte[]? byteValue = value;
if (byteValue is not null && BackplaneMessage.TryParse(byteValue, out backplaneMessage))
{
return true;
}
}
catch (Exception exc)
{
if (logger?.IsEnabled(LogLevel.Warning) ?? false)
logger.Log(LogLevel.Warning, exc, "FUSION [N={CacheName} I={CacheInstanceId}]: [BP] an error occurred while converting a RedisValue into a BackplaneMessage", subscriptionOptions?.CacheName, subscriptionOptions?.CacheInstanceId);
}

return null;
backplaneMessage = default;
return false;
}

private static RedisValue GetRedisValueFromMessage(BackplaneMessage message, ILogger? logger, BackplaneSubscriptionOptions? subscriptionOptions)
{
try
{
return BackplaneMessage.ToByteArray(message);
using var arrayPoolBufferWriter = new ArrayPoolBufferWriter();
message.WriteTo(arrayPoolBufferWriter);
return arrayPoolBufferWriter.ToArray();
}
catch (Exception exc)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,14 +89,15 @@ public async ValueTask SubscribeAsync(BackplaneSubscriptionOptions subscriptionO

await _subscriber.SubscribeAsync(_channel, (rc, value) =>
{
var message = GetMessageFromRedisValue(value, _logger, _subscriptionOptions);
if (message is null)
return;

_ = Task.Run(async () =>
if (TryGetMessageFromRedisValue(value, _logger, _subscriptionOptions, out var message))
{
await OnMessageAsync(message).ConfigureAwait(false);
});
_ = Task.Run(async () =>
{
await OnMessageAsync(message).ConfigureAwait(false);
});
}

return;
}).ConfigureAwait(false);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,14 +90,15 @@ public void Subscribe(BackplaneSubscriptionOptions subscriptionOptions)

_subscriber.Subscribe(_channel, (rc, value) =>
{
var message = GetMessageFromRedisValue(value, _logger, _subscriptionOptions);
if (message is null)
return;

_ = Task.Run(async () =>
if (TryGetMessageFromRedisValue(value, _logger, _subscriptionOptions, out var message))
{
await OnMessageAsync(message).ConfigureAwait(false);
});
_ = Task.Run(async () =>
{
await OnMessageAsync(message).ConfigureAwait(false);
});
}

return;
});
}

Expand All @@ -116,7 +117,7 @@ public void Unsubscribe()
}

/// <inheritdoc/>
public void Publish(BackplaneMessage message, FusionCacheEntryOptions options, CancellationToken token = default)
public void Publish(in BackplaneMessage message, FusionCacheEntryOptions options, CancellationToken token = default)
{
// CONNECTION
EnsureConnection(token);
Expand Down
4 changes: 2 additions & 2 deletions src/ZiggyCreatures.FusionCache.Chaos/ChaosBackplane.cs
Original file line number Diff line number Diff line change
Expand Up @@ -113,10 +113,10 @@ public async ValueTask UnsubscribeAsync()
}

/// <inheritdoc/>
public void Publish(BackplaneMessage message, FusionCacheEntryOptions options, CancellationToken token = default)
public void Publish(in BackplaneMessage message, FusionCacheEntryOptions options, CancellationToken token = default)
{
MaybeChaos(token);
_innerBackplane.Publish(message, options, token);
_innerBackplane.Publish(in message, options, token);
}

/// <inheritdoc/>
Expand Down
Loading
Loading