diff --git a/src/NServiceBus.AcceptanceTests/PubSub/When_publishing_an_event_using_a_broker_transport_with_centralized_routing.cs b/src/NServiceBus.AcceptanceTests/PubSub/When_publishing_an_event_using_a_broker_transport_with_centralized_routing.cs index 1924860173..22bcdf8e11 100644 --- a/src/NServiceBus.AcceptanceTests/PubSub/When_publishing_an_event_using_a_broker_transport_with_centralized_routing.cs +++ b/src/NServiceBus.AcceptanceTests/PubSub/When_publishing_an_event_using_a_broker_transport_with_centralized_routing.cs @@ -8,16 +8,20 @@ public class When_publishing_an_event_using_a_broker_transport_with_centralized_routing : NServiceBusAcceptanceTest { - [Test, Ignore("Not reliable!")] + [Test, Ignore] // Ignore because, test this test is unreliable. Passed on the build server without the core fix! public void Should_be_delivered_to_allsubscribers_without_the_need_for_config() { Scenario.Define() - .WithEndpoint(b => b.When(c => c.EndpointsStarted, (bus, context) => - { - bus.Publish(new MyEvent()); - })) - .WithEndpoint() - .WithEndpoint() + .WithEndpoint + (b => b.When(c => c.IsSubscriptionProcessedForSub1 && c.IsSubscriptionProcessedForSub2, bus => bus.Publish(new MyEvent()))) + .WithEndpoint(b => b.Given((bus, context) => + { + context.IsSubscriptionProcessedForSub1 = true; + })) + .WithEndpoint(b => b.Given((bus, context) => + { + context.IsSubscriptionProcessedForSub2 = true; + })) .Done(c => c.Subscriber1GotTheEvent && c.Subscriber2GotTheEvent) .Repeat(r => r.For()) .Should(c => @@ -32,8 +36,10 @@ public void Should_be_delivered_to_allsubscribers_without_the_need_for_config() public class Context : ScenarioContext { public bool Subscriber1GotTheEvent { get; set; } - public bool Subscriber2GotTheEvent { get; set; } + + public bool IsSubscriptionProcessedForSub1 { get; set; } + public bool IsSubscriptionProcessedForSub2 { get; set; } } public class CentralizedPublisher : EndpointConfigurationBuilder diff --git a/src/NServiceBus.AcceptanceTests/Sagas/When_a_saga_is_started_by_an_event_published_by_another_saga.cs b/src/NServiceBus.AcceptanceTests/Sagas/When_a_saga_is_started_by_an_event_published_by_another_saga.cs index 6eb27d46c8..c3f1ec8b62 100644 --- a/src/NServiceBus.AcceptanceTests/Sagas/When_a_saga_is_started_by_an_event_published_by_another_saga.cs +++ b/src/NServiceBus.AcceptanceTests/Sagas/When_a_saga_is_started_by_an_event_published_by_another_saga.cs @@ -46,6 +46,39 @@ public class Context : ScenarioContext public bool IsEventSubscriptionReceived { get; set; } } + + [Test] + public void Should_start_the_saga_when_set_up_to_start_for_the_base_event() + { + Scenario.Define() + .WithEndpoint(b => + b.Given( + (bus, context) => + Subscriptions.OnEndpointSubscribed(s => + { + if (s.SubscriberReturnAddress.Queue.Contains("SagaThatIsStartedByABaseEvent")) + { + context.IsEventSubscriptionReceived = true; + } + })) + .When(c => c.IsEventSubscriptionReceived, + bus => + bus.Publish(m=> { m.DataId = Guid.NewGuid(); })) + ) + .WithEndpoint( + b => b.Given((bus, context) => bus.Subscribe())) + .Done(c => c.DidSagaComplete) + .Repeat(r => r.For(Transports.Default)) + .Should(c => Assert.True(c.DidSagaComplete)) + .Run(); + } + + public class SagaContext : ScenarioContext + { + public bool IsEventSubscriptionReceived { get; set; } + public bool DidSagaComplete { get; set; } + } + public class SagaThatPublishesAnEvent : EndpointConfigurationBuilder { public SagaThatPublishesAnEvent() @@ -125,13 +158,45 @@ public class Saga2Timeout } } + public class SagaThatIsStartedByABaseEvent : EndpointConfigurationBuilder + { + public SagaThatIsStartedByABaseEvent() + { + EndpointSetup(c => Configure.Features.Disable()) + .AddMapping(typeof(SagaThatPublishesAnEvent)); + } + + public class SagaStartedByBaseEvent : Saga, IAmStartedByMessages + { + public SagaContext Context { get; set; } + + public void Handle(BaseEvent message) + { + Data.DataId = message.DataId; + MarkAsComplete(); + Context.DidSagaComplete = true; + } + + public class SagaData : ContainSagaData + { + [Unique] + public virtual Guid DataId { get; set; } + } + } + } + [Serializable] public class StartSaga : ICommand { public Guid DataId { get; set; } } - public interface SomethingHappenedEvent : IEvent + public interface SomethingHappenedEvent : BaseEvent + { + + } + + public interface BaseEvent : IEvent { Guid DataId { get; set; } } diff --git a/src/NServiceBus.Core.Tests/Fakes/FakeCentralizedPubSubTransportDefinition.cs b/src/NServiceBus.Core.Tests/Fakes/FakeCentralizedPubSubTransportDefinition.cs new file mode 100644 index 0000000000..94d2a60ae1 --- /dev/null +++ b/src/NServiceBus.Core.Tests/Fakes/FakeCentralizedPubSubTransportDefinition.cs @@ -0,0 +1,13 @@ +namespace NServiceBus.Core.Tests.Fakes +{ + using Transports; + + public class FakeCentralizedPubSubTransportDefinition : TransportDefinition + { + public FakeCentralizedPubSubTransportDefinition() + { + HasNativePubSubSupport = true; + HasSupportForCentralizedPubSub = true; + } + } +} diff --git a/src/NServiceBus.Core.Tests/NServiceBus.Core.Tests.csproj b/src/NServiceBus.Core.Tests/NServiceBus.Core.Tests.csproj index 8a03d34d9f..7afc366799 100644 --- a/src/NServiceBus.Core.Tests/NServiceBus.Core.Tests.csproj +++ b/src/NServiceBus.Core.Tests/NServiceBus.Core.Tests.csproj @@ -132,6 +132,7 @@ + diff --git a/src/NServiceBus.Core.Tests/Unicast/Contexts/using_the_unicastbus.cs b/src/NServiceBus.Core.Tests/Unicast/Contexts/using_the_unicastbus.cs index 363923a35c..e616cc88d0 100644 --- a/src/NServiceBus.Core.Tests/Unicast/Contexts/using_the_unicastbus.cs +++ b/src/NServiceBus.Core.Tests/Unicast/Contexts/using_the_unicastbus.cs @@ -53,6 +53,7 @@ public class using_a_configured_unicastBus protected StaticMessageRouter router; protected MessageHandlerRegistry handlerRegistry; + protected TransportDefinition transportDefinition; PipelineFactory pipelineFactory; @@ -60,7 +61,7 @@ public class using_a_configured_unicastBus public void SetUp() { - + transportDefinition = new Msmq(); LicenseManager.Verify(); HandlerInvocationCache.Clear(); @@ -134,6 +135,8 @@ public void SetUp() FuncBuilder.Register(() => new CreatePhysicalMessageBehavior()); FuncBuilder.Register(() => pipelineFactory); + FuncBuilder.Register(() => transportDefinition); + var messagePublisher = new StorageDrivenPublisher { diff --git a/src/NServiceBus.Core.Tests/Unicast/Subscriptions.cs b/src/NServiceBus.Core.Tests/Unicast/Subscriptions.cs index 7572348f20..016867f1f7 100644 --- a/src/NServiceBus.Core.Tests/Unicast/Subscriptions.cs +++ b/src/NServiceBus.Core.Tests/Unicast/Subscriptions.cs @@ -2,8 +2,9 @@ namespace NServiceBus.Unicast.Tests { using System; using Contexts; + using Core.Tests.Fakes; using NUnit.Framework; - + [TestFixture] public class When_subscribing_to_messages : using_the_unicastBus { @@ -16,6 +17,7 @@ public class When_subscribing_to_messages : using_the_unicastBus { router.RegisterMessageRoute(typeof(TestMessage), addressToOwnerOfTestMessage); } + [Test] public void Should_send_the_assemblyQualified_name_as_subscription_type() { @@ -38,24 +40,43 @@ public void Should_set_the_message_intent_to_subscribe() } [TestFixture] - public class When_subscribing_to_a_message_that_has_no_configured_address : using_the_unicastBus + public class When_using_a_non_centralized_pub_sub_transport : using_the_unicastBus { [Test] - public void Should_throw() + public void Should_throw_when_subscribing_to_a_message_that_has_no_configured_address() { Assert.Throws(() => bus.Subscribe()); } + + [Test] + public void Should_throw_when_unsubscribing_to_a_message_that_has_no_configured_address() + { + Assert.Throws(() => bus.Unsubscribe()); + } } [TestFixture] - public class When_unsubscribing_to_a_message_that_has_no_configured_address : using_the_unicastBus + public class When_using_a_centralized_pub_sub_transport : using_the_unicastBus { + [SetUp] + public new void SetUp() + { + transportDefinition = new FakeCentralizedPubSubTransportDefinition(); + } + [Test] - public void Should_throw() + public void Should_not_throw_when_subscribing_to_a_message_that_has_no_configured_address() { - Assert.Throws(() => bus.Unsubscribe()); + Assert.DoesNotThrow(() => bus.Subscribe()); + } + + [Test] + public void Should_not_throw_When_unsubscribing_to_a_message_that_has_no_configured_address() + { + Assert.DoesNotThrow(() => bus.Unsubscribe()); } } + [TestFixture] public class When_subscribing_to_command_messages : using_the_unicastBus { diff --git a/src/NServiceBus.Core/ConfigureQueueCreation.cs b/src/NServiceBus.Core/ConfigureQueueCreation.cs index cbf7dcfc67..a7c2611bf7 100644 --- a/src/NServiceBus.Core/ConfigureQueueCreation.cs +++ b/src/NServiceBus.Core/ConfigureQueueCreation.cs @@ -1,5 +1,7 @@ namespace NServiceBus { + using System.ComponentModel; + /// /// Contains extension methods to NServiceBus.Configure. /// @@ -15,6 +17,10 @@ public static Configure DoNotCreateQueues(this Configure config) return config; } - internal static bool DontCreateQueues { get; private set; } + /// + /// Gets whether or not queues should be created + /// + [EditorBrowsable(EditorBrowsableState.Advanced)] + public static bool DontCreateQueues { get; private set; } } } diff --git a/src/NServiceBus.Core/Licensing/LicenseExpiredForm.cs b/src/NServiceBus.Core/Licensing/LicenseExpiredForm.cs index 9c772fa542..5ba945b9a5 100644 --- a/src/NServiceBus.Core/Licensing/LicenseExpiredForm.cs +++ b/src/NServiceBus.Core/Licensing/LicenseExpiredForm.cs @@ -12,7 +12,13 @@ partial class LicenseExpiredForm : Form static ILog Logger = LogManager.GetLogger(typeof(LicenseExpiredForm)); public LicenseExpiredForm() { - InitializeComponent(); + InitializeComponent(); + } + + protected override void OnLoad(EventArgs e) + { + base.OnLoad(e); + Visible = true; } void browseButton_Click(object sender, EventArgs e) diff --git a/src/NServiceBus.Core/Licensing/LicenseExpiredForm.resx b/src/NServiceBus.Core/Licensing/LicenseExpiredForm.resx index 33632b4bd0..b278a1e02e 100644 --- a/src/NServiceBus.Core/Licensing/LicenseExpiredForm.resx +++ b/src/NServiceBus.Core/Licensing/LicenseExpiredForm.resx @@ -118,11 +118,11 @@ System.Resources.ResXResourceWriter, System.Windows.Forms, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089 - Your license has enxpired and a new license is needed to continue + Your license has expired and a new license is needed to continue. -Please purchase a license online or browse for a license file +Please purchase a license online or browse for a license file. -If you close this dialog NServiceBus will fall back to running in basic mode +If you close this dialog NServiceBus will fall back to running in basic mode. diff --git a/src/NServiceBus.Core/NServiceBus.Core.csproj b/src/NServiceBus.Core/NServiceBus.Core.csproj index bea72dede0..f0df10729c 100644 --- a/src/NServiceBus.Core/NServiceBus.Core.csproj +++ b/src/NServiceBus.Core/NServiceBus.Core.csproj @@ -149,6 +149,7 @@ + diff --git a/src/NServiceBus.Core/Sagas/Sagas.cs b/src/NServiceBus.Core/Sagas/Sagas.cs index 612f9a3a93..342121bd04 100644 --- a/src/NServiceBus.Core/Sagas/Sagas.cs +++ b/src/NServiceBus.Core/Sagas/Sagas.cs @@ -122,13 +122,17 @@ public static bool ShouldMessageStartSaga(Type sagaType, Type messageType) if (messageTypes == null) return false; - return messageTypes.Contains(messageType); + if (messageTypes.Contains(messageType)) + return true; + + return messageTypes.Any(msgTypeHandleBySaga => msgTypeHandleBySaga.IsAssignableFrom(messageType)); } /// /// Gets the saga type to instantiate and invoke if an existing saga couldn't be found by /// the given finder using the given message. /// + [ObsoleteEx(RemoveInVersion = "5.0", TreatAsErrorFromVersion = "4.4")] public static Type GetSagaTypeToStartIfMessageNotFoundByFinder(object message, IFinder finder) { Type sagaEntityType; diff --git a/src/NServiceBus.Core/Transports/ConfigureTransport.cs b/src/NServiceBus.Core/Transports/ConfigureTransport.cs new file mode 100644 index 0000000000..db764e2264 --- /dev/null +++ b/src/NServiceBus.Core/Transports/ConfigureTransport.cs @@ -0,0 +1,53 @@ +namespace NServiceBus.Transports +{ + using System; + using Features; + using Settings; + using Unicast.Transport; + + public abstract class ConfigureTransport : Feature, IConfigureTransport where T : TransportDefinition + { + public void Configure(Configure config) + { + var connectionString = TransportConnectionString.GetConnectionStringOrNull(); + + if (connectionString == null && RequiresConnectionString) + { + throw new InvalidOperationException(String.Format(Message, GetConfigFileIfExists(), typeof(T).Name, ExampleConnectionStringForErrorMessage)); + } + + SettingsHolder.Set("NServiceBus.Transport.ConnectionString", connectionString); + + var selectedTransportDefinition = Activator.CreateInstance(); + SettingsHolder.Set("NServiceBus.Transport.SelectedTransport", selectedTransportDefinition); + config.Configurer.RegisterSingleton(selectedTransportDefinition); + InternalConfigure(config); + } + + protected abstract void InternalConfigure(Configure config); + + protected abstract string ExampleConnectionStringForErrorMessage { get; } + + protected virtual bool RequiresConnectionString + { + get { return true; } + } + + + static string GetConfigFileIfExists() + { + return AppDomain.CurrentDomain.SetupInformation.ConfigurationFile ?? "App.config"; + } + + const string Message = + @"No default connection string found in your config file ({0}) for the {1} Transport. + +To run NServiceBus with {1} Transport you need to specify the database connectionstring. +Here is an example of what is required: + + + + "; + + } +} \ No newline at end of file diff --git a/src/NServiceBus.Core/Transports/IConfigureTransport.cs b/src/NServiceBus.Core/Transports/IConfigureTransport.cs index a60d5d5a5b..b0f3542003 100644 --- a/src/NServiceBus.Core/Transports/IConfigureTransport.cs +++ b/src/NServiceBus.Core/Transports/IConfigureTransport.cs @@ -1,10 +1,5 @@ namespace NServiceBus.Transports { - using System; - using Features; - using Settings; - using Unicast.Transport; - /// /// Configures the given transport using the default settings /// @@ -18,48 +13,4 @@ public interface IConfigureTransport /// The generic counterpart to IConfigureTransports /// public interface IConfigureTransport : IConfigureTransport where T : TransportDefinition { } - - public abstract class ConfigureTransport : Feature, IConfigureTransport where T : TransportDefinition - { - public void Configure(Configure config) - { - var connectionString = TransportConnectionString.GetConnectionStringOrNull(); - - if (connectionString == null && RequiresConnectionString) - { - throw new InvalidOperationException(String.Format(Message, GetConfigFileIfExists(), typeof(T).Name, ExampleConnectionStringForErrorMessage)); - } - - SettingsHolder.Set("NServiceBus.Transport.ConnectionString", connectionString); - SettingsHolder.Set("NServiceBus.Transport.SelectedTransport", Activator.CreateInstance()); - - InternalConfigure(config); - } - - protected abstract void InternalConfigure(Configure config); - - protected abstract string ExampleConnectionStringForErrorMessage { get; } - - protected virtual bool RequiresConnectionString - { - get { return true; } - } - - - static string GetConfigFileIfExists() - { - return AppDomain.CurrentDomain.SetupInformation.ConfigurationFile ?? "App.config"; - } - - const string Message = - @"No default connection string found in your config file ({0}) for the {1} Transport. - -To run NServiceBus with {1} Transport you need to specify the database connectionstring. -Here is an example of what is required: - - - - "; - - } } \ No newline at end of file diff --git a/src/NServiceBus.Core/Unicast/UnicastBus.cs b/src/NServiceBus.Core/Unicast/UnicastBus.cs index bf584b6acc..0f076901f7 100644 --- a/src/NServiceBus.Core/Unicast/UnicastBus.cs +++ b/src/NServiceBus.Core/Unicast/UnicastBus.cs @@ -18,6 +18,7 @@ namespace NServiceBus.Unicast using Routing; using Satellites; using Serialization; + using Settings; using Subscriptions; using Subscriptions.MessageDrivenSubscriptions.SubcriberSideFiltering; using Support; @@ -368,6 +369,13 @@ public virtual void Subscribe(Type messageType, Predicate condition) throw new InvalidOperationException("No subscription manager is available"); } + if (TransportDefinition.HasSupportForCentralizedPubSub) + { + // We are dealing with a brokered transport wired for auto pub/sub. + SubscriptionManager.Subscribe(messageType, null); + return; + } + var addresses = GetAddressForMessageType(messageType); if (addresses.Count == 0) { @@ -418,6 +426,13 @@ public virtual void Unsubscribe(Type messageType) throw new InvalidOperationException("No subscription manager is available"); } + if (TransportDefinition.HasSupportForCentralizedPubSub) + { + // We are dealing with a brokered transport wired for auto pub/sub. + SubscriptionManager.Unsubscribe(messageType, null); + return; + } + var addresses = GetAddressForMessageType(messageType); if (addresses.Count == 0) { @@ -1153,5 +1168,15 @@ LogicalMessageFactory LogicalMessageFactory return Builder.Build(); } } + + TransportDefinition TransportDefinition + { + get + { + return Builder.Build(); + } + } + + } } diff --git a/src/NServiceBus/NServiceBusVersion.cs b/src/NServiceBus/NServiceBusVersion.cs index f7403394e4..fe91fdcdf6 100644 --- a/src/NServiceBus/NServiceBusVersion.cs +++ b/src/NServiceBus/NServiceBusVersion.cs @@ -8,6 +8,6 @@ public static class NServiceBusVersion /// /// The semver version of NServiceBus /// - public const string Version = "4.3.0"; + public const string Version = "4.3.1"; } }