Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Correlate client and orchestrator functions for DF .NET Isolated Distributed Tracing #2998

Merged
merged 16 commits into from
Feb 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions src/WebJobs.Extensions.DurableTask/Correlation/Schema.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See LICENSE in the project root for license information.
#nullable enable

namespace Microsoft.Azure.WebJobs.Extensions.DurableTask.Correlation
{
internal static class Schema
{
internal static class Task
{
internal const string Type = "durabletask.type";
internal const string Name = "durabletask.task.name";
internal const string Version = "durabletask.task.version";
internal const string InstanceId = "durabletask.task.instance_id";
internal const string ExecutionId = "durabletask.task.execution_id";
internal const string Status = "durabletask.task.status";
internal const string TaskId = "durabletask.task.task_id";
internal const string EventTargetInstanceId = "durabletask.event.target_instance_id";
internal const string FireAt = "durabletask.fire_at";
}

internal static class Status
{
internal const string Code = "otel.status_code";
internal const string Description = "otel.status_description";
}

internal static class SpanNames
{
internal static string CreateOrchestration(string name, string? version)
=> FormatName(TraceActivityConstants.CreateOrchestration, name, version);

private static string FormatName(string prefix, string name, string? version)
=> string.IsNullOrEmpty(version) ? $"{prefix}:{name}" : $"{prefix}:{name}@{version}";
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,28 @@ public TelemetryActivator(IOptions<DurableTaskOptions> options, INameResolver na

internal IAsyncDisposable TelemetryModule { get; set; }

internal IAsyncDisposable WebJobsTelemetryModule { get; set; }

/// <inheritdoc/>
public ValueTask DisposeAsync()
{
return this.TelemetryModule?.DisposeAsync() ?? default;
if (this.TelemetryModule != null)
{
this.TelemetryModule.DisposeAsync();
}

if (this.WebJobsTelemetryModule != null)
{
this.WebJobsTelemetryModule.DisposeAsync();
}

return default;
}

/// <inheritdoc/>
public void Dispose()
bachuv marked this conversation as resolved.
Show resolved Hide resolved
{
this.TelemetryModule?.DisposeAsync().AsTask().GetAwaiter().GetResult();
this.DisposeAsync().AsTask().GetAwaiter().GetResult();
}

/// <summary>
Expand All @@ -65,6 +77,10 @@ public void Initialize(TelemetryConfiguration configuration)
DurableTelemetryModule module = new DurableTelemetryModule();
module.Initialize(configuration);
this.TelemetryModule = module;

WebJobsTelemetryModule webJobsModule = new WebJobsTelemetryModule();
bachuv marked this conversation as resolved.
Show resolved Hide resolved
webJobsModule.Initialize(configuration);
this.WebJobsTelemetryModule = webJobsModule;
}
else
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See LICENSE in the project root for license information.

namespace Microsoft.Azure.WebJobs.Extensions.DurableTask.Correlation
{
internal class TraceActivityConstants
bachuv marked this conversation as resolved.
Show resolved Hide resolved
{
public const string Client = "client";
public const string Orchestration = "orchestration";
public const string Activity = "activity";
public const string Event = "event";
public const string Timer = "timer";

public const string CreateOrchestration = "create_orchestration";
public const string OrchestrationEvent = "orchestration_event";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See LICENSE in the project root for license information.

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.ApplicationInsights;
using Microsoft.ApplicationInsights.DataContracts;
using Microsoft.ApplicationInsights.Extensibility;
using Microsoft.ApplicationInsights.Extensibility.Implementation;

namespace Microsoft.Azure.WebJobs.Extensions.DurableTask.Correlation
{
internal class WebJobsTelemetryModule : ITelemetryModule, IAsyncDisposable
{
private TelemetryClient telemetryClient;
private ActivityListener listener;

/// <inheritdoc/>
public void Initialize(TelemetryConfiguration configuration)
{
this.telemetryClient = new TelemetryClient(configuration);

// ActivitySamplingResult.AllData means that the ActivityListener is going to collect all of the data
// for any Activity that's sent to the "DurableTask" source. It isn't going to exclude any data.
this.listener = new ActivityListener
{
ShouldListenTo = source => source.Name.StartsWith("WebJobs.Extensions.DurableTask"),
ActivityStopped = this.OnEndActivity,
Sample = (ref ActivityCreationOptions<ActivityContext> _) => ActivitySamplingResult.AllData,
SampleUsingParentId = (ref ActivityCreationOptions<string> _) => ActivitySamplingResult.AllData,
};

ActivitySource.AddActivityListener(this.listener);
}

private void OnEndActivity(Activity activity)
{
if (!activity.IsAllDataRequested)
{
return;
}

OperationTelemetry telemetry = CreateTelemetry(activity);
this.telemetryClient.Track(telemetry);
}

private static OperationTelemetry CreateTelemetry(Activity activity)
{
OperationTelemetry telemetry;
ActivityStatusCode status = activity.Status;
switch (activity.Kind)
{
case ActivityKind.Consumer or ActivityKind.Server:
RequestTelemetry request = CreateTelemetryCore<RequestTelemetry>(activity);
request.Success = status != ActivityStatusCode.Error;

if (string.IsNullOrEmpty(request.ResponseCode))
{
request.ResponseCode = (bool)request.Success ? "200" : "500";
}

telemetry = request;
break;
default:
DependencyTelemetry dependency = CreateTelemetryCore<DependencyTelemetry>(activity);
dependency.Success = status != ActivityStatusCode.Error;
dependency.Type = activity.Kind is ActivityKind.Internal ? "InProc" : "DurableTask";
telemetry = dependency;
break;
}

// telemetry.Properties["otel.status_description"] = description;
return telemetry;
}

private static T CreateTelemetryCore<T>(Activity activity)
where T : OperationTelemetry, new()
{
T telemetry = new ()
{
Name = activity.DisplayName,
Id = activity.SpanId.ToString(),
Timestamp = activity.StartTimeUtc,
Duration = activity.Duration,
};

telemetry.Context.Operation.Id = activity.RootId;
ActivitySpanId parentId = activity.ParentSpanId;
if (parentId != default)
{
telemetry.Context.Operation.ParentId = parentId.ToString();
}

foreach (KeyValuePair<string, string> item in activity.Baggage)
{
telemetry.Properties[item.Key] = item.Value;
}

foreach (KeyValuePair<string, object> item in activity.TagObjects)
{
telemetry.Properties[item.Key] = item.Value.ToString();
}

return telemetry;
}

async ValueTask IAsyncDisposable.DisposeAsync()
{
this.listener?.Dispose();
if (this.telemetryClient != null)
{
using CancellationTokenSource cts = new (millisecondsDelay: 5000);
try
{
await this.telemetryClient.FlushAsync(cts.Token);
}
catch
{
// Ignore for now; potentially log this in the future.
}
}
}
}
}
24 changes: 24 additions & 0 deletions src/WebJobs.Extensions.DurableTask/Grpc/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# WebJobs Extension Protobuf

This directory contains the protobuf definitions for the WebJobs Extension SDK, which are used to generate the C# source code for the gRPC service contracts. The official protobuf definitions are maintained in the [Durable Task Protobuf repository](https://github.com/microsoft/durabletask-protobuf).

## Updating the Protobuf Definitions

To update the protobuf definitions in this directory, follow these steps:

1. Make sure you have [PowerShell](https://learn.microsoft.com/powershell/scripting/install/installing-powershell) installed on your machine.
2. Run the following command to download the latest protobuf definitions from the Durable Task Protobuf repository:

```powershell
.\refresh-protos.ps1
```

This script will download the latest protobuf definitions from the `https://github.com/microsoft/durabletask-protobuf` repository and copy them to this directory.

By default, the latest versions of the protobufs are downloaded from the `main` branch. To specify an alternative branch, use the `-branch` parameter:

```powershell
.\refresh-protos.ps1 -branch <branch-name>
```

The `versions.txt` file in this directory contains the list of protobuf files and their commit hashes that were last downloaded. It is updated automatically by the `refresh-protos.ps1` script.
Loading
Loading