From 83594966cbfd3ad447259b091ee04d51d6eaf0fb Mon Sep 17 00:00:00 2001 From: Chris Gillum Date: Fri, 15 Nov 2024 07:56:15 -0800 Subject: [PATCH 1/4] Make scalers into per-task hub singletons --- .../AzureStorageDurabilityProvider.cs | 61 +++++++++++++------ .../Listener/DurableTaskMetricsProvider.cs | 10 +-- .../Listener/DurableTaskScaleMonitor.cs | 18 +++--- .../Listener/DurableTaskTargetScaler.cs | 16 +++-- 4 files changed, 68 insertions(+), 37 deletions(-) diff --git a/src/WebJobs.Extensions.DurableTask/AzureStorageDurabilityProvider.cs b/src/WebJobs.Extensions.DurableTask/AzureStorageDurabilityProvider.cs index 2920a0f37..ea5350463 100644 --- a/src/WebJobs.Extensions.DurableTask/AzureStorageDurabilityProvider.cs +++ b/src/WebJobs.Extensions.DurableTask/AzureStorageDurabilityProvider.cs @@ -35,6 +35,16 @@ internal class AzureStorageDurabilityProvider : DurabilityProvider private readonly JObject storageOptionsJson; private readonly ILogger logger; + private readonly object initLock = new object(); + +#if !FUNCTIONS_V1 + private DurableTaskScaleMonitor singletonScaleMonitor; +#endif + +#if FUNCTIONS_V3_OR_GREATER + private DurableTaskTargetScaler singletonTargetScaler; +#endif + public AzureStorageDurabilityProvider( AzureStorageOrchestrationService service, IStorageAccountProvider storageAccountProvider, @@ -226,12 +236,11 @@ internal static OrchestrationInstanceStatusQueryCondition ConvertWebjobsDurableC #if !FUNCTIONS_V1 internal DurableTaskMetricsProvider GetMetricsProvider( - string functionName, string hubName, CloudStorageAccount storageAccount, ILogger logger) { - return new DurableTaskMetricsProvider(functionName, hubName, logger, performanceMonitor: null, storageAccount); + return new DurableTaskMetricsProvider(hubName, logger, performanceMonitor: null, storageAccount); } /// @@ -242,16 +251,22 @@ public override bool TryGetScaleMonitor( string connectionName, out IScaleMonitor scaleMonitor) { - CloudStorageAccount storageAccount = this.storageAccountProvider.GetStorageAccountDetails(connectionName).ToCloudStorageAccount(); - DurableTaskMetricsProvider metricsProvider = this.GetMetricsProvider(functionName, hubName, storageAccount, this.logger); - scaleMonitor = new DurableTaskScaleMonitor( - functionId, - functionName, - hubName, - storageAccount, - this.logger, - metricsProvider); - return true; + lock (this.initLock) + { + if (this.singletonScaleMonitor == null) + { + CloudStorageAccount storageAccount = this.storageAccountProvider.GetStorageAccountDetails(connectionName).ToCloudStorageAccount(); + DurableTaskMetricsProvider metricsProvider = this.GetMetricsProvider(hubName, storageAccount, this.logger); + this.singletonScaleMonitor = new DurableTaskScaleMonitor( + hubName, + storageAccount, + this.logger, + metricsProvider); + } + + scaleMonitor = this.singletonScaleMonitor; + return true; + } } #endif @@ -263,11 +278,23 @@ public override bool TryGetTargetScaler( string connectionName, out ITargetScaler targetScaler) { - // This is only called by the ScaleController, it doesn't run in the Functions Host process. - CloudStorageAccount storageAccount = this.storageAccountProvider.GetStorageAccountDetails(connectionName).ToCloudStorageAccount(); - DurableTaskMetricsProvider metricsProvider = this.GetMetricsProvider(functionName, hubName, storageAccount, this.logger); - targetScaler = new DurableTaskTargetScaler(functionId, metricsProvider, this, this.logger); - return true; + lock (this.initLock) + { + if (this.singletonTargetScaler == null) + { + // This is only called by the ScaleController, it doesn't run in the Functions Host process. + CloudStorageAccount storageAccount = this.storageAccountProvider.GetStorageAccountDetails(connectionName).ToCloudStorageAccount(); + DurableTaskMetricsProvider metricsProvider = this.GetMetricsProvider(hubName, storageAccount, this.logger); + + // Scalers in Durable Functions are shared for all functions in the same task hub. + // So instead of using a function ID, we use the task hub name as the basis for the descriptor ID. + string id = $"DurableTask-AzureStorage:{hubName ?? "default"}"; + this.singletonTargetScaler = new DurableTaskTargetScaler(id, metricsProvider, this, this.logger); + } + + targetScaler = this.singletonTargetScaler; + return true; + } } #endif } diff --git a/src/WebJobs.Extensions.DurableTask/Listener/DurableTaskMetricsProvider.cs b/src/WebJobs.Extensions.DurableTask/Listener/DurableTaskMetricsProvider.cs index e3565d169..999821abc 100644 --- a/src/WebJobs.Extensions.DurableTask/Listener/DurableTaskMetricsProvider.cs +++ b/src/WebJobs.Extensions.DurableTask/Listener/DurableTaskMetricsProvider.cs @@ -13,16 +13,18 @@ namespace Microsoft.Azure.WebJobs.Extensions.DurableTask { internal class DurableTaskMetricsProvider { - private readonly string functionName; private readonly string hubName; private readonly ILogger logger; private readonly CloudStorageAccount storageAccount; private DisconnectedPerformanceMonitor performanceMonitor; - public DurableTaskMetricsProvider(string functionName, string hubName, ILogger logger, DisconnectedPerformanceMonitor performanceMonitor, CloudStorageAccount storageAccount) + public DurableTaskMetricsProvider( + string hubName, + ILogger logger, + DisconnectedPerformanceMonitor performanceMonitor, + CloudStorageAccount storageAccount) { - this.functionName = functionName; this.hubName = hubName; this.logger = logger; this.performanceMonitor = performanceMonitor; @@ -42,7 +44,7 @@ public virtual async Task GetMetricsAsync() } catch (StorageException e) { - this.logger.LogWarning("{details}. Function: {functionName}. HubName: {hubName}.", e.ToString(), this.functionName, this.hubName); + this.logger.LogWarning("{details}. HubName: {hubName}.", e.ToString(), this.hubName); } if (heartbeat != null) diff --git a/src/WebJobs.Extensions.DurableTask/Listener/DurableTaskScaleMonitor.cs b/src/WebJobs.Extensions.DurableTask/Listener/DurableTaskScaleMonitor.cs index 4c05b3df0..912210158 100644 --- a/src/WebJobs.Extensions.DurableTask/Listener/DurableTaskScaleMonitor.cs +++ b/src/WebJobs.Extensions.DurableTask/Listener/DurableTaskScaleMonitor.cs @@ -16,8 +16,6 @@ namespace Microsoft.Azure.WebJobs.Extensions.DurableTask { internal sealed class DurableTaskScaleMonitor : IScaleMonitor { - private readonly string functionId; - private readonly string functionName; private readonly string hubName; private readonly CloudStorageAccount storageAccount; private readonly ScaleMonitorDescriptor scaleMonitorDescriptor; @@ -27,16 +25,12 @@ internal sealed class DurableTaskScaleMonitor : IScaleMonitor GetScaleResultAsync(TargetScalerContext co // and the ScaleController is injecting it's own custom ILogger implementation that forwards logs to Kusto. var metricsLog = $"Metrics: workItemQueueLength={workItemQueueLength}. controlQueueLengths={serializedControlQueueLengths}. " + $"maxConcurrentOrchestrators={this.MaxConcurrentOrchestrators}. maxConcurrentActivities={this.MaxConcurrentActivities}"; - var scaleControllerLog = $"Target worker count for '{this.functionId}' is '{numWorkersToRequest}'. " + + var scaleControllerLog = $"Target worker count for '{this.scaler}' is '{numWorkersToRequest}'. " + metricsLog; // target worker count should never be negative @@ -85,7 +89,7 @@ public async Task GetScaleResultAsync(TargetScalerContext co // We want to augment the exception with metrics information for investigation purposes var metricsLog = $"Metrics: workItemQueueLength={metrics?.WorkItemQueueLength}. controlQueueLengths={metrics?.ControlQueueLengths}. " + $"maxConcurrentOrchestrators={this.MaxConcurrentOrchestrators}. maxConcurrentActivities={this.MaxConcurrentActivities}"; - var errorLog = $"Error: target worker count for '{this.functionId}' resulted in exception. " + metricsLog; + var errorLog = $"Error: target worker count for '{this.scaler}' resulted in exception. " + metricsLog; throw new Exception(errorLog, ex); } } From 14a6a2011ab92daf302849e2b5125384d6e39210 Mon Sep 17 00:00:00 2001 From: Chris Gillum Date: Fri, 15 Nov 2024 13:38:28 -0800 Subject: [PATCH 2/4] Fix unit tests --- .../Listener/DurableTaskScaleMonitor.cs | 7 ++----- test/FunctionsV2/DurableTaskScaleMonitorTests.cs | 7 +------ 2 files changed, 3 insertions(+), 11 deletions(-) diff --git a/src/WebJobs.Extensions.DurableTask/Listener/DurableTaskScaleMonitor.cs b/src/WebJobs.Extensions.DurableTask/Listener/DurableTaskScaleMonitor.cs index 912210158..c24762e06 100644 --- a/src/WebJobs.Extensions.DurableTask/Listener/DurableTaskScaleMonitor.cs +++ b/src/WebJobs.Extensions.DurableTask/Listener/DurableTaskScaleMonitor.cs @@ -37,18 +37,15 @@ public DurableTaskScaleMonitor( this.performanceMonitor = performanceMonitor; this.durableTaskMetricsProvider = durableTaskMetricsProvider; + string id = $"DurableTaskTrigger-{this.hubName}".ToLower(); #if FUNCTIONS_V3_OR_GREATER // Scalers in Durable Functions are shared for all functions in the same task hub. // So instead of using a function ID, we use the task hub name as the basis for the descriptor ID. - string id = $"DurableTask-AzureStorage:{hubName ?? "default"}"; this.scaleMonitorDescriptor = new ScaleMonitorDescriptor(id: id, functionId: id); #else -#pragma warning disable CS0618 // Type or member is obsolete. - // We need this because the new ScaleMonitorDescriptor constructor is not compatible with the WebJobs version of Functions V1 and V2. // Technically, it is also not available in Functions V3, but we don't have a TFM allowing us to differentiate between Functions V3 and V4. - this.scaleMonitorDescriptor = new ScaleMonitorDescriptor($"DurableTaskTrigger-{this.hubName}".ToLower()); -#pragma warning restore CS0618 // Type or member is obsolete. However, the new interface is not compatible with Functions V2 and V1 + this.scaleMonitorDescriptor = new ScaleMonitorDescriptor(id); #endif } diff --git a/test/FunctionsV2/DurableTaskScaleMonitorTests.cs b/test/FunctionsV2/DurableTaskScaleMonitorTests.cs index 57566da29..330567dda 100644 --- a/test/FunctionsV2/DurableTaskScaleMonitorTests.cs +++ b/test/FunctionsV2/DurableTaskScaleMonitorTests.cs @@ -20,8 +20,6 @@ namespace Microsoft.Azure.WebJobs.Extensions.DurableTask.Tests { public class DurableTaskScaleMonitorTests { - private readonly string functionId = "DurableTaskTriggerFunctionId"; - private readonly FunctionName functionName = new FunctionName("DurableTaskTriggerFunctionName"); private readonly string hubName = "DurableTaskTriggerHubName"; private readonly CloudStorageAccount storageAccount = CloudStorageAccount.Parse(TestHelpers.GetStorageConnectionString()); private readonly ITestOutputHelper output; @@ -41,15 +39,12 @@ public DurableTaskScaleMonitorTests(ITestOutputHelper output) this.traceHelper = new EndToEndTraceHelper(logger, false); this.performanceMonitor = new Mock(MockBehavior.Strict, this.storageAccount, this.hubName, (int?)null); var metricsProvider = new DurableTaskMetricsProvider( - this.functionName.Name, this.hubName, logger, this.performanceMonitor.Object, this.storageAccount); this.scaleMonitor = new DurableTaskScaleMonitor( - this.functionId, - this.functionName.Name, this.hubName, this.storageAccount, logger, @@ -61,7 +56,7 @@ public DurableTaskScaleMonitorTests(ITestOutputHelper output) [Trait("Category", PlatformSpecificHelpers.TestCategory)] public void ScaleMonitorDescriptor_ReturnsExpectedValue() { - Assert.Equal($"{this.functionId}-DurableTaskTrigger-{this.hubName}".ToLower(), this.scaleMonitor.Descriptor.Id); + Assert.Equal($"DurableTaskTrigger-{this.hubName}".ToLower(), this.scaleMonitor.Descriptor.Id); } [Fact] From 59c643daf2e71d31f223b6a5a20544a56d885762 Mon Sep 17 00:00:00 2001 From: Chris Gillum Date: Wed, 20 Nov 2024 10:40:54 -0800 Subject: [PATCH 3/4] Fix unit test --- test/FunctionsV2/DurableTaskListenerTests.cs | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/test/FunctionsV2/DurableTaskListenerTests.cs b/test/FunctionsV2/DurableTaskListenerTests.cs index 3be412bd9..996a336cd 100644 --- a/test/FunctionsV2/DurableTaskListenerTests.cs +++ b/test/FunctionsV2/DurableTaskListenerTests.cs @@ -2,13 +2,10 @@ // Licensed under the MIT License. See LICENSE in the project root for license information. using System; -using System.Linq; -using Microsoft.Azure.WebJobs.Host.Executors; using Microsoft.Azure.WebJobs.Host.Scale; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging.Abstractions; using Microsoft.Extensions.Options; -using Moq; using Xunit; namespace Microsoft.Azure.WebJobs.Extensions.DurableTask.Tests @@ -40,9 +37,9 @@ public void GetMonitor_ReturnsExpectedValue() IScaleMonitor scaleMonitor = this.listener.GetMonitor(); Assert.Equal(typeof(DurableTaskScaleMonitor), scaleMonitor.GetType()); - Assert.Equal($"{this.functionId}-DurableTaskTrigger-DurableTaskHub".ToLower(), scaleMonitor.Descriptor.Id); + Assert.Equal($"DurableTaskTrigger-DurableTaskHub".ToLower(), scaleMonitor.Descriptor.Id); - var scaleMonitor2 = this.listener.GetMonitor(); + IScaleMonitor scaleMonitor2 = this.listener.GetMonitor(); Assert.Same(scaleMonitor, scaleMonitor2); } From 112c7427dbdc100d5254e8abd7e4b377a5857341 Mon Sep 17 00:00:00 2001 From: Chris Gillum Date: Wed, 20 Nov 2024 10:58:06 -0800 Subject: [PATCH 4/4] Fix mocks that were broken by change --- test/FunctionsV2/DurableTaskTargetScalerTests.cs | 1 - 1 file changed, 1 deletion(-) diff --git a/test/FunctionsV2/DurableTaskTargetScalerTests.cs b/test/FunctionsV2/DurableTaskTargetScalerTests.cs index cc2272e16..0fcae7ec6 100644 --- a/test/FunctionsV2/DurableTaskTargetScalerTests.cs +++ b/test/FunctionsV2/DurableTaskTargetScalerTests.cs @@ -47,7 +47,6 @@ public DurableTaskTargetScalerTests(ITestOutputHelper output) CloudStorageAccount nullCloudStorageAccountMock = null; this.metricsProviderMock = new Mock( MockBehavior.Strict, - "FunctionName", "HubName", logger, nullPerformanceMonitorMock,