From 025d144ec5093d4271a94df30ab1326939795046 Mon Sep 17 00:00:00 2001 From: Maxwell Weru Date: Thu, 6 Jul 2023 10:57:31 +0300 Subject: [PATCH] Fix sharing of processors for Azure transports that messes up deadletters (#529) --- .../AzureEventHubsTransport.cs | 5 +++-- .../AzureServiceBusTransport.cs | 5 +++-- .../InMemoryTransport.cs | 7 ++++--- 3 files changed, 10 insertions(+), 7 deletions(-) diff --git a/src/Tingle.EventBus.Transports.Azure.EventHubs/AzureEventHubsTransport.cs b/src/Tingle.EventBus.Transports.Azure.EventHubs/AzureEventHubsTransport.cs index ea6ff6a5..7d3f0aad 100644 --- a/src/Tingle.EventBus.Transports.Azure.EventHubs/AzureEventHubsTransport.cs +++ b/src/Tingle.EventBus.Transports.Azure.EventHubs/AzureEventHubsTransport.cs @@ -300,7 +300,8 @@ Task creator((Type, bool) _, CancellationToken ct) private Task GetProcessorAsync(EventRegistration reg, EventConsumerRegistration ecr, CancellationToken cancellationToken) { var name = reg.EventName; - if (ecr.Deadletter) name += Options.DeadLetterSuffix; + var deadletter = ecr.Deadletter; + if (deadletter) name += Options.DeadLetterSuffix; // For events configured as sourced from IoT Hub, // 1. The event hub name is in the metadata @@ -360,7 +361,7 @@ async Task creator(string key, CancellationToken ct) return processor; } - var key = $"{eventHubName}/{consumerGroup}"; + var key = $"{eventHubName}/{consumerGroup}/{deadletter}"; return processorsCache.GetOrAddAsync(key, creator, cancellationToken); } diff --git a/src/Tingle.EventBus.Transports.Azure.ServiceBus/AzureServiceBusTransport.cs b/src/Tingle.EventBus.Transports.Azure.ServiceBus/AzureServiceBusTransport.cs index 9f61b35d..e2a9b58e 100644 --- a/src/Tingle.EventBus.Transports.Azure.ServiceBus/AzureServiceBusTransport.cs +++ b/src/Tingle.EventBus.Transports.Azure.ServiceBus/AzureServiceBusTransport.cs @@ -322,6 +322,7 @@ private Task GetProcessorAsync(EventRegistration reg, Event { var topicName = reg.EventName!; var subscriptionName = ecr.ConsumerName!; + var deadletter = ecr.Deadletter; async Task creator(string key, CancellationToken ct) { @@ -343,7 +344,7 @@ async Task creator(string key, CancellationToken ct) PrefetchCount = Options.DefaultPrefetchCount, // Set the sub-queue to be used - SubQueue = ecr.Deadletter ? SubQueue.DeadLetter : SubQueue.None, + SubQueue = deadletter ? SubQueue.DeadLetter : SubQueue.None, }; // Allow for the defaults to be overridden @@ -376,7 +377,7 @@ await CreateSubscriptionIfNotExistsAsync(ecr: ecr, } } - var key = $"{topicName}/{subscriptionName}"; + var key = $"{topicName}/{subscriptionName}/{deadletter}"; return processorsCache.GetOrAddAsync(key, creator, cancellationToken); } diff --git a/src/Tingle.EventBus.Transports.InMemory/InMemoryTransport.cs b/src/Tingle.EventBus.Transports.InMemory/InMemoryTransport.cs index bf5aa040..2e1f61a3 100644 --- a/src/Tingle.EventBus.Transports.InMemory/InMemoryTransport.cs +++ b/src/Tingle.EventBus.Transports.InMemory/InMemoryTransport.cs @@ -294,8 +294,9 @@ private Task GetProcessorAsync(EventRegistration reg, EventCo { var topicName = reg.EventName!; var subscriptionName = ecr.ConsumerName!; + var deadletter = ecr.Deadletter; - var key = $"{topicName}/{subscriptionName}"; + var key = $"{topicName}/{subscriptionName}/{deadletter}"; Task creator(string _, CancellationToken ct) { // Create the processor options @@ -307,13 +308,13 @@ Task creator(string _, CancellationToken ct) { // Create the processor for the Queue Logger.CreatingQueueProcessor(queueName: topicName); - processor = inMemoryClient.CreateProcessor(queueName: topicName, options: inpo); + processor = inMemoryClient.CreateProcessor(queueName: topicName, options: inpo); // TODO: support deadletter } else { // Create the processor for the Subscription Logger.CreatingSubscriptionProcessor(topicName: topicName, subscriptionName: subscriptionName); - processor = inMemoryClient.CreateProcessor(topicName: topicName, subscriptionName: subscriptionName, options: inpo); + processor = inMemoryClient.CreateProcessor(topicName: topicName, subscriptionName: subscriptionName, options: inpo); // TODO: support deadletter } return Task.FromResult(processor);