Skip to content
This repository has been archived by the owner on Feb 12, 2024. It is now read-only.

Messaging abstractions for building testable applications.

License

Notifications You must be signed in to change notification settings

EnableSoftware/Enable.Extensions.Messaging

Repository files navigation

Enable.Extensions.Messaging

Build status

Messaging abstractions for building testable applications.

Message buses provide asynchronous message communication. They allow applications to easily scale out workloads to multiple processors, improve the resiliency of applications to sudden peaks in workloads and naturally lead to loosely coupled components or applications.

Enable.Extensions.Messaging currently provides three messaging implementations:

In addition to these packages, an additional Enable.Extensions.Messaging.Abstractions package is available. This contains the basic abstractions that the implementations listed above build upon. Use Enable.Extensions.Messaging.Abstractions to implement your own messaging provider.

Package name NuGet version
Enable.Extensions.Messaging.Abstractions NuGet
Enable.Extensions.Messaging.InMemory NuGet
Enable.Extensions.Messaging.RabbitMQ NuGet
Enable.Extensions.Messaging.AzureServiceBus NuGet

Queues vs. message buses

There are two types of messages, and therefore two types of messaging queues, that we may want to leverage, depending on the intent of the publisher of a message. The first type of messages are commands or jobs that must be processed. The second are events that notify of something that has already happened. In the former case, we would need an actor to action the command, and we would want a single actor to do this, potentially reporting back the outcome of the processing. In the latter case, we are simply notifying any listeners of something that has already happened, and we don't have any expectation of how that event is to be handled.

Enable.Extensions.Messaging provides the second of these types of messaging queues. Use any of the queue implementations provided to queue up work items, and have one out of any number of consumers process each individual message.

Examples

The following example demonstrates publishing a message and then retrieving the same message. This uses a single application to both send and receive messages. In production systems you'll more likely have (multiple) different components publishing and consuming messages.

Here we're using the RabbitMQ queue provider. How we work with messages is the same across any of the available queue implementations. The only differences are in the options that are passed in when constructing the queue factory.

using System;
using System.Threading.Tasks;
using Enable.Extensions.Messaging.Abstractions;
using Enable.Extensions.Messaging.RabbitMQ;

namespace MessagingSamples
{
    public class Program
    {
        public static void Main() => MainAsync().GetAwaiter().GetResult();

        public static async Task MainAsync()
        {
            var options = new RabbitMQQueueClientFactoryOptions
            {
                HostName = "localhost",
                Port = 5672,
                VirtualHost = "/"
                UserName = "guest",
                Password = "guest",
            };

            // Using new RabbitMQQuorumQueueMessagingClientFactory(options) can be used to
            // create a quorum queue if the queue does not already exist. 
            var queueFactory = new RabbitMQMessagingClientFactory(options);

            // A topic is a essentially a queue that we want to publish
            // messages to.
            var topicName = "your-topic-name";

            // We start by getting a publisher to the topic that you want to
            // work with. The queue will be automatically created if it doesn't
            // exist.
            using (var publisher = queueFactory.GetMessagePublisher(topicName))
            {
                // Add a new item to the queue.
                // Here we're using a `string`, but any type of message can
                // be used as long as it can be serialised to a byte array.
                await publisher.EnqueueAsync("Hello, World!");
            }

            // Publishing messages to a topic and subscribing to messages on
            // that topic are spearated out into different interfaces. We
            // therefore now need to a subscriber to our topic. Each unique
            // subscription gets a unique name. A message published to a topic
            // will be delievered once to each unique subscription.
            var subscriptionName = "your-subscription-name";
            
            using (var subscriber = queueFactory.GetMessageSubscriber(topicName, subscriptionName))
            {
                // Retrieve our message from the queue.
                // We get this back as a type of `IMessage` (note that
                // we could get `null` back here, if there are no messages
                // in the queue).
                var message = await subscriber.DequeueAsync();

                // The contents of the message that we sent are available
                // from `IMessage.Body`. This property is our original
                // message, "Hello, World!", stored as a byte array. This
                // might not be that useful, so you can use the
                // `IMessage.GetBody<T>` method to deserialise this back
                // to your original type.
                // In this case, we want to get our `string` back:
                var payload = message.GetBody<string>();

                // The following will print `Hello, World!`.
                Console.WriteLine(payload);

                // Now, do some clever work with the message.
                // Here's your chance to shine…

                // Finally, we acknowledge the message once we're done.
                // We can either "complete" the message, which means that
                // we've successfully processed it. This will remove the
                // message from our queue.
                await subscriber.CompleteAsync(message);

                // Or if something goes wrong and we can't process the
                // message, we can "abandon" it (but don't call both
                // `CompleteAsync` and `AbandonAsync`!).
                // await subscriber.AbandonAsync(message);
            }
        }
    }
}

About

Messaging abstractions for building testable applications.

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages