Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make scalers into per-task hub singletons #2967

Merged
merged 4 commits into from
Nov 21, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,16 @@ internal class AzureStorageDurabilityProvider : DurabilityProvider
private readonly JObject storageOptionsJson;
private readonly ILogger logger;

private readonly object initLock = new object();

#if !FUNCTIONS_V1
private DurableTaskScaleMonitor singletonScaleMonitor;
#endif

#if FUNCTIONS_V3_OR_GREATER
private DurableTaskTargetScaler singletonTargetScaler;
#endif

public AzureStorageDurabilityProvider(
AzureStorageOrchestrationService service,
IStorageAccountProvider storageAccountProvider,
Expand Down Expand Up @@ -226,12 +236,11 @@ internal static OrchestrationInstanceStatusQueryCondition ConvertWebjobsDurableC
#if !FUNCTIONS_V1

internal DurableTaskMetricsProvider GetMetricsProvider(
string functionName,
string hubName,
CloudStorageAccount storageAccount,
ILogger logger)
{
return new DurableTaskMetricsProvider(functionName, hubName, logger, performanceMonitor: null, storageAccount);
return new DurableTaskMetricsProvider(hubName, logger, performanceMonitor: null, storageAccount);
}

/// <inheritdoc/>
Expand All @@ -242,16 +251,22 @@ public override bool TryGetScaleMonitor(
string connectionName,
out IScaleMonitor scaleMonitor)
{
CloudStorageAccount storageAccount = this.storageAccountProvider.GetStorageAccountDetails(connectionName).ToCloudStorageAccount();
DurableTaskMetricsProvider metricsProvider = this.GetMetricsProvider(functionName, hubName, storageAccount, this.logger);
scaleMonitor = new DurableTaskScaleMonitor(
functionId,
functionName,
hubName,
storageAccount,
this.logger,
metricsProvider);
return true;
lock (this.initLock)
{
cgillum marked this conversation as resolved.
Show resolved Hide resolved
if (this.singletonScaleMonitor == null)
{
CloudStorageAccount storageAccount = this.storageAccountProvider.GetStorageAccountDetails(connectionName).ToCloudStorageAccount();
DurableTaskMetricsProvider metricsProvider = this.GetMetricsProvider(hubName, storageAccount, this.logger);
this.singletonScaleMonitor = new DurableTaskScaleMonitor(
hubName,
storageAccount,
this.logger,
metricsProvider);
}

scaleMonitor = this.singletonScaleMonitor;
return true;
}
}

#endif
Expand All @@ -263,11 +278,23 @@ public override bool TryGetTargetScaler(
string connectionName,
out ITargetScaler targetScaler)
{
// This is only called by the ScaleController, it doesn't run in the Functions Host process.
CloudStorageAccount storageAccount = this.storageAccountProvider.GetStorageAccountDetails(connectionName).ToCloudStorageAccount();
DurableTaskMetricsProvider metricsProvider = this.GetMetricsProvider(functionName, hubName, storageAccount, this.logger);
targetScaler = new DurableTaskTargetScaler(functionId, metricsProvider, this, this.logger);
return true;
lock (this.initLock)
{
if (this.singletonTargetScaler == null)
{
// This is only called by the ScaleController, it doesn't run in the Functions Host process.
CloudStorageAccount storageAccount = this.storageAccountProvider.GetStorageAccountDetails(connectionName).ToCloudStorageAccount();
DurableTaskMetricsProvider metricsProvider = this.GetMetricsProvider(hubName, storageAccount, this.logger);

// Scalers in Durable Functions are shared for all functions in the same task hub.
// So instead of using a function ID, we use the task hub name as the basis for the descriptor ID.
string id = $"DurableTask-AzureStorage:{hubName ?? "default"}";
this.singletonTargetScaler = new DurableTaskTargetScaler(id, metricsProvider, this, this.logger);
}

targetScaler = this.singletonTargetScaler;
return true;
}
}
#endif
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,18 @@ namespace Microsoft.Azure.WebJobs.Extensions.DurableTask
{
internal class DurableTaskMetricsProvider
{
private readonly string functionName;
private readonly string hubName;
private readonly ILogger logger;
private readonly CloudStorageAccount storageAccount;

private DisconnectedPerformanceMonitor performanceMonitor;

public DurableTaskMetricsProvider(string functionName, string hubName, ILogger logger, DisconnectedPerformanceMonitor performanceMonitor, CloudStorageAccount storageAccount)
public DurableTaskMetricsProvider(
string hubName,
ILogger logger,
DisconnectedPerformanceMonitor performanceMonitor,
CloudStorageAccount storageAccount)
{
this.functionName = functionName;
this.hubName = hubName;
this.logger = logger;
this.performanceMonitor = performanceMonitor;
Expand All @@ -42,7 +44,7 @@ public virtual async Task<DurableTaskTriggerMetrics> GetMetricsAsync()
}
catch (StorageException e)
{
this.logger.LogWarning("{details}. Function: {functionName}. HubName: {hubName}.", e.ToString(), this.functionName, this.hubName);
this.logger.LogWarning("{details}. HubName: {hubName}.", e.ToString(), this.hubName);
}

if (heartbeat != null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ namespace Microsoft.Azure.WebJobs.Extensions.DurableTask
{
internal sealed class DurableTaskScaleMonitor : IScaleMonitor<DurableTaskTriggerMetrics>
{
private readonly string functionId;
private readonly string functionName;
private readonly string hubName;
private readonly CloudStorageAccount storageAccount;
private readonly ScaleMonitorDescriptor scaleMonitorDescriptor;
Expand All @@ -27,30 +25,29 @@ internal sealed class DurableTaskScaleMonitor : IScaleMonitor<DurableTaskTrigger
private DisconnectedPerformanceMonitor performanceMonitor;

public DurableTaskScaleMonitor(
string functionId,
string functionName,
string hubName,
CloudStorageAccount storageAccount,
ILogger logger,
DurableTaskMetricsProvider durableTaskMetricsProvider,
DisconnectedPerformanceMonitor performanceMonitor = null)
{
this.functionId = functionId;
this.functionName = functionName;
this.hubName = hubName;
this.storageAccount = storageAccount;
this.logger = logger;
this.performanceMonitor = performanceMonitor;
this.durableTaskMetricsProvider = durableTaskMetricsProvider;

#if FUNCTIONS_V3_OR_GREATER
this.scaleMonitorDescriptor = new ScaleMonitorDescriptor($"{this.functionId}-DurableTaskTrigger-{this.hubName}".ToLower(), this.functionId);
// Scalers in Durable Functions are shared for all functions in the same task hub.
// So instead of using a function ID, we use the task hub name as the basis for the descriptor ID.
string id = $"DurableTask-AzureStorage:{hubName ?? "default"}";
this.scaleMonitorDescriptor = new ScaleMonitorDescriptor(id: id, functionId: id);
#else
#pragma warning disable CS0618 // Type or member is obsolete.

// We need this because the new ScaleMonitorDescriptor constructor is not compatible with the WebJobs version of Functions V1 and V2.
// Technically, it is also not available in Functions V3, but we don't have a TFM allowing us to differentiate between Functions V3 and V4.
this.scaleMonitorDescriptor = new ScaleMonitorDescriptor($"{this.functionId}-DurableTaskTrigger-{this.hubName}".ToLower());
this.scaleMonitorDescriptor = new ScaleMonitorDescriptor($"DurableTaskTrigger-{this.hubName}".ToLower());
#pragma warning restore CS0618 // Type or member is obsolete. However, the new interface is not compatible with Functions V2 and V1
#endif
}
Expand Down Expand Up @@ -150,9 +147,10 @@ private ScaleStatus GetScaleStatusCore(int workerCount, DurableTaskTriggerMetric
if (writeToUserLogs)
{
this.logger.LogInformation(
$"Durable Functions Trigger Scale Decision: {scaleStatus.Vote.ToString()}, Reason: {scaleRecommendation?.Reason}",
"Durable Functions Trigger Scale Decision for {TaskHub}: {Vote}, Reason: {Reason}",
this.hubName,
this.functionName);
scaleStatus.Vote,
scaleRecommendation?.Reason);
}

return scaleStatus;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,18 @@ internal class DurableTaskTargetScaler : ITargetScaler
private readonly TargetScalerResult scaleResult;
private readonly DurabilityProvider durabilityProvider;
private readonly ILogger logger;
private readonly string functionId;
private readonly string scaler;

public DurableTaskTargetScaler(string functionId, DurableTaskMetricsProvider metricsProvider, DurabilityProvider durabilityProvider, ILogger logger)
public DurableTaskTargetScaler(
string scalerId,
DurableTaskMetricsProvider metricsProvider,
DurabilityProvider durabilityProvider,
ILogger logger)
{
this.functionId = functionId;
this.scaler = scalerId;
this.metricsProvider = metricsProvider;
this.scaleResult = new TargetScalerResult();
this.TargetScalerDescriptor = new TargetScalerDescriptor(this.functionId);
this.TargetScalerDescriptor = new TargetScalerDescriptor(this.scaler);
this.durabilityProvider = durabilityProvider;
this.logger = logger;
}
Expand Down Expand Up @@ -68,7 +72,7 @@ public async Task<TargetScalerResult> GetScaleResultAsync(TargetScalerContext co
// and the ScaleController is injecting it's own custom ILogger implementation that forwards logs to Kusto.
var metricsLog = $"Metrics: workItemQueueLength={workItemQueueLength}. controlQueueLengths={serializedControlQueueLengths}. " +
$"maxConcurrentOrchestrators={this.MaxConcurrentOrchestrators}. maxConcurrentActivities={this.MaxConcurrentActivities}";
var scaleControllerLog = $"Target worker count for '{this.functionId}' is '{numWorkersToRequest}'. " +
var scaleControllerLog = $"Target worker count for '{this.scaler}' is '{numWorkersToRequest}'. " +
metricsLog;

// target worker count should never be negative
Expand All @@ -85,7 +89,7 @@ public async Task<TargetScalerResult> GetScaleResultAsync(TargetScalerContext co
// We want to augment the exception with metrics information for investigation purposes
var metricsLog = $"Metrics: workItemQueueLength={metrics?.WorkItemQueueLength}. controlQueueLengths={metrics?.ControlQueueLengths}. " +
$"maxConcurrentOrchestrators={this.MaxConcurrentOrchestrators}. maxConcurrentActivities={this.MaxConcurrentActivities}";
var errorLog = $"Error: target worker count for '{this.functionId}' resulted in exception. " + metricsLog;
var errorLog = $"Error: target worker count for '{this.scaler}' resulted in exception. " + metricsLog;
throw new Exception(errorLog, ex);
}
}
Expand Down
Loading