diff --git a/src/Tingle.EventBus.Transports.InMemory/Client/InMemoryClient.cs b/src/Tingle.EventBus.Transports.InMemory/Client/InMemoryClient.cs
index a193ccf6..f1a3ebb0 100644
--- a/src/Tingle.EventBus.Transports.InMemory/Client/InMemoryClient.cs
+++ b/src/Tingle.EventBus.Transports.InMemory/Client/InMemoryClient.cs
@@ -26,7 +26,13 @@ public InMemoryClient(SequenceNumberGenerator sng)
public virtual InMemoryProcessor CreateProcessor(string queueName, InMemoryProcessorOptions options)
{
var channel = GetChannel(queueName);
- return new InMemoryProcessor(entityPath: queueName, reader: channel.Reader);
+ var entityPath = queueName;
+ entityPath += options.SubQueue switch
+ {
+ InMemoryProcessorSubQueue.DeadLetter => "/$DeadLetter",
+ _ => "",
+ };
+ return new InMemoryProcessor(entityPath: entityPath, reader: channel.Reader);
}
///
@@ -44,6 +50,11 @@ public virtual InMemoryProcessor CreateProcessor(string topicName, string subscr
{
var parent = GetChannel(entityPath: topicName, broadcast: true);
var entityPath = $"{topicName}/Subscriptions/{subscriptionName}";
+ entityPath += options.SubQueue switch
+ {
+ InMemoryProcessorSubQueue.DeadLetter => "/$DeadLetter",
+ _ => "",
+ };
var channel = GetChannel(entityPath: entityPath);
((BroadcastChannelWriter)parent.Writer).Children.Add(channel.Writer);
return new InMemoryProcessor(entityPath: entityPath, reader: channel.Reader);
diff --git a/src/Tingle.EventBus.Transports.InMemory/Client/InMemoryMessage.cs b/src/Tingle.EventBus.Transports.InMemory/Client/InMemoryMessage.cs
index 268b882e..7253ba58 100644
--- a/src/Tingle.EventBus.Transports.InMemory/Client/InMemoryMessage.cs
+++ b/src/Tingle.EventBus.Transports.InMemory/Client/InMemoryMessage.cs
@@ -27,6 +27,16 @@ public InMemoryMessage(BinaryData body)
Body = body ?? throw new ArgumentNullException(nameof(body));
}
+ internal InMemoryMessage(InMemoryReceivedMessage message) : this(message?.Body ?? throw new ArgumentNullException(nameof(message)))
+ {
+ ContentType = message.ContentType;
+ CorrelationId = message.CorrelationId;
+ MessageId = message.MessageId;
+ SequenceNumber = message.SequenceNumber;
+ Properties = message.Properties;
+ Scheduled = message.Scheduled;
+ }
+
///
/// Gets or sets the content type descriptor.
///
diff --git a/src/Tingle.EventBus.Transports.InMemory/Client/InMemoryProcessorOptions.cs b/src/Tingle.EventBus.Transports.InMemory/Client/InMemoryProcessorOptions.cs
index 65252396..f8b786ec 100644
--- a/src/Tingle.EventBus.Transports.InMemory/Client/InMemoryProcessorOptions.cs
+++ b/src/Tingle.EventBus.Transports.InMemory/Client/InMemoryProcessorOptions.cs
@@ -2,4 +2,11 @@
internal class InMemoryProcessorOptions
{
+ public InMemoryProcessorSubQueue SubQueue { get; set; } = InMemoryProcessorSubQueue.None;
+}
+
+internal enum InMemoryProcessorSubQueue
+{
+ None,
+ DeadLetter,
}
diff --git a/src/Tingle.EventBus.Transports.InMemory/InMemoryTransport.cs b/src/Tingle.EventBus.Transports.InMemory/InMemoryTransport.cs
index 2e1f61a3..12ea2c14 100644
--- a/src/Tingle.EventBus.Transports.InMemory/InMemoryTransport.cs
+++ b/src/Tingle.EventBus.Transports.InMemory/InMemoryTransport.cs
@@ -17,9 +17,9 @@ namespace Tingle.EventBus.Transports.InMemory;
///
public class InMemoryTransport : EventBusTransport
{
- private readonly EventBusConcurrentDictionary sendersCache = new();
+ private readonly EventBusConcurrentDictionary<(Type, bool), InMemorySender> sendersCache = new();
private readonly EventBusConcurrentDictionary processorsCache = new();
- private readonly InMemoryClient inMemoryClient;
+ private readonly InMemoryClient client;
private readonly ConcurrentBag published = new();
private readonly ConcurrentBag cancelled = new();
@@ -41,7 +41,7 @@ public InMemoryTransport(IServiceScopeFactory serviceScopeFactory,
SequenceNumberGenerator sng)
: base(serviceScopeFactory, busOptionsAccessor, optionsMonitor, loggerFactory)
{
- inMemoryClient = new InMemoryClient(sng);
+ client = new InMemoryClient(sng);
}
///
@@ -155,7 +155,7 @@ protected override async Task StopCoreAsync(CancellationToken cancellationToken)
published.Add(@event);
// Get the queue and send the message accordingly
- var sender = await GetSenderAsync(registration, cancellationToken).ConfigureAwait(false);
+ var sender = await GetSenderAsync(registration, deadletter: false, cancellationToken).ConfigureAwait(false);
Logger.SendingMessage(eventBusId: @event.Id, entityPath: sender.EntityPath, scheduled: scheduled);
if (scheduled != null)
{
@@ -217,7 +217,7 @@ protected override async Task StopCoreAsync(CancellationToken cancellationToken)
AddBatch(published, events);
// Get the queue and send the message accordingly
- var sender = await GetSenderAsync(registration, cancellationToken).ConfigureAwait(false);
+ var sender = await GetSenderAsync(registration, deadletter: false, cancellationToken).ConfigureAwait(false);
Logger.SendingMessages(events: events, entityPath: sender.EntityPath, scheduled: scheduled);
if (scheduled != null)
{
@@ -247,7 +247,7 @@ protected override async Task CancelCoreAsync(string id,
}
// get the entity and cancel the message accordingly
- var sender = await GetSenderAsync(registration, cancellationToken).ConfigureAwait(false);
+ var sender = await GetSenderAsync(registration, deadletter: false, cancellationToken).ConfigureAwait(false);
Logger.CancelingMessage(sequenceNumber: seqNum, entityPath: sender.EntityPath);
await sender.CancelScheduledMessageAsync(sequenceNumber: seqNum, cancellationToken: cancellationToken).ConfigureAwait(false);
@@ -272,7 +272,7 @@ protected override async Task CancelCoreAsync(IList ids,
}).ToList();
// get the entity and cancel the message accordingly
- var sender = await GetSenderAsync(registration, cancellationToken).ConfigureAwait(false);
+ var sender = await GetSenderAsync(registration, deadletter: false, cancellationToken).ConfigureAwait(false);
Logger.CancelingMessages(sequenceNumbers: seqNums, entityPath: sender.EntityPath);
await sender.CancelScheduledMessagesAsync(sequenceNumbers: seqNums, cancellationToken: cancellationToken).ConfigureAwait(false);
@@ -280,14 +280,14 @@ protected override async Task CancelCoreAsync(IList ids,
AddBatch(cancelled, seqNums);
}
- private Task GetSenderAsync(EventRegistration reg, CancellationToken cancellationToken)
+ private Task GetSenderAsync(EventRegistration reg, bool deadletter, CancellationToken cancellationToken)
{
- Task creator(Type _, CancellationToken ct)
+ Task creator((Type, bool) _, CancellationToken ct)
{
- var sender = inMemoryClient.CreateSender(name: reg.EventName!, broadcast: reg.EntityKind == EntityKind.Broadcast);
+ var sender = client.CreateSender(name: reg.EventName!, broadcast: reg.EntityKind == EntityKind.Broadcast);
return Task.FromResult(sender);
}
- return sendersCache.GetOrAddAsync(reg.EventType, creator, cancellationToken);
+ return sendersCache.GetOrAddAsync((reg.EventType, deadletter), creator, cancellationToken);
}
private Task GetProcessorAsync(EventRegistration reg, EventConsumerRegistration ecr, CancellationToken cancellationToken)
@@ -308,13 +308,13 @@ Task creator(string _, CancellationToken ct)
{
// Create the processor for the Queue
Logger.CreatingQueueProcessor(queueName: topicName);
- processor = inMemoryClient.CreateProcessor(queueName: topicName, options: inpo); // TODO: support deadletter
+ processor = client.CreateProcessor(queueName: topicName, options: inpo);
}
else
{
// Create the processor for the Subscription
Logger.CreatingSubscriptionProcessor(topicName: topicName, subscriptionName: subscriptionName);
- processor = inMemoryClient.CreateProcessor(topicName: topicName, subscriptionName: subscriptionName, options: inpo); // TODO: support deadletter
+ processor = client.CreateProcessor(topicName: topicName, subscriptionName: subscriptionName, options: inpo);
}
return Task.FromResult(processor);
@@ -376,15 +376,18 @@ private async Task OnMessageReceivedAsync(EventRegistration r
scope: scope,
cancellationToken: cancellationToken).ConfigureAwait(false);
- if (successful)
- {
- // Add to Consumed list
- consumed.Add(context);
- }
- else
+ // Add to Consumed/Failed list
+ if (successful) consumed.Add(context);
+ else failed.Add(context);
+
+ // dead-letter cannot be dead-lettered again, what else can we do?
+ if (ecr.Deadletter) return; // TODO: figure out what to do when dead-letter fails
+
+ if (!successful && ecr.UnhandledErrorBehaviour == UnhandledConsumerErrorBehaviour.Deadletter)
{
- // Add to failed list
- failed.Add(context);
+ // get the client for the dead letter queue and send the message there
+ var sender = await GetSenderAsync(reg, deadletter: true, cancellationToken).ConfigureAwait(false);
+ await sender.SendMessageAsync(new(message), cancellationToken).ConfigureAwait(false);
}
}
diff --git a/src/Tingle.EventBus.Transports.InMemory/InMemoryTransportConfigureOptions.cs b/src/Tingle.EventBus.Transports.InMemory/InMemoryTransportConfigureOptions.cs
index 8a13611d..e95a31cd 100644
--- a/src/Tingle.EventBus.Transports.InMemory/InMemoryTransportConfigureOptions.cs
+++ b/src/Tingle.EventBus.Transports.InMemory/InMemoryTransportConfigureOptions.cs
@@ -31,17 +31,6 @@ public override void PostConfigure(string? name, InMemoryTransportOptions option
// Ensure the entity type is allowed
options.EnsureAllowedEntityKind(reg, EntityKind.Broadcast, EntityKind.Queue);
-
- // This does not support dead-letter yet
- foreach (var ecr in reg.Consumers)
- {
- if (ecr.Deadletter)
- {
- throw new InvalidOperationException($"ConsumerName '{ecr.ConsumerName}' is setup for dead-letter but the InMemory "
- + "implementation doesn't yet support it.");
- }
- }
-
}
}
}
diff --git a/tests/Tingle.EventBus.Transports.InMemory.Tests/SampleEventConsumerTests.cs b/tests/Tingle.EventBus.Transports.InMemory.Tests/SampleEventConsumerTests.cs
index 8fccf314..9a63a1e8 100644
--- a/tests/Tingle.EventBus.Transports.InMemory.Tests/SampleEventConsumerTests.cs
+++ b/tests/Tingle.EventBus.Transports.InMemory.Tests/SampleEventConsumerTests.cs
@@ -1,17 +1,27 @@
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
+using Microsoft.Extensions.Logging;
using SimpleConsumer;
using Tingle.EventBus.Transports.InMemory;
+using Xunit.Abstractions;
namespace Tingle.EventBus.Tests.InMemory;
public class SampleEventConsumerTests
{
+ private readonly ITestOutputHelper outputHelper;
+
+ public SampleEventConsumerTests(ITestOutputHelper outputHelper)
+ {
+ this.outputHelper = outputHelper ?? throw new ArgumentNullException(nameof(outputHelper));
+ }
+
[Fact]
public async Task ConsumerWorksAsync()
{
var counter = new EventCounter();
var host = Host.CreateDefaultBuilder()
+ .ConfigureLogging((context, builder) => builder.AddXUnit(outputHelper))
.ConfigureServices((context, services) =>
{
services.AddSingleton(counter);