Skip to content

Commit

Permalink
Subscriber abstraction + RabbitMQ consumer service
Browse files Browse the repository at this point in the history
  • Loading branch information
fredimachado committed May 19, 2024
1 parent 161cd51 commit fececb3
Show file tree
Hide file tree
Showing 20 changed files with 348 additions and 171 deletions.
7 changes: 3 additions & 4 deletions src/Aspire/NCafe.AppHost/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,16 @@
.WithReference(eventStore)
.WaitFor(eventStore);

var baristaProject = builder.AddProject<Projects.NCafe_Barista_Api>("barista-api")
var cashierProject = builder.AddProject<Projects.NCafe_Cashier_Api>("cashier-api")
.WithReference(eventStore)
.WithReference(rabbitMq)
.WaitFor(eventStore)
.WaitFor(rabbitMq);

var cashierProject = builder.AddProject<Projects.NCafe_Cashier_Api>("cashier-api")
var baristaProject = builder.AddProject<Projects.NCafe_Barista_Api>("barista-api")
.WithReference(eventStore)
.WithReference(rabbitMq)
.WaitFor(eventStore)
.WaitFor(rabbitMq);
.WaitFor(cashierProject);

var webUiProject = builder.AddProject<Projects.NCafe_Web>("web-ui")
.WithExternalHttpEndpoints();
Expand Down
93 changes: 0 additions & 93 deletions src/Barista/NCafe.Barista.Api/MessageBus/OrdersConsumerService.cs

This file was deleted.

26 changes: 23 additions & 3 deletions src/Barista/NCafe.Barista.Api/Program.cs
Original file line number Diff line number Diff line change
@@ -1,19 +1,24 @@
using Microsoft.AspNetCore.ResponseCompression;
using Microsoft.AspNetCore.SignalR;
using NCafe.Barista.Api.Hubs;
using NCafe.Barista.Api.MessageBus;
using NCafe.Barista.Api.Projections;
using NCafe.Barista.Domain.Commands;
using NCafe.Barista.Domain.Queries;
using NCafe.Barista.Domain.ReadModels;
using NCafe.Core.Commands;
using NCafe.Core.MessageBus.Events;
using NCafe.Core.Queries;
using NCafe.Infrastructure;
using NCafe.Shared.Hubs;

var builder = WebApplication.CreateBuilder(args);

builder.AddServiceDefaults();

builder.AddRabbitMQClient("rabbitmq");
builder.AddRabbitMQClient("rabbitmq", configureConnectionFactory: config =>
{
config.DispatchConsumersAsync = true;
});

// Add services to the container.
builder.Services.AddEventStoreRepository(builder.Configuration)
Expand All @@ -25,7 +30,7 @@
.AddEventStoreProjectionService<BaristaOrder>()
.AddHostedService<BaristaOrderProjectionService>();

builder.Services.AddHostedService<OrdersConsumerService>();
builder.Services.AddRabbitMqConsumerService(builder.Configuration);

builder.Services.AddEndpointsApiExplorer()
.AddSwaggerGen();
Expand All @@ -51,6 +56,21 @@

var app = builder.Build();

app.UseMessageSubscriber()
.Subscribe<OrderPlaced>(async (serviceProvider, message) =>
{
var commandDispatcher = serviceProvider.GetRequiredService<ICommandDispatcher>();
var hubContext = serviceProvider.GetRequiredService<IHubContext<OrderHub>>();
// Dispatch domain command
await commandDispatcher.DispatchAsync(new PlaceOrder(message.Id, message.ProductId, message.Quantity));
// Notify clients
await hubContext.Clients.All.SendAsync(
"ReceiveOrder",
new Order(message.Id, message.ProductId, message.Quantity));
});

app.MapDefaultEndpoints();

app.UseResponseCompression();
Expand Down
3 changes: 3 additions & 0 deletions src/Barista/NCafe.Barista.Api/appsettings.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,8 @@
"ConnectionStrings": {
"EventStore": "esdb://localhost:2113?tls=false",
"RabbitMq": "host=localhost;username=myuser;password=mypassword"
},
"RabbitMq": {
"queuePrefix": "barista-service"
}
}
3 changes: 3 additions & 0 deletions src/Cashier/NCafe.Cashier.Api/appsettings.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,8 @@
"ConnectionStrings": {
"EventStore": "esdb://localhost:2113?tls=false",
"RabbitMq": "host=localhost;username=myuser;password=mypassword"
},
"RabbitMq": {
"exchangeName": "cashier"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public async Task GivenOrderSaved_ShouldPublishToMessageBus()

// Assert
exception.ShouldBeNull();
A.CallTo(() => _publisher.Publish("orders_queue", A<OrderPlaced>.That.Matches(o => o.ProductId == productId && o.Quantity == 1)))
A.CallTo(() => _publisher.Publish(A<OrderPlaced>.That.Matches(o => o.ProductId == productId && o.Quantity == 1)))
.MustHaveHappenedOnceExactly();
}
}
4 changes: 1 addition & 3 deletions src/Cashier/NCafe.Cashier.Domain/Commands/PlaceOrder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,13 @@ internal sealed class PlaceOrderHandler(
private readonly IReadModelRepository<Product> _productReadRepository = productReadRepository;
private readonly IPublisher _publisher = publisher;

private const string Queue = "orders_queue";

public async Task HandleAsync(PlaceOrder command)
{
var product = _productReadRepository.GetById(command.ProductId) ?? throw new ProductNotFoundException(command.ProductId);
var order = new Order(Guid.NewGuid(), product.Id, command.Quantity);

await _repository.Save(order);

await _publisher.Publish(Queue, new OrderPlaced(order.Id, order.ProductId, order.Quantity));
await _publisher.Publish(new OrderPlaced(order.Id, order.ProductId, order.Quantity));
}
}
1 change: 1 addition & 0 deletions src/Common/NCafe.Core/MessageBus/Events/OrderPlaced.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
namespace NCafe.Core.MessageBus.Events;

[Message("cashier")]
public sealed record OrderPlaced(Guid Id, Guid ProductId, int Quantity) : IBusMessage;
7 changes: 7 additions & 0 deletions src/Common/NCafe.Core/MessageBus/IMessageSubscriber.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
namespace NCafe.Core.MessageBus;

public interface IMessageSubscriber
{
IReadOnlyDictionary<Type, Func<IServiceProvider, object, Task>> Handlers { get; }
IMessageSubscriber Subscribe<T>(Func<IServiceProvider, T, Task> handle) where T : class, IBusMessage;
}
2 changes: 1 addition & 1 deletion src/Common/NCafe.Core/MessageBus/IPublisher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@

public interface IPublisher
{
Task Publish<T>(string topicName, T message) where T : class, IBusMessage;
Task Publish<T>(T message) where T : class, IBusMessage;
}
7 changes: 7 additions & 0 deletions src/Common/NCafe.Core/MessageBus/MessageAttribute.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
namespace NCafe.Core.MessageBus;

[AttributeUsage(AttributeTargets.Class, Inherited = false, AllowMultiple = false)]
public sealed class MessageAttribute(string exchangeName) : Attribute
{
public string ExchangeName { get; } = exchangeName;
}
48 changes: 44 additions & 4 deletions src/Common/NCafe.Infrastructure/DependencyRegistration.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
using EventStore.Client;
using Microsoft.AspNetCore.Builder;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
using NCafe.Core.Commands;
using NCafe.Core.MessageBus;
using NCafe.Core.Projections;
Expand All @@ -10,9 +12,10 @@
using NCafe.Infrastructure.Commands;
using NCafe.Infrastructure.EventStore;
using NCafe.Infrastructure.Logging;
using NCafe.Infrastructure.MessageBus;
using NCafe.Infrastructure.MessageBrokers.RabbitMQ;
using NCafe.Infrastructure.Queries;
using NCafe.Infrastructure.ReadModels;
using RabbitMQ.Client;

namespace NCafe.Infrastructure;

Expand All @@ -35,9 +38,7 @@ public static IServiceCollection AddEventStoreRepository(this IServiceCollection
public static IServiceCollection AddEventStoreProjectionService<TModel>(this IServiceCollection services)
where TModel : ReadModel
{
services.AddSingleton<IProjectionService<TModel>, EventStoreProjectionService<TModel>>();

return services;
return services.AddSingleton<IProjectionService<TModel>, EventStoreProjectionService<TModel>>();
}

public static IServiceCollection AddCommandHandlers<T>(this IServiceCollection services)
Expand Down Expand Up @@ -79,11 +80,50 @@ public static IServiceCollection AddRabbitMqPublisher(this IServiceCollection se
throw new InvalidOperationException("Invalid RabbitMq configuration.");
}

services.AddOptions<RabbitMqSettings>()
.Bind(configuration.GetSection(RabbitMqSettings.SectionName));

services.AddSingleton<IPublisher, RabbitMqPublisher>();

InitializeRabbitMqExchange(services);

return services;
}

public static IServiceCollection AddRabbitMqConsumerService(this IServiceCollection services, IConfiguration configuration)
{
services.AddOptions<RabbitMqSettings>()
.Bind(configuration.GetSection(RabbitMqSettings.SectionName));

return services.AddSingleton<IMessageSubscriber, RabbitMqMessageSubscriber>()
.AddHostedService<RabbitMqConsumerService>();
}

public static IMessageSubscriber UseMessageSubscriber(this IApplicationBuilder app)
{
var messageSubscriber = app.ApplicationServices.GetService<IMessageSubscriber>();

if (messageSubscriber is null)
{
throw new InvalidOperationException("Message Subscriber is not registered. Make sure to call Services.AddRabbitMqConsumerService.");
}

return messageSubscriber;
}

private static void InitializeRabbitMqExchange(IServiceCollection services)
{
var scope = services.BuildServiceProvider().CreateScope();
var connection = scope.ServiceProvider.GetRequiredService<IConnection>();
var settings = scope.ServiceProvider.GetRequiredService<IOptions<RabbitMqSettings>>();

var channel = connection.CreateModel();
channel.ExchangeDeclare(exchange: ExchangeNameProvider.Get(settings.Value.ExchangeName),
type: ExchangeType.Topic,
durable: true,
autoDelete: false);
}

public static IServiceCollection AddInMemoryReadModelRepository<T>(this IServiceCollection services) where T : ReadModel
{
services.AddSingleton<IReadModelRepository<T>, InMemoryReadModelRepository<T>>();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
using System.Reflection;

namespace NCafe.Infrastructure.MessageBrokers.RabbitMQ;

internal static class ExchangeNameProvider
{
public static string Get(string exchange)
{
return !string.IsNullOrWhiteSpace(exchange) ? exchange : Assembly.GetEntryAssembly().GetName().Name;
}
}
Loading

0 comments on commit fececb3

Please sign in to comment.