Skip to content

Commit

Permalink
Added more logging to Azure QueueStorage EventBus
Browse files Browse the repository at this point in the history
  • Loading branch information
mburumaxwell committed Dec 21, 2020
1 parent a1450b3 commit 8bc353b
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public override async Task<bool> CheckHealthAsync(CancellationToken cancellation
public override Task StartAsync(CancellationToken cancellationToken)
{
var registrations = BusOptions.GetConsumerRegistrations();
logger.StartingBus(registrations.Count);
foreach (var reg in registrations)
{
_ = ReceiveAsync(reg);
Expand All @@ -71,6 +72,7 @@ public override Task StartAsync(CancellationToken cancellationToken)
/// <inheritdoc/>
public override Task StopAsync(CancellationToken cancellationToken)
{
logger.StoppingBus();
receiveCancellationTokenSource.Cancel();
// TODO: figure out a way to wait for notification of termination in all receivers
return Task.CompletedTask;
Expand Down Expand Up @@ -98,6 +100,7 @@ public override async Task<string> PublishAsync<TEvent>(EventContext<TEvent> @ev
// get the queue client and send the message
var queueClient = await GetQueueClientAsync(reg: reg, deadletter: false, cancellationToken: cancellationToken);
var message = Encoding.UTF8.GetString(ms.ToArray());
logger.LogInformation("Sending {EventId} to '{QueueName}'", @event.EventId, queueClient.Name);
var response = await queueClient.SendMessageAsync(messageText: message,
visibilityTimeout: visibilityTimeout,
timeToLive: ttl,
Expand Down Expand Up @@ -134,6 +137,7 @@ public override async Task<IList<string>> PublishAsync<TEvent>(IList<EventContex

// send the message
var message = Encoding.UTF8.GetString(ms.ToArray());
logger.LogInformation("Sending {EventId} to '{QueueName}'", @event.EventId, queueClient.Name);
var response = await queueClient.SendMessageAsync(messageText: message,
visibilityTimeout: visibilityTimeout,
timeToLive: ttl,
Expand All @@ -157,8 +161,10 @@ public override async Task CancelAsync<TEvent>(string id, CancellationToken canc
var reg = BusOptions.GetOrCreateEventRegistration<TEvent>();
var queueClient = await GetQueueClientAsync(reg: reg, deadletter: false, cancellationToken: cancellationToken);
var parts = id.Split(SequenceNumberSeparator);
await queueClient.DeleteMessageAsync(messageId: parts[0],
popReceipt: parts[1],
string messageId = parts[0], popReceipt = parts[1];
logger.LogInformation("Cancelling '{MessageId}|{PopReceipt}' on '{QueueName}'", messageId, popReceipt, queueClient.Name);
await queueClient.DeleteMessageAsync(messageId: messageId,
popReceipt: popReceipt,
cancellationToken: cancellationToken);
}

Expand All @@ -183,6 +189,7 @@ public override async Task CancelAsync<TEvent>(IList<string> ids, CancellationTo
var queueClient = await GetQueueClientAsync(reg: reg, deadletter: false, cancellationToken: cancellationToken);
foreach (var (messageId, popReceipt) in splits)
{
logger.LogInformation("Cancelling '{MessageId}|{PopReceipt}' on '{QueueName}'", messageId, popReceipt, queueClient.Name);
await queueClient.DeleteMessageAsync(messageId: messageId,
popReceipt: popReceipt,
cancellationToken: cancellationToken);
Expand Down Expand Up @@ -211,7 +218,8 @@ private async Task<QueueClient> GetQueueClientAsync(EventRegistration reg, bool
MessageEncoding = QueueMessageEncoding.Base64,
});

// ensure queue is created
// ensure queue is created if it does not exist
logger.LogInformation("Ensuring queue '{QueueName}' exists", name);
await queueClient.CreateIfNotExistsAsync(cancellationToken: cancellationToken);

queueClientsCache[(reg.EventType, deadletter)] = queueClient;
Expand Down Expand Up @@ -242,10 +250,13 @@ private async Task ReceiveAsync(ConsumerRegistration reg)
// if the response is empty, introduce a delay
if (messages.Length == 0)
{
await Task.Delay(TransportOptions.EmptyResultsDelay, cancellationToken);
var delay = TransportOptions.EmptyResultsDelay;
logger.LogTrace("No messages on '{QueueName}', delaying check for {Delay}", queueClient.Name, delay);
await Task.Delay(delay, cancellationToken);
}
else
{
logger.LogDebug("Received {MessageCount} messages on '{QueueName}'", messages.Length, queueClient.Name);
foreach (var message in messages)
{
await (Task)method.Invoke(this, new object[] { reg, queueClient, message, cancellationToken, });
Expand All @@ -268,12 +279,17 @@ private async Task OnMessageReceivedAsync<TEvent, TConsumer>(ConsumerRegistratio

try
{
logger.LogDebug("Processing '{MessageId}|{PopReceipt}'", message.MessageId, message.PopReceipt);
using var ms = new MemoryStream(Encoding.UTF8.GetBytes(message.MessageText));
var contentType = new ContentType("*/*");
var context = await DeserializeAsync<TEvent>(body: ms,
contentType: contentType,
registration: reg,
cancellationToken: cancellationToken);
logger.LogInformation("Received message: '{MessageId}|{PopReceipt}' containing Event '{EventId}'",
message.MessageId,
message.PopReceipt,
context.EventId);
await PushToConsumerAsync<TEvent, TConsumer>(context, cancellationToken);
}
catch (Exception ex)
Expand All @@ -286,6 +302,10 @@ private async Task OnMessageReceivedAsync<TEvent, TConsumer>(ConsumerRegistratio
}

// always delete the message from the current queue
logger.LogTrace("Deleting '{MessageId}|{PopReceipt}' on '{QueueName}'",
message.MessageId,
message.PopReceipt,
queueClient.Name);
await queueClient.DeleteMessageAsync(messageId: message.MessageId,
popReceipt: message.PopReceipt,
cancellationToken: cancellationToken);
Expand Down
26 changes: 26 additions & 0 deletions src/Tingle.EventBus/Logging/ILoggerExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
using System;

namespace Microsoft.Extensions.Logging
{
/// <summary>
/// Extensions on <see cref="ILogger"/> for the EventBus
/// </summary>
internal static class ILoggerExtensions
{
private static readonly Action<ILogger, int, Exception> _startingBus
= LoggerMessage.Define<int>(
eventId: new EventId(1, nameof(StartingBus)),
logLevel: LogLevel.Debug,
formatString: "Starting bus receivers. Consumers: '{ConsumersCount}'");

private static readonly Action<ILogger, Exception> _stoppingBus
= LoggerMessage.Define(
eventId: new EventId(2, nameof(StoppingBus)),
logLevel: LogLevel.Debug,
formatString: "Stopping bus receivers.");

public static void StartingBus(this ILogger logger, int consumersCount) => _startingBus(logger, consumersCount, null);

public static void StoppingBus(this ILogger logger) => _stoppingBus(logger, null);
}
}

0 comments on commit 8bc353b

Please sign in to comment.