Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update MaxQueuePollingInterval default for Flex Consumption apps #2953

Merged
merged 7 commits into from
Dec 3, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions eng/ci/code-mirror.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ trigger:
# Keep this set limited as appropriate (don't mirror individual user branches).
- main
- dev
- v3.x

resources:
repositories:
Expand Down
1 change: 1 addition & 0 deletions eng/ci/official-build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ trigger:
include:
- main
- dev
- v3.x

# CI only, does not trigger on PRs.
pr: none
Expand Down
2 changes: 2 additions & 0 deletions release_notes.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

### New Features

- Update MaxQueuePollingInterval default for Flex Consumption apps #2953

### Bug Fixes

### Breaking Changes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,16 @@ internal class AzureStorageDurabilityProvider : DurabilityProvider
private readonly JObject storageOptionsJson;
private readonly ILogger logger;

private readonly object initLock = new object();

#if !FUNCTIONS_V1
private DurableTaskScaleMonitor singletonScaleMonitor;
#endif

#if FUNCTIONS_V3_OR_GREATER
private DurableTaskTargetScaler singletonTargetScaler;
#endif

public AzureStorageDurabilityProvider(
AzureStorageOrchestrationService service,
IStorageAccountProvider storageAccountProvider,
Expand Down Expand Up @@ -226,12 +236,11 @@ internal static OrchestrationInstanceStatusQueryCondition ConvertWebjobsDurableC
#if !FUNCTIONS_V1

internal DurableTaskMetricsProvider GetMetricsProvider(
string functionName,
string hubName,
CloudStorageAccount storageAccount,
ILogger logger)
{
return new DurableTaskMetricsProvider(functionName, hubName, logger, performanceMonitor: null, storageAccount);
return new DurableTaskMetricsProvider(hubName, logger, performanceMonitor: null, storageAccount);
}

/// <inheritdoc/>
Expand All @@ -242,16 +251,22 @@ public override bool TryGetScaleMonitor(
string connectionName,
out IScaleMonitor scaleMonitor)
{
CloudStorageAccount storageAccount = this.storageAccountProvider.GetStorageAccountDetails(connectionName).ToCloudStorageAccount();
DurableTaskMetricsProvider metricsProvider = this.GetMetricsProvider(functionName, hubName, storageAccount, this.logger);
scaleMonitor = new DurableTaskScaleMonitor(
functionId,
functionName,
hubName,
storageAccount,
this.logger,
metricsProvider);
return true;
lock (this.initLock)
{
if (this.singletonScaleMonitor == null)
{
CloudStorageAccount storageAccount = this.storageAccountProvider.GetStorageAccountDetails(connectionName).ToCloudStorageAccount();
DurableTaskMetricsProvider metricsProvider = this.GetMetricsProvider(hubName, storageAccount, this.logger);
this.singletonScaleMonitor = new DurableTaskScaleMonitor(
hubName,
storageAccount,
this.logger,
metricsProvider);
}

scaleMonitor = this.singletonScaleMonitor;
return true;
}
}

#endif
Expand All @@ -263,11 +278,23 @@ public override bool TryGetTargetScaler(
string connectionName,
out ITargetScaler targetScaler)
{
// This is only called by the ScaleController, it doesn't run in the Functions Host process.
CloudStorageAccount storageAccount = this.storageAccountProvider.GetStorageAccountDetails(connectionName).ToCloudStorageAccount();
DurableTaskMetricsProvider metricsProvider = this.GetMetricsProvider(functionName, hubName, storageAccount, this.logger);
targetScaler = new DurableTaskTargetScaler(functionId, metricsProvider, this, this.logger);
return true;
lock (this.initLock)
{
if (this.singletonTargetScaler == null)
{
// This is only called by the ScaleController, it doesn't run in the Functions Host process.
CloudStorageAccount storageAccount = this.storageAccountProvider.GetStorageAccountDetails(connectionName).ToCloudStorageAccount();
DurableTaskMetricsProvider metricsProvider = this.GetMetricsProvider(hubName, storageAccount, this.logger);

// Scalers in Durable Functions are shared for all functions in the same task hub.
// So instead of using a function ID, we use the task hub name as the basis for the descriptor ID.
string id = $"DurableTask-AzureStorage:{hubName ?? "default"}";
this.singletonTargetScaler = new DurableTaskTargetScaler(id, metricsProvider, this, this.logger);
}

targetScaler = this.singletonTargetScaler;
return true;
}
}
#endif
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,18 @@ namespace Microsoft.Azure.WebJobs.Extensions.DurableTask
{
internal class DurableTaskMetricsProvider
{
private readonly string functionName;
private readonly string hubName;
private readonly ILogger logger;
private readonly CloudStorageAccount storageAccount;

private DisconnectedPerformanceMonitor performanceMonitor;

public DurableTaskMetricsProvider(string functionName, string hubName, ILogger logger, DisconnectedPerformanceMonitor performanceMonitor, CloudStorageAccount storageAccount)
public DurableTaskMetricsProvider(
string hubName,
ILogger logger,
DisconnectedPerformanceMonitor performanceMonitor,
CloudStorageAccount storageAccount)
{
this.functionName = functionName;
this.hubName = hubName;
this.logger = logger;
this.performanceMonitor = performanceMonitor;
Expand All @@ -42,7 +44,7 @@ public virtual async Task<DurableTaskTriggerMetrics> GetMetricsAsync()
}
catch (StorageException e)
{
this.logger.LogWarning("{details}. Function: {functionName}. HubName: {hubName}.", e.ToString(), this.functionName, this.hubName);
this.logger.LogWarning("{details}. HubName: {hubName}.", e.ToString(), this.hubName);
}

if (heartbeat != null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ namespace Microsoft.Azure.WebJobs.Extensions.DurableTask
{
internal sealed class DurableTaskScaleMonitor : IScaleMonitor<DurableTaskTriggerMetrics>
{
private readonly string functionId;
private readonly string functionName;
private readonly string hubName;
private readonly CloudStorageAccount storageAccount;
private readonly ScaleMonitorDescriptor scaleMonitorDescriptor;
Expand All @@ -27,31 +25,27 @@ internal sealed class DurableTaskScaleMonitor : IScaleMonitor<DurableTaskTrigger
private DisconnectedPerformanceMonitor performanceMonitor;

public DurableTaskScaleMonitor(
string functionId,
string functionName,
string hubName,
CloudStorageAccount storageAccount,
ILogger logger,
DurableTaskMetricsProvider durableTaskMetricsProvider,
DisconnectedPerformanceMonitor performanceMonitor = null)
{
this.functionId = functionId;
this.functionName = functionName;
this.hubName = hubName;
this.storageAccount = storageAccount;
this.logger = logger;
this.performanceMonitor = performanceMonitor;
this.durableTaskMetricsProvider = durableTaskMetricsProvider;

string id = $"DurableTaskTrigger-{this.hubName}".ToLower();
#if FUNCTIONS_V3_OR_GREATER
this.scaleMonitorDescriptor = new ScaleMonitorDescriptor($"{this.functionId}-DurableTaskTrigger-{this.hubName}".ToLower(), this.functionId);
// Scalers in Durable Functions are shared for all functions in the same task hub.
// So instead of using a function ID, we use the task hub name as the basis for the descriptor ID.
this.scaleMonitorDescriptor = new ScaleMonitorDescriptor(id: id, functionId: id);
#else
#pragma warning disable CS0618 // Type or member is obsolete.

// We need this because the new ScaleMonitorDescriptor constructor is not compatible with the WebJobs version of Functions V1 and V2.
// Technically, it is also not available in Functions V3, but we don't have a TFM allowing us to differentiate between Functions V3 and V4.
this.scaleMonitorDescriptor = new ScaleMonitorDescriptor($"{this.functionId}-DurableTaskTrigger-{this.hubName}".ToLower());
#pragma warning restore CS0618 // Type or member is obsolete. However, the new interface is not compatible with Functions V2 and V1
this.scaleMonitorDescriptor = new ScaleMonitorDescriptor(id);
#endif
}

Expand Down Expand Up @@ -150,9 +144,10 @@ private ScaleStatus GetScaleStatusCore(int workerCount, DurableTaskTriggerMetric
if (writeToUserLogs)
{
this.logger.LogInformation(
$"Durable Functions Trigger Scale Decision: {scaleStatus.Vote.ToString()}, Reason: {scaleRecommendation?.Reason}",
"Durable Functions Trigger Scale Decision for {TaskHub}: {Vote}, Reason: {Reason}",
this.hubName,
this.functionName);
scaleStatus.Vote,
scaleRecommendation?.Reason);
}

return scaleStatus;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,18 @@ internal class DurableTaskTargetScaler : ITargetScaler
private readonly TargetScalerResult scaleResult;
private readonly DurabilityProvider durabilityProvider;
private readonly ILogger logger;
private readonly string functionId;
private readonly string scaler;

public DurableTaskTargetScaler(string functionId, DurableTaskMetricsProvider metricsProvider, DurabilityProvider durabilityProvider, ILogger logger)
public DurableTaskTargetScaler(
string scalerId,
DurableTaskMetricsProvider metricsProvider,
DurabilityProvider durabilityProvider,
ILogger logger)
{
this.functionId = functionId;
this.scaler = scalerId;
this.metricsProvider = metricsProvider;
this.scaleResult = new TargetScalerResult();
this.TargetScalerDescriptor = new TargetScalerDescriptor(this.functionId);
this.TargetScalerDescriptor = new TargetScalerDescriptor(this.scaler);
this.durabilityProvider = durabilityProvider;
this.logger = logger;
}
Expand Down Expand Up @@ -68,7 +72,7 @@ public async Task<TargetScalerResult> GetScaleResultAsync(TargetScalerContext co
// and the ScaleController is injecting it's own custom ILogger implementation that forwards logs to Kusto.
var metricsLog = $"Metrics: workItemQueueLength={workItemQueueLength}. controlQueueLengths={serializedControlQueueLengths}. " +
$"maxConcurrentOrchestrators={this.MaxConcurrentOrchestrators}. maxConcurrentActivities={this.MaxConcurrentActivities}";
var scaleControllerLog = $"Target worker count for '{this.functionId}' is '{numWorkersToRequest}'. " +
var scaleControllerLog = $"Target worker count for '{this.scaler}' is '{numWorkersToRequest}'. " +
metricsLog;

// target worker count should never be negative
Expand All @@ -85,7 +89,7 @@ public async Task<TargetScalerResult> GetScaleResultAsync(TargetScalerContext co
// We want to augment the exception with metrics information for investigation purposes
var metricsLog = $"Metrics: workItemQueueLength={metrics?.WorkItemQueueLength}. controlQueueLengths={metrics?.ControlQueueLengths}. " +
$"maxConcurrentOrchestrators={this.MaxConcurrentOrchestrators}. maxConcurrentActivities={this.MaxConcurrentActivities}";
var errorLog = $"Error: target worker count for '{this.functionId}' resulted in exception. " + metricsLog;
var errorLog = $"Error: target worker count for '{this.scaler}' resulted in exception. " + metricsLog;
throw new Exception(errorLog, ex);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ public class AzureStorageOptions
private const int MaxTaskHubNameSize = 45;
private const int MinTaskHubNameSize = 3;
private const string TaskHubPadding = "Hub";
private TimeSpan maxQueuePollingInterval;

/// <summary>
/// Gets or sets the name of the Azure Storage connection information used to manage the underlying Azure Storage resources.
Expand Down Expand Up @@ -160,9 +161,36 @@ public string TrackingStoreConnectionStringName

/// <summary>
/// Gets or sets the maximum queue polling interval.
/// We update the default value to 1 second for the Flex Consumption SKU
/// because of a known cold start latency with Flex Consumption
/// and Durable Functions.
/// The default value is 30 seconds for all other SKUs.
/// </summary>
/// <value>Maximum interval for polling control and work-item queues.</value>
public TimeSpan MaxQueuePollingInterval { get; set; } = TimeSpan.FromSeconds(30);
public TimeSpan MaxQueuePollingInterval
{
get
bachuv marked this conversation as resolved.
Show resolved Hide resolved
{
if (this.maxQueuePollingInterval == TimeSpan.Zero)
{
if (string.Equals(Environment.GetEnvironmentVariable("WEBSITE_SKU"), "FlexConsumption", StringComparison.OrdinalIgnoreCase))
{
this.maxQueuePollingInterval = TimeSpan.FromSeconds(1);
}
else
{
this.maxQueuePollingInterval = TimeSpan.FromSeconds(30);
}
}

return this.maxQueuePollingInterval;
}

set
{
this.maxQueuePollingInterval = value;
}
}

/// <summary>
/// Determines whether or not to use the old partition management strategy, or the new
Expand Down
59 changes: 59 additions & 0 deletions test/FunctionsV2/AzureStorageOptionsTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See LICENSE in the project root for license information.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Xunit;

namespace Microsoft.Azure.WebJobs.Extensions.DurableTask.Tests
{
public class AzureStorageOptionsTests
{
#if !FUNCTIONS_V1
[Fact]
[Trait("Category", PlatformSpecificHelpers.TestCategory)]
public void MaxQueuePollingInterval_NonFlexConsumption_DefaultValue()
{
Environment.SetEnvironmentVariable("WEBSITE_SKU", "Free");

var options = new AzureStorageOptions();
Assert.Equal(TimeSpan.FromSeconds(30), options.MaxQueuePollingInterval);
}

[Fact]
[Trait("Category", PlatformSpecificHelpers.TestCategory)]
public void MaxQueuePollingInterval_NonFlexConsumption_SetCustomValue()
{
Environment.SetEnvironmentVariable("WEBSITE_SKU", "Free");

var options = new AzureStorageOptions();
options.MaxQueuePollingInterval = TimeSpan.FromSeconds(4);
Assert.Equal(TimeSpan.FromSeconds(4), options.MaxQueuePollingInterval);
}

[Fact]
[Trait("Category", PlatformSpecificHelpers.TestCategory)]
public void MaxQueuePollingInterval_FlexConsumption_DefaultValue()
{
Environment.SetEnvironmentVariable("WEBSITE_SKU", "FlexConsumption");

var options = new AzureStorageOptions();
Assert.Equal(TimeSpan.FromSeconds(1), options.MaxQueuePollingInterval);
}

[Fact]
[Trait("Category", PlatformSpecificHelpers.TestCategory)]
public void MaxQueuePollingInterval_FlexConsumption_SetCustomValue()
{
Environment.SetEnvironmentVariable("WEBSITE_SKU", "FlexConsumption");

var options = new AzureStorageOptions();
options.MaxQueuePollingInterval = TimeSpan.FromSeconds(6);
Assert.Equal(TimeSpan.FromSeconds(6), options.MaxQueuePollingInterval);
}
#endif
}
}
7 changes: 2 additions & 5 deletions test/FunctionsV2/DurableTaskListenerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,10 @@
// Licensed under the MIT License. See LICENSE in the project root for license information.

using System;
using System.Linq;
using Microsoft.Azure.WebJobs.Host.Executors;
using Microsoft.Azure.WebJobs.Host.Scale;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.Extensions.Options;
using Moq;
using Xunit;

namespace Microsoft.Azure.WebJobs.Extensions.DurableTask.Tests
Expand Down Expand Up @@ -40,9 +37,9 @@ public void GetMonitor_ReturnsExpectedValue()
IScaleMonitor scaleMonitor = this.listener.GetMonitor();

Assert.Equal(typeof(DurableTaskScaleMonitor), scaleMonitor.GetType());
Assert.Equal($"{this.functionId}-DurableTaskTrigger-DurableTaskHub".ToLower(), scaleMonitor.Descriptor.Id);
Assert.Equal($"DurableTaskTrigger-DurableTaskHub".ToLower(), scaleMonitor.Descriptor.Id);

var scaleMonitor2 = this.listener.GetMonitor();
IScaleMonitor scaleMonitor2 = this.listener.GetMonitor();

Assert.Same(scaleMonitor, scaleMonitor2);
}
Expand Down
Loading
Loading