Skip to content

Commit

Permalink
Improved extensibility of gateway to allow for dropping in an assembl…
Browse files Browse the repository at this point in the history
…y which will be notified of messages processed by the gateway.
  • Loading branch information
udidahan committed Jan 12, 2011
1 parent 1baf7ff commit b650d76
Show file tree
Hide file tree
Showing 7 changed files with 60 additions and 7 deletions.
2 changes: 1 addition & 1 deletion src/gateway/NServiceBus.Gateway/App.config
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<?xml version="1.0" encoding="utf-8" ?>
<configuration>
<appSettings>
<add key="NumberOfWorkerThreads" value="10"/>
<add key="NumberOfWorkerThreads" value="1"/>

<add key="InputQueue" value="gateway"/>
<add key="ErrorQueue" value="error"/>
Expand Down
11 changes: 9 additions & 2 deletions src/gateway/NServiceBus.Gateway/EndpointConfig.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System.Configuration;
using System.Net;
using System.Threading;
using NServiceBus.Unicast.Transport;
using NServiceBus.Unicast.Transport.Msmq;

namespace NServiceBus.Gateway
Expand Down Expand Up @@ -36,9 +37,14 @@ public void Init()
NumberOfWorkerThreads = numberOfWorkerThreads
};

notifier = new MessageNotifier();

NServiceBus.Configure.Instance.Configurer.RegisterSingleton<ITransport>(transport);
NServiceBus.Configure.Instance.Configurer.RegisterSingleton<INotifyAboutMessages>(notifier);

transport.TransportMessageReceived += (s, e) =>
{
new MsmqHandler(listenUrl).Handle(e.Message);
new MsmqHandler(listenUrl, notifier).Handle(e.Message);
if (!string.IsNullOrEmpty(audit))
transport.Send(e.Message, audit);
Expand All @@ -57,7 +63,7 @@ public void Run()
{
HttpListenerContext context = listener.GetContext();
ThreadPool.QueueUserWorkItem(
o => new HttpRequestHandler(requireMD5FromClient).Handle(o as HttpListenerContext, transport, outputQueue),
o => new HttpRequestHandler(requireMD5FromClient, notifier).Handle(o as HttpListenerContext, transport, outputQueue),
context);
}
}
Expand All @@ -69,6 +75,7 @@ public void Stop()

private static HttpListener listener;
private static MsmqTransport transport;
private static MessageNotifier notifier;
private static bool requireMD5FromClient = true;
private static string outputQueue;

Expand Down
8 changes: 6 additions & 2 deletions src/gateway/NServiceBus.Gateway/HttpRequestHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,16 @@

namespace NServiceBus.Gateway
{
public class HttpRequestHandler
internal class HttpRequestHandler
{
private const int maximumBytesToRead = 100000;
private bool requireMD5FromClient = true;
private readonly IMessageNotifier notifier;

public HttpRequestHandler(bool requireMD5)
public HttpRequestHandler(bool requireMD5, IMessageNotifier notifier)
{
requireMD5FromClient = requireMD5;
this.notifier = notifier;
}

public void Handle(HttpListenerContext ctx, MsmqTransport transport, string queue)
Expand Down Expand Up @@ -96,6 +98,8 @@ public void Handle(HttpListenerContext ctx, MsmqTransport transport, string queu
transport.Send(msg, header.Value);
else
transport.Send(msg, queue);

notifier.RaiseMessageProcessed(TransportTypeEnum.FromHttpToMsmq, msg);
}

if (hash != null)
Expand Down
34 changes: 34 additions & 0 deletions src/gateway/NServiceBus.Gateway/INotifyAboutMessages.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
using System;
using NServiceBus.Unicast.Transport;

namespace NServiceBus.Gateway
{
public interface INotifyAboutMessages
{
event EventHandler<MessageTransportArgs> MessageProcessed;
}

public class MessageTransportArgs : EventArgs
{
public TransportTypeEnum TransportType { get; set;}
public TransportMessage Message { get; set; }
}

public enum TransportTypeEnum { FromHttpToMsmq, FromMsmqToHttp }

internal interface IMessageNotifier : INotifyAboutMessages
{
void RaiseMessageProcessed(TransportTypeEnum transportType, TransportMessage message);
}

internal class MessageNotifier : IMessageNotifier
{
public event EventHandler<MessageTransportArgs> MessageProcessed;

void IMessageNotifier.RaiseMessageProcessed(TransportTypeEnum transportType, TransportMessage message)
{
if (MessageProcessed != null)
MessageProcessed(this, new MessageTransportArgs { TransportType = transportType, Message = message });
}
}
}
11 changes: 9 additions & 2 deletions src/gateway/NServiceBus.Gateway/MsmqHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,17 @@

namespace NServiceBus.Gateway
{
public class MsmqHandler
internal class MsmqHandler
{
private readonly string from;
public MsmqHandler(string listenUrl)
private readonly IMessageNotifier notifier;

public MsmqHandler(string listenUrl, IMessageNotifier notifier)
{
from = listenUrl;
this.notifier = notifier;
}

public void Handle(TransportMessage msg)
{
var header = msg.Headers.Find(h => h.Key == NServiceBus.Headers.HttpTo);
Expand Down Expand Up @@ -60,7 +64,10 @@ public void Handle(TransportMessage msg)
}

if (md5 == hash)
{
Logger.Debug("Message transferred successfully.");
notifier.RaiseMessageProcessed(TransportTypeEnum.FromMsmqToHttp, msg);
}
else
{
Logger.Info(Headers.ContentMd5Key + " header received from client not the same as that sent. Message not transferred successfully. Trying again...");
Expand Down
1 change: 1 addition & 0 deletions src/gateway/NServiceBus.Gateway/NServiceBus.Gateway.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
<Compile Include="Hasher.cs" />
<Compile Include="HeaderMapper.cs" />
<Compile Include="HttpRequestHandler.cs" />
<Compile Include="INotifyAboutMessages.cs" />
<Compile Include="MsmqHandler.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
</ItemGroup>
Expand Down
Binary file modified src/gateway/gateway.suo
Binary file not shown.

0 comments on commit b650d76

Please sign in to comment.