Skip to content

Commit

Permalink
Set status and exception in Activity instances (#626)
Browse files Browse the repository at this point in the history
  • Loading branch information
mburumaxwell authored Jun 6, 2024
1 parent d648321 commit 047dec6
Show file tree
Hide file tree
Showing 9 changed files with 155 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,12 @@ private async Task OnMessageReceivedAsync(IServiceScope scope, EventRegistration

Logger.ReceivedMessage(messageId: messageId, eventBusId: context.Id, queueUrl: queueUrl);

var (successful, _) = await ConsumeAsync(scope, reg, ecr, context, cancellationToken).ConfigureAwait(false);
var (successful, ex) = await ConsumeAsync(scope, reg, ecr, context, cancellationToken).ConfigureAwait(false);
if (ex != null)
{
activity?.SetStatus(ActivityStatusCode.Error);
activity?.AddException(ex);
}

// dead-letter cannot be dead-lettered again, what else can we do?
if (ecr.Deadletter) return; // TODO: figure out what to do when dead-letter fails
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,12 @@ private async Task OnEventReceivedAsync(EventRegistration reg, EventConsumerRegi
.SetPartitionContext(args.Partition)
.SetEventData(data);

var (successful, _) = await ConsumeAsync(scope, reg, ecr, context, cancellationToken).ConfigureAwait(false);
var (successful, ex) = await ConsumeAsync(scope, reg, ecr, context, cancellationToken).ConfigureAwait(false);
if (ex != null)
{
activity?.SetStatus(ActivityStatusCode.Error);
activity?.AddException(ex);
}

// dead-letter cannot be dead-lettered again, what else can we do?
if (ecr.Deadletter) return; // TODO: figure out what to do when dead-letter fails
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,12 @@ private async Task OnMessageReceivedAsync(IServiceScope scope, EventRegistration
activity?.SetParentId(parentId: parentActivityId);
}

var (successful, _) = await ConsumeAsync(scope, reg, ecr, context, cancellationToken).ConfigureAwait(false);
var (successful, ex) = await ConsumeAsync(scope, reg, ecr, context, cancellationToken).ConfigureAwait(false);
if (ex != null)
{
activity?.SetStatus(ActivityStatusCode.Error);
activity?.AddException(ex);
}

// dead-letter cannot be dead-lettered again, what else can we do?
if (ecr.Deadletter) return; // TODO: figure out what to do when dead-letter fails
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -549,6 +549,11 @@ private async Task OnMessageReceivedAsync(EventRegistration reg, EventConsumerRe
}

var (successful, ex) = await ConsumeAsync(scope, reg, ecr, context, cancellationToken).ConfigureAwait(false);
if (ex != null)
{
activity?.SetStatus(ActivityStatusCode.Error);
activity?.AddException(ex);
}

// Decide the action to execute then execute
var action = DecideAction(successful, ecr.UnhandledErrorBehaviour, processor.AutoCompleteMessages);
Expand Down
7 changes: 6 additions & 1 deletion src/Tingle.EventBus.Transports.InMemory/InMemoryTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,12 @@ private async Task OnMessageReceivedAsync(EventRegistration reg, EventConsumerRe
// set the extras
context.SetInMemoryReceivedMessage(message);

var (successful, _) = await ConsumeAsync(scope, reg, ecr, context, cancellationToken).ConfigureAwait(false);
var (successful, ex) = await ConsumeAsync(scope, reg, ecr, context, cancellationToken).ConfigureAwait(false);
if (ex != null)
{
activity?.SetStatus(ActivityStatusCode.Error);
activity?.AddException(ex);
}

// Add to Consumed/Failed list
if (successful) consumed.Add(context);
Expand Down
7 changes: 6 additions & 1 deletion src/Tingle.EventBus.Transports.Kafka/KafkaTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,12 @@ private async Task OnEventReceivedAsync(EventRegistration reg, EventConsumerRegi
partition: result.Partition,
offset: result.Offset);

var (successful, _) = await ConsumeAsync(scope, reg, ecr, context, cancellationToken).ConfigureAwait(false);
var (successful, ex) = await ConsumeAsync(scope, reg, ecr, context, cancellationToken).ConfigureAwait(false);
if (ex != null)
{
activity?.SetStatus(ActivityStatusCode.Error);
activity?.AddException(ex);
}

// dead-letter cannot be dead-lettered again, what else can we do?
if (ecr.Deadletter) return; // TODO: figure out what to do when dead-letter fails
Expand Down
5 changes: 5 additions & 0 deletions src/Tingle.EventBus.Transports.RabbitMQ/RabbitMqTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,11 @@ private async Task OnMessageReceivedAsync(EventRegistration reg, EventConsumerRe
messageId,
context.Id);
var (successful, ex) = await ConsumeAsync(scope, reg, ecr, context, cancellationToken).ConfigureAwait(false);
if (ex != null)
{
activity?.SetStatus(ActivityStatusCode.Error);
activity?.AddException(ex);
}

// Decide the action to execute then execute
var action = DecideAction(successful, ecr.UnhandledErrorBehaviour);
Expand Down
61 changes: 61 additions & 0 deletions src/Tingle.EventBus/Diagnostics/ActivityExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
using System.Diagnostics;

namespace Tingle.EventBus.Diagnostics;

///
public static class ActivityExtensions
{
// Copied from https://github.com/dotnet/runtime/pull/102905/files
#if !NET9_0_OR_GREATER
///
public static Activity AddException(this Activity activity, Exception exception, TagList tags = default, DateTimeOffset timestamp = default)
{
if (activity is null) throw new ArgumentNullException(nameof(activity));
if (exception is null) throw new ArgumentNullException(nameof(exception));

TagList exceptionTags = tags;

const string ExceptionEventName = "exception";
const string ExceptionMessageTag = "exception.message";
const string ExceptionStackTraceTag = "exception.stacktrace";
const string ExceptionTypeTag = "exception.type";

bool hasMessage = false;
bool hasStackTrace = false;
bool hasType = false;

for (int i = 0; i < exceptionTags.Count; i++)
{
if (exceptionTags[i].Key == ExceptionMessageTag)
{
hasMessage = true;
}
else if (exceptionTags[i].Key == ExceptionStackTraceTag)
{
hasStackTrace = true;
}
else if (exceptionTags[i].Key == ExceptionTypeTag)
{
hasType = true;
}
}

if (!hasMessage)
{
exceptionTags.Add(new KeyValuePair<string, object?>(ExceptionMessageTag, exception.Message));
}

if (!hasStackTrace)
{
exceptionTags.Add(new KeyValuePair<string, object?>(ExceptionStackTraceTag, exception.ToString()));
}

if (!hasType)
{
exceptionTags.Add(new KeyValuePair<string, object?>(ExceptionTypeTag, exception.GetType().ToString()));
}

return activity.AddEvent(new ActivityEvent(ExceptionEventName, timestamp));
}
#endif
}
76 changes: 54 additions & 22 deletions src/Tingle.EventBus/EventBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,7 @@ public class EventBus(EventBusTransportProvider transportProvider,

private readonly IReadOnlyDictionary<string, IEventBusTransport> transports = transportProvider.GetTransports();

/// <summary>
/// Publish an event.
/// </summary>
/// <summary>Publish an event.</summary>
/// <typeparam name="TEvent">The event type.</typeparam>
/// <param name="event">The event to publish.</param>
/// <param name="scheduled">
Expand Down Expand Up @@ -74,15 +72,23 @@ public class EventBus(EventBusTransportProvider transportProvider,
activity?.AddTag(ActivityTagNames.MessagingConversationId, @event.CorrelationId);

// Publish on the transport
return await transport.PublishAsync(@event: @event,
registration: reg,
scheduled: scheduled,
cancellationToken: cancellationToken).ConfigureAwait(false);
try
{
return await transport.PublishAsync(@event: @event,
registration: reg,
scheduled: scheduled,
cancellationToken: cancellationToken).ConfigureAwait(false);
}
catch (Exception ex)
{
activity?.SetStatus(ActivityStatusCode.Error);
activity?.AddException(ex);

throw;
}
}

/// <summary>
/// Publish a batch of events.
/// </summary>
/// <summary>Publish a batch of events.</summary>
/// <typeparam name="TEvent">The event type.</typeparam>
/// <param name="events">The events to publish.</param>
/// <param name="scheduled">
Expand Down Expand Up @@ -127,16 +133,24 @@ public class EventBus(EventBusTransportProvider transportProvider,
activity?.AddTag(ActivityTagNames.MessagingConversationId, string.Join(",", events.Select(e => e.CorrelationId)));

// Publish on the transport
return await transport.PublishAsync(events: events,
registration: reg,
scheduled: scheduled,
cancellationToken: cancellationToken).ConfigureAwait(false);
try
{
return await transport.PublishAsync(events: events,
registration: reg,
scheduled: scheduled,
cancellationToken: cancellationToken).ConfigureAwait(false);
}
catch (Exception ex)
{
activity?.SetStatus(ActivityStatusCode.Error);
activity?.AddException(ex);

throw;
}
}


/// <summary>
/// Cancel a scheduled event.
/// </summary>
/// <summary>Cancel a scheduled event.</summary>
/// <typeparam name="TEvent">The event type.</typeparam>
/// <param name="id">The scheduling identifier of the scheduled event.</param>
/// <param name="cancellationToken"></param>
Expand All @@ -155,12 +169,20 @@ public class EventBus(EventBusTransportProvider transportProvider,
activity?.AddTag(ActivityTagNames.MessagingSystem, transport.Name);

// Cancel on the transport
await transport.CancelAsync<TEvent>(id: id, registration: reg, cancellationToken: cancellationToken).ConfigureAwait(false);
try
{
await transport.CancelAsync<TEvent>(id: id, registration: reg, cancellationToken: cancellationToken).ConfigureAwait(false);
}
catch (Exception ex)
{
activity?.SetStatus(ActivityStatusCode.Error);
activity?.AddException(ex);

throw;
}
}

/// <summary>
/// Cancel a batch of scheduled events.
/// </summary>
/// <summary>Cancel a batch of scheduled events.</summary>
/// <typeparam name="TEvent">The event type.</typeparam>
/// <param name="ids">The scheduling identifiers of the scheduled events.</param>
/// <param name="cancellationToken"></param>
Expand All @@ -179,7 +201,17 @@ public class EventBus(EventBusTransportProvider transportProvider,
activity?.AddTag(ActivityTagNames.MessagingSystem, transport.Name);

// Cancel on the transport
await transport.CancelAsync<TEvent>(ids: ids, registration: reg, cancellationToken: cancellationToken).ConfigureAwait(false);
try
{
await transport.CancelAsync<TEvent>(ids: ids, registration: reg, cancellationToken: cancellationToken).ConfigureAwait(false);
}
catch (Exception ex)
{
activity?.SetStatus(ActivityStatusCode.Error);
activity?.AddException(ex);

throw;
}
}

///
Expand Down

0 comments on commit 047dec6

Please sign in to comment.