diff --git a/src/NServiceBus.Core/Unicast/Subscriptions/MessageDrivenSubscriptions/MessageDrivenSubscriptionManager.cs b/src/NServiceBus.Core/Unicast/Subscriptions/MessageDrivenSubscriptions/MessageDrivenSubscriptionManager.cs index 77cc2df0da..ef3bab6922 100644 --- a/src/NServiceBus.Core/Unicast/Subscriptions/MessageDrivenSubscriptions/MessageDrivenSubscriptionManager.cs +++ b/src/NServiceBus.Core/Unicast/Subscriptions/MessageDrivenSubscriptions/MessageDrivenSubscriptionManager.cs @@ -10,7 +10,7 @@ using Queuing; using Transport; using Transports; - + /// /// Implements message driven subscriptions for transports that doesn't have native support for it (MSMQ , SqlServer, Azure Queues etc) /// @@ -23,6 +23,7 @@ public class MessageDrivenSubscriptionManager : IManageSubscriptions, public IBuilder Builder { get; set; } public ISubscriptionStorage SubscriptionStorage { get; set; } public IAuthorizeSubscriptions SubscriptionAuthorizer { get { return subscriptionAuthorizer ?? (subscriptionAuthorizer = new NoopSubscriptionAuthorizer()); } set { subscriptionAuthorizer = value; } } + public Address DistributorDataAddress { get; set; } public void Subscribe(Type eventType, Address publisherAddress) { @@ -33,6 +34,10 @@ public void Subscribe(Type eventType, Address publisherAddress) var subscriptionMessage = CreateControlMessage(eventType); subscriptionMessage.MessageIntent = MessageIntentEnum.Subscribe; + if (DistributorDataAddress != null) + { + subscriptionMessage.ReplyToAddress = DistributorDataAddress; + } ThreadPool.QueueUserWorkItem(state => SendSubscribeMessageWithRetries(publisherAddress, subscriptionMessage, eventType.AssemblyQualifiedName)); @@ -49,6 +54,11 @@ public void Unsubscribe(Type eventType, Address publisherAddress) var subscriptionMessage = CreateControlMessage(eventType); subscriptionMessage.MessageIntent = MessageIntentEnum.Unsubscribe; + if (DistributorDataAddress != null) + { + subscriptionMessage.ReplyToAddress = DistributorDataAddress; + } + MessageSender.Send(subscriptionMessage, publisherAddress); } @@ -144,7 +154,6 @@ void SendSubscribeMessageWithRetries(Address destination, TransportMessage subsc { try { - MessageSender.Send(subscriptionMessage, destination); } catch (QueueNotFoundException ex) diff --git a/src/NServiceBus.Core/Unicast/Subscriptions/MessageDrivenSubscriptions/MessageDrivenSubscriptions.cs b/src/NServiceBus.Core/Unicast/Subscriptions/MessageDrivenSubscriptions/MessageDrivenSubscriptions.cs index 1bb9e556fc..ade798c334 100644 --- a/src/NServiceBus.Core/Unicast/Subscriptions/MessageDrivenSubscriptions/MessageDrivenSubscriptions.cs +++ b/src/NServiceBus.Core/Unicast/Subscriptions/MessageDrivenSubscriptions/MessageDrivenSubscriptions.cs @@ -7,9 +7,14 @@ public class MessageDrivenSubscriptions : Feature { public override void Initialize() { - Configure.Component(DependencyLifecycle.SingleInstance); Configure.Component(DependencyLifecycle.InstancePerCall); Configure.Component(DependencyLifecycle.SingleInstance); + + var masterNodeAddress = Configure.Instance.GetMasterNodeAddress(); + Configure.Component( + DependencyLifecycle.SingleInstance) + .ConfigureProperty(r => r.DistributorDataAddress, masterNodeAddress); + } } } \ No newline at end of file diff --git a/src/NServiceBus/NServiceBusVersion.cs b/src/NServiceBus/NServiceBusVersion.cs index 7e549231fc..864196557c 100644 --- a/src/NServiceBus/NServiceBusVersion.cs +++ b/src/NServiceBus/NServiceBusVersion.cs @@ -8,6 +8,6 @@ public static class NServiceBusVersion /// /// The semver version of NServiceBus /// - public const string Version = "4.3.3"; + public const string Version = "4.3.4"; } }