Skip to content

Commit

Permalink
Merge pull request #1927 from Particular/fix-1925
Browse files Browse the repository at this point in the history
Relates to #1925
  • Loading branch information
SimonCropp authored and indualagarsamy committed Jan 31, 2014
1 parent f791acb commit 3bedcdf
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
using Queuing;
using Transport;
using Transports;

/// <summary>
/// Implements message driven subscriptions for transports that doesn't have native support for it (MSMQ , SqlServer, Azure Queues etc)
/// </summary>
Expand All @@ -23,6 +23,7 @@ public class MessageDrivenSubscriptionManager : IManageSubscriptions,
public IBuilder Builder { get; set; }
public ISubscriptionStorage SubscriptionStorage { get; set; }
public IAuthorizeSubscriptions SubscriptionAuthorizer { get { return subscriptionAuthorizer ?? (subscriptionAuthorizer = new NoopSubscriptionAuthorizer()); } set { subscriptionAuthorizer = value; } }
public Address DistributorDataAddress { get; set; }

public void Subscribe(Type eventType, Address publisherAddress)
{
Expand All @@ -33,6 +34,10 @@ public void Subscribe(Type eventType, Address publisherAddress)

var subscriptionMessage = CreateControlMessage(eventType);
subscriptionMessage.MessageIntent = MessageIntentEnum.Subscribe;
if (DistributorDataAddress != null)
{
subscriptionMessage.ReplyToAddress = DistributorDataAddress;
}

ThreadPool.QueueUserWorkItem(state =>
SendSubscribeMessageWithRetries(publisherAddress, subscriptionMessage, eventType.AssemblyQualifiedName));
Expand All @@ -49,6 +54,11 @@ public void Unsubscribe(Type eventType, Address publisherAddress)
var subscriptionMessage = CreateControlMessage(eventType);
subscriptionMessage.MessageIntent = MessageIntentEnum.Unsubscribe;

if (DistributorDataAddress != null)
{
subscriptionMessage.ReplyToAddress = DistributorDataAddress;
}

MessageSender.Send(subscriptionMessage, publisherAddress);
}

Expand Down Expand Up @@ -144,7 +154,6 @@ void SendSubscribeMessageWithRetries(Address destination, TransportMessage subsc
{
try
{

MessageSender.Send(subscriptionMessage, destination);
}
catch (QueueNotFoundException ex)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,14 @@ public class MessageDrivenSubscriptions : Feature
{
public override void Initialize()
{
Configure.Component<MessageDrivenSubscriptionManager>(DependencyLifecycle.SingleInstance);
Configure.Component<FilteringMutator>(DependencyLifecycle.InstancePerCall);
Configure.Component<SubscriptionPredicatesEvaluator>(DependencyLifecycle.SingleInstance);

var masterNodeAddress = Configure.Instance.GetMasterNodeAddress();
Configure.Component<MessageDrivenSubscriptionManager>(
DependencyLifecycle.SingleInstance)
.ConfigureProperty(r => r.DistributorDataAddress, masterNodeAddress);

}
}
}

0 comments on commit 3bedcdf

Please sign in to comment.