Skip to content

Commit

Permalink
Merge branch 'hotfix-4.3.4'
Browse files Browse the repository at this point in the history
  • Loading branch information
indualagarsamy committed Feb 2, 2014
2 parents f791acb + 6644c2c commit fcf57f3
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
using Queuing;
using Transport;
using Transports;

/// <summary>
/// Implements message driven subscriptions for transports that doesn't have native support for it (MSMQ , SqlServer, Azure Queues etc)
/// </summary>
Expand All @@ -23,6 +23,7 @@ public class MessageDrivenSubscriptionManager : IManageSubscriptions,
public IBuilder Builder { get; set; }
public ISubscriptionStorage SubscriptionStorage { get; set; }
public IAuthorizeSubscriptions SubscriptionAuthorizer { get { return subscriptionAuthorizer ?? (subscriptionAuthorizer = new NoopSubscriptionAuthorizer()); } set { subscriptionAuthorizer = value; } }
public Address DistributorDataAddress { get; set; }

public void Subscribe(Type eventType, Address publisherAddress)
{
Expand All @@ -33,6 +34,10 @@ public void Subscribe(Type eventType, Address publisherAddress)

var subscriptionMessage = CreateControlMessage(eventType);
subscriptionMessage.MessageIntent = MessageIntentEnum.Subscribe;
if (DistributorDataAddress != null)
{
subscriptionMessage.ReplyToAddress = DistributorDataAddress;
}

ThreadPool.QueueUserWorkItem(state =>
SendSubscribeMessageWithRetries(publisherAddress, subscriptionMessage, eventType.AssemblyQualifiedName));
Expand All @@ -49,6 +54,11 @@ public void Unsubscribe(Type eventType, Address publisherAddress)
var subscriptionMessage = CreateControlMessage(eventType);
subscriptionMessage.MessageIntent = MessageIntentEnum.Unsubscribe;

if (DistributorDataAddress != null)
{
subscriptionMessage.ReplyToAddress = DistributorDataAddress;
}

MessageSender.Send(subscriptionMessage, publisherAddress);
}

Expand Down Expand Up @@ -144,7 +154,6 @@ void SendSubscribeMessageWithRetries(Address destination, TransportMessage subsc
{
try
{

MessageSender.Send(subscriptionMessage, destination);
}
catch (QueueNotFoundException ex)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,14 @@ public class MessageDrivenSubscriptions : Feature
{
public override void Initialize()
{
Configure.Component<MessageDrivenSubscriptionManager>(DependencyLifecycle.SingleInstance);
Configure.Component<FilteringMutator>(DependencyLifecycle.InstancePerCall);
Configure.Component<SubscriptionPredicatesEvaluator>(DependencyLifecycle.SingleInstance);

var masterNodeAddress = Configure.Instance.GetMasterNodeAddress();
Configure.Component<MessageDrivenSubscriptionManager>(
DependencyLifecycle.SingleInstance)
.ConfigureProperty(r => r.DistributorDataAddress, masterNodeAddress);

}
}
}
2 changes: 1 addition & 1 deletion src/NServiceBus/NServiceBusVersion.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,6 @@ public static class NServiceBusVersion
/// <summary>
/// The semver version of NServiceBus
/// </summary>
public const string Version = "4.3.3";
public const string Version = "4.3.4";
}
}

0 comments on commit fcf57f3

Please sign in to comment.