Skip to content

Commit

Permalink
Migrate to resilience policies in Polly v8 with a new example (#575)
Browse files Browse the repository at this point in the history
  • Loading branch information
mburumaxwell authored Dec 12, 2023
1 parent 7e73742 commit 33be58f
Show file tree
Hide file tree
Showing 10 changed files with 122 additions and 101 deletions.
32 changes: 19 additions & 13 deletions src/Tingle.EventBus.Transports.RabbitMQ/RabbitMqTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public class RabbitMqTransport : EventBusTransport<RabbitMqTransportOptions>, ID
{
private readonly SemaphoreSlim connectionLock = new(1, 1);
private readonly EventBusConcurrentDictionary<string, IModel> subscriptionChannelsCache = new();
private RetryPolicy? retryPolicy;
private ResiliencePipeline? resiliencePipeline;

private IConnection? connection;
private bool disposed;
Expand Down Expand Up @@ -98,7 +98,7 @@ protected override async Task StopCoreAsync(CancellationToken cancellationToken)

// publish message
string? scheduledId = null;
GetRetryPolicy().Execute(() =>
GetResiliencePipeline().Execute(() =>
{
// setup properties
var properties = channel.CreateBasicProperties();
Expand Down Expand Up @@ -173,7 +173,7 @@ protected override async Task StopCoreAsync(CancellationToken cancellationToken)
serializedEvents.Add((@event, @event.ContentType, body));
}

GetRetryPolicy().Execute(() =>
GetResiliencePipeline().Execute(() =>
{
var batch = channel.CreateBasicPublishBatch();
foreach (var (@event, contentType, body) in serializedEvents)
Expand Down Expand Up @@ -241,16 +241,22 @@ protected override Task CancelCoreAsync<TEvent>(IList<string> ids,
throw new NotSupportedException("RabbitMQ does not support canceling published messages.");
}

private RetryPolicy GetRetryPolicy()
private ResiliencePipeline GetResiliencePipeline()
{
return retryPolicy ??= Policy.Handle<BrokerUnreachableException>()
.Or<SocketException>()
.WaitAndRetry(retryCount: Options.RetryCount,
sleepDurationProvider: attempt => TimeSpan.FromSeconds(Math.Pow(2, attempt)),
onRetry: (ex, time) =>
{
Logger.LogError(ex, "RabbitMQ Client could not connect after {Timeout:n1}s ", time.TotalSeconds);
});
return resiliencePipeline ??= new ResiliencePipelineBuilder()
.AddRetry(new RetryStrategyOptions
{
ShouldHandle = new PredicateBuilder().Handle<BrokerUnreachableException>()
.Handle<SocketException>(),
DelayGenerator = args => new ValueTask<TimeSpan?>(TimeSpan.FromSeconds(Math.Pow(2, args.AttemptNumber))),
MaxRetryAttempts = Options.RetryCount,
OnRetry = args =>
{
Logger.LogError(args.Outcome.Exception, "RabbitMQ Client could not connect after {Timeout:n1}s", args.RetryDelay.TotalSeconds);
return new ValueTask();
},
})
.Build();
}

private async Task ConnectConsumersAsync(CancellationToken cancellationToken)
Expand Down Expand Up @@ -397,7 +403,7 @@ private async Task<bool> TryConnectAsync(CancellationToken cancellationToken)
return true;
}

GetRetryPolicy().Execute(() =>
GetResiliencePipeline().Execute(() =>
{
connection = Options.ConnectionFactory!.CreateConnection();
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public EventConsumerRegistration(Type consumerType, bool deadletter)
/// Defaults to <see langword="null"/>.
/// When this value is set, it overrides the default value set on the transport or the bus.
/// <br/>
/// When a retry policy is in force, only errors not handled by it will be subject to the value set here.
/// When a resilience pipeline is in force, only errors not handled by it will be subject to the value set here.
/// </summary>
public UnhandledConsumerErrorBehaviour? UnhandledErrorBehaviour { get; set; }

Expand Down
18 changes: 9 additions & 9 deletions src/Tingle.EventBus/Configuration/EventRegistration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -67,17 +67,17 @@ public EventRegistration(Type eventType)
public TimeSpan? DuplicateDetectionDuration { get; set; }

/// <summary>
/// The retry policy to apply specifically for this event.
/// The resiliency pipeline to apply specifically for this event.
/// This is in addition to what may be provided by the SDKs for each transport.
/// When provided alongside policies on the transport and the bus, it is used as the inner most policy.
/// When provided alongside pipelines on the transport and the bus, it is used as the inner most pipeline.
/// </summary>
/// <remarks>
/// When a value is provided, the transport may extend the lock for the
/// message during consumption until the execution with retry policy completes successfully or not.
/// message during consumption until the execution with resiliency pipeline completes successfully or not.
/// In such a case, ensure the execution timeout (sometimes called the visibility timeout
/// or lock duration) is set to accommodate the longest possible duration of the retry policy.
/// or lock duration) is set to accommodate the longest possible duration of the resiliency pipeline.
/// </remarks>
public AsyncPolicy? RetryPolicy { get; set; }
public ResiliencePipeline? ResiliencePipeline { get; set; }

/// <summary>
/// The list of consumers registered for this event.
Expand All @@ -93,11 +93,11 @@ public EventRegistration(Type eventType)
/// </summary>
public IDictionary<string, object> Metadata { get; set; } = new Dictionary<string, object>();

/// <summary>Whether the execution policies have been merged.</summary>
internal bool MergedExecutionPolicies { get; set; } = false;
/// <summary>Whether the execution pipelines have been merged.</summary>
internal bool MergedExecutionPipelines { get; set; } = false;

/// <summary>The final policy used in executions for the event and it's consumers.</summary>
internal IAsyncPolicy ExecutionPolicy { get; set; } = Policy.NoOpAsync();
/// <summary>The final resilience pipeline used in executions for the event and it's consumers.</summary>
internal ResiliencePipeline ExecutionPipeline { get; set; } = ResiliencePipeline.Empty;

#region Equality Overrides

Expand Down
12 changes: 6 additions & 6 deletions src/Tingle.EventBus/DependencyInjection/EventBusOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,15 @@ public class EventBusOptions
public bool DefaultTransportWaitStarted { get; set; } = true;

/// <summary>
/// Optional retry policy to apply to the bus.
/// When provided alongside policies on the transport and the event registration, it is used as the putter most policy.
/// Optional resilience pipeline to apply to the bus.
/// When provided alongside pipelines on the transport and the event registration, it is used as the putter most pipeline.
/// Defaults to <see langword="null"/>.
/// </summary>
/// <remarks>
/// To specify a value on an event registration, use <see cref="EventRegistration.RetryPolicy"/>.
/// To specify a value on a transport, use <see cref="EventBusTransportOptions.RetryPolicy"/> for the specific transport.
/// To specify a value on an event registration, use <see cref="EventRegistration.ResiliencePipeline"/>.
/// To specify a value on a transport, use <see cref="EventBusTransportOptions.ResiliencePipeline"/> for the specific transport.
/// </remarks>
public AsyncPolicy? RetryPolicy { get; set; }
public ResiliencePipeline? ResiliencePipeline { get; set; }

/// <summary>
/// Optional default format to use for generated event identifiers when for events where it is not specified.
Expand Down Expand Up @@ -68,7 +68,7 @@ public class EventBusOptions
/// Optional default behaviour for errors encountered in a consumer but are not handled.
/// To specify a value per consumer, use the <see cref="EventConsumerRegistration.UnhandledErrorBehaviour"/> option.
/// To specify a value per transport, use the <see cref="EventBusTransportOptions.DefaultUnhandledConsumerErrorBehaviour"/> option on the specific transport.
/// When an <see cref="AsyncPolicy"/> is in force, only errors that are not handled by it will be subject to the value set here.
/// When an <see cref="ResiliencePipeline"/> is in force, only errors that are not handled by it will be subject to the value set here.
/// Defaults to <see langword="null"/>.
/// </summary>
public UnhandledConsumerErrorBehaviour? DefaultUnhandledConsumerErrorBehaviour { get; set; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public async Task<TValue> GetOrAddAsync(TKey key, Func<TKey, CancellationToken,
tcs.SetException(ex);

// We remove the entry if the factory failed so it's not a permanent failure
// and future gets can retry (this could be a policy)
// and future gets can retry (this could be a resilience pipeline)
TryRemove(key, out _);
throw;
}
Expand Down
38 changes: 0 additions & 38 deletions src/Tingle.EventBus/Internal/PollyHelper.cs

This file was deleted.

53 changes: 53 additions & 0 deletions src/Tingle.EventBus/Internal/ResiliencePipelineHelper.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
using Microsoft.Extensions.DependencyInjection;
using Polly;
using Tingle.EventBus.Configuration;
using Tingle.EventBus.Transports;

namespace Tingle.EventBus.Internal;

internal static class ResiliencePipelineHelper
{
public static void CombineIfNeeded(EventBusOptions busOptions, EventBusTransportOptions transportOptions, EventRegistration registration)
{
if (busOptions is null) throw new ArgumentNullException(nameof(busOptions));
if (transportOptions is null) throw new ArgumentNullException(nameof(transportOptions));
if (registration is null) throw new ArgumentNullException(nameof(registration));

// if the pipelines have been merged, there is no need to repeat the process
if (registration.MergedExecutionPipelines) return;

registration.ExecutionPipeline = Combine(busOptions, transportOptions, registration);
registration.MergedExecutionPipelines = true;
}

private static ResiliencePipeline Combine(EventBusOptions busOptions, EventBusTransportOptions transportOptions, EventRegistration registration)
{
var pipelines = new ResiliencePipeline?[] {
busOptions.ResiliencePipeline, // outer
transportOptions.ResiliencePipeline,
registration.ResiliencePipeline, // inner
}.Where(p => p is not null).Select(p => p!).ToArray();

return pipelines.Length switch
{
0 => ResiliencePipeline.Empty, // if there are none, return empty
1 => pipelines[0], // a single pipeline can just be used (no need to combine)
_ => new ResiliencePipelineBuilder().AddPipelines(pipelines).Build(), // more than one needs to be combined
};
}

private static ResiliencePipelineBuilder AddPipelines(this ResiliencePipelineBuilder builder, params ResiliencePipeline?[]? pipelines)
{
if (pipelines is null) return builder;

foreach (var pipeline in pipelines)
{
if (pipeline is not null)
{
builder.AddPipeline(pipeline);
}
}

return builder;
}
}
4 changes: 2 additions & 2 deletions src/Tingle.EventBus/Tingle.EventBus.csproj
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
<Project Sdk="Microsoft.NET.Sdk">
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<Description>Event-Based framework for distributed applications.</Description>
Expand All @@ -13,8 +13,8 @@
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" Version="8.0.0" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="8.0.0" />
<PackageReference Include="Microsoft.Extensions.Options" Version="8.0.0" />
<PackageReference Include="Polly" Version="8.2.0" />
<PackageReference Include="Polly.Contrib.WaitAndRetry" Version="1.1.1" />
<PackageReference Include="Polly.Core" Version="8.2.0" />
<PackageReference Include="System.Memory.Data" Version="8.0.0" />
</ItemGroup>

Expand Down
Loading

0 comments on commit 33be58f

Please sign in to comment.