From 97c562f1fe333d87ccba40d640037525b1472ef7 Mon Sep 17 00:00:00 2001 From: shlomii Date: Thu, 22 Mar 2012 17:37:04 +0200 Subject: [PATCH] Allow V3.0.1 to be compatible with V2.6 Return messages. Closes #236 --- src/core/NServiceBus/Headers.cs | 10 ++++ .../CompletionMessage.cs | 2 +- .../IncomingReturnMessageMutator.cs | 52 +++++++++++++++++++ ... => IncomingSubscriptionMessageMutator.cs} | 14 ++--- ...ceBus.Unicast.BackwardCompatibility.csproj | 5 +- ... => OutgoingSubscriptionMessageMutator.cs} | 19 ++++--- src/unicast/NServiceBus.Unicast/UnicastBus.cs | 10 ++-- .../ControlMessage.cs | 13 ++--- 8 files changed, 95 insertions(+), 30 deletions(-) create mode 100644 src/unicast/NServiceBus.Unicast.BackwardCompatibility/IncomingReturnMessageMutator.cs rename src/unicast/NServiceBus.Unicast.BackwardCompatibility/{MutateTransportIncomingSubscriptionMessages.cs => IncomingSubscriptionMessageMutator.cs} (64%) rename src/unicast/NServiceBus.Unicast.BackwardCompatibility/{MutateTransportOutgoingSubscriptionMessages.cs => OutgoingSubscriptionMessageMutator.cs} (62%) diff --git a/src/core/NServiceBus/Headers.cs b/src/core/NServiceBus/Headers.cs index 4fcafb4861..1bc867969b 100644 --- a/src/core/NServiceBus/Headers.cs +++ b/src/core/NServiceBus/Headers.cs @@ -67,5 +67,15 @@ public static class Headers /// Header telling the NServiceBus Version (beginning NServiceBus V3.0.1). /// public const string NServiceBusVersion = "NServiceBus.Version"; + + /// + /// Used in a header when doing a callback (bus.return) + /// + public const string ReturnMessageErrorCodeHeader = "NServiceBus.ReturnMessage.ErrorCode"; + + /// + /// Header that tells if this transport message is a control message + /// + public const string ControlMessageHeader = "NServiceBus.ControlMessage"; } } diff --git a/src/unicast/NServiceBus.Unicast.BackwardCompatibility/CompletionMessage.cs b/src/unicast/NServiceBus.Unicast.BackwardCompatibility/CompletionMessage.cs index bac2fc5a42..dc602619b0 100644 --- a/src/unicast/NServiceBus.Unicast.BackwardCompatibility/CompletionMessage.cs +++ b/src/unicast/NServiceBus.Unicast.BackwardCompatibility/CompletionMessage.cs @@ -1,5 +1,5 @@ using System; -// Completion message is used by a V3.X subsciber with a 2.6 publisher. +// Completion message is used by a V3.X subscriber with a 2.6 publisher. // Do no change the namespace namespace namespace NServiceBus.Unicast.Transport diff --git a/src/unicast/NServiceBus.Unicast.BackwardCompatibility/IncomingReturnMessageMutator.cs b/src/unicast/NServiceBus.Unicast.BackwardCompatibility/IncomingReturnMessageMutator.cs new file mode 100644 index 0000000000..1f20fb484c --- /dev/null +++ b/src/unicast/NServiceBus.Unicast.BackwardCompatibility/IncomingReturnMessageMutator.cs @@ -0,0 +1,52 @@ +using System.Globalization; +using Common.Logging; +using NServiceBus.Config; +using NServiceBus.MessageMutator; +using NServiceBus.Unicast.Transport; + +namespace NServiceBus.Unicast.BackwardCompatibility +{ + /// + /// If this is a V26 message, extract completion message return error code and place it in the transport headers + /// + public class IncomingReturnMessageMutator : IMutateIncomingMessages, INeedInitialization + { + /// + /// Reference to the BUS to get a hold of the current TransportMessage + /// + public IBus Bus { get; set; } + + /// + /// If this is a completion message from a 2.6 sender, copy the error code. + /// + /// Message to copy ErrorCode from. + /// Same message as received. + public object MutateIncoming(object message) + { + var completionMessage = message as CompletionMessage; + if (completionMessage == null) + return message; + + if(!Bus.CurrentMessageContext.Headers.ContainsKey(Headers.ReturnMessageErrorCodeHeader)) + Bus.CurrentMessageContext.Headers.Add(Headers.ReturnMessageErrorCodeHeader, + completionMessage.ErrorCode.ToString(CultureInfo.InvariantCulture)); + + //Change to Transport to be a Control Message so no need to find a handler for that. + if(!Bus.CurrentMessageContext.Headers.ContainsKey(Headers.ControlMessageHeader)) + Bus.CurrentMessageContext.Headers.Add(Headers.ControlMessageHeader, true.ToString(CultureInfo.InvariantCulture)); + + return message; + } + + /// + /// Register the IncomingReturnMessageMutator + /// + public void Init() + { + Configure.Instance.Configurer.ConfigureComponent(DependencyLifecycle.InstancePerCall); + Log.Debug("Configured IncomingReturnMessageMutator"); + } + + private readonly static ILog Log = LogManager.GetLogger(typeof(IncomingSubscriptionMessageMutator)); + } +} diff --git a/src/unicast/NServiceBus.Unicast.BackwardCompatibility/MutateTransportIncomingSubscriptionMessages.cs b/src/unicast/NServiceBus.Unicast.BackwardCompatibility/IncomingSubscriptionMessageMutator.cs similarity index 64% rename from src/unicast/NServiceBus.Unicast.BackwardCompatibility/MutateTransportIncomingSubscriptionMessages.cs rename to src/unicast/NServiceBus.Unicast.BackwardCompatibility/IncomingSubscriptionMessageMutator.cs index 2edf1a38c6..ee2bd7d4e2 100644 --- a/src/unicast/NServiceBus.Unicast.BackwardCompatibility/MutateTransportIncomingSubscriptionMessages.cs +++ b/src/unicast/NServiceBus.Unicast.BackwardCompatibility/IncomingSubscriptionMessageMutator.cs @@ -5,10 +5,12 @@ namespace NServiceBus.Unicast.BackwardCompatibility { - class MutateTransportIncomingSubscriptionMessages : IMutateIncomingTransportMessages, INeedInitialization + class IncomingSubscriptionMessageMutator : IMutateIncomingTransportMessages, INeedInitialization { /// - /// Re-Adjust V3.0.0 subscribe & UnSubscribe messages. Version 3.0.0 Subs/Unsubs/Publish no NServiceBus.Version set it the headers. + /// Re-Adjust V3.0.0 subscribe and unsubscribe messages. + /// Version 3.0.0 subscribe and unsubscribe message have no NServiceBus.Version set it the headers. + /// Version 3.0.0 Send message have it with "3.0.0" set as value. /// Do nothing If it is a V2.6 message (contains EnclosedMessageTypes key). /// /// @@ -24,14 +26,14 @@ public void MutateIncoming(TransportMessage transportMessage) } /// - /// Register the MutateTransportIncomingSubscriptionMessages mutator + /// Register the IncomingSubscriptionMessageMutator mutator /// public void Init() { - Configure.Instance.Configurer.ConfigureComponent(DependencyLifecycle.InstancePerCall); - Log.Debug("Configured Transport Incoming Message Mutator: MutateTransportIncomingSubscriptionMessages"); + Configure.Instance.Configurer.ConfigureComponent(DependencyLifecycle.InstancePerCall); + Log.Debug("Configured Transport Incoming Message Mutator: IncomingSubscriptionMessageMutator"); } - private readonly static ILog Log = LogManager.GetLogger(typeof(MutateTransportIncomingSubscriptionMessages)); + private readonly static ILog Log = LogManager.GetLogger(typeof(IncomingSubscriptionMessageMutator)); } } diff --git a/src/unicast/NServiceBus.Unicast.BackwardCompatibility/NServiceBus.Unicast.BackwardCompatibility.csproj b/src/unicast/NServiceBus.Unicast.BackwardCompatibility/NServiceBus.Unicast.BackwardCompatibility.csproj index a8d7f57e17..694804d3e0 100644 --- a/src/unicast/NServiceBus.Unicast.BackwardCompatibility/NServiceBus.Unicast.BackwardCompatibility.csproj +++ b/src/unicast/NServiceBus.Unicast.BackwardCompatibility/NServiceBus.Unicast.BackwardCompatibility.csproj @@ -57,8 +57,9 @@ - - + + + diff --git a/src/unicast/NServiceBus.Unicast.BackwardCompatibility/MutateTransportOutgoingSubscriptionMessages.cs b/src/unicast/NServiceBus.Unicast.BackwardCompatibility/OutgoingSubscriptionMessageMutator.cs similarity index 62% rename from src/unicast/NServiceBus.Unicast.BackwardCompatibility/MutateTransportOutgoingSubscriptionMessages.cs rename to src/unicast/NServiceBus.Unicast.BackwardCompatibility/OutgoingSubscriptionMessageMutator.cs index 68a148002f..504fa5ac26 100644 --- a/src/unicast/NServiceBus.Unicast.BackwardCompatibility/MutateTransportOutgoingSubscriptionMessages.cs +++ b/src/unicast/NServiceBus.Unicast.BackwardCompatibility/OutgoingSubscriptionMessageMutator.cs @@ -1,5 +1,4 @@ using System.IO; -using System.Threading; using Common.Logging; using NServiceBus.Config; using NServiceBus.MessageMutator; @@ -11,7 +10,7 @@ namespace NServiceBus.Unicast.BackwardCompatibility /// /// Allow for a V3.X subscriber to subscribe/unsubscribe to a V2.6 publisher /// - public class MutateTransportOutgoingSubscriptionMessages : IMutateOutgoingTransportMessages, INeedInitialization + public class OutgoingSubscriptionMessageMutator : IMutateOutgoingTransportMessages, INeedInitialization { /// /// Allow for a V3.X subscriber to subscribe/unsubscribe to a V2.6 publisher @@ -23,26 +22,32 @@ public class MutateTransportOutgoingSubscriptionMessages : IMutateOutgoingTransp public void MutateOutgoing(object[] messages, TransportMessage transportMessage) { if ((transportMessage.IsControlMessage() && - ((transportMessage.MessageIntent == MessageIntentEnum.Subscribe) || (transportMessage.MessageIntent == MessageIntentEnum.Unsubscribe)))) + ((transportMessage.MessageIntent == MessageIntentEnum.Subscribe) || + (transportMessage.MessageIntent == MessageIntentEnum.Unsubscribe) || + (transportMessage.MessageIntent == MessageIntentEnum.Send)))) { var stream = new MemoryStream(); - MessageSerializer.Serialize(new object[] { new CompletionMessage() }, stream); + var completionMessage = new CompletionMessage(); + if (transportMessage.Headers.ContainsKey(Headers.ReturnMessageErrorCodeHeader)) + completionMessage.ErrorCode = int.Parse(transportMessage.Headers[Headers.ReturnMessageErrorCodeHeader]); + + MessageSerializer.Serialize(new object[] { completionMessage }, stream); transportMessage.Body = stream.ToArray(); Log.Debug("Added Completion message and sending message intent: " + transportMessage.MessageIntent); } } /// - /// Register the MutateTransportOutgoingSubscriptionMessages mutator + /// Register the OutgoingSubscriptionMessageMutator mutator /// public void Init() { - Configure.Instance.Configurer.ConfigureComponent(DependencyLifecycle.InstancePerCall); + Configure.Instance.Configurer.ConfigureComponent(DependencyLifecycle.InstancePerCall); } /// /// Gets or sets the message serializer /// public IMessageSerializer MessageSerializer { get; set; } - private readonly static ILog Log = LogManager.GetLogger(typeof(MutateTransportOutgoingSubscriptionMessages)); + private readonly static ILog Log = LogManager.GetLogger(typeof(OutgoingSubscriptionMessageMutator)); } } diff --git a/src/unicast/NServiceBus.Unicast/UnicastBus.cs b/src/unicast/NServiceBus.Unicast/UnicastBus.cs index 3093d9a80b..8b1775fac8 100644 --- a/src/unicast/NServiceBus.Unicast/UnicastBus.cs +++ b/src/unicast/NServiceBus.Unicast/UnicastBus.cs @@ -32,8 +32,6 @@ public class UnicastBus : IUnicastBus, IStartableBus /// public const string SubscriptionMessageType = "SubscriptionMessageType"; - private const string ReturnMessageErrorCodeHeader = "NServiceBus.ReturnMessage.ErrorCode"; - #region config properties private bool autoSubscribe = true; @@ -480,10 +478,12 @@ void IBus.Return(T errorCode) { var returnMessage = ControlMessage.Create(); - returnMessage.Headers[ReturnMessageErrorCodeHeader] = errorCode.GetHashCode().ToString(); + returnMessage.Headers[Headers.ReturnMessageErrorCodeHeader] = errorCode.GetHashCode().ToString(); returnMessage.CorrelationId = _messageBeingHandled.IdForCorrelation; returnMessage.MessageIntent = MessageIntentEnum.Send; + InvokeOutgoingTransportMessagesMutators(new object[] { }, returnMessage); + MessageSender.Send(returnMessage, _messageBeingHandled.ReplyToAddress); } @@ -1111,8 +1111,8 @@ void HandleCorellatedMessage(TransportMessage msg, object[] messages) var statusCode = int.MinValue; - if (msg.IsControlMessage() && msg.Headers.ContainsKey(ReturnMessageErrorCodeHeader)) - statusCode = int.Parse(msg.Headers[ReturnMessageErrorCodeHeader]); + if (msg.IsControlMessage() && msg.Headers.ContainsKey(Headers.ReturnMessageErrorCodeHeader)) + statusCode = int.Parse(msg.Headers[Headers.ReturnMessageErrorCodeHeader]); busAsyncResult.Complete(statusCode, messages); } diff --git a/src/unicastTransport/NServiceBus.Unicast.Transport/ControlMessage.cs b/src/unicastTransport/NServiceBus.Unicast.Transport/ControlMessage.cs index 0a1a360c5f..67e3b4bc31 100644 --- a/src/unicastTransport/NServiceBus.Unicast.Transport/ControlMessage.cs +++ b/src/unicastTransport/NServiceBus.Unicast.Transport/ControlMessage.cs @@ -3,7 +3,7 @@ using System.Collections.Generic; /// - /// Helper for creating controll messages + /// Helper for creating control messages /// public static class ControlMessage { @@ -20,15 +20,10 @@ public static TransportMessage Create() Recoverable = true, MessageIntent = MessageIntentEnum.Send }; - transportMessage.Headers.Add(ControlMessageHeader, true.ToString()); + transportMessage.Headers.Add(Headers.ControlMessageHeader, true.ToString()); return transportMessage; } - - /// - /// Header which tells that this transportmessage is a controll message - /// - public static string ControlMessageHeader = "NServiceBus.ControlMessage"; } /// @@ -37,14 +32,14 @@ public static TransportMessage Create() public static class TransportMessageExtensions { /// - /// True if the transportmessage is a control message + /// True if the transport message is a control message /// /// /// public static bool IsControlMessage(this TransportMessage transportMessage) { return transportMessage.Headers != null && - transportMessage.Headers.ContainsKey(ControlMessage.ControlMessageHeader); + transportMessage.Headers.ContainsKey(Headers.ControlMessageHeader); } }