Skip to content

Commit

Permalink
Support deadletter entities with InMemory transport (#530)
Browse files Browse the repository at this point in the history
  • Loading branch information
mburumaxwell authored Jul 6, 2023
1 parent 025d144 commit bb0423b
Show file tree
Hide file tree
Showing 6 changed files with 63 additions and 33 deletions.
13 changes: 12 additions & 1 deletion src/Tingle.EventBus.Transports.InMemory/Client/InMemoryClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,13 @@ public InMemoryClient(SequenceNumberGenerator sng)
public virtual InMemoryProcessor CreateProcessor(string queueName, InMemoryProcessorOptions options)
{
var channel = GetChannel(queueName);
return new InMemoryProcessor(entityPath: queueName, reader: channel.Reader);
var entityPath = queueName;
entityPath += options.SubQueue switch
{
InMemoryProcessorSubQueue.DeadLetter => "/$DeadLetter",
_ => "",
};
return new InMemoryProcessor(entityPath: entityPath, reader: channel.Reader);
}

/// <summary>
Expand All @@ -44,6 +50,11 @@ public virtual InMemoryProcessor CreateProcessor(string topicName, string subscr
{
var parent = GetChannel(entityPath: topicName, broadcast: true);
var entityPath = $"{topicName}/Subscriptions/{subscriptionName}";
entityPath += options.SubQueue switch
{
InMemoryProcessorSubQueue.DeadLetter => "/$DeadLetter",
_ => "",
};
var channel = GetChannel(entityPath: entityPath);
((BroadcastChannelWriter<InMemoryMessage>)parent.Writer).Children.Add(channel.Writer);
return new InMemoryProcessor(entityPath: entityPath, reader: channel.Reader);
Expand Down
10 changes: 10 additions & 0 deletions src/Tingle.EventBus.Transports.InMemory/Client/InMemoryMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,16 @@ public InMemoryMessage(BinaryData body)
Body = body ?? throw new ArgumentNullException(nameof(body));
}

internal InMemoryMessage(InMemoryReceivedMessage message) : this(message?.Body ?? throw new ArgumentNullException(nameof(message)))
{
ContentType = message.ContentType;
CorrelationId = message.CorrelationId;
MessageId = message.MessageId;
SequenceNumber = message.SequenceNumber;
Properties = message.Properties;
Scheduled = message.Scheduled;
}

/// <summary>
/// Gets or sets the content type descriptor.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,11 @@

internal class InMemoryProcessorOptions
{
public InMemoryProcessorSubQueue SubQueue { get; set; } = InMemoryProcessorSubQueue.None;
}

internal enum InMemoryProcessorSubQueue
{
None,
DeadLetter,
}
45 changes: 24 additions & 21 deletions src/Tingle.EventBus.Transports.InMemory/InMemoryTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ namespace Tingle.EventBus.Transports.InMemory;
/// </summary>
public class InMemoryTransport : EventBusTransport<InMemoryTransportOptions>
{
private readonly EventBusConcurrentDictionary<Type, InMemorySender> sendersCache = new();
private readonly EventBusConcurrentDictionary<(Type, bool), InMemorySender> sendersCache = new();
private readonly EventBusConcurrentDictionary<string, InMemoryProcessor> processorsCache = new();
private readonly InMemoryClient inMemoryClient;
private readonly InMemoryClient client;

private readonly ConcurrentBag<EventContext> published = new();
private readonly ConcurrentBag<long> cancelled = new();
Expand All @@ -41,7 +41,7 @@ public InMemoryTransport(IServiceScopeFactory serviceScopeFactory,
SequenceNumberGenerator sng)
: base(serviceScopeFactory, busOptionsAccessor, optionsMonitor, loggerFactory)
{
inMemoryClient = new InMemoryClient(sng);
client = new InMemoryClient(sng);
}

/// <summary>
Expand Down Expand Up @@ -155,7 +155,7 @@ protected override async Task StopCoreAsync(CancellationToken cancellationToken)
published.Add(@event);

// Get the queue and send the message accordingly
var sender = await GetSenderAsync(registration, cancellationToken).ConfigureAwait(false);
var sender = await GetSenderAsync(registration, deadletter: false, cancellationToken).ConfigureAwait(false);
Logger.SendingMessage(eventBusId: @event.Id, entityPath: sender.EntityPath, scheduled: scheduled);
if (scheduled != null)
{
Expand Down Expand Up @@ -217,7 +217,7 @@ protected override async Task StopCoreAsync(CancellationToken cancellationToken)
AddBatch(published, events);

// Get the queue and send the message accordingly
var sender = await GetSenderAsync(registration, cancellationToken).ConfigureAwait(false);
var sender = await GetSenderAsync(registration, deadletter: false, cancellationToken).ConfigureAwait(false);
Logger.SendingMessages(events: events, entityPath: sender.EntityPath, scheduled: scheduled);
if (scheduled != null)
{
Expand Down Expand Up @@ -247,7 +247,7 @@ protected override async Task CancelCoreAsync<TEvent>(string id,
}

// get the entity and cancel the message accordingly
var sender = await GetSenderAsync(registration, cancellationToken).ConfigureAwait(false);
var sender = await GetSenderAsync(registration, deadletter: false, cancellationToken).ConfigureAwait(false);
Logger.CancelingMessage(sequenceNumber: seqNum, entityPath: sender.EntityPath);
await sender.CancelScheduledMessageAsync(sequenceNumber: seqNum, cancellationToken: cancellationToken).ConfigureAwait(false);

Expand All @@ -272,22 +272,22 @@ protected override async Task CancelCoreAsync<TEvent>(IList<string> ids,
}).ToList();

// get the entity and cancel the message accordingly
var sender = await GetSenderAsync(registration, cancellationToken).ConfigureAwait(false);
var sender = await GetSenderAsync(registration, deadletter: false, cancellationToken).ConfigureAwait(false);
Logger.CancelingMessages(sequenceNumbers: seqNums, entityPath: sender.EntityPath);
await sender.CancelScheduledMessagesAsync(sequenceNumbers: seqNums, cancellationToken: cancellationToken).ConfigureAwait(false);

// Add to cancelled list
AddBatch(cancelled, seqNums);
}

private Task<InMemorySender> GetSenderAsync(EventRegistration reg, CancellationToken cancellationToken)
private Task<InMemorySender> GetSenderAsync(EventRegistration reg, bool deadletter, CancellationToken cancellationToken)
{
Task<InMemorySender> creator(Type _, CancellationToken ct)
Task<InMemorySender> creator((Type, bool) _, CancellationToken ct)
{
var sender = inMemoryClient.CreateSender(name: reg.EventName!, broadcast: reg.EntityKind == EntityKind.Broadcast);
var sender = client.CreateSender(name: reg.EventName!, broadcast: reg.EntityKind == EntityKind.Broadcast);
return Task.FromResult(sender);
}
return sendersCache.GetOrAddAsync(reg.EventType, creator, cancellationToken);
return sendersCache.GetOrAddAsync((reg.EventType, deadletter), creator, cancellationToken);
}

private Task<InMemoryProcessor> GetProcessorAsync(EventRegistration reg, EventConsumerRegistration ecr, CancellationToken cancellationToken)
Expand All @@ -308,13 +308,13 @@ Task<InMemoryProcessor> creator(string _, CancellationToken ct)
{
// Create the processor for the Queue
Logger.CreatingQueueProcessor(queueName: topicName);
processor = inMemoryClient.CreateProcessor(queueName: topicName, options: inpo); // TODO: support deadletter
processor = client.CreateProcessor(queueName: topicName, options: inpo);
}
else
{
// Create the processor for the Subscription
Logger.CreatingSubscriptionProcessor(topicName: topicName, subscriptionName: subscriptionName);
processor = inMemoryClient.CreateProcessor(topicName: topicName, subscriptionName: subscriptionName, options: inpo); // TODO: support deadletter
processor = client.CreateProcessor(topicName: topicName, subscriptionName: subscriptionName, options: inpo);
}

return Task.FromResult(processor);
Expand Down Expand Up @@ -376,15 +376,18 @@ private async Task OnMessageReceivedAsync<TEvent, TConsumer>(EventRegistration r
scope: scope,
cancellationToken: cancellationToken).ConfigureAwait(false);

if (successful)
{
// Add to Consumed list
consumed.Add(context);
}
else
// Add to Consumed/Failed list
if (successful) consumed.Add(context);
else failed.Add(context);

// dead-letter cannot be dead-lettered again, what else can we do?
if (ecr.Deadletter) return; // TODO: figure out what to do when dead-letter fails

if (!successful && ecr.UnhandledErrorBehaviour == UnhandledConsumerErrorBehaviour.Deadletter)
{
// Add to failed list
failed.Add(context);
// get the client for the dead letter queue and send the message there
var sender = await GetSenderAsync(reg, deadletter: true, cancellationToken).ConfigureAwait(false);
await sender.SendMessageAsync(new(message), cancellationToken).ConfigureAwait(false);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,6 @@ public override void PostConfigure(string? name, InMemoryTransportOptions option

// Ensure the entity type is allowed
options.EnsureAllowedEntityKind(reg, EntityKind.Broadcast, EntityKind.Queue);

// This does not support dead-letter yet
foreach (var ecr in reg.Consumers)
{
if (ecr.Deadletter)
{
throw new InvalidOperationException($"ConsumerName '{ecr.ConsumerName}' is setup for dead-letter but the InMemory "
+ "implementation doesn't yet support it.");
}
}

}
}
}
Original file line number Diff line number Diff line change
@@ -1,17 +1,27 @@
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using SimpleConsumer;
using Tingle.EventBus.Transports.InMemory;
using Xunit.Abstractions;

namespace Tingle.EventBus.Tests.InMemory;

public class SampleEventConsumerTests
{
private readonly ITestOutputHelper outputHelper;

public SampleEventConsumerTests(ITestOutputHelper outputHelper)
{
this.outputHelper = outputHelper ?? throw new ArgumentNullException(nameof(outputHelper));
}

[Fact]
public async Task ConsumerWorksAsync()
{
var counter = new EventCounter();
var host = Host.CreateDefaultBuilder()
.ConfigureLogging((context, builder) => builder.AddXUnit(outputHelper))
.ConfigureServices((context, services) =>
{
services.AddSingleton(counter);
Expand Down

0 comments on commit bb0423b

Please sign in to comment.