diff --git a/src/Tingle.EventBus.Transports.Amazon.Kinesis/AmazonKinesisConfigureOptions.cs b/src/Tingle.EventBus.Transports.Amazon.Kinesis/AmazonKinesisConfigureOptions.cs
index 646d6aa1..4ff3ecfd 100644
--- a/src/Tingle.EventBus.Transports.Amazon.Kinesis/AmazonKinesisConfigureOptions.cs
+++ b/src/Tingle.EventBus.Transports.Amazon.Kinesis/AmazonKinesisConfigureOptions.cs
@@ -53,7 +53,7 @@ public override void PostConfigure(string? name, AmazonKinesisTransportOptions o
// Consumer names become Queue names and they should not be longer than 128 characters
// See https://docs.aws.amazon.com/kinesis/latest/APIReference/API_RegisterStreamConsumer.html
- foreach (var ecr in reg.Consumers.Values)
+ foreach (var ecr in reg.Consumers)
{
if (ecr.ConsumerName!.Length > 128)
{
diff --git a/src/Tingle.EventBus.Transports.Amazon.Sqs/AmazonSqsConfigureOptions.cs b/src/Tingle.EventBus.Transports.Amazon.Sqs/AmazonSqsConfigureOptions.cs
index 16c0bb87..16e597ef 100644
--- a/src/Tingle.EventBus.Transports.Amazon.Sqs/AmazonSqsConfigureOptions.cs
+++ b/src/Tingle.EventBus.Transports.Amazon.Sqs/AmazonSqsConfigureOptions.cs
@@ -50,7 +50,7 @@ public override void PostConfigure(string? name, AmazonSqsTransportOptions optio
// Consumer names become Queue names and they should not be longer than 80 characters
// See https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/quotas-queues.html
- foreach (var ecr in reg.Consumers.Values)
+ foreach (var ecr in reg.Consumers)
{
if (ecr.ConsumerName!.Length > 80)
{
diff --git a/src/Tingle.EventBus.Transports.Amazon.Sqs/AmazonSqsTransport.cs b/src/Tingle.EventBus.Transports.Amazon.Sqs/AmazonSqsTransport.cs
index 0c3622d1..1bfbdedb 100644
--- a/src/Tingle.EventBus.Transports.Amazon.Sqs/AmazonSqsTransport.cs
+++ b/src/Tingle.EventBus.Transports.Amazon.Sqs/AmazonSqsTransport.cs
@@ -54,7 +54,7 @@ protected override async Task StartCoreAsync(CancellationToken cancellationToken
var registrations = GetRegistrations();
foreach (var reg in registrations)
{
- foreach (var ecr in reg.Consumers.Values)
+ foreach (var ecr in reg.Consumers)
{
var queueUrl = await GetQueueUrlAsync(reg: reg,
ecr: ecr,
diff --git a/src/Tingle.EventBus.Transports.Azure.EventHubs/AzureEventHubsConfigureOptions.cs b/src/Tingle.EventBus.Transports.Azure.EventHubs/AzureEventHubsConfigureOptions.cs
index dacebfd2..8377057a 100644
--- a/src/Tingle.EventBus.Transports.Azure.EventHubs/AzureEventHubsConfigureOptions.cs
+++ b/src/Tingle.EventBus.Transports.Azure.EventHubs/AzureEventHubsConfigureOptions.cs
@@ -119,7 +119,7 @@ public override void PostConfigure(string? name, AzureEventHubsTransportOptions
}
// Consumer names become Consumer Group names and they should not be longer than 256 characters
- foreach (var ecr in reg.Consumers.Values)
+ foreach (var ecr in reg.Consumers)
{
if (ecr.ConsumerName!.Length > 256)
{
diff --git a/src/Tingle.EventBus.Transports.Azure.EventHubs/AzureEventHubsTransport.cs b/src/Tingle.EventBus.Transports.Azure.EventHubs/AzureEventHubsTransport.cs
index de5b27da..ea6ff6a5 100644
--- a/src/Tingle.EventBus.Transports.Azure.EventHubs/AzureEventHubsTransport.cs
+++ b/src/Tingle.EventBus.Transports.Azure.EventHubs/AzureEventHubsTransport.cs
@@ -45,7 +45,7 @@ protected override async Task StartCoreAsync(CancellationToken cancellationToken
var registrations = GetRegistrations();
foreach (var reg in registrations)
{
- foreach (var ecr in reg.Consumers.Values)
+ foreach (var ecr in reg.Consumers)
{
var processor = await GetProcessorAsync(reg: reg, ecr: ecr, cancellationToken: cancellationToken).ConfigureAwait(false);
diff --git a/src/Tingle.EventBus.Transports.Azure.QueueStorage/AzureQueueStorageTransport.cs b/src/Tingle.EventBus.Transports.Azure.QueueStorage/AzureQueueStorageTransport.cs
index 2ab6776d..12699335 100644
--- a/src/Tingle.EventBus.Transports.Azure.QueueStorage/AzureQueueStorageTransport.cs
+++ b/src/Tingle.EventBus.Transports.Azure.QueueStorage/AzureQueueStorageTransport.cs
@@ -49,7 +49,7 @@ protected override Task StartCoreAsync(CancellationToken cancellationToken)
var registrations = GetRegistrations();
foreach (var reg in registrations)
{
- foreach (var ecr in reg.Consumers.Values)
+ foreach (var ecr in reg.Consumers)
{
var t = ReceiveAsync(reg: reg, ecr: ecr, cancellationToken: stoppingCts.Token);
receiverTasks.Add(t);
diff --git a/src/Tingle.EventBus.Transports.Azure.ServiceBus/AzureServiceBusConfigureOptions.cs b/src/Tingle.EventBus.Transports.Azure.ServiceBus/AzureServiceBusConfigureOptions.cs
index 681bf20b..53efb059 100644
--- a/src/Tingle.EventBus.Transports.Azure.ServiceBus/AzureServiceBusConfigureOptions.cs
+++ b/src/Tingle.EventBus.Transports.Azure.ServiceBus/AzureServiceBusConfigureOptions.cs
@@ -73,7 +73,7 @@ public override void PostConfigure(string? name, AzureServiceBusTransportOptions
// When not using Queues, ConsumerName -> SubscriptionName does not happen
if (reg.EntityKind == EntityKind.Broadcast)
{
- foreach (var ecr in reg.Consumers.Values)
+ foreach (var ecr in reg.Consumers)
{
if (ecr.ConsumerName!.Length > 50)
{
diff --git a/src/Tingle.EventBus.Transports.Azure.ServiceBus/AzureServiceBusTransport.cs b/src/Tingle.EventBus.Transports.Azure.ServiceBus/AzureServiceBusTransport.cs
index f63868c6..9f61b35d 100644
--- a/src/Tingle.EventBus.Transports.Azure.ServiceBus/AzureServiceBusTransport.cs
+++ b/src/Tingle.EventBus.Transports.Azure.ServiceBus/AzureServiceBusTransport.cs
@@ -76,7 +76,7 @@ protected override async Task StartCoreAsync(CancellationToken cancellationToken
var registrations = GetRegistrations();
foreach (var reg in registrations)
{
- foreach (var ecr in reg.Consumers.Values)
+ foreach (var ecr in reg.Consumers)
{
var processor = await GetProcessorAsync(reg: reg, ecr: ecr, cancellationToken: cancellationToken).ConfigureAwait(false);
diff --git a/src/Tingle.EventBus.Transports.InMemory/InMemoryTransport.cs b/src/Tingle.EventBus.Transports.InMemory/InMemoryTransport.cs
index 2a89b7b1..bf5aa040 100644
--- a/src/Tingle.EventBus.Transports.InMemory/InMemoryTransport.cs
+++ b/src/Tingle.EventBus.Transports.InMemory/InMemoryTransport.cs
@@ -70,7 +70,7 @@ protected override async Task StartCoreAsync(CancellationToken cancellationToken
var registrations = GetRegistrations();
foreach (var reg in registrations)
{
- foreach (var ecr in reg.Consumers.Values)
+ foreach (var ecr in reg.Consumers)
{
var processor = await GetProcessorAsync(reg: reg, ecr: ecr, cancellationToken: cancellationToken).ConfigureAwait(false);
diff --git a/src/Tingle.EventBus.Transports.InMemory/InMemoryTransportConfigureOptions.cs b/src/Tingle.EventBus.Transports.InMemory/InMemoryTransportConfigureOptions.cs
index c2313461..8a13611d 100644
--- a/src/Tingle.EventBus.Transports.InMemory/InMemoryTransportConfigureOptions.cs
+++ b/src/Tingle.EventBus.Transports.InMemory/InMemoryTransportConfigureOptions.cs
@@ -33,7 +33,7 @@ public override void PostConfigure(string? name, InMemoryTransportOptions option
options.EnsureAllowedEntityKind(reg, EntityKind.Broadcast, EntityKind.Queue);
// This does not support dead-letter yet
- foreach (var ecr in reg.Consumers.Values)
+ foreach (var ecr in reg.Consumers)
{
if (ecr.Deadletter)
{
diff --git a/src/Tingle.EventBus.Transports.Kafka/KafkaConfigureOptions.cs b/src/Tingle.EventBus.Transports.Kafka/KafkaConfigureOptions.cs
index 1692ae03..7d9e2bf6 100644
--- a/src/Tingle.EventBus.Transports.Kafka/KafkaConfigureOptions.cs
+++ b/src/Tingle.EventBus.Transports.Kafka/KafkaConfigureOptions.cs
@@ -74,7 +74,7 @@ public override void PostConfigure(string? name, KafkaTransportOptions options)
+ "Kafka does not allow more than 255 characters for Topic names.");
}
- foreach (var ecr in reg.Consumers.Values)
+ foreach (var ecr in reg.Consumers)
{
// Consumer names become Consumer Group IDs and they should not be longer than 255 characters
if (ecr.ConsumerName!.Length > 255)
diff --git a/src/Tingle.EventBus.Transports.Kafka/KafkaTransport.cs b/src/Tingle.EventBus.Transports.Kafka/KafkaTransport.cs
index 9c4c71cc..11307029 100644
--- a/src/Tingle.EventBus.Transports.Kafka/KafkaTransport.cs
+++ b/src/Tingle.EventBus.Transports.Kafka/KafkaTransport.cs
@@ -251,7 +251,7 @@ private async Task ProcessAsync(CancellationToken cancellationToken)
var reg = GetRegistrations().Single(r => r.EventName == topic);
// form the generic method
- var ecr = reg.Consumers.Values.Single(); // only one consumer per event
+ var ecr = reg.Consumers.Single(); // only one consumer per event
var method = mt.MakeGenericMethod(reg.EventType, ecr.ConsumerType);
await ((Task)method.Invoke(this, new object[] { reg, ecr, result, cancellationToken, })!).ConfigureAwait(false);
}
diff --git a/src/Tingle.EventBus.Transports.RabbitMQ/RabbitMqConfigureOptions.cs b/src/Tingle.EventBus.Transports.RabbitMQ/RabbitMqConfigureOptions.cs
index 013399c2..b25272c7 100644
--- a/src/Tingle.EventBus.Transports.RabbitMQ/RabbitMqConfigureOptions.cs
+++ b/src/Tingle.EventBus.Transports.RabbitMQ/RabbitMqConfigureOptions.cs
@@ -78,7 +78,7 @@ public override void PostConfigure(string? name, RabbitMqTransportOptions option
+ "RabbitMQ does not allow more than 255 characters for Exchange names.");
}
- foreach (var ecr in reg.Consumers.Values)
+ foreach (var ecr in reg.Consumers)
{
// Consumer names become Queue names and they should not be longer than 255 characters
if (ecr.ConsumerName!.Length > 255)
diff --git a/src/Tingle.EventBus.Transports.RabbitMQ/RabbitMqTransport.cs b/src/Tingle.EventBus.Transports.RabbitMQ/RabbitMqTransport.cs
index 4c7c0318..e35bae15 100644
--- a/src/Tingle.EventBus.Transports.RabbitMQ/RabbitMqTransport.cs
+++ b/src/Tingle.EventBus.Transports.RabbitMQ/RabbitMqTransport.cs
@@ -263,7 +263,7 @@ private async Task ConnectConsumersAsync(CancellationToken cancellationToken)
foreach (var reg in registrations)
{
var exchangeName = reg.EventName!;
- foreach (var ecr in reg.Consumers.Values)
+ foreach (var ecr in reg.Consumers)
{
// queue names must be unique so add the exchange name so that we can tell to whom the queue belongs
var queueName = BusOptions.Naming.Join(ecr.ConsumerName!, exchangeName);
diff --git a/src/Tingle.EventBus/Configuration/DefaultEventConfigurator.cs b/src/Tingle.EventBus/Configuration/DefaultEventConfigurator.cs
index b7213e5d..cdcba04e 100644
--- a/src/Tingle.EventBus/Configuration/DefaultEventConfigurator.cs
+++ b/src/Tingle.EventBus/Configuration/DefaultEventConfigurator.cs
@@ -28,7 +28,7 @@ public void Configure(EventRegistration registration, EventBusOptions options)
// bind from IConfiguration
var configuration = configurationProvider.Configuration.GetSection($"Events:{registration.EventType.FullName}");
configuration.Bind(registration);
- foreach (var ecr in registration.Consumers.Values)
+ foreach (var ecr in registration.Consumers)
{
configuration.GetSection($"Consumers:{ecr.ConsumerType.FullName}").Bind(ecr);
}
@@ -117,7 +117,7 @@ internal void ConfigureConsumerNames(EventRegistration reg, EventBusNamingOption
// prefix is either the one provided or the application name
var prefix = options.ConsumerNamePrefix ?? environment.ApplicationName;
- foreach (var ecr in reg.Consumers.Values)
+ foreach (var ecr in reg.Consumers)
{
// set the consumer name, if not set
if (string.IsNullOrWhiteSpace(ecr.ConsumerName))
diff --git a/src/Tingle.EventBus/Configuration/EventConsumerRegistration.cs b/src/Tingle.EventBus/Configuration/EventConsumerRegistration.cs
index 76bfcba4..f93d10cd 100644
--- a/src/Tingle.EventBus/Configuration/EventConsumerRegistration.cs
+++ b/src/Tingle.EventBus/Configuration/EventConsumerRegistration.cs
@@ -3,15 +3,17 @@
///
/// Represents a registration for a consumer of an event.
///
-public class EventConsumerRegistration : IEquatable
+public class EventConsumerRegistration : IEquatable
{
///
/// Creates an instance of .
///
/// The type of consumer handling the event.
- public EventConsumerRegistration(Type consumerType)
+ /// Whether the consumer should be connected to the dead-letter entity.
+ public EventConsumerRegistration(Type consumerType, bool deadletter)
{
ConsumerType = consumerType ?? throw new ArgumentNullException(nameof(consumerType));
+ Deadletter = deadletter;
}
///
@@ -20,17 +22,17 @@ public EventConsumerRegistration(Type consumerType)
public Type ConsumerType { get; }
///
- /// The name generated for the consumer.
+ /// Gets or sets a value indicating if the consumer should be connected to the dead-letter entity.
+ /// For transports that do not support dead-letter entities, a separate queue is created.
+ /// When set to , you must use
+ /// to consume events.
///
- public string? ConsumerName { get; set; }
+ public bool Deadletter { get; }
///
- /// Gets or sets a value indicating if the consumer should be connected to the dead-letter sub-queue.
- /// For transports that do not support dead-letter sub-queues, a separate queue is created.
- /// When set to , you must use
- /// to consume events.
+ /// The name generated for the consumer.
///
- public bool Deadletter { get; internal set; }
+ public string? ConsumerName { get; set; }
///
/// The behaviour for unhandled errors when consuming events via the
@@ -83,21 +85,22 @@ public EventConsumerRegistration OnError(UnhandledConsumerErrorBehaviour? behavi
///
public bool Equals(EventConsumerRegistration? other)
{
- return other is not null && EqualityComparer.Default.Equals(ConsumerType, other.ConsumerType);
+ return other is not null &&
+ EqualityComparer.Default.Equals(ConsumerType, other.ConsumerType) &&
+ Deadletter == other.Deadletter;
}
///
- public override int GetHashCode() => ConsumerType.GetHashCode();
+ public override int GetHashCode() => HashCode.Combine(ConsumerType, Deadletter);
///
- public static bool operator ==(EventConsumerRegistration left, EventConsumerRegistration right)
+ public static bool operator ==(EventConsumerRegistration? left, EventConsumerRegistration? right)
{
- return EqualityComparer.Default.Equals(left, right);
+ return EqualityComparer.Default.Equals(left, right);
}
///
- public static bool operator !=(EventConsumerRegistration left, EventConsumerRegistration right) => !(left == right);
+ public static bool operator !=(EventConsumerRegistration? left, EventConsumerRegistration? right) => !(left == right);
#endregion
-
}
diff --git a/src/Tingle.EventBus/Configuration/EventRegistration.cs b/src/Tingle.EventBus/Configuration/EventRegistration.cs
index 25be7fdb..5dfc44c0 100644
--- a/src/Tingle.EventBus/Configuration/EventRegistration.cs
+++ b/src/Tingle.EventBus/Configuration/EventRegistration.cs
@@ -81,7 +81,7 @@ public EventRegistration(Type eventType)
///
/// This is backed by a to ensure no duplicates.
///
- public Dictionary Consumers { get; } = new();
+ public HashSet Consumers { get; } = new();
///
/// Gets a key/value collection that can be used to organize and share data across components
@@ -103,20 +103,21 @@ public EventRegistration(Type eventType)
///
public bool Equals(EventRegistration? other)
{
- return other is not null && EqualityComparer.Default.Equals(EventType, other.EventType);
+ return other is not null &&
+ EqualityComparer.Default.Equals(EventType, other.EventType);
}
///
- public override int GetHashCode() => EventType.GetHashCode();
+ public override int GetHashCode() => HashCode.Combine(EventType);
///
- public static bool operator ==(EventRegistration left, EventRegistration right)
+ public static bool operator ==(EventRegistration? left, EventRegistration? right)
{
- return EqualityComparer.Default.Equals(left, right);
+ return EqualityComparer.Default.Equals(left, right);
}
///
- public static bool operator !=(EventRegistration left, EventRegistration right) => !(left == right);
+ public static bool operator !=(EventRegistration? left, EventRegistration? right) => !(left == right);
#endregion
}
diff --git a/src/Tingle.EventBus/DependencyInjection/EventBusBuilder.cs b/src/Tingle.EventBus/DependencyInjection/EventBusBuilder.cs
index 4945fa3b..864450ba 100644
--- a/src/Tingle.EventBus/DependencyInjection/EventBusBuilder.cs
+++ b/src/Tingle.EventBus/DependencyInjection/EventBusBuilder.cs
@@ -187,12 +187,9 @@ public EventBusBuilder AddConsumer(Action new EventRegistration(t));
- // get or create a simple ConsumerRegistration
- if (!reg.Consumers.TryGetValue(consumerType, out var ecr))
- {
- ecr = new EventConsumerRegistration(consumerType) { Deadletter = deadletter, };
- reg.Consumers.Add(consumerType, ecr);
- }
+ // create a simple ConsumerRegistration (HashSet removes duplicates)
+ var ecr = new EventConsumerRegistration(consumerType, deadletter);
+ reg.Consumers.Add(ecr);
// call the configuration function
configure?.Invoke(reg, ecr);
@@ -224,7 +221,7 @@ public EventBusBuilder RemoveConsumer() where TConsumer : class, IEve
var ct = typeof(TConsumer);
foreach (var registration in options.Registrations.Values)
{
- registration.Consumers.Remove(ct);
+ registration.Consumers.RemoveWhere(creg => creg.ConsumerType == ct);
}
});
}
diff --git a/src/Tingle.EventBus/DependencyInjection/EventBusConfigureOptions.cs b/src/Tingle.EventBus/DependencyInjection/EventBusConfigureOptions.cs
index 50d106d6..475f668c 100644
--- a/src/Tingle.EventBus/DependencyInjection/EventBusConfigureOptions.cs
+++ b/src/Tingle.EventBus/DependencyInjection/EventBusConfigureOptions.cs
@@ -113,11 +113,13 @@ public ValidateOptionsResult Validate(string? name, EventBusOptions options)
// Ensure there are no consumers with the same name per event
foreach (var evr in registrations)
{
- var conflict = evr.Consumers.Values.GroupBy(ecr => ecr.ConsumerName).FirstOrDefault(kvp => kvp.Count() > 1);
+ var conflict = evr.Consumers.GroupBy(ecr => (ecr.ConsumerName, ecr.Deadletter)).FirstOrDefault(kvp => kvp.Count() > 1);
if (conflict != null)
{
var names = conflict.Select(r => r.ConsumerType.FullName);
- return ValidateOptionsResult.Fail($"The consumer name '{conflict.Key}' cannot be used more than once on '{evr.EventType.Name}'."
+ var id = conflict.Key.ConsumerName;
+ if (conflict.Key.Deadletter) id += " [dead-letter]";
+ return ValidateOptionsResult.Fail($"The consumer name '({id})' cannot be used more than once on '{evr.EventType.Name}'."
+ $" Types:\r\n- {string.Join("\r\n- ", names)}");
}
}
diff --git a/src/Tingle.EventBus/DependencyInjection/EventBusOptions.cs b/src/Tingle.EventBus/DependencyInjection/EventBusOptions.cs
index 6bc56b17..6d648335 100644
--- a/src/Tingle.EventBus/DependencyInjection/EventBusOptions.cs
+++ b/src/Tingle.EventBus/DependencyInjection/EventBusOptions.cs
@@ -134,42 +134,47 @@ public ICollection GetRegistrations(string transportName)
}
///
- /// Get the consumer registration in a given event type.
+ /// Get the consumer registrations in a given event type.
///
/// The event type from which to retrieve a for.
/// The consumer to configure.
///
- /// When this method returns, contains the event registration associated with the specified event type,
+ /// When this method returns, contains the event registrations associated with the specified event type,
/// if the event type is found; otherwise, is returned.
/// This parameter is passed uninitialized.
///
- ///
- /// When this method returns, contains the consumer registration associated with the specified event type,
+ ///
+ /// When this method returns, contains the consumer registrations associated with the specified event type,
/// if the event type is found; otherwise, is returned.
/// This parameter is passed uninitialized.
///
/// if there's a consumer registered for the given event type; otherwise, false.
- internal bool TryGetConsumerRegistration([NotNullWhen(true)] out EventRegistration? reg,
- [NotNullWhen(true)] out EventConsumerRegistration? ecr)
+ internal bool TryGetConsumerRegistrations([NotNullWhen(true)] out EventRegistration? reg,
+ [NotNullWhen(true)] out List? ecrs)
{
- ecr = default;
- return Registrations.TryGetValue(typeof(TEvent), out reg) && reg.Consumers.TryGetValue(typeof(TConsumer), out ecr);
+ ecrs = default;
+ if (Registrations.TryGetValue(typeof(TEvent), out reg))
+ {
+ ecrs = reg.Consumers.Where(r => r.ConsumerType == typeof(TConsumer)).ToList();
+ return false;
+ }
+ return false;
}
///
- /// Get the consumer registration in a given event type.
+ /// Get the consumer registrations in a given event type.
///
/// The event type from which to retrieve a for.
/// The consumer to configure.
- ///
- /// When this method returns, contains the consumer registration associated with the specified event type,
+ ///
+ /// When this method returns, contains the consumer registrations associated with the specified event type,
/// if the event type is found; otherwise, is returned.
/// This parameter is passed uninitialized.
///
/// if there's a consumer registered for the given event type; otherwise, false.
- public bool TryGetConsumerRegistration([NotNullWhen(true)] out EventConsumerRegistration? registration)
+ public bool TryGetConsumerRegistrations([NotNullWhen(true)] out List? registrations)
{
- return TryGetConsumerRegistration(out _, out registration);
+ return TryGetConsumerRegistrations(out _, out registrations);
}
///
@@ -202,9 +207,9 @@ public EventBusOptions ConfigureConsumer(Action(out var reg, out var ecr) && ecr is not null)
+ if (TryGetConsumerRegistrations(out var reg, out var ecrs) && ecrs is not null)
{
- configure(reg, ecr);
+ foreach (var ecr in ecrs) configure(reg, ecr);
}
return this;
diff --git a/src/Tingle.EventBus/Transports/EventBusTransport.cs b/src/Tingle.EventBus/Transports/EventBusTransport.cs
index 88c21a18..364a3860 100644
--- a/src/Tingle.EventBus/Transports/EventBusTransport.cs
+++ b/src/Tingle.EventBus/Transports/EventBusTransport.cs
@@ -86,7 +86,7 @@ public async Task StartAsync(CancellationToken cancellationToken)
// Combine the retry policies
PollyHelper.CombineIfNeeded(BusOptions, Options, reg);
- foreach (var ecr in reg.Consumers.Values)
+ foreach (var ecr in reg.Consumers)
{
// Set unhandled error behaviour
ecr.UnhandledErrorBehaviour ??= Options.DefaultUnhandledConsumerErrorBehaviour;
diff --git a/tests/Tingle.EventBus.Tests/Configurator/DefaultEventConfiguratorTests.cs b/tests/Tingle.EventBus.Tests/Configurator/DefaultEventConfiguratorTests.cs
index 92d25d31..3d0ed5e1 100644
--- a/tests/Tingle.EventBus.Tests/Configurator/DefaultEventConfiguratorTests.cs
+++ b/tests/Tingle.EventBus.Tests/Configurator/DefaultEventConfiguratorTests.cs
@@ -152,9 +152,9 @@ public void SetConsumerName_Works(Type eventType,
options.Naming.ConsumerNamePrefix = prefix;
var registration = new EventRegistration(eventType);
- registration.Consumers.Add(consumerType, new EventConsumerRegistration(consumerType));
+ registration.Consumers.Add(new EventConsumerRegistration(consumerType, false));
- var creg = Assert.Single(registration.Consumers.Values);
+ var creg = Assert.Single(registration.Consumers);
configurator.ConfigureEventName(registration, options.Naming);
configurator.ConfigureConsumerNames(registration, options.Naming);
Assert.Equal(expected, creg.ConsumerName);
diff --git a/tests/Tingle.EventBus.Tests/EventBusBuilderTests.cs b/tests/Tingle.EventBus.Tests/EventBusBuilderTests.cs
index 18eb68bc..42175149 100644
--- a/tests/Tingle.EventBus.Tests/EventBusBuilderTests.cs
+++ b/tests/Tingle.EventBus.Tests/EventBusBuilderTests.cs
@@ -11,14 +11,14 @@ namespace Tingle.EventBus.Tests;
public class EventBusBuilderTests
{
[Fact]
- public void DeadletterConsumerIsRegistered()
+ public void DeadletterConsumerIsRegistered_DifferentEventTypes()
{
var services = new ServiceCollection();
services.AddSingleton(new FakeHostEnvironment("app1"));
services.AddSingleton(new ConfigurationBuilder().Build());
services.AddEventBus(builder =>
{
- builder.AddConsumer();
+ builder.AddConsumer();
builder.Configure(o => o.AddTransport("Dummy", null));
});
@@ -27,17 +27,60 @@ public void DeadletterConsumerIsRegistered()
Assert.Equal(2, options.Registrations.Count);
var reg = Assert.Contains(typeof(TestEvent1), (IDictionary)options.Registrations);
- var ecr = Assert.Single(reg.Consumers).Value;
- Assert.Equal(typeof(DummyConsumer), ecr.ConsumerType);
+ var ecr = Assert.Single(reg.Consumers);
+ Assert.Equal(typeof(DummyConsumer1), ecr.ConsumerType);
Assert.False(ecr.Deadletter);
reg = Assert.Contains(typeof(TestEvent2), (IDictionary)options.Registrations);
- ecr = Assert.Single(reg.Consumers).Value;
- Assert.Equal(typeof(DummyConsumer), ecr.ConsumerType);
+ ecr = Assert.Single(reg.Consumers);
+ Assert.Equal(typeof(DummyConsumer1), ecr.ConsumerType);
Assert.True(ecr.Deadletter);
}
- internal class DummyConsumer : IEventConsumer, IDeadLetteredEventConsumer
+ [Fact]
+ public void DeadletterConsumerIsRegistered_SameEventType()
+ {
+ var services = new ServiceCollection();
+ services.AddSingleton(new FakeHostEnvironment("app1"));
+ services.AddSingleton(new ConfigurationBuilder().Build());
+ services.AddEventBus(builder =>
+ {
+ builder.AddConsumer();
+ builder.Configure(o => o.AddTransport("Dummy", null));
+ });
+
+ var provider = services.BuildServiceProvider();
+ var options = provider.GetRequiredService>().Value;
+ var reg = Assert.Single(options.Registrations).Value;
+ Assert.Equal(2, reg.Consumers.Count);
+ Assert.All(reg.Consumers, ecr => Assert.Equal(typeof(DummyConsumer2), ecr.ConsumerType));
+ Assert.False(reg.Consumers.ElementAt(0).Deadletter);
+ Assert.True(reg.Consumers.ElementAt(1).Deadletter);
+ }
+
+ [Fact]
+ public void DuplicateConsumersFails()
+ {
+ var services = new ServiceCollection();
+ services.AddSingleton(new FakeHostEnvironment("app1"));
+ services.AddSingleton(new ConfigurationBuilder().Build());
+ services.AddEventBus(builder =>
+ {
+ builder.AddConsumer();
+ builder.AddConsumer();
+ builder.Configure(o =>
+ {
+ o.Naming.ConsumerNameSource = ConsumerNameSource.Prefix;
+ o.AddTransport("Dummy", null);
+ });
+ });
+
+ var provider = services.BuildServiceProvider();
+ var ex = Assert.Throws(() => provider.GetRequiredService>().Value);
+ Assert.Equal("The consumer name '(app1 [dead-letter])' cannot be used more than once on 'TestEvent2'. Types:\r\n- Tingle.EventBus.Tests.EventBusBuilderTests+DummyConsumer1\r\n- Tingle.EventBus.Tests.EventBusBuilderTests+DummyConsumer2", ex.Message);
+ }
+
+ internal class DummyConsumer1 : IEventConsumer, IDeadLetteredEventConsumer
{
public Task ConsumeAsync(EventContext context, CancellationToken cancellationToken = default)
{
@@ -49,6 +92,18 @@ public Task ConsumeAsync(DeadLetteredEventContext context, Cancellat
}
}
+ internal class DummyConsumer2 : IEventConsumer, IDeadLetteredEventConsumer
+ {
+ public Task ConsumeAsync(EventContext context, CancellationToken cancellationToken = default)
+ {
+ throw new NotImplementedException();
+ }
+ public Task ConsumeAsync(DeadLetteredEventContext context, CancellationToken cancellationToken = default)
+ {
+ throw new NotImplementedException();
+ }
+ }
+
internal class DummyTransport : IEventBusTransport
{
public string Name => throw new NotImplementedException();