Skip to content

Commit

Permalink
v1.3.2 FreeRedis.OpenTelemetry #205 #196
Browse files Browse the repository at this point in the history
  • Loading branch information
2881099 committed Sep 8, 2024
1 parent 196d3e2 commit e5c0bf7
Show file tree
Hide file tree
Showing 9 changed files with 196 additions and 180 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<AssemblyName>FreeRedis.DistributedCache</AssemblyName>
<PackageId>FreeRedis.DistributedCache</PackageId>
<RootNamespace>FreeRedis.DistributedCache</RootNamespace>
<Version>1.3.0</Version>
<Version>1.3.2</Version>
<GeneratePackageOnBuild>true</GeneratePackageOnBuild>
<PackageProjectUrl>https://github.com/2881099/FreeRedis</PackageProjectUrl>
<Description>分布式缓存 FreeRedis 实现 Microsoft.Extensions.Caching</Description>
Expand Down
122 changes: 63 additions & 59 deletions src/FreeRedis.OpenTelemetry/DiagnosticListener.cs
Original file line number Diff line number Diff line change
@@ -1,77 +1,81 @@
using OpenTelemetry.Trace;
using System;
using System.Collections.Generic;
using System.Diagnostics;

namespace FreeRedis.OpenTelemetry;

public class DiagnosticListener : IObserver<KeyValuePair<string, object?>>
namespace FreeRedis.OpenTelemetry
{
public const string SourceName = "FreeRedis.OpenTelemetry";

private static readonly ActivitySource ActivitySource = new(SourceName, "1.0.0");

/// <summary>Notifies the observer that the provider has finished sending push-based notifications.</summary>
public void OnCompleted()
public class DiagnosticListener : IObserver<KeyValuePair<string, object?>>
{
}
public const string SourceName = "FreeRedis.OpenTelemetry";

/// <summary>Notifies the observer that the provider has experienced an error condition.</summary>
/// <param name="error">An object that provides additional information about the error.</param>
public void OnError(Exception error)
{
}
private static readonly ActivitySource ActivitySource = new(SourceName, "1.0.0");

/// <summary>Provides the observer with new data.</summary>
/// <param name="evt">The current notification information.</param>
public void OnNext(KeyValuePair<string, object?> evt)
{
//https://opentelemetry.io/docs/specs/semconv/database/redis/
switch (evt.Key)
/// <summary>Notifies the observer that the provider has finished sending push-based notifications.</summary>
public void OnCompleted()
{
case FreeRedisDiagnosticListenerNames.NoticeCallBefore:
{
var eventData = (InterceptorBeforeEventArgs)evt.Value!;
var activity = ActivitySource.StartActivity("redis command execute: " + eventData.Command);
if (activity != null)
{
activity.SetTag("db.system", "redis");
activity.SetTag("db.operation.name", eventData.Command._command);
activity.SetTag("db.query.text", eventData.Command);
//Activity.Current?.SetTag("network.peer.address", ip);
//Activity.Current?.SetTag("network.peer.port", port);
}

activity.AddEvent(new ActivityEvent("redis command execute start",
DateTimeOffset.FromUnixTimeMilliseconds(eventData.OperationTimestamp!.Value)));
}
}
break;
case FreeRedisDiagnosticListenerNames.NoticeCallAfter:
{
var eventData = (InterceptorAfterEventArgs)evt.Value!;
var writeTarget = eventData.Command.WriteTarget;
if (!string.IsNullOrEmpty(writeTarget))
/// <summary>Notifies the observer that the provider has experienced an error condition.</summary>
/// <param name="error">An object that provides additional information about the error.</param>
public void OnError(Exception error)
{
}

/// <summary>Provides the observer with new data.</summary>
/// <param name="evt">The current notification information.</param>
public void OnNext(KeyValuePair<string, object?> evt)
{
//https://opentelemetry.io/docs/specs/semconv/database/redis/
switch (evt.Key)
{
case FreeRedisDiagnosticListenerNames.NoticeCallBefore:
{
var parts = writeTarget.Split([':', '/'], StringSplitOptions.RemoveEmptyEntries);
var ip = parts[0];
var port = int.Parse(parts[1]);
var dbIndex = int.Parse(parts[2]);
var eventData = (InterceptorBeforeEventArgs)evt.Value!;
var activity = ActivitySource.StartActivity("redis command execute: " + eventData.Command);
if (activity != null)
{
activity.SetTag("db.system", "redis");
activity.SetTag("db.operation.name", eventData.Command._command);
activity.SetTag("db.query.text", eventData.Command);
//Activity.Current?.SetTag("network.peer.address", ip);
//Activity.Current?.SetTag("network.peer.port", port);

Activity.Current?.SetTag("server.address", ip);
Activity.Current?.SetTag("server.port", port);
Activity.Current?.SetTag("db.namespace", dbIndex);
activity.AddEvent(new ActivityEvent("redis command execute start",
DateTimeOffset.FromUnixTimeMilliseconds(eventData.OperationTimestamp!.Value)));
}
}
var tags = new ActivityTagsCollection { new("free_redis.duration", eventData.ElapsedMilliseconds) };
if (eventData.Exception != null)
break;
case FreeRedisDiagnosticListenerNames.NoticeCallAfter:
{
Activity.Current?.SetStatus(Status.Error.WithDescription(eventData.Exception.Message));
tags.Add(new("error.type", eventData.Exception.Message));
}
Activity.Current?.AddEvent(new ActivityEvent("redis command executed",
DateTimeOffset.FromUnixTimeMilliseconds(eventData.OperationTimestamp!.Value), tags)
);
var eventData = (InterceptorAfterEventArgs)evt.Value!;
var writeTarget = eventData.Command.WriteTarget;
if (!string.IsNullOrEmpty(writeTarget))
{
var parts = writeTarget.Split(new[] { ':', '/' }, StringSplitOptions.RemoveEmptyEntries);
var ip = parts[0];
var port = int.Parse(parts[1]);
var dbIndex = int.Parse(parts[2]);

Activity.Current?.Stop();
}
break;
Activity.Current?.SetTag("server.address", ip);
Activity.Current?.SetTag("server.port", port);
Activity.Current?.SetTag("db.namespace", dbIndex);
}
var tags = new ActivityTagsCollection { new("free_redis.duration", eventData.ElapsedMilliseconds) };
if (eventData.Exception != null)
{
Activity.Current?.SetStatus(Status.Error.WithDescription(eventData.Exception.Message));
tags.Add(new("error.type", eventData.Exception.Message));
}
Activity.Current?.AddEvent(new ActivityEvent("redis command executed",
DateTimeOffset.FromUnixTimeMilliseconds(eventData.OperationTimestamp!.Value), tags)
);

Activity.Current?.Stop();
}
break;
}
}
}
}
130 changes: 68 additions & 62 deletions src/FreeRedis.OpenTelemetry/DiagnosticSourceSubscriber.cs
Original file line number Diff line number Diff line change
@@ -1,85 +1,91 @@
namespace FreeRedis.OpenTelemetry;
using System;
using System.Collections.Generic;
using System.Threading;

public class DiagnosticSourceSubscriber : IDisposable, IObserver<System.Diagnostics.DiagnosticListener>
namespace FreeRedis.OpenTelemetry
{
private readonly Func<System.Diagnostics.DiagnosticListener, bool> _diagnosticSourceFilter;
private readonly Func<string, DiagnosticListener> _handlerFactory;
private readonly Func<string, object?, object?, bool>? _isEnabledFilter;
private readonly List<IDisposable> _listenerSubscriptions;
private IDisposable? _allSourcesSubscription;
private long _disposed;

public DiagnosticSourceSubscriber(
DiagnosticListener handler,
Func<string, object?, object?, bool>? isEnabledFilter)
: this(_ => handler,
value => FreeRedisDiagnosticListenerNames.DiagnosticListenerName == value.Name,
isEnabledFilter)
public class DiagnosticSourceSubscriber : IDisposable, IObserver<System.Diagnostics.DiagnosticListener>
{
}
private readonly Func<System.Diagnostics.DiagnosticListener, bool> _diagnosticSourceFilter;
private readonly Func<string, DiagnosticListener> _handlerFactory;
private readonly Func<string, object?, object?, bool>? _isEnabledFilter;
private readonly List<IDisposable> _listenerSubscriptions;
private IDisposable? _allSourcesSubscription;
private long _disposed;

public DiagnosticSourceSubscriber(
Func<string, DiagnosticListener> handlerFactory,
Func<System.Diagnostics.DiagnosticListener, bool> diagnosticSourceFilter,
Func<string, object?, object?, bool>? isEnabledFilter)
{
_listenerSubscriptions = new List<IDisposable>();
_handlerFactory = handlerFactory ?? throw new ArgumentNullException(nameof(handlerFactory));
_diagnosticSourceFilter = diagnosticSourceFilter;
_isEnabledFilter = isEnabledFilter;
}
public DiagnosticSourceSubscriber(
DiagnosticListener handler,
Func<string, object?, object?, bool>? isEnabledFilter)
: this(_ => handler,
value => FreeRedisDiagnosticListenerNames.DiagnosticListenerName == value.Name,
isEnabledFilter)
{
}

/// <inheritdoc />
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
public DiagnosticSourceSubscriber(
Func<string, DiagnosticListener> handlerFactory,
Func<System.Diagnostics.DiagnosticListener, bool> diagnosticSourceFilter,
Func<string, object?, object?, bool>? isEnabledFilter)
{
_listenerSubscriptions = new List<IDisposable>();
_handlerFactory = handlerFactory ?? throw new ArgumentNullException(nameof(handlerFactory));
_diagnosticSourceFilter = diagnosticSourceFilter;
_isEnabledFilter = isEnabledFilter;
}

public void OnNext(System.Diagnostics.DiagnosticListener value)
{
if (Interlocked.Read(ref _disposed) == 0 && _diagnosticSourceFilter(value))
/// <inheritdoc />
public void Dispose()
{
var handler = _handlerFactory(value.Name);
var subscription = _isEnabledFilter == null
? value.Subscribe(handler)
: value.Subscribe(handler, _isEnabledFilter);
Dispose(true);
GC.SuppressFinalize(this);
}

lock (_listenerSubscriptions)
public void OnNext(System.Diagnostics.DiagnosticListener value)
{
if (Interlocked.Read(ref _disposed) == 0 && _diagnosticSourceFilter(value))
{
_listenerSubscriptions.Add(subscription);
var handler = _handlerFactory(value.Name);
var subscription = _isEnabledFilter == null
? value.Subscribe(handler)
: value.Subscribe(handler, _isEnabledFilter);

lock (_listenerSubscriptions)
{
_listenerSubscriptions.Add(subscription);
}
}
}
}

public void OnCompleted()
{
}

public void OnError(Exception error)
{
}
public void OnCompleted()
{
}

public void Subscribe()
{
_allSourcesSubscription ??= System.Diagnostics.DiagnosticListener.AllListeners.Subscribe(this);
}
public void OnError(Exception error)
{
}

protected virtual void Dispose(bool disposing)
{
if (Interlocked.CompareExchange(ref _disposed, 1, 0) == 1) return;
public void Subscribe()
{
_allSourcesSubscription ??= System.Diagnostics.DiagnosticListener.AllListeners.Subscribe(this);
}

lock (_listenerSubscriptions)
protected virtual void Dispose(bool disposing)
{
foreach (var listenerSubscription in _listenerSubscriptions)
if (Interlocked.CompareExchange(ref _disposed, 1, 0) == 1) return;

lock (_listenerSubscriptions)
{
listenerSubscription?.Dispose();
foreach (var listenerSubscription in _listenerSubscriptions)
{
listenerSubscription?.Dispose();
}

_listenerSubscriptions.Clear();
}

_listenerSubscriptions.Clear();
_allSourcesSubscription?.Dispose();
_allSourcesSubscription = null;
}

_allSourcesSubscription?.Dispose();
_allSourcesSubscription = null;
}
}
43 changes: 31 additions & 12 deletions src/FreeRedis.OpenTelemetry/FreeRedis.OpenTelemetry.csproj
Original file line number Diff line number Diff line change
@@ -1,17 +1,36 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>net8.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>
<PropertyGroup>
<TargetFrameworks>net80</TargetFrameworks>
<AssemblyName>FreeRedis.OpenTelemetry</AssemblyName>
<PackageId>FreeRedis.OpenTelemetry</PackageId>
<RootNamespace>FreeRedis.OpenTelemetry</RootNamespace>
<Version>1.3.2</Version>
<GeneratePackageOnBuild>true</GeneratePackageOnBuild>
<PackageProjectUrl>https://github.com/2881099/FreeRedis</PackageProjectUrl>
<RepositoryUrl>https://github.com/2881099/FreeRedis</RepositoryUrl>
<PackageTags>FreeRedis redis-trib cluster rediscluster sentinel OpenTelemetry</PackageTags>
<RepositoryType>git</RepositoryType>
<PackageLicenseExpression>MIT</PackageLicenseExpression>
<Title>$(AssemblyName)</Title>
<IsPackable>true</IsPackable>
<GenerateAssemblyInfo>true</GenerateAssemblyInfo>
<WarningLevel>3</WarningLevel>
<SignAssembly>true</SignAssembly>
<AssemblyOriginatorKeyFile>key.snk</AssemblyOriginatorKeyFile>
<DelaySign>false</DelaySign>
<PackageReadmeFile>readme.md</PackageReadmeFile>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="OpenTelemetry" Version="1.9.0" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="OpenTelemetry" Version="1.9.0" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\FreeRedis\FreeRedis.csproj" />
</ItemGroup>
<ItemGroup>
<None Include="../../readme.md" Pack="true" PackagePath="\"/>
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\FreeRedis\FreeRedis.csproj" />
</ItemGroup>

</Project>
</Project>
26 changes: 15 additions & 11 deletions src/FreeRedis.OpenTelemetry/FreeRedisInstrumentation.cs
Original file line number Diff line number Diff line change
@@ -1,18 +1,22 @@
namespace FreeRedis.OpenTelemetry;
using System;

public class FreeRedisInstrumentation : IDisposable
namespace FreeRedis.OpenTelemetry
{
private readonly DiagnosticSourceSubscriber? _diagnosticSourceSubscriber;

public FreeRedisInstrumentation(DiagnosticListener diagnosticListener)
public class FreeRedisInstrumentation : IDisposable
{
_diagnosticSourceSubscriber = new DiagnosticSourceSubscriber(diagnosticListener, null);
_diagnosticSourceSubscriber.Subscribe();
}
private readonly DiagnosticSourceSubscriber? _diagnosticSourceSubscriber;

/// <inheritdoc />
public void Dispose()
{
_diagnosticSourceSubscriber?.Dispose();
public FreeRedisInstrumentation(DiagnosticListener diagnosticListener)
{
_diagnosticSourceSubscriber = new DiagnosticSourceSubscriber(diagnosticListener, null);
_diagnosticSourceSubscriber.Subscribe();
}

/// <inheritdoc />
public void Dispose()
{
_diagnosticSourceSubscriber?.Dispose();
}
}
}
Loading

0 comments on commit e5c0bf7

Please sign in to comment.