Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge dev into main for v2.13.0 release #2670

Merged
merged 8 commits into from
Nov 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions azure-pipelines.yml
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
pr:
- main
- dev
- feature/*

jobs:

- job: FunctionsV1Tests
Expand Down
15 changes: 14 additions & 1 deletion release_notes.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,23 @@
# Release Notes

## Microsoft.Azure.Functions.Worker.Extensions.DurableTask <version>

### New Features

### Bug Fixes

### Breaking Changes

### Dependency Updates

## Microsoft.Azure.WebJobs.Extensions.DurableTask <version>

### New Features

### Bug Fixes

- Fix support for distributed tracing v2 in dotnet-isolated and Java (https://github.com/Azure/azure-functions-durable-extension/pull/2634)
- Fix issue where json token input (not a json object) was unwrapped before sending to an out-of-proc worker. This could then lead to deserialization issues as the wrapping quotes were missing. (Applies to dotnet-isolated and java only)
- Fix failed orchestration/entities not showing up as function invocation failures. (Applies to dotnet-isolated and java only)

### Breaking Changes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
using DurableTask.AzureStorage;
using DurableTask.AzureStorage.Tracking;
using DurableTask.Core;
using DurableTask.Core.Entities;
using Microsoft.Extensions.Logging;
using Microsoft.WindowsAzure.Storage;
using Newtonsoft.Json;
Expand All @@ -19,6 +20,7 @@
using Microsoft.Azure.WebJobs.Host.Scale;
#endif
using AzureStorage = DurableTask.AzureStorage;
using DTCore = DurableTask.Core;

namespace Microsoft.Azure.WebJobs.Extensions.DurableTask
{
Expand Down Expand Up @@ -54,8 +56,6 @@ public AzureStorageDurabilityProvider(
this.logger = logger;
}

public override bool SupportsEntities => true;

public override bool CheckStatusBeforeRaiseEvent => true;

/// <summary>
Expand Down Expand Up @@ -98,6 +98,29 @@ public async override Task<IList<OrchestrationState>> GetAllOrchestrationStatesW

/// <inheritdoc/>
public async override Task<string> RetrieveSerializedEntityState(EntityId entityId, JsonSerializerSettings serializerSettings)
{
EntityBackendQueries entityBackendQueries = (this.serviceClient as IEntityOrchestrationService)?.EntityBackendQueries;

if (entityBackendQueries != null) // entity queries are natively supported
{
var entity = await entityBackendQueries.GetEntityAsync(new DTCore.Entities.EntityId(entityId.EntityName, entityId.EntityKey), cancellation: default);

if (entity == null)
{
return null;
}
else
{
return entity.Value.SerializedState;
}
}
else // fall back to old implementation
{
return await this.LegacyImplementationOfRetrieveSerializedEntityState(entityId, serializerSettings);
}
}

private async Task<string> LegacyImplementationOfRetrieveSerializedEntityState(EntityId entityId, JsonSerializerSettings serializerSettings)
{
var instanceId = EntityId.GetSchedulerIdFromEntityId(entityId);
IList<OrchestrationState> stateList = await this.serviceClient.GetOrchestrationStateAsync(instanceId, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ internal class AzureStorageDurabilityProviderFactory : IDurabilityProviderFactor
private readonly AzureStorageOptions azureStorageOptions;
private readonly INameResolver nameResolver;
private readonly ILoggerFactory loggerFactory;
private readonly bool useSeparateQueueForEntityWorkItems;
private readonly bool inConsumption; // If true, optimize defaults for consumption
private AzureStorageDurabilityProvider defaultStorageProvider;

Expand Down Expand Up @@ -56,6 +57,7 @@ public AzureStorageDurabilityProviderFactory(
// different defaults for key configuration values.
int maxConcurrentOrchestratorsDefault = this.inConsumption ? 5 : 10 * Environment.ProcessorCount;
int maxConcurrentActivitiesDefault = this.inConsumption ? 10 : 10 * Environment.ProcessorCount;
int maxConcurrentEntitiesDefault = this.inConsumption ? 10 : 10 * Environment.ProcessorCount;
int maxEntityOperationBatchSizeDefault = this.inConsumption ? 50 : 5000;

if (this.inConsumption)
Expand All @@ -71,9 +73,18 @@ public AzureStorageDurabilityProviderFactory(
}
}

WorkerRuntimeType runtimeType = platformInfo.GetWorkerRuntimeType();
if (runtimeType == WorkerRuntimeType.DotNetIsolated ||
runtimeType == WorkerRuntimeType.Java ||
runtimeType == WorkerRuntimeType.Custom)
{
this.useSeparateQueueForEntityWorkItems = true;
}

// The following defaults are only applied if the customer did not explicitely set them on `host.json`
this.options.MaxConcurrentOrchestratorFunctions = this.options.MaxConcurrentOrchestratorFunctions ?? maxConcurrentOrchestratorsDefault;
this.options.MaxConcurrentActivityFunctions = this.options.MaxConcurrentActivityFunctions ?? maxConcurrentActivitiesDefault;
this.options.MaxConcurrentEntityFunctions = this.options.MaxConcurrentEntityFunctions ?? maxConcurrentEntitiesDefault;
this.options.MaxEntityOperationBatchSize = this.options.MaxEntityOperationBatchSize ?? maxEntityOperationBatchSizeDefault;

// Override the configuration defaults with user-provided values in host.json, if any.
Expand Down Expand Up @@ -188,6 +199,7 @@ internal AzureStorageOrchestrationServiceSettings GetAzureStorageOrchestrationSe
WorkItemQueueVisibilityTimeout = this.azureStorageOptions.WorkItemQueueVisibilityTimeout,
MaxConcurrentTaskOrchestrationWorkItems = this.options.MaxConcurrentOrchestratorFunctions ?? throw new InvalidOperationException($"{nameof(this.options.MaxConcurrentOrchestratorFunctions)} needs a default value"),
MaxConcurrentTaskActivityWorkItems = this.options.MaxConcurrentActivityFunctions ?? throw new InvalidOperationException($"{nameof(this.options.MaxConcurrentOrchestratorFunctions)} needs a default value"),
MaxConcurrentTaskEntityWorkItems = this.options.MaxConcurrentEntityFunctions ?? throw new InvalidOperationException($"{nameof(this.options.MaxConcurrentEntityFunctions)} needs a default value"),
ExtendedSessionsEnabled = this.options.ExtendedSessionsEnabled,
ExtendedSessionIdleTimeout = extendedSessionTimeout,
MaxQueuePollingInterval = this.azureStorageOptions.MaxQueuePollingInterval,
Expand All @@ -202,6 +214,9 @@ internal AzureStorageOrchestrationServiceSettings GetAzureStorageOrchestrationSe
LoggerFactory = this.loggerFactory,
UseLegacyPartitionManagement = this.azureStorageOptions.UseLegacyPartitionManagement,
UseTablePartitionManagement = this.azureStorageOptions.UseTablePartitionManagement,
UseSeparateQueueForEntityWorkItems = this.useSeparateQueueForEntityWorkItems,
EntityMessageReorderWindowInMinutes = this.options.EntityMessageReorderWindowInMinutes,
MaxEntityOperationBatchSize = this.options.MaxEntityOperationBatchSize,
};

if (this.inConsumption)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
using Microsoft.Azure.WebJobs.Host.Listeners;
using Microsoft.Azure.WebJobs.Host.Protocols;
using Microsoft.Azure.WebJobs.Host.Triggers;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;

namespace Microsoft.Azure.WebJobs.Extensions.DurableTask
Expand Down Expand Up @@ -57,6 +58,8 @@ public EntityTriggerAttributeBindingProvider(

private class EntityTriggerBinding : ITriggerBinding
{
private static readonly IReadOnlyDictionary<string, object?> EmptyBindingData = new Dictionary<string, object?>(capacity: 0);

private readonly DurableTaskExtension config;
private readonly ParameterInfo parameterInfo;
private readonly FunctionName entityName;
Expand All @@ -75,7 +78,10 @@ public EntityTriggerBinding(
this.BindingDataContract = GetBindingDataContract(parameterInfo);
}

public Type TriggerValueType => typeof(IDurableEntityContext);
// Out-of-proc V2 uses a different trigger value type
public Type TriggerValueType => this.config.OutOfProcProtocol == OutOfProcOrchestrationProtocol.MiddlewarePassthrough ?
typeof(RemoteEntityContext) :
typeof(IDurableEntityContext);

public IReadOnlyDictionary<string, Type> BindingDataContract { get; }

Expand All @@ -95,31 +101,52 @@ private static IReadOnlyDictionary<string, Type> GetBindingDataContract(Paramete

public Task<ITriggerData> BindAsync(object value, ValueBindingContext context)
{
var entityContext = (DurableEntityContext)value;
Type destinationType = this.parameterInfo.ParameterType;

object? convertedValue = null;
if (destinationType == typeof(IDurableEntityContext))
if (value is DurableEntityContext entityContext)
{
convertedValue = entityContext;
Type destinationType = this.parameterInfo.ParameterType;

object? convertedValue = null;
if (destinationType == typeof(IDurableEntityContext))
{
convertedValue = entityContext;
#if !FUNCTIONS_V1
((IDurableEntityContext)value).FunctionBindingContext = context.FunctionContext;
((IDurableEntityContext)value).FunctionBindingContext = context.FunctionContext;
#endif
}
else if (destinationType == typeof(string))
{
convertedValue = EntityContextToString(entityContext);
}

var inputValueProvider = new ObjectValueProvider(
convertedValue ?? value,
this.parameterInfo.ParameterType);
var inputValueProvider = new ObjectValueProvider(
convertedValue ?? value,
this.parameterInfo.ParameterType);

var bindingData = new Dictionary<string, object?>(StringComparer.OrdinalIgnoreCase);
bindingData[this.parameterInfo.Name!] = convertedValue;
var bindingData = new Dictionary<string, object?>(StringComparer.OrdinalIgnoreCase);
bindingData[this.parameterInfo.Name!] = convertedValue;

var triggerData = new TriggerData(inputValueProvider, bindingData);
return Task.FromResult<ITriggerData>(triggerData);
var triggerData = new TriggerData(inputValueProvider, bindingData);
return Task.FromResult<ITriggerData>(triggerData);
}
#if FUNCTIONS_V3_OR_GREATER
else if (value is RemoteEntityContext remoteContext)
{
// Generate a byte array which is the serialized protobuf payload
// https://developers.google.com/protocol-buffers/docs/csharptutorial#parsing_and_serialization
var entityBatchRequest = remoteContext.Request.ToEntityBatchRequest();

// We convert the binary payload into a base64 string because that seems to be the most commonly supported
// format for Azure Functions language workers. Attempts to send unencoded byte[] payloads were unsuccessful.
string encodedRequest = ProtobufUtils.Base64Encode(entityBatchRequest);
var contextValueProvider = new ObjectValueProvider(encodedRequest, typeof(string));
var triggerData = new TriggerData(contextValueProvider, EmptyBindingData);
return Task.FromResult<ITriggerData>(triggerData);
}
#endif
else
{
throw new ArgumentException($"Don't know how to bind to {value?.GetType().Name ?? "null"}.", nameof(value));
}
}

public ParameterDescriptor ToParameterDescriptor()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ public Task<ITriggerData> BindAsync(object? value, ValueBindingContext context)
InstanceId = remoteContext.InstanceId,
PastEvents = { remoteContext.PastEvents.Select(ProtobufUtils.ToHistoryEventProto) },
NewEvents = { remoteContext.NewEvents.Select(ProtobufUtils.ToHistoryEventProto) },
EntityParameters = remoteContext.EntityParameters.ToProtobuf(),
};

// We convert the binary payload into a base64 string because that seems to be the most commonly supported
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ internal class DurableActivityContext : IDurableActivityContext,
private readonly string instanceId;
private readonly MessagePayloadDataConverter messageDataConverter;
private readonly bool inputsAreArrays;
private readonly bool rawInput;

private JToken parsedJsonInput;
private string serializedOutput;
Expand All @@ -35,6 +36,9 @@ internal DurableActivityContext(DurableTaskExtension config, string instanceId,
this.inputsAreArrays =
config.OutOfProcProtocol == OutOfProcOrchestrationProtocol.OrchestratorShim ||
config.PlatformInformationService.GetWorkerRuntimeType() == WorkerRuntimeType.DotNetIsolated;

// Do not manipulate JSON input when using middleware passthrough.
this.rawInput = config.OutOfProcProtocol == OutOfProcOrchestrationProtocol.MiddlewarePassthrough;
}

/// <inheritdoc />
Expand Down Expand Up @@ -116,8 +120,7 @@ internal object GetInput(Type destinationType)
return null;
}

var value = jToken as JValue;
if (value != null)
if (!this.rawInput && jToken is JValue value)
{
return value.ToObject(destinationType);
}
Expand All @@ -129,7 +132,7 @@ internal object GetInput(Type destinationType)
// MessagePayloadDataConverter to throw an exception. This is a workaround for that case. All other
// inputs with destination System.String (in-proc: JSON and not JSON; out-of-proc: not-JSON) inputs with
// destination System.String should cast to JValues and be handled above.)
if (destinationType.Equals(typeof(string)))
if (this.rawInput)
{
return serializedValue;
}
Expand Down
Loading
Loading