Skip to content

Commit

Permalink
Logging updates (#376)
Browse files Browse the repository at this point in the history
Change logs to use LoggerMessage for higher performance and better management in a single file.
  • Loading branch information
mburumaxwell authored Dec 23, 2021
1 parent 47259f0 commit 1e8a091
Show file tree
Hide file tree
Showing 18 changed files with 594 additions and 392 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public override async Task StopAsync(CancellationToken cancellationToken)
// log warning when trying to publish scheduled message
if (scheduled != null)
{
Logger.LogWarning("Amazon Kinesis does not support delay or scheduled publish");
Logger.SchedulingNotSupported();
}

using var scope = CreateScope();
Expand All @@ -81,7 +81,7 @@ public override async Task StopAsync(CancellationToken cancellationToken)
cancellationToken: cancellationToken);

// prepare the record
var streamName = registration.EventName;
var streamName = registration.EventName!;
var request = new PutRecordRequest
{
Data = body.ToMemoryStream(),
Expand All @@ -90,7 +90,7 @@ public override async Task StopAsync(CancellationToken cancellationToken)
};

// send the event
Logger.LogInformation("Sending {Id} to '{StreamName}'. Scheduled: {Scheduled}", @event.Id, streamName, scheduled);
Logger.SendingToStream(eventId: @event.Id, streamName: streamName, scheduled: scheduled);
var response = await kinesisClient.PutRecordAsync(request, cancellationToken);
response.EnsureSuccess();

Expand All @@ -107,7 +107,7 @@ public override async Task StopAsync(CancellationToken cancellationToken)
// log warning when trying to publish scheduled message
if (scheduled != null)
{
Logger.LogWarning("Amazon Kinesis does not support delay or scheduled publish");
Logger.SchedulingNotSupported();
}

using var scope = CreateScope();
Expand All @@ -130,19 +130,15 @@ public override async Task StopAsync(CancellationToken cancellationToken)
}

// prepare the request
var streamName = registration.EventName;
var streamName = registration.EventName!;
var request = new PutRecordsRequest
{
StreamName = streamName,
Records = records,
};

// send the events
Logger.LogInformation("Sending {EventsCount} messages to '{StreamName}'. Scheduled: {Scheduled}. Events:\r\n- {Ids}",
events.Count,
streamName,
scheduled,
string.Join("\r\n- ", events.Select(e => e.Id)));
Logger.SendingEventsToStream(events, streamName, scheduled);
var response = await kinesisClient.PutRecordsAsync(request, cancellationToken);
response.EnsureSuccess();

Expand Down
34 changes: 34 additions & 0 deletions src/Tingle.EventBus.Transports.Amazon.Kinesis/ILoggerExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
using Tingle.EventBus;

namespace Microsoft.Extensions.Logging;

/// <summary>
/// Extensions on <see cref="ILogger"/> for the EventBus
/// </summary>
internal static partial class ILoggerExtensions
{
[LoggerMessage(100, LogLevel.Information, "Sending {EventId} to '{StreamName}'. Scheduled: {Scheduled}.")]
public static partial void SendingToStream(this ILogger logger, string? eventId, string streamName, DateTimeOffset? scheduled);

[LoggerMessage(101, LogLevel.Information, "Sending {EventsCount} messages to '{StreamName}'. Scheduled: {Scheduled}. Events:\r\n- {EventIds}")]
private static partial void SendingEventsToStream(this ILogger logger, int eventsCount, string streamName, DateTimeOffset? scheduled, string eventIds);

public static void SendingEventsToStream(this ILogger logger, IList<string?> eventIds, string streamName, DateTimeOffset? scheduled)
{
if (!logger.IsEnabled(LogLevel.Information)) return;
logger.SendingEventsToStream(eventsCount: eventIds.Count,
streamName: streamName,
scheduled: scheduled,
eventIds: string.Join("\r\n- ", eventIds));
}

public static void SendingEventsToStream<T>(this ILogger logger, IList<EventContext<T>> events, string entityPath, DateTimeOffset? scheduled = null)
where T : class
{
if (!logger.IsEnabled(LogLevel.Information)) return;
logger.SendingEventsToStream(events.Select(e => e.Id).ToList(), entityPath, scheduled);
}

[LoggerMessage(102, LogLevel.Warning, "Amazon Kinesis does not support delay or scheduled publish.")]
public static partial void SchedulingNotSupported(this ILogger logger);
}
23 changes: 10 additions & 13 deletions src/Tingle.EventBus.Transports.Amazon.Sqs/AmazonSqsTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ public override async Task StopAsync(CancellationToken cancellationToken)
// log warning when trying to publish scheduled message
if (scheduled != null)
{
Logger.LogWarning("Amazon SNS does not support delay or scheduled publish");
Logger.SchedulingNotSupported();
}

using var scope = CreateScope();
Expand All @@ -120,7 +120,7 @@ public override async Task StopAsync(CancellationToken cancellationToken)
.SetAttribute(MetadataNames.RequestId, @event.RequestId)
.SetAttribute(MetadataNames.InitiatorId, @event.InitiatorId)
.SetAttribute(MetadataNames.ActivityId, Activity.Current?.Id);
Logger.LogInformation("Sending {Id} to '{TopicArn}'. Scheduled: {Scheduled}", @event.Id, topicArn, scheduled);
Logger.SendingToTopic(eventId: @event.Id, topicArn: topicArn, scheduled: scheduled);
var response = await snsClient.PublishAsync(request: request, cancellationToken: cancellationToken);
response.EnsureSuccess();

Expand All @@ -135,12 +135,12 @@ public override async Task StopAsync(CancellationToken cancellationToken)
CancellationToken cancellationToken = default)
{
// log warning when doing batch
Logger.LogWarning("Amazon SNS does not support batching. The events will be looped through one by one");
Logger.BatchingNotSupported();

// log warning when trying to publish scheduled message
if (scheduled != null)
{
Logger.LogWarning("Amazon SNS does not support delay or scheduled publish");
Logger.SchedulingNotSupported();
}

using var scope = CreateScope();
Expand All @@ -162,7 +162,7 @@ public override async Task StopAsync(CancellationToken cancellationToken)
.SetAttribute(MetadataNames.RequestId, @event.RequestId)
.SetAttribute(MetadataNames.InitiatorId, @event.InitiatorId)
.SetAttribute(MetadataNames.ActivityId, Activity.Current?.Id);
Logger.LogInformation("Sending {Id} to '{TopicArn}'. Scheduled: {Scheduled}", @event.Id, topicArn, scheduled);
Logger.SendingToTopic(eventId: @event.Id, topicArn: topicArn, scheduled: scheduled);
var response = await snsClient.PublishAsync(request: request, cancellationToken: cancellationToken);
response.EnsureSuccess();

Expand Down Expand Up @@ -310,12 +310,12 @@ private async Task ReceiveAsync(EventRegistration reg, EventConsumerRegistration
if (messages.Count == 0)
{
var delay = TransportOptions.EmptyResultsDelay;
Logger.LogTrace("No messages on '{QueueUrl}', delaying check for {Delay}", queueUrl, delay);
Logger.NoMessages(queueUrl: queueUrl, delay: delay);
await Task.Delay(delay, cancellationToken);
}
else
{
Logger.LogDebug("Received {MessageCount} messages on '{QueueUrl}'", messages.Count, queueUrl);
Logger.ReceivedMessages(messagesCount: messages.Count, queueUrl: queueUrl);
using var scope = CreateScope(); // shared
foreach (var message in messages)
{
Expand Down Expand Up @@ -369,7 +369,7 @@ private async Task OnMessageReceivedAsync<TEvent, TConsumer>(EventRegistration r
activity?.AddTag(ActivityTagNames.MessagingDestinationKind, "queue");
activity?.AddTag(ActivityTagNames.MessagingUrl, queueUrl);

Logger.LogDebug("Processing '{MessageId}' from '{QueueUrl}'", messageId, queueUrl);
Logger.ProcessingMessage(messageId: messageId, queueUrl: queueUrl);
message.TryGetAttribute("Content-Type", out var contentType_str);
var contentType = contentType_str == null ? null : new ContentType(contentType_str);

Expand All @@ -381,10 +381,7 @@ private async Task OnMessageReceivedAsync<TEvent, TConsumer>(EventRegistration r
identifier: messageId,
cancellationToken: cancellationToken);

Logger.LogInformation("Received message: '{MessageId}' containing Event '{Id}' from '{QueueUrl}'",
messageId,
context.Id,
queueUrl);
Logger.ReceivedMessage(messageId: messageId, eventId: context.Id, queueUrl: queueUrl);

var (successful, _) = await ConsumeAsync<TEvent, TConsumer>(ecr: ecr,
@event: context,
Expand All @@ -405,7 +402,7 @@ private async Task OnMessageReceivedAsync<TEvent, TConsumer>(EventRegistration r
}

// whether or not successful, always delete the message from the current queue
Logger.LogTrace("Deleting '{MessageId}' on '{QueueUrl}'", messageId, queueUrl);
Logger.DeletingMessage(messageId: messageId, queueUrl: queueUrl);
await sqsClient.DeleteMessageAsync(queueUrl: queueUrl,
receiptHandle: message.ReceiptHandle,
cancellationToken: cancellationToken);
Expand Down
31 changes: 31 additions & 0 deletions src/Tingle.EventBus.Transports.Amazon.Sqs/ILoggerExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
namespace Microsoft.Extensions.Logging;

/// <summary>
/// Extensions on <see cref="ILogger"/> for the EventBus
/// </summary>
internal static partial class ILoggerExtensions
{
[LoggerMessage(100, LogLevel.Information, "Sending {EventId} to '{TopicArn}'. Scheduled: {Scheduled}.")]
public static partial void SendingToTopic(this ILogger logger, string? eventId, string topicArn, DateTimeOffset? scheduled);

[LoggerMessage(101, LogLevel.Information, "Received message: '{MessageId}' containing Event '{EventId}' from '{QueueUrl}'")]
public static partial void ReceivedMessage(this ILogger logger, string messageId, string? eventId, string queueUrl);

[LoggerMessage(102, LogLevel.Warning, "Amazon SNS does not support delay or scheduled publish.")]
public static partial void SchedulingNotSupported(this ILogger logger);

[LoggerMessage(103, LogLevel.Warning, "Amazon SNS does not support batching. The events will be looped through one by one.")]
public static partial void BatchingNotSupported(this ILogger logger);

[LoggerMessage(104, LogLevel.Trace, "No messages on '{QueueUrl}', delaying check for {Delay}.")]
public static partial void NoMessages(this ILogger logger, string queueUrl, TimeSpan delay);

[LoggerMessage(105, LogLevel.Debug, "Received {MessagesCount} messages on '{QueueUrl}'")]
public static partial void ReceivedMessages(this ILogger logger, int messagesCount, string queueUrl);

[LoggerMessage(106, LogLevel.Debug, "Processing '{MessageId}' from '{QueueUrl}'")]
public static partial void ProcessingMessage(this ILogger logger, string messageId, string queueUrl);

[LoggerMessage(107, LogLevel.Trace, "Deleting '{MessageId}' from '{QueueUrl}'")]
public static partial void DeletingMessage(this ILogger logger, string messageId, string queueUrl);
}
Loading

0 comments on commit 1e8a091

Please sign in to comment.