Skip to content

Commit b6dafe6

Browse files
Pre release cleanup (#68)
* WIP for metric tests, breaking OTel API * Custom observability tags * Fix custom default tags
1 parent f2365ae commit b6dafe6

File tree

21 files changed

+196
-96
lines changed

21 files changed

+196
-96
lines changed

src/Core/src/Eventuous.Producers/BaseProducer.cs

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -48,12 +48,7 @@ public abstract class BaseProducer : IEventProducer {
4848

4949
protected BaseProducer(ProducerTracingOptions? tracingOptions = null) {
5050
var options = tracingOptions ?? new ProducerTracingOptions();
51-
52-
DefaultTags = new[] {
53-
new KeyValuePair<string, object?>(Messaging.System, options.MessagingSystem),
54-
new KeyValuePair<string, object?>(Messaging.DestinationKind, options.DestinationKind),
55-
new KeyValuePair<string, object?>(Messaging.Operation, options.ProduceOperation)
56-
};
51+
DefaultTags = options.AllTags.Concat(EventuousDiagnostics.Tags).ToArray();
5752
}
5853

5954
protected abstract Task ProduceMessages(
Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,17 @@
1-
namespace Eventuous.Producers.Diagnostics;
1+
using Eventuous.Diagnostics;
2+
3+
namespace Eventuous.Producers.Diagnostics;
4+
5+
public delegate ProducerTracingOptions ConfigureProducerTracing(ProducerTracingOptions defaultOptions);
26

37
public record ProducerTracingOptions {
48
public string? MessagingSystem { get; init; }
59
public string? DestinationKind { get; init; }
610
public string? ProduceOperation { get; init; }
11+
12+
public KeyValuePair<string, object?>[] AllTags => new KeyValuePair<string, object?>[] {
13+
new(TelemetryTags.Messaging.System, MessagingSystem),
14+
new(TelemetryTags.Messaging.DestinationKind, DestinationKind),
15+
new(TelemetryTags.Messaging.Operation, ProduceOperation)
16+
};
717
}

src/Core/src/Eventuous.Subscriptions/Diagnostics/SubscriptionMetrics.cs

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77

88
namespace Eventuous.Subscriptions.Diagnostics;
99

10-
public sealed class SubscriptionMetrics : IDisposable {
10+
public sealed class SubscriptionMetrics : IWithCustomTags, IDisposable {
1111
const string MetricPrefix = "eventuous";
1212
const string Category = "subscription";
1313

@@ -63,21 +63,30 @@ public SubscriptionMetrics(IEnumerable<GetSubscriptionGap> measures) {
6363

6464
IEnumerable<Measurement<double>> ObserveTimeValues()
6565
=> gaps?
66-
.Select(x => new Measurement<double>(x.TimeGap.TotalSeconds, SubTag(x.SubscriptionId)))
66+
.Select(x => Measure(x.TimeGap.TotalSeconds, x.SubscriptionId))
6767
?? Array.Empty<Measurement<double>>();
6868

6969
IEnumerable<Measurement<long>> ObserveGapValues(GetSubscriptionGap[] gapMeasure) {
7070
gaps = gapMeasure.Select(GetGap).Where(x => x != SubscriptionGap.Invalid);
7171

72-
return gaps.Select(x => new Measurement<long>((long)x.PositionGap, SubTag(x.SubscriptionId)));
72+
return gaps.Select(x => Measure((long)x.PositionGap, x.SubscriptionId));
73+
}
74+
75+
Measurement<T> Measure<T>(T value, string subscriptionId) where T : struct {
76+
if (_customTags == null) {
77+
return new Measurement<T>(value, SubTag(subscriptionId));
78+
}
79+
80+
var tags = new List<KeyValuePair<string, object?>>(_customTags) { SubTag(subscriptionId) };
81+
return new Measurement<T>(value, tags);
7382
}
7483
}
7584

7685
static KeyValuePair<string, object?> GetTag(string key, object? id) => new(key, id);
7786

7887
static KeyValuePair<string, object?> SubTag(object? id) => new(SubscriptionIdTag, id);
7988

80-
static void ActivityStopped(Histogram<double> histogram, Counter<long> errorCount, Activity activity) {
89+
void ActivityStopped(Histogram<double> histogram, Counter<long> errorCount, Activity activity) {
8190
if (activity.OperationName != TracingConstants.ConsumerOperation) return;
8291

8392
var subId = activity.GetTagItem(TelemetryTags.Eventuous.Subscription);
@@ -86,10 +95,17 @@ static void ActivityStopped(Histogram<double> histogram, Counter<long> errorCoun
8695
var subTag = SubTag(subId);
8796
var typeTag = GetTag(MessageTypeTag, activity.GetTagItem(TelemetryTags.Message.Type));
8897
var partitionTag = GetTag(PartitionIdTag, activity.GetTagItem(TelemetryTags.Eventuous.Partition));
89-
histogram.Record(activity.Duration.TotalMilliseconds, subTag, typeTag, partitionTag);
98+
99+
var tags = new TagList(_customTags) {
100+
subTag,
101+
typeTag,
102+
partitionTag
103+
};
104+
105+
histogram.Record(activity.Duration.TotalMilliseconds, tags);
90106

91107
if (activity.Status == ActivityStatusCode.Error) {
92-
errorCount.Add(1, subTag, typeTag);
108+
errorCount.Add(1, tags);
93109
}
94110
}
95111

@@ -110,10 +126,15 @@ static SubscriptionGap GetGap(GetSubscriptionGap gapMeasure) {
110126
readonly Meter _meter;
111127
readonly ActivityListener _listener;
112128
readonly Lazy<CheckpointCommitMetrics> _checkpointMetrics;
129+
KeyValuePair<string, object?>[] _customTags = EventuousDiagnostics.Tags;
113130

114131
public void Dispose() {
115132
_listener.Dispose();
116133
_meter.Dispose();
117134
if (_checkpointMetrics.IsValueCreated) _checkpointMetrics.Value.Dispose();
118135
}
136+
137+
public void SetCustomTags(TagList customTags) {
138+
_customTags = _customTags.Concat(customTags).ToArray();
139+
}
119140
}

src/Core/src/Eventuous.Subscriptions/EventSubscription.cs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
using System.Diagnostics;
22
using System.Runtime.CompilerServices;
3+
using Eventuous.Diagnostics;
34
using Eventuous.Subscriptions.Context;
45
using Eventuous.Subscriptions.Diagnostics;
56
using Eventuous.Subscriptions.Filters;
@@ -60,7 +61,9 @@ public async ValueTask Unsubscribe(OnUnsubscribed onUnsubscribed, CancellationTo
6061

6162
[MethodImpl(MethodImplOptions.AggressiveInlining)]
6263
protected async ValueTask Handler(IMessageConsumeContext context) {
63-
var activity = SubscriptionActivity.Create(TracingConstants.SubscriptionOperation, context);
64+
var activity = EventuousDiagnostics.Enabled
65+
? SubscriptionActivity.Create(TracingConstants.SubscriptionOperation, context, EventuousDiagnostics.Tags)
66+
: null;
6467
var delayed = context is DelayedAckConsumeContext;
6568
if (!delayed) activity?.Start();
6669

src/Core/src/Eventuous.Subscriptions/Filters/TracingFilter.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,9 @@ namespace Eventuous.Subscriptions.Filters;
99
public class TracingFilter : ConsumeFilter {
1010
readonly KeyValuePair<string, object?>[] _defaultTags;
1111

12-
public TracingFilter(params KeyValuePair<string, object?>[] tags) => _defaultTags = tags;
12+
public TracingFilter(params KeyValuePair<string, object?>[] tags) {
13+
_defaultTags = tags.Concat(EventuousDiagnostics.Tags).ToArray();
14+
}
1315

1416
public override async ValueTask Send(
1517
IMessageConsumeContext context,

src/Diagnostics/src/Eventuous.Diagnostics.OpenTelemetry/Eventuous.Diagnostics.OpenTelemetry.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,6 @@
55
<ProjectReference Include="$(CoreRoot)\Eventuous\Eventuous.csproj" />
66
</ItemGroup>
77
<ItemGroup>
8-
<PackageReference Include="OpenTelemetry.Extensions.Hosting" Version="1.0.0-rc8" />
8+
<PackageReference Include="OpenTelemetry.Extensions.Hosting" Version="1.0.0-rc9" />
99
</ItemGroup>
1010
</Project>
Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
using System.Diagnostics;
12
using Eventuous.Diagnostics.Metrics;
23
using Eventuous.Subscriptions.Diagnostics;
34
using Microsoft.Extensions.DependencyInjection;
@@ -11,29 +12,39 @@ public static class MeterProviderBuilderExtensions {
1112
/// Adds subscriptions metrics instrumentation
1213
/// </summary>
1314
/// <param name="builder"></param>
15+
/// <param name="customTags"></param>
1416
/// <returns></returns>
15-
public static MeterProviderBuilder AddEventuousSubscriptions(this MeterProviderBuilder builder)
17+
public static MeterProviderBuilder AddEventuousSubscriptions(this MeterProviderBuilder builder, TagList? customTags = null)
1618
=> Ensure.NotNull(builder)
1719
.AddMeter(SubscriptionMetrics.MeterName)
18-
.AddMetrics<SubscriptionMetrics>();
20+
.AddMetrics<SubscriptionMetrics>(customTags);
1921

2022
/// <summary>
2123
/// Adds metrics instrumentation for core components such as application service and event store
2224
/// </summary>
2325
/// <param name="builder"></param>
26+
/// <param name="customTags"></param>
2427
/// <returns></returns>
25-
public static MeterProviderBuilder AddEventuous(this MeterProviderBuilder builder)
28+
public static MeterProviderBuilder AddEventuous(this MeterProviderBuilder builder, TagList? customTags = null)
2629
=> Ensure.NotNull(builder)
2730
.AddMeter(EventuousMetrics.MeterName)
28-
.AddMetrics<EventuousMetrics>();
31+
.AddMetrics<EventuousMetrics>(customTags);
2932

30-
static MeterProviderBuilder AddMetrics<T>(this MeterProviderBuilder builder) where T : class {
33+
static MeterProviderBuilder AddMetrics<T>(this MeterProviderBuilder builder, TagList? customTags = null)
34+
where T : class, IWithCustomTags {
3135
builder.GetServices().AddSingleton<T>();
3236

3337
return builder is IDeferredMeterProviderBuilder deferredMeterProviderBuilder
3438
? deferredMeterProviderBuilder.Configure(
35-
(sp, b) =>
36-
b.AddInstrumentation(sp.GetRequiredService<T>)
37-
) : builder.AddInstrumentation<T>();
39+
(sp, b) => {
40+
b.AddInstrumentation(
41+
() => {
42+
var instrument = sp.GetRequiredService<T>();
43+
if (customTags != null) instrument.SetCustomTags(customTags.Value);
44+
return instrument;
45+
}
46+
);
47+
}
48+
) : builder;
3849
}
39-
}
50+
}

src/Diagnostics/src/Eventuous.Diagnostics/EventuousDiagnostics.cs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,13 @@ public static class EventuousDiagnostics {
1414

1515
static ActivitySource? activitySource;
1616
static ActivityListener? listener;
17+
18+
public static KeyValuePair<string, object?>[] Tags { get; private set; } = Array.Empty<KeyValuePair<string, object?>>();
19+
20+
public static void AddDefaultTag(string key, object? value) {
21+
var tags = new List<KeyValuePair<string, object?>>(Tags) { new(key, value) };
22+
Tags = tags.ToArray();
23+
}
1724

1825
public static bool Enabled { get; }
1926

src/Diagnostics/src/Eventuous.Diagnostics/Metrics/EventuousMetrics.cs

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,15 @@
44

55
namespace Eventuous.Diagnostics.Metrics;
66

7-
public sealed class EventuousMetrics : IDisposable {
7+
public sealed class EventuousMetrics : IWithCustomTags, IDisposable {
88
public static readonly string MeterName = EventuousDiagnostics.GetMeterName("core");
99

10-
readonly Meter _meter;
11-
readonly ActivityListener _listener;
10+
readonly Meter _meter;
11+
readonly ActivityListener _listener;
12+
KeyValuePair<string, object?>[]? _customTags;
1213

1314
public EventuousMetrics() {
14-
_meter = EventuousDiagnostics.GetMeter(MeterName);
15+
_meter = EventuousDiagnostics.GetMeter(MeterName);
1516

1617
var eventStoreMetric = _meter.CreateHistogram<double>(
1718
Constants.EventStorePrefix,
@@ -30,11 +31,13 @@ public EventuousMetrics() {
3031
Sample = (ref ActivityCreationOptions<ActivityContext> _) => ActivitySamplingResult.AllData,
3132
ActivityStopped = Record
3233
};
34+
3335
ActivitySource.AddActivityListener(_listener);
3436

3537
void Record(Activity activity) {
3638
if (activity.OperationName == Constants.HandleCommand) {
37-
appServiceMetric.Record(
39+
RecordWithTags(
40+
appServiceMetric,
3841
activity.Duration.TotalMilliseconds,
3942
new KeyValuePair<string, object?>("command", activity.GetTagItem(Constants.CommandTag))
4043
);
@@ -43,16 +46,29 @@ void Record(Activity activity) {
4346
}
4447

4548
if (activity.OperationName.StartsWith(Constants.EventStorePrefix)) {
46-
eventStoreMetric.Record(
49+
RecordWithTags(
50+
eventStoreMetric,
4751
activity.Duration.TotalMilliseconds,
4852
new KeyValuePair<string, object?>("operation", activity.OperationName)
4953
);
5054
}
5155
}
56+
57+
void RecordWithTags(Histogram<double> histogram, double value, KeyValuePair<string, object?> tag) {
58+
if (_customTags == null) {
59+
histogram.Record(value, tag);
60+
return;
61+
}
62+
63+
var tags = new TagList(_customTags) { tag };
64+
histogram.Record(value, tags);
65+
}
5266
}
5367

5468
public void Dispose() {
5569
_listener.Dispose();
5670
_meter.Dispose();
5771
}
58-
}
72+
73+
public void SetCustomTags(TagList customTags) => _customTags = customTags.ToArray();
74+
}
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
using System.Diagnostics;
2+
3+
namespace Eventuous.Diagnostics;
4+
5+
public interface IWithCustomTags {
6+
void SetCustomTags(TagList customTags);
7+
}

0 commit comments

Comments
 (0)