Skip to content

Commit

Permalink
Make scalers into per-task hub singletons (cherry pick #2967) (#2974)
Browse files Browse the repository at this point in the history
* Make scalers into per-task hub singletons (#2967)

This addresses an issue where the costs associated with polling storage resources increase substantially when an app has a large number of durable-trigger functions compared to apps that only have a small number.

* Remove unnecesary #ifdef

* Update nuget package versions
  • Loading branch information
cgillum authored Nov 26, 2024
1 parent 8ad9682 commit 860216c
Show file tree
Hide file tree
Showing 10 changed files with 71 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ internal class AzureStorageDurabilityProvider : DurabilityProvider
private readonly JObject storageOptionsJson;
private readonly ILogger logger;

private readonly object initLock = new object();

private DurableTaskScaleMonitor singletonScaleMonitor;
private DurableTaskTargetScaler singletonTargetScaler;

public AzureStorageDurabilityProvider(
AzureStorageOrchestrationService service,
IStorageServiceClientProviderFactory clientProviderFactory,
Expand Down Expand Up @@ -222,12 +227,11 @@ internal static OrchestrationInstanceStatusQueryCondition ConvertWebjobsDurableC
}

internal DurableTaskMetricsProvider GetMetricsProvider(
string functionName,
string hubName,
StorageAccountClientProvider storageAccountClientProvider,
ILogger logger)
string hubName,
StorageAccountClientProvider storageAccountClientProvider,
ILogger logger)
{
return new DurableTaskMetricsProvider(functionName, hubName, logger, performanceMonitor: null, storageAccountClientProvider);
return new DurableTaskMetricsProvider(hubName, logger, performanceMonitor: null, storageAccountClientProvider);
}

/// <inheritdoc/>
Expand All @@ -238,16 +242,22 @@ public override bool TryGetScaleMonitor(
string connectionName,
out IScaleMonitor scaleMonitor)
{
StorageAccountClientProvider storageAccountClientProvider = this.clientProviderFactory.GetClientProvider(connectionName);
DurableTaskMetricsProvider metricsProvider = this.GetMetricsProvider(functionName, hubName, storageAccountClientProvider, this.logger);
scaleMonitor = new DurableTaskScaleMonitor(
functionId,
functionName,
hubName,
storageAccountClientProvider,
this.logger,
metricsProvider);
return true;
lock (this.initLock)
{
if (this.singletonScaleMonitor == null)
{
StorageAccountClientProvider storageAccountClientProvider = this.clientProviderFactory.GetClientProvider(connectionName);
DurableTaskMetricsProvider metricsProvider = this.GetMetricsProvider(hubName, storageAccountClientProvider, this.logger);
this.singletonScaleMonitor = new DurableTaskScaleMonitor(
hubName,
storageAccountClientProvider,
this.logger,
metricsProvider);
}

scaleMonitor = this.singletonScaleMonitor;
return true;
}
}

public override bool TryGetTargetScaler(
Expand All @@ -257,11 +267,23 @@ public override bool TryGetTargetScaler(
string connectionName,
out ITargetScaler targetScaler)
{
// This is only called by the ScaleController, it doesn't run in the Functions Host process.
StorageAccountClientProvider storageAccountClientProvider = this.clientProviderFactory.GetClientProvider(connectionName);
DurableTaskMetricsProvider metricsProvider = this.GetMetricsProvider(functionName, hubName, storageAccountClientProvider, this.logger);
targetScaler = new DurableTaskTargetScaler(functionId, metricsProvider, this, this.logger);
return true;
lock (this.initLock)
{
if (this.singletonTargetScaler == null)
{
// This is only called by the ScaleController, it doesn't run in the Functions Host process.
StorageAccountClientProvider storageAccountClientProvider = this.clientProviderFactory.GetClientProvider(connectionName);
DurableTaskMetricsProvider metricsProvider = this.GetMetricsProvider(hubName, storageAccountClientProvider, this.logger);

// Scalers in Durable Functions are shared for all functions in the same task hub.
// So instead of using a function ID, we use the task hub name as the basis for the descriptor ID.
string id = $"DurableTask-AzureStorage:{hubName ?? "default"}";
this.singletonTargetScaler = new DurableTaskTargetScaler(id, metricsProvider, this, this.logger);
}

targetScaler = this.singletonTargetScaler;
return true;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,18 @@ namespace Microsoft.Azure.WebJobs.Extensions.DurableTask
{
internal class DurableTaskMetricsProvider
{
private readonly string functionName;
private readonly string hubName;
private readonly ILogger logger;
private readonly StorageAccountClientProvider storageAccountClientProvider;

private DisconnectedPerformanceMonitor performanceMonitor;

public DurableTaskMetricsProvider(string functionName, string hubName, ILogger logger, DisconnectedPerformanceMonitor performanceMonitor, StorageAccountClientProvider storageAccountClientProvider)
public DurableTaskMetricsProvider(
string hubName,
ILogger logger,
DisconnectedPerformanceMonitor performanceMonitor,
StorageAccountClientProvider storageAccountClientProvider)
{
this.functionName = functionName;
this.hubName = hubName;
this.logger = logger;
this.performanceMonitor = performanceMonitor;
Expand All @@ -42,7 +44,7 @@ public virtual async Task<DurableTaskTriggerMetrics> GetMetricsAsync()
}
catch (Exception e) when (e.InnerException is RequestFailedException)
{
this.logger.LogWarning("{details}. Function: {functionName}. HubName: {hubName}.", e.ToString(), this.functionName, this.hubName);
this.logger.LogWarning("{details}. HubName: {hubName}.", e.ToString(), this.hubName);
}

if (heartbeat != null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ namespace Microsoft.Azure.WebJobs.Extensions.DurableTask
{
internal sealed class DurableTaskScaleMonitor : IScaleMonitor<DurableTaskTriggerMetrics>
{
private readonly string functionId;
private readonly string functionName;
private readonly string hubName;
private readonly StorageAccountClientProvider storageAccountClientProvider;
private readonly ScaleMonitorDescriptor scaleMonitorDescriptor;
Expand All @@ -27,23 +25,22 @@ internal sealed class DurableTaskScaleMonitor : IScaleMonitor<DurableTaskTrigger
private DisconnectedPerformanceMonitor performanceMonitor;

public DurableTaskScaleMonitor(
string functionId,
string functionName,
string hubName,
StorageAccountClientProvider storageAccountClientProvider,
ILogger logger,
DurableTaskMetricsProvider durableTaskMetricsProvider,
DisconnectedPerformanceMonitor performanceMonitor = null)
{
this.functionId = functionId;
this.functionName = functionName;
this.hubName = hubName;
this.storageAccountClientProvider = storageAccountClientProvider;
this.logger = logger;
this.performanceMonitor = performanceMonitor;
this.durableTaskMetricsProvider = durableTaskMetricsProvider;

this.scaleMonitorDescriptor = new ScaleMonitorDescriptor($"{this.functionId}-DurableTaskTrigger-{this.hubName}".ToLower(), this.functionId);
string id = $"DurableTaskTrigger-{this.hubName}".ToLower();
// Scalers in Durable Functions are shared for all functions in the same task hub.

Check warning on line 41 in src/WebJobs.Extensions.DurableTask/Listener/DurableTaskScaleMonitor.cs

View workflow job for this annotation

GitHub Actions / Analyze (csharp)

Check warning on line 41 in src/WebJobs.Extensions.DurableTask/Listener/DurableTaskScaleMonitor.cs

View workflow job for this annotation

GitHub Actions / Analyze (csharp)

// So instead of using a function ID, we use the task hub name as the basis for the descriptor ID.
this.scaleMonitorDescriptor = new ScaleMonitorDescriptor(id: id, functionId: id);
}

public ScaleMonitorDescriptor Descriptor
Expand Down Expand Up @@ -141,9 +138,10 @@ private ScaleStatus GetScaleStatusCore(int workerCount, DurableTaskTriggerMetric
if (writeToUserLogs)
{
this.logger.LogInformation(
$"Durable Functions Trigger Scale Decision: {scaleStatus.Vote.ToString()}, Reason: {scaleRecommendation?.Reason}",
"Durable Functions Trigger Scale Decision for {TaskHub}: {Vote}, Reason: {Reason}",
this.hubName,
this.functionName);
scaleStatus.Vote,
scaleRecommendation?.Reason);
}

return scaleStatus;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,18 @@ internal class DurableTaskTargetScaler : ITargetScaler
private readonly TargetScalerResult scaleResult;
private readonly DurabilityProvider durabilityProvider;
private readonly ILogger logger;
private readonly string functionId;
private readonly string scaler;

public DurableTaskTargetScaler(string functionId, DurableTaskMetricsProvider metricsProvider, DurabilityProvider durabilityProvider, ILogger logger)
public DurableTaskTargetScaler(
string scalerId,
DurableTaskMetricsProvider metricsProvider,
DurabilityProvider durabilityProvider,
ILogger logger)
{
this.functionId = functionId;
this.scaler = scalerId;
this.metricsProvider = metricsProvider;
this.scaleResult = new TargetScalerResult();
this.TargetScalerDescriptor = new TargetScalerDescriptor(this.functionId);
this.TargetScalerDescriptor = new TargetScalerDescriptor(this.scaler);
this.durabilityProvider = durabilityProvider;
this.logger = logger;
}
Expand Down Expand Up @@ -67,7 +71,7 @@ public async Task<TargetScalerResult> GetScaleResultAsync(TargetScalerContext co
// and the ScaleController is injecting it's own custom ILogger implementation that forwards logs to Kusto.
var metricsLog = $"Metrics: workItemQueueLength={workItemQueueLength}. controlQueueLengths={serializedControlQueueLengths}. " +
$"maxConcurrentOrchestrators={this.MaxConcurrentOrchestrators}. maxConcurrentActivities={this.MaxConcurrentActivities}";
var scaleControllerLog = $"Target worker count for '{this.functionId}' is '{numWorkersToRequest}'. " +
var scaleControllerLog = $"Target worker count for '{this.scaler}' is '{numWorkersToRequest}'. " +
metricsLog;

// target worker count should never be negative
Expand All @@ -84,7 +88,7 @@ public async Task<TargetScalerResult> GetScaleResultAsync(TargetScalerContext co
// We want to augment the exception with metrics information for investigation purposes
var metricsLog = $"Metrics: workItemQueueLength={metrics?.WorkItemQueueLength}. controlQueueLengths={metrics?.ControlQueueLengths}. " +
$"maxConcurrentOrchestrators={this.MaxConcurrentOrchestrators}. maxConcurrentActivities={this.MaxConcurrentActivities}";
var errorLog = $"Error: target worker count for '{this.functionId}' resulted in exception. " + metricsLog;
var errorLog = $"Error: target worker count for '{this.scaler}' resulted in exception. " + metricsLog;
throw new Exception(errorLog, ex);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<RootNamespace>Microsoft.Azure.WebJobs.Extensions.DurableTask</RootNamespace>
<MajorVersion>3</MajorVersion>
<MinorVersion>0</MinorVersion>
<PatchVersion>0</PatchVersion>
<PatchVersion>1</PatchVersion>
<VersionPrefix>$(MajorVersion).$(MinorVersion).$(PatchVersion)</VersionPrefix>
<FileVersion>$(MajorVersion).$(MinorVersion).$(PatchVersion)</FileVersion>
<AssemblyVersion>$(MajorVersion).0.0.0</AssemblyVersion>
Expand Down
2 changes: 1 addition & 1 deletion src/Worker.Extensions.DurableTask/AssemblyInfo.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,5 @@
using Microsoft.Azure.Functions.Worker.Extensions.Abstractions;

// TODO: Find a way to generate this dynamically at build-time
[assembly: ExtensionInformation("Microsoft.Azure.WebJobs.Extensions.DurableTask", "3.0.0")]
[assembly: ExtensionInformation("Microsoft.Azure.WebJobs.Extensions.DurableTask", "3.0.1")]
[assembly: InternalsVisibleTo("Worker.Extensions.DurableTask.Tests, PublicKey=0024000004800000940000000602000000240000525341310004000001000100cd1dabd5a893b40e75dc901fe7293db4a3caf9cd4d3e3ed6178d49cd476969abe74a9e0b7f4a0bb15edca48758155d35a4f05e6e852fff1b319d103b39ba04acbadd278c2753627c95e1f6f6582425374b92f51cca3deb0d2aab9de3ecda7753900a31f70a236f163006beefffe282888f85e3c76d1205ec7dfef7fa472a17b1")]
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
<AssemblyOriginatorKeyFile>..\..\sign.snk</AssemblyOriginatorKeyFile>

<!-- Version information -->
<VersionPrefix>1.2.0</VersionPrefix>
<VersionPrefix>1.2.1</VersionPrefix>
<VersionSuffix></VersionSuffix>
<AssemblyVersion>$(VersionPrefix).0</AssemblyVersion>
<!-- FileVersionRevision is expected to be set by the CI. -->
Expand Down
7 changes: 2 additions & 5 deletions test/FunctionsV2/DurableTaskListenerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,10 @@
// Licensed under the MIT License. See LICENSE in the project root for license information.

using System;
using System.Linq;
using Microsoft.Azure.WebJobs.Host.Executors;
using Microsoft.Azure.WebJobs.Host.Scale;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.Extensions.Options;
using Moq;
using Xunit;

namespace Microsoft.Azure.WebJobs.Extensions.DurableTask.Tests
Expand Down Expand Up @@ -40,9 +37,9 @@ public void GetMonitor_ReturnsExpectedValue()
IScaleMonitor scaleMonitor = this.listener.GetMonitor();

Assert.Equal(typeof(DurableTaskScaleMonitor), scaleMonitor.GetType());
Assert.Equal($"{this.functionId}-DurableTaskTrigger-DurableTaskHub".ToLower(), scaleMonitor.Descriptor.Id);
Assert.Equal($"DurableTaskTrigger-DurableTaskHub".ToLower(), scaleMonitor.Descriptor.Id);

var scaleMonitor2 = this.listener.GetMonitor();
IScaleMonitor scaleMonitor2 = this.listener.GetMonitor();

Assert.Same(scaleMonitor, scaleMonitor2);
}
Expand Down
7 changes: 1 addition & 6 deletions test/FunctionsV2/DurableTaskScaleMonitorTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ namespace Microsoft.Azure.WebJobs.Extensions.DurableTask.Tests
{
public class DurableTaskScaleMonitorTests
{
private readonly string functionId = "DurableTaskTriggerFunctionId";
private readonly FunctionName functionName = new FunctionName("DurableTaskTriggerFunctionName");
private readonly string hubName = "DurableTaskTriggerHubName";
private readonly StorageAccountClientProvider clientProvider = new StorageAccountClientProvider(TestHelpers.GetStorageConnectionString());
private readonly ITestOutputHelper output;
Expand All @@ -45,15 +43,12 @@ public DurableTaskScaleMonitorTests(ITestOutputHelper output)
TaskHubName = this.hubName,
});
var metricsProvider = new DurableTaskMetricsProvider(
this.functionName.Name,
this.hubName,
logger,
this.performanceMonitor.Object,
this.clientProvider);

this.scaleMonitor = new DurableTaskScaleMonitor(
this.functionId,
this.functionName.Name,
this.hubName,
this.clientProvider,
logger,
Expand All @@ -65,7 +60,7 @@ public DurableTaskScaleMonitorTests(ITestOutputHelper output)
[Trait("Category", PlatformSpecificHelpers.TestCategory)]
public void ScaleMonitorDescriptor_ReturnsExpectedValue()
{
Assert.Equal($"{this.functionId}-DurableTaskTrigger-{this.hubName}".ToLower(), this.scaleMonitor.Descriptor.Id);
Assert.Equal($"DurableTaskTrigger-{this.hubName}".ToLower(), this.scaleMonitor.Descriptor.Id);
}

[Fact]
Expand Down
1 change: 0 additions & 1 deletion test/FunctionsV2/DurableTaskTargetScalerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ public DurableTaskTargetScalerTests(ITestOutputHelper output)
StorageAccountClientProvider storageAccountClientProvider = null;
this.metricsProviderMock = new Mock<DurableTaskMetricsProvider>(
MockBehavior.Strict,
"FunctionName",
"HubName",
logger,
nullPerformanceMonitorMock,
Expand Down

0 comments on commit 860216c

Please sign in to comment.