Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## 5.5.0
- Added Outbox pattern support

## 5.4.0
- Added Isolation feature V2

## 5.3.1
- Changed
- Fix usage of sent counter on receiver instead of received counter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@ namespace Ev.ServiceBus.Abstractions;
[Serializable]
public class SenderNotFoundException : Exception
{
public SenderNotFoundException(ClientType clientType, string topicName)
public SenderNotFoundException(string resourceId)
: base(
$"The {clientType.ToString()} '{topicName}' you tried to retrieve was not found. "
+ $"Verify your configuration to make sure the {clientType.ToString()} is properly registered.")
$"The '{resourceId}' you tried to retrieve was not found. "
+ $"Verify your configuration to make sure the resource is properly registered.")
{
TopicName = topicName;
ResourceId = resourceId;
}

public string TopicName { get; }
public string ResourceId { get; }
}
7 changes: 4 additions & 3 deletions src/Ev.ServiceBus.Abstractions/IMessagePublisher.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Threading.Tasks;

namespace Ev.ServiceBus.Abstractions;

Expand All @@ -9,21 +10,21 @@ public interface IMessagePublisher
/// </summary>
/// <param name="messageDto">The object to send through Service Bus</param>
/// <typeparam name="TMessagePayload">A type of object that is registered within Ev.ServiceBus</typeparam>
void Publish<TMessagePayload>(TMessagePayload messageDto);
Task Publish<TMessagePayload>(TMessagePayload messageDto);

/// <summary>
/// Temporarily stores the object to send through Service Bus until <see cref="IMessageDispatcher.ExecuteDispatches"/> is called.
/// </summary>
/// <param name="messageDto">The object to send through Service Bus</param>
/// <param name="sessionId">The sessionId to attach to the outgoing message</param>
/// <typeparam name="TMessagePayload">A type of object that is registered within Ev.ServiceBus</typeparam>
void Publish<TMessagePayload>(TMessagePayload messageDto, string sessionId);
Task Publish<TMessagePayload>(TMessagePayload messageDto, string sessionId);

/// <summary>
/// Temporarily stores the object to send through Service Bus until <see cref="IMessageDispatcher.ExecuteDispatches"/> is called.
/// </summary>
/// <param name="messageDto">The object to send through Service Bus</param>
/// <param name="messageContextConfiguration">Configurator of message context</param>
/// <typeparam name="TMessagePayload">A type of object that is registered within Ev.ServiceBus</typeparam>
void Publish<TMessagePayload>(TMessagePayload messageDto, Action<IDispatchContext> messageContextConfiguration);
Task Publish<TMessagePayload>(TMessagePayload messageDto, Action<IDispatchContext> messageContextConfiguration);
}
11 changes: 11 additions & 0 deletions src/Ev.ServiceBus.Abstractions/IsExternalInit.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
using System.ComponentModel;

namespace System.Runtime.CompilerServices;

/// <summary>
/// Reserved to be used by the compiler for tracking metadata.
/// This class should not be used by developers in source code.
/// </summary>
[EditorBrowsable(EditorBrowsableState.Never)]
internal static class IsExternalInit {
}
217 changes: 16 additions & 201 deletions src/Ev.ServiceBus/Dispatch/DispatchSender.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,40 +3,21 @@
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Azure.Messaging.ServiceBus;
using Ev.ServiceBus.Abstractions;
using Ev.ServiceBus.Abstractions.Extensions;
using Ev.ServiceBus.Abstractions.MessageReception;
using Ev.ServiceBus.Diagnostics;
using Ev.ServiceBus.Management;
using Microsoft.Extensions.Options;

namespace Ev.ServiceBus.Dispatch;

public class DispatchSender : IDispatchSender
{
private const int MaxMessagePerSend = 100;
private readonly IMessagePayloadSerializer _messagePayloadSerializer;
private readonly ServiceBusRegistry _dispatchRegistry;
private readonly ServiceBusRegistry _registry;
private readonly IMessageMetadataAccessor _messageMetadataAccessor;
private readonly IEnumerable<IDispatchExtender> _dispatchCustomizers;
private readonly ServiceBusOptions _serviceBusOptions;
private readonly ServiceBusMessageFactory _messageFactory;
private readonly ServiceBusMessageSender _serviceBusMessageSender;

public DispatchSender(
ServiceBusRegistry registry,
IMessagePayloadSerializer messagePayloadSerializer,
ServiceBusRegistry dispatchRegistry,
IMessageMetadataAccessor messageMetadataAccessor,
IEnumerable<IDispatchExtender> dispatchCustomizers,
IOptions<ServiceBusOptions> serviceBusOptions)
ServiceBusMessageFactory messageFactory,
ServiceBusMessageSender serviceBusMessageSender)
{
_registry = registry;
_messagePayloadSerializer = messagePayloadSerializer;
_dispatchRegistry = dispatchRegistry;
_messageMetadataAccessor = messageMetadataAccessor;
_dispatchCustomizers = dispatchCustomizers;
_serviceBusOptions = serviceBusOptions.Value;
_messageFactory = messageFactory;
_serviceBusMessageSender = serviceBusMessageSender;
}

/// <inheritdoc />
Expand All @@ -50,20 +31,14 @@ public async Task SendDispatch(object messagePayload, CancellationToken token =
/// <inheritdoc />
public async Task SendDispatch(Abstractions.Dispatch messagePayload, CancellationToken token = default)
{
var dispatches = CreateMessagesToSend([messagePayload]);
var dispatches = _messageFactory.CreateMessagesToSend([messagePayload]);

foreach (var messagePerResource in dispatches)
{
var message = messagePerResource.Messages.Single();
var messagePerResource = dispatches.Single();

await messagePerResource.Sender.SendMessageAsync(message.Message, token);
ServiceBusMeter.IncrementSentCounter(
1,
messagePerResource.Sender.ClientType.ToString(),
messagePerResource.Sender.Name,
message.Message.ApplicationProperties[UserProperties.PayloadTypeIdProperty]?.ToString()
);
}
await _serviceBusMessageSender.SendMessages(
messagePerResource.ResourceId,
messagePerResource.Messages,
token);
}

/// <inheritdoc />
Expand All @@ -86,48 +61,10 @@ public async Task SendDispatches(IEnumerable<Abstractions.Dispatch> messagePaylo
throw new ArgumentNullException(nameof(messagePayloads));
}

var dispatches = CreateMessagesToSend(messagePayloads);
var dispatches = _messageFactory.CreateMessagesToSend(messagePayloads);
foreach (var messagesPerResource in dispatches)
{
await BatchAndSendMessages(messagesPerResource, token, async (sender, batch) =>
{
await sender.SendMessagesAsync(batch, token);
});
}
}

private async Task BatchAndSendMessages(MessagesPerResource dispatches, CancellationToken token, Func<IMessageSender, ServiceBusMessageBatch, Task> senderAction)
{
var batches = new List<ServiceBusMessageBatch>();
var batch = await dispatches.Sender.CreateMessageBatchAsync(token);
batches.Add(batch);
foreach (var messageToSend in dispatches.Messages)
{
ServiceBusMeter.IncrementSentCounter(
1,
dispatches.Sender.ClientType.ToString(),
dispatches.Sender.Name,
messageToSend.Message.ApplicationProperties[UserProperties.PayloadTypeIdProperty]?.ToString()
);

if (batch.TryAddMessage(messageToSend.Message))
{
continue;
}
batch = await dispatches.Sender.CreateMessageBatchAsync(token);
batches.Add(batch);
if (batch.TryAddMessage(messageToSend.Message))
{
continue;
}

throw new ArgumentOutOfRangeException("A message is too big to fit in a single batch");
}

foreach (var pageMessages in batches)
{
await senderAction.Invoke(dispatches.Sender, pageMessages);
pageMessages.Dispose();
await _serviceBusMessageSender.SendMessages(messagesPerResource.ResourceId, messagesPerResource.Messages, token);
}
}

Expand All @@ -151,132 +88,10 @@ public async Task ScheduleDispatches(IEnumerable<Abstractions.Dispatch> messageP
throw new ArgumentNullException(nameof(messagePayloads));
}

var dispatches = CreateMessagesToSend(messagePayloads);
var dispatches = _messageFactory.CreateMessagesToSend(messagePayloads);
foreach (var messagesPerResource in dispatches)
{
await PaginateAndSendMessages(messagesPerResource, async (sender, page) =>
{
await sender.ScheduleMessagesAsync(page, scheduledEnqueueTime, token);
});
}
}

private async Task PaginateAndSendMessages(MessagesPerResource dispatches, Func<IMessageSender, IEnumerable<ServiceBusMessage>, Task> senderAction)
{
var paginatedMessages = dispatches.Messages.Select(o => o.Message)
.Select((x, i) => new
{
Item = x,
Index = i
})
.GroupBy(x => x.Index / MaxMessagePerSend, x => x.Item);

foreach (var pageMessages in paginatedMessages)
{
foreach (var message in pageMessages)
{
ServiceBusMeter.IncrementSentCounter(
1,
dispatches.Sender.ClientType.ToString(),
dispatches.Sender.Name,
message.ApplicationProperties[UserProperties.PayloadTypeIdProperty]?.ToString()
);
}

await senderAction.Invoke(dispatches.Sender, pageMessages.Select(m => m).ToArray());
}
}

private class MessagesPerResource
{
public MessageToSend[] Messages { get; set; }
public ClientType ClientType { get; set; }
public string ResourceId { get; set; }
public IMessageSender Sender { get; set; }
}

private class MessageToSend
{
public MessageToSend(ServiceBusMessage message, MessageDispatchRegistration registration)
{
Message = message;
Registration = registration;
}

public ServiceBusMessage Message { get; }
public MessageDispatchRegistration Registration { get; }
}

private MessagesPerResource[] CreateMessagesToSend(IEnumerable<Abstractions.Dispatch> messagePayloads)
{
var dispatches =
(
from dispatch in messagePayloads
// the same dispatch can be published to several senders
let registrations = _dispatchRegistry.GetDispatchRegistrations(dispatch.Payload.GetType())
from eventPublicationRegistration in registrations
let message = CreateMessage(eventPublicationRegistration, dispatch)
select new MessageToSend(message, eventPublicationRegistration)
)
.ToArray();

var messagesPerResource = (
from dispatch in dispatches
group dispatch by new { dispatch.Registration.Options.ClientType, dispatch.Registration.Options.ResourceId } into gr
let sender = _registry.GetMessageSender(gr.Key.ClientType, gr.Key.ResourceId)
select new MessagesPerResource()
{
Messages = gr.ToArray(),
ClientType = gr.Key.ClientType,
ResourceId = gr.Key.ResourceId,
Sender = sender
}).ToArray();

return messagesPerResource;
}

private ServiceBusMessage CreateMessage(
MessageDispatchRegistration registration,
Abstractions.Dispatch dispatch)
{
var result = _messagePayloadSerializer.SerializeBody(dispatch.Payload);
var message = MessageHelper.CreateMessage(result.ContentType, result.Body, registration.PayloadTypeId);

dispatch.ApplicationProperties.Remove(UserProperties.PayloadTypeIdProperty);
foreach (var dispatchApplicationProperty in dispatch.ApplicationProperties)
{
message.ApplicationProperties[dispatchApplicationProperty.Key] = dispatchApplicationProperty.Value;
}

message.SessionId = dispatch.SessionId;

var originalCorrelationId = _messageMetadataAccessor.Metadata?.CorrelationId ?? Guid.NewGuid().ToString();
message.CorrelationId = dispatch.CorrelationId ?? originalCorrelationId;

var originalIsolationKey = _messageMetadataAccessor.Metadata?.ApplicationProperties.GetIsolationKey();
message.SetIsolationKey(originalIsolationKey ?? _serviceBusOptions.Settings.IsolationSettings.IsolationKey);

var originalIsolationApps = _messageMetadataAccessor.Metadata?.ApplicationProperties.GetIsolationApps() ?? [];
message.SetIsolationApps(originalIsolationApps);

if (dispatch.DiagnosticId != null)
{
message.SetDiagnosticIdIfIsNot(dispatch.DiagnosticId);
}
if (!string.IsNullOrWhiteSpace(dispatch.MessageId))
{
message.MessageId = dispatch.MessageId;
}

foreach (var customizer in registration.OutgoingMessageCustomizers)
{
customizer?.Invoke(message, dispatch.Payload);
}

foreach (var dispatchCustomizer in _dispatchCustomizers)
{
dispatchCustomizer.ExtendDispatch(message, dispatch.Payload);
await _serviceBusMessageSender.ScheduleMessages(messagesPerResource, scheduledEnqueueTime, token);
}
return message;
}
}
12 changes: 9 additions & 3 deletions src/Ev.ServiceBus/Dispatch/MessageDispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public async Task ExecuteDispatches(CancellationToken token)
}

/// <inheritdoc />
public void Publish<TMessageDto>(TMessageDto messageDto)
public Task Publish<TMessageDto>(TMessageDto messageDto)
{
if (messageDto == null)
{
Expand All @@ -42,10 +42,12 @@ public void Publish<TMessageDto>(TMessageDto messageDto)
{
DiagnosticId = Activity.Current?.Id
});

return Task.CompletedTask;
}

/// <inheritdoc />
public void Publish<TMessagePayload>(TMessagePayload messageDto, string sessionId)
public Task Publish<TMessagePayload>(TMessagePayload messageDto, string sessionId)
{
if (messageDto == null)
{
Expand All @@ -62,10 +64,12 @@ public void Publish<TMessagePayload>(TMessagePayload messageDto, string sessionI
SessionId = sessionId,
DiagnosticId = Activity.Current?.Id
});

return Task.CompletedTask;
}

/// <inheritdoc />
public void Publish<TMessagePayload>(
public Task Publish<TMessagePayload>(
TMessagePayload messageDto,
Action<IDispatchContext> messageContextConfiguration)
{
Expand All @@ -90,5 +94,7 @@ public void Publish<TMessagePayload>(
MessageId = context.MessageId,
DiagnosticId = context.DiagnosticId ?? Activity.Current?.Id
});

return Task.CompletedTask;
}
}
12 changes: 12 additions & 0 deletions src/Ev.ServiceBus/Dispatch/Outbox/IOutboxRepository.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Azure.Messaging.ServiceBus;

namespace Ev.ServiceBus.Dispatch.Outbox;

public interface IOutboxRepository
{
Task Add(string resourceId, ServiceBusMessage message, CancellationToken token);
Task AddScheduled(string resourceId, DateTimeOffset scheduledEnqueueTime, ServiceBusMessage message, CancellationToken token);
}
Loading
Loading